/*
 * Decompiled with CFR 0.152.
 */
package kafka.tools;

import java.io.File;
import java.io.Serializable;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicBoolean;
import kafka.integration.KafkaServerTestHarness;
import kafka.server.KafkaConfig;
import kafka.server.KafkaConfig$;
import kafka.server.KafkaServer;
import kafka.tools.MirrorMaker;
import kafka.tools.MirrorMaker$;
import kafka.utils.TestUtils$;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.utils.Exit;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.scalactic.source.Position;
import org.scalatest.Assertions$;
import scala.Function1;
import scala.None$;
import scala.Option;
import scala.Some;
import scala.collection.Map;
import scala.collection.Map$;
import scala.collection.Seq;
import scala.collection.immutable.Nil$;
import scala.reflect.ScalaSignature;

@ScalaSignature(bytes="\u0006\u0005\u00114AAC\u0006\u0001!!)q\u0003\u0001C\u00011!)1\u0004\u0001C!9!91\u0006\u0001b\u0001\n\u0003a\u0003BB\u001d\u0001A\u0003%Q\u0006C\u0003;\u0001\u0011\u00053\bC\u0003J\u0001\u0011\u00053\bC\u0003O\u0001\u0011\u00051\bC\u0003a\u0001\u0011\u00051\bC\u0003c\u0001\u0011\u00051H\u0001\u000eNSJ\u0014xN]'bW\u0016\u0014\u0018J\u001c;fOJ\fG/[8o)\u0016\u001cHO\u0003\u0002\r\u001b\u0005)Ao\\8mg*\ta\"A\u0003lC\u001a\\\u0017m\u0001\u0001\u0014\u0005\u0001\t\u0002C\u0001\n\u0016\u001b\u0005\u0019\"B\u0001\u000b\u000e\u0003-Ig\u000e^3he\u0006$\u0018n\u001c8\n\u0005Y\u0019\"AF&bM.\f7+\u001a:wKJ$Vm\u001d;ICJtWm]:\u0002\rqJg.\u001b;?)\u0005I\u0002C\u0001\u000e\u0001\u001b\u0005Y\u0011aD4f]\u0016\u0014\u0018\r^3D_:4\u0017nZ:\u0016\u0003u\u00012AH\u0012&\u001b\u0005y\"B\u0001\u0011\"\u0003)\u0019w\u000e\u001c7fGRLwN\u001c\u0006\u0002E\u0005)1oY1mC&\u0011Ae\b\u0002\u0004'\u0016\f\bC\u0001\u0014*\u001b\u00059#B\u0001\u0015\u000e\u0003\u0019\u0019XM\u001d<fe&\u0011!f\n\u0002\f\u0017\u000647.Y\"p]\u001aLw-\u0001\u0004fq&$X\rZ\u000b\u0002[A\u0011afN\u0007\u0002_)\u0011\u0001'M\u0001\u0007CR|W.[2\u000b\u0005I\u001a\u0014AC2p]\u000e,(O]3oi*\u0011A'N\u0001\u0005kRLGNC\u00017\u0003\u0011Q\u0017M^1\n\u0005az#!D!u_6L7MQ8pY\u0016\fg.A\u0004fq&$X\r\u001a\u0011\u0002\u000bM,G/\u00169\u0015\u0003q\u0002\"!\u0010 \u000e\u0003\u0005J!aP\u0011\u0003\tUs\u0017\u000e\u001e\u0015\u0003\u000b\u0005\u0003\"AQ$\u000e\u0003\rS!\u0001R#\u0002\u000b),h.\u001b;\u000b\u0003\u0019\u000b1a\u001c:h\u0013\tA5I\u0001\u0004CK\u001a|'/Z\u0001\ti\u0016\f'\u000fR8x]\"\u0012aa\u0013\t\u0003\u00052K!!T\"\u0003\u000b\u00053G/\u001a:\u0002MQ,7\u000f^\"p[6LGo\u00144gg\u0016$8\u000f\u00165s_^$\u0016.\\3pkR,\u0005pY3qi&|g\u000e\u000b\u0003\b!N#\u0006C\u0001\"R\u0013\t\u00116I\u0001\u0003UKN$\u0018\u0001C3ya\u0016\u001cG/\u001a3$\u0003U\u0003\"A\u00160\u000e\u0003]S!\u0001W-\u0002\r\u0015\u0014(o\u001c:t\u0015\tQ6,\u0001\u0004d_6lwN\u001c\u0006\u0003\u001dqS!!X#\u0002\r\u0005\u0004\u0018m\u00195f\u0013\tyvK\u0001\tUS6,w.\u001e;Fq\u000e,\u0007\u000f^5p]\u0006AC/Z:u\u0007>lW.\u001b;PM\u001a\u001cX\r^:SK6|g/\u001a(p]\u0016C\u0018n\u001d;f]R$v\u000e]5dg\"\u0012\u0001\u0002U\u0001\u0018i\u0016\u001cHoQ8n[\u0006\u001cV\r]1sCR,GMU3hKbD#!\u0003)")
public class MirrorMakerIntegrationTest
extends KafkaServerTestHarness {
    private final AtomicBoolean exited = new AtomicBoolean(false);

    @Override
    public Seq<KafkaConfig> generateConfigs() {
        return (Seq)TestUtils$.MODULE$.createBrokerConfigs(1, this.zkConnect(), true, true, (Option<SecurityProtocol>)None$.MODULE$, (Option<File>)None$.MODULE$, (Option<Properties>)None$.MODULE$, true, false, false, false, (Map<Object, String>)((Map)Map$.MODULE$.apply((scala.collection.immutable.Seq)Nil$.MODULE$)), 1, false, 1, (short)1).map((Function1 & Serializable)x$1 -> {
            Properties fromProps_overrides = new Properties();
            return KafkaConfig$.MODULE$.fromProps(x$1, fromProps_overrides, true);
        });
    }

    public AtomicBoolean exited() {
        return this.exited;
    }

    @Override
    @Before
    public void setUp() {
        Exit.setExitProcedure((x$2, x$3) -> this.exited().set(true));
        super.setUp();
    }

    @Override
    @After
    public void tearDown() {
        super.tearDown();
        try {
            Assert.assertFalse((boolean)this.exited().get());
        }
        finally {
            Exit.resetExitProcedure();
        }
    }

    @Test(expected=TimeoutException.class)
    public void testCommitOffsetsThrowTimeoutException() {
        Properties consumerProps = new Properties();
        consumerProps.put("group.id", "test-group");
        consumerProps.put("auto.offset.reset", "earliest");
        consumerProps.put("bootstrap.servers", this.brokerList());
        consumerProps.put("default.api.timeout.ms", "1");
        KafkaConsumer consumer = new KafkaConsumer(consumerProps, (Deserializer)new ByteArrayDeserializer(), (Deserializer)new ByteArrayDeserializer());
        MirrorMaker.ConsumerWrapper mirrorMakerConsumer = new MirrorMaker.ConsumerWrapper((Consumer)consumer, (Option)None$.MODULE$, (Option)new Some((Object)"any"));
        mirrorMakerConsumer.offsets().put((Object)new TopicPartition("test", 0), (Object)0L);
        mirrorMakerConsumer.commit();
    }

    @Test
    public void testCommitOffsetsRemoveNonExistentTopics() {
        Properties consumerProps = new Properties();
        consumerProps.put("group.id", "test-group");
        consumerProps.put("auto.offset.reset", "earliest");
        consumerProps.put("bootstrap.servers", this.brokerList());
        consumerProps.put("default.api.timeout.ms", "2000");
        KafkaConsumer consumer = new KafkaConsumer(consumerProps, (Deserializer)new ByteArrayDeserializer(), (Deserializer)new ByteArrayDeserializer());
        MirrorMaker.ConsumerWrapper mirrorMakerConsumer = new MirrorMaker.ConsumerWrapper((Consumer)consumer, (Option)None$.MODULE$, (Option)new Some((Object)"any"));
        mirrorMakerConsumer.offsets().put((Object)new TopicPartition("nonexistent-topic1", 0), (Object)0L);
        mirrorMakerConsumer.offsets().put((Object)new TopicPartition("nonexistent-topic2", 0), (Object)0L);
        MirrorMaker$.MODULE$.commitOffsets(mirrorMakerConsumer);
        Assert.assertTrue((String)"Offsets for non-existent topics should be removed", (boolean)mirrorMakerConsumer.offsets().isEmpty());
    }

    @Test
    public void testCommaSeparatedRegex() {
        String topic = "new-topic";
        String msg = "a test message";
        String brokerList = TestUtils$.MODULE$.getBrokerListStrFromServers((Seq<KafkaServer>)this.servers(), SecurityProtocol.PLAINTEXT);
        Properties producerProps = new Properties();
        producerProps.put("bootstrap.servers", brokerList);
        producerProps.put("key.serializer", ByteArraySerializer.class);
        producerProps.put("value.serializer", ByteArraySerializer.class);
        MirrorMaker.MirrorMakerProducer producer = new MirrorMaker.MirrorMakerProducer(true, producerProps);
        MirrorMaker$.MODULE$.producer_$eq(producer);
        MirrorMaker$.MODULE$.producer().send(new ProducerRecord(topic, (Object)msg.getBytes()));
        MirrorMaker$.MODULE$.producer().close();
        Properties consumerProps = new Properties();
        consumerProps.put("group.id", "test-group");
        consumerProps.put("auto.offset.reset", "earliest");
        consumerProps.put("bootstrap.servers", brokerList);
        KafkaConsumer consumer = new KafkaConsumer(consumerProps, (Deserializer)new ByteArrayDeserializer(), (Deserializer)new ByteArrayDeserializer());
        MirrorMaker.ConsumerWrapper mirrorMakerConsumer = new MirrorMaker.ConsumerWrapper((Consumer)consumer, (Option)None$.MODULE$, (Option)new Some((Object)"another_topic,new.*,foo"));
        mirrorMakerConsumer.init();
        try {
            long l = 15000L;
            long l2 = 100L;
            long waitUntilTrue_startTime = System.currentTimeMillis();
            while (!MirrorMakerIntegrationTest.$anonfun$testCommaSeparatedRegex$1(mirrorMakerConsumer, topic, msg)) {
                if (System.currentTimeMillis() > waitUntilTrue_startTime + l) {
                    throw Assertions$.MODULE$.fail("MirrorMaker consumer should read the expected message from the expected topic within the timeout", new Position("TestUtils.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 847));
                }
                Thread.sleep(Math.min(l, l2));
            }
        }
        finally {
            consumer.close();
        }
    }

    public static final /* synthetic */ boolean $anonfun$testCommaSeparatedRegex$1(MirrorMaker.ConsumerWrapper mirrorMakerConsumer$1, String topic$1, String msg$1) {
        boolean bl;
        try {
            ConsumerRecord data = mirrorMakerConsumer$1.receive();
            String string = data.topic();
            if ((string != null ? !string.equals(topic$1) : topic$1 != null) || !new String((byte[])data.value()).equals(msg$1)) {
                return false;
            }
            bl = true;
        }
        catch (MirrorMaker.NoRecordsException noRecordsException) {
            bl = false;
        }
        return bl;
    }

    public static final /* synthetic */ String $anonfun$testCommaSeparatedRegex$2() {
        return "MirrorMaker consumer should read the expected message from the expected topic within the timeout";
    }
}

