/*
 * Decompiled with CFR 0.152.
 */
package kafka.tools;

import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.File;
import java.io.InputStreamReader;
import java.io.Serializable;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.OpenOption;
import java.nio.file.Path;
import java.nio.file.attribute.FileAttribute;
import java.time.Duration;
import java.util.Collection;
import java.util.List;
import java.util.Properties;
import java.util.Random;
import java.util.Set;
import joptsimple.ArgumentAcceptingOptionSpec;
import joptsimple.OptionParser;
import joptsimple.OptionSet;
import joptsimple.OptionSpec;
import kafka.tools.TestRecord;
import kafka.tools.TestRecord$;
import kafka.utils.Exit$;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.utils.AbstractIterator;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.server.util.CommandLineUtils;
import org.junit.jupiter.api.Assertions;
import scala.Function0;
import scala.Function1;
import scala.None$;
import scala.Option;
import scala.Predef;
import scala.Predef$;
import scala.Tuple2;
import scala.collection.ArrayOps$;
import scala.collection.Iterator;
import scala.collection.Map;
import scala.collection.Seq;
import scala.collection.StringOps$;
import scala.jdk.CollectionConverters$;
import scala.reflect.ClassTag$;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.ObjectRef;
import scala.runtime.RichInt$;
import scala.runtime.RichLong;
import scala.runtime.RichLong$;
import scala.runtime.ScalaRunTime$;
import scala.runtime.java8.JFunction1;

public final class LogCompactionTester$ {
    public static final LogCompactionTester$ MODULE$ = new LogCompactionTester$();
    private static final int ReadAheadLimit = 4906;

    private int ReadAheadLimit() {
        return ReadAheadLimit;
    }

    public void main(String[] args) {
        OptionParser parser = new OptionParser(false);
        ArgumentAcceptingOptionSpec numMessagesOpt = parser.accepts("messages", "The number of messages to send or consume.").withRequiredArg().describedAs("count").ofType(Long.class).defaultsTo((Object)Predef$.MODULE$.long2Long(Long.MAX_VALUE), (Object[])new Long[0]);
        ArgumentAcceptingOptionSpec messageCompressionOpt = parser.accepts("compression-type", "message compression type").withOptionalArg().describedAs("compressionType").ofType(String.class).defaultsTo((Object)"none", (Object[])new String[0]);
        ArgumentAcceptingOptionSpec numDupsOpt = parser.accepts("duplicates", "The number of duplicates for each key.").withRequiredArg().describedAs("count").ofType(Integer.class).defaultsTo((Object)Predef$.MODULE$.int2Integer(5), (Object[])new Integer[0]);
        ArgumentAcceptingOptionSpec brokerOpt = parser.accepts("bootstrap-server", "The server(s) to connect to.").withRequiredArg().describedAs("url").ofType(String.class);
        ArgumentAcceptingOptionSpec topicsOpt = parser.accepts("topics", "The number of topics to test.").withRequiredArg().describedAs("count").ofType(Integer.class).defaultsTo((Object)Predef$.MODULE$.int2Integer(1), (Object[])new Integer[0]);
        ArgumentAcceptingOptionSpec percentDeletesOpt = parser.accepts("percent-deletes", "The percentage of updates that are deletes.").withRequiredArg().describedAs("percent").ofType(Integer.class).defaultsTo((Object)Predef$.MODULE$.int2Integer(0), (Object[])new Integer[0]);
        ArgumentAcceptingOptionSpec sleepSecsOpt = parser.accepts("sleep", "Time in milliseconds to sleep between production and consumption.").withRequiredArg().describedAs("ms").ofType(Integer.class).defaultsTo((Object)Predef$.MODULE$.int2Integer(0), (Object[])new Integer[0]);
        OptionSet options = parser.parse(args);
        if (ArrayOps$.MODULE$.isEmpty$extension(Predef$.MODULE$.refArrayOps((Object[])args))) {
            CommandLineUtils.printUsageAndExit((OptionParser)parser, (String)"A tool to test log compaction. Valid options are: ");
        }
        CommandLineUtils.checkRequiredArgs((OptionParser)parser, (OptionSet)options, (OptionSpec[])new OptionSpec[]{brokerOpt, numMessagesOpt});
        long messages = (Long)options.valueOf((OptionSpec)numMessagesOpt);
        String compressionType = (String)options.valueOf((OptionSpec)messageCompressionOpt);
        int percentDeletes = (Integer)options.valueOf((OptionSpec)percentDeletesOpt);
        int dups = (Integer)options.valueOf((OptionSpec)numDupsOpt);
        String brokerUrl = (String)options.valueOf((OptionSpec)brokerOpt);
        int topicCount = (Integer)options.valueOf((OptionSpec)topicsOpt);
        int sleepSecs = (Integer)options.valueOf((OptionSpec)sleepSecsOpt);
        long testId = new Random().nextLong();
        Object[] topics = (String[])RichInt$.MODULE$.until$extension(Predef$.MODULE$.intWrapper(0), topicCount).map((Function1 & Serializable)x$1 -> LogCompactionTester$.$anonfun$main$1(testId, BoxesRunTime.unboxToInt((Object)x$1))).toArray(ClassTag$.MODULE$.apply(String.class));
        this.createTopics(brokerUrl, (scala.collection.immutable.Seq<String>)ArrayOps$.MODULE$.toSeq$extension(Predef$.MODULE$.refArrayOps(topics)));
        Predef$.MODULE$.println((Object)new StringBuilder(31).append("Producing ").append(messages).append(" messages..to topics ").append(Predef$.MODULE$.wrapRefArray(topics).mkString(",")).toString());
        Path producedDataFilePath = this.produceMessages(brokerUrl, (String[])topics, messages, compressionType, dups, percentDeletes);
        Predef$.MODULE$.println((Object)new StringBuilder(24).append("Sleeping for ").append(sleepSecs).append(" seconds...").toString());
        Thread.sleep(sleepSecs * 1000);
        Predef$.MODULE$.println((Object)"Consuming messages...");
        Path consumedDataFilePath = this.consumeMessages(brokerUrl, (String[])topics);
        int producedLines = this.lineCount(producedDataFilePath);
        int consumedLines = this.lineCount(consumedDataFilePath);
        double reduction = (double)100 * (1.0 - (double)consumedLines / (double)producedLines);
        Predef$.MODULE$.println((Object)StringOps$.MODULE$.format$extension("%d rows of data produced, %d rows of data consumed (%.1f%% reduction).", (scala.collection.immutable.Seq)ScalaRunTime$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToInteger((int)producedLines), BoxesRunTime.boxToInteger((int)consumedLines), BoxesRunTime.boxToDouble((double)reduction)})));
        Predef$.MODULE$.println((Object)"De-duplicating and validating output files...");
        this.validateOutput(producedDataFilePath.toFile(), consumedDataFilePath.toFile());
        Utils.delete((File)producedDataFilePath.toFile());
        Utils.delete((File)consumedDataFilePath.toFile());
        Predef$.MODULE$.println((Object)"Data verification is completed");
    }

    /*
     * WARNING - void declaration
     */
    public void createTopics(String brokerUrl, scala.collection.immutable.Seq<String> topics) {
        Properties adminConfig = new Properties();
        adminConfig.put("bootstrap.servers", brokerUrl);
        try (Admin adminClient = Admin.create((Properties)adminConfig);){
            scala.collection.immutable.Map topicConfigs = (scala.collection.immutable.Map)Predef$.MODULE$.Map().apply((scala.collection.immutable.Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"cleanup.policy"), (Object)"compact")}));
            List newTopics = CollectionConverters$.MODULE$.SeqHasAsJava((Seq)topics.map((Function1 & Serializable)name -> new NewTopic(name, 1, (short)1).configs(CollectionConverters$.MODULE$.MapHasAsJava((Map)topicConfigs).asJava()))).asJava();
            adminClient.createTopics((Collection)newTopics).all().get();
            long l = 100L;
            long waitUntilTrue_waitTimeMs = 15000L;
            long waitUntilTrue_startTime = System.currentTimeMillis();
            while (true) {
                scala.collection.immutable.Seq $anonfun$createTopics$2_allTopics = CollectionConverters$.MODULE$.SetHasAsScala((Set)adminClient.listTopics().names().get()).asScala().toSeq();
                scala.collection.immutable.Seq seq = (scala.collection.immutable.Seq)topics.filter((Function1 & Serializable)topicName -> BoxesRunTime.boxToBoolean((boolean)LogCompactionTester$.$anonfun$createTopics$3(allTopics, topicName)));
                Object var14_10 = null;
                if (!seq.isEmpty()) {
                    void waitUntilTrue_pause;
                    if (System.currentTimeMillis() > waitUntilTrue_startTime + waitUntilTrue_waitTimeMs) {
                        Assertions.fail((String)new StringBuilder(31).append("timed out waiting for topics : ").append(seq).toString());
                    }
                    Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(waitUntilTrue_waitTimeMs), (long)waitUntilTrue_pause));
                    continue;
                }
                break;
            }
        }
    }

    public int lineCount(Path filPath) {
        return Files.readAllLines(filPath).size();
    }

    public void validateOutput(File producedDataFile, File consumedDataFile) {
        BufferedReader producedReader = this.externalSort(producedDataFile);
        BufferedReader consumedReader = this.externalSort(consumedDataFile);
        Iterator<TestRecord> produced = this.valuesIterator(producedReader);
        Iterator<TestRecord> consumed = this.valuesIterator(consumedReader);
        File producedDedupedFile = new File(new StringBuilder(8).append(producedDataFile.getAbsolutePath()).append(".deduped").toString());
        BufferedWriter producedDeduped = Files.newBufferedWriter(producedDedupedFile.toPath(), StandardCharsets.UTF_8, new OpenOption[0]);
        File consumedDedupedFile = new File(new StringBuilder(8).append(consumedDataFile.getAbsolutePath()).append(".deduped").toString());
        BufferedWriter consumedDeduped = Files.newBufferedWriter(consumedDedupedFile.toPath(), StandardCharsets.UTF_8, new OpenOption[0]);
        int total = 0;
        int mismatched = 0;
        while (produced.hasNext() && consumed.hasNext()) {
            TestRecord p = (TestRecord)produced.next();
            producedDeduped.write(p.toString());
            producedDeduped.newLine();
            TestRecord c = (TestRecord)consumed.next();
            consumedDeduped.write(c.toString());
            consumedDeduped.newLine();
            if (!((Object)p).equals(c)) {
                ++mismatched;
            }
            ++total;
        }
        producedDeduped.close();
        consumedDeduped.close();
        Predef$.MODULE$.println((Object)new StringBuilder(31).append("Validated ").append(total).append(" values, ").append(mismatched).append(" mismatches.").toString());
        if (!(!produced.hasNext())) {
            System.err.println(new StringBuilder(25).append("Data validation failed : ").append((Object)"Additional values produced not found in consumer log.").toString());
            throw Exit$.MODULE$.exit(1, (Option)None$.MODULE$);
        }
        if (!(!consumed.hasNext())) {
            System.err.println(new StringBuilder(25).append("Data validation failed : ").append((Object)"Additional values consumed not found in producer log.").toString());
            throw Exit$.MODULE$.exit(1, (Option)None$.MODULE$);
        }
        if (!(mismatched == 0)) {
            System.err.println(new StringBuilder(25).append("Data validation failed : ").append((Object)"Non-zero number of row mismatches.").toString());
            throw Exit$.MODULE$.exit(1, (Option)None$.MODULE$);
        }
        Utils.delete((File)producedDedupedFile);
        Utils.delete((File)consumedDedupedFile);
    }

    public void require(boolean requirement, Function0<Object> message) {
        if (!requirement) {
            System.err.println(new StringBuilder(25).append("Data validation failed : ").append(message.apply()).toString());
            throw Exit$.MODULE$.exit(1, (Option)None$.MODULE$);
        }
    }

    public Iterator<TestRecord> valuesIterator(BufferedReader reader) {
        return CollectionConverters$.MODULE$.IteratorHasAsScala((java.util.Iterator)new AbstractIterator<TestRecord>(reader){
            private final BufferedReader reader$1;

            public TestRecord makeNext() {
                TestRecord next = LogCompactionTester$.MODULE$.readNext(this.reader$1);
                while (next != null && next.delete()) {
                    next = LogCompactionTester$.MODULE$.readNext(this.reader$1);
                }
                if (next == null) {
                    return (TestRecord)this.allDone();
                }
                return next;
            }
            {
                this.reader$1 = reader$1;
            }
        }).asScala();
    }

    public TestRecord readNext(BufferedReader reader) {
        String line = reader.readLine();
        if (line == null) {
            return null;
        }
        TestRecord curr = TestRecord$.MODULE$.parse(line);
        while (true) {
            TestRecord next;
            block7: {
                block6: {
                    if ((line = this.peekLine(reader)) == null) {
                        return curr;
                    }
                    next = TestRecord$.MODULE$.parse(line);
                    if (next == null) break block6;
                    String string = next.topicAndKey();
                    String string2 = curr.topicAndKey();
                    if (!(string == null ? string2 != null : !string.equals(string2))) break block7;
                }
                return curr;
            }
            curr = next;
            reader.readLine();
        }
    }

    public String peekLine(BufferedReader reader) {
        reader.mark(this.ReadAheadLimit());
        String line = reader.readLine();
        reader.reset();
        return line;
    }

    public BufferedReader externalSort(File file) {
        Process process = new ProcessBuilder("sort", "--key=1,2", "--stable", "--buffer-size=20%", new StringBuilder(22).append("--temporary-directory=").append(Files.createTempDirectory("log_compaction_test", new FileAttribute[0])).toString(), file.getAbsolutePath()).start();
        new Thread(process){
            private final Process process$1;

            public void run() {
                if (this.process$1.waitFor() != 0) {
                    System.err.println("Process exited abnormally.");
                    while (this.process$1.getErrorStream().available() > 0) {
                        System.err.write(this.process$1.getErrorStream().read());
                    }
                    return;
                }
            }
            {
                this.process$1 = process$1;
            }
        }.start();
        return new BufferedReader(new InputStreamReader(process.getInputStream(), StandardCharsets.UTF_8), 0xA00000);
    }

    public Path produceMessages(String brokerUrl, String[] topics, long messages, String compressionType, int dups, int percentDeletes) {
        Path path;
        Properties producerProps = new Properties();
        producerProps.setProperty("max.block.ms", Long.toString(Long.MAX_VALUE));
        producerProps.setProperty("bootstrap.servers", brokerUrl);
        producerProps.setProperty("compression.type", compressionType);
        try (KafkaProducer producer = new KafkaProducer(producerProps, (Serializer)new ByteArraySerializer(), (Serializer)new ByteArraySerializer());){
            Random rand = new Random(1L);
            int keyCount = (int)(messages / (long)dups);
            Path producedFilePath = Files.createTempFile("kafka-log-cleaner-produced-", ".txt", new FileAttribute[0]);
            Predef$.MODULE$.println((Object)new StringBuilder(28).append("Logging produce requests to ").append(producedFilePath).toString());
            BufferedWriter producedWriter = Files.newBufferedWriter(producedFilePath, StandardCharsets.UTF_8, new OpenOption[0]);
            new RichLong(Predef$.MODULE$.longWrapper(0L)).until((Object)BoxesRunTime.boxToLong((long)(messages * (long)topics.length))).foreach$mVc$sp((Function1)(JFunction1.mcVJ.sp & Serializable)i -> {
                String topic = topics[(int)(i % (long)topics.length)];
                int key = rand.nextInt(keyCount);
                boolean delete = i % 100L < (long)percentDeletes;
                ProducerRecord msg = delete ? new ProducerRecord(topic, (Object)Integer.toString(key).getBytes(StandardCharsets.UTF_8), null) : new ProducerRecord(topic, (Object)Integer.toString(key).getBytes(StandardCharsets.UTF_8), (Object)Long.toString(i).getBytes(StandardCharsets.UTF_8));
                producer.send(msg);
                producedWriter.write(new TestRecord(topic, key, i, delete).toString());
                producedWriter.newLine();
            });
            producedWriter.close();
            path = producedFilePath;
        }
        return path;
    }

    public Consumer<String, String> createConsumer(String brokerUrl) {
        Properties consumerProps = new Properties();
        consumerProps.setProperty("group.id", new StringBuilder(17).append("log-cleaner-test-").append(new Random().nextInt(Integer.MAX_VALUE)).toString());
        consumerProps.setProperty("bootstrap.servers", brokerUrl);
        consumerProps.setProperty("auto.offset.reset", "earliest");
        return new KafkaConsumer(consumerProps, (Deserializer)new StringDeserializer(), (Deserializer)new StringDeserializer());
    }

    public Path consumeMessages(String brokerUrl, String[] topics) {
        Path path;
        Consumer<String, String> consumer = this.createConsumer(brokerUrl);
        consumer.subscribe((Collection)CollectionConverters$.MODULE$.SeqHasAsJava((Seq)ArrayOps$.MODULE$.toSeq$extension(Predef$.MODULE$.refArrayOps((Object[])topics))).asJava());
        Path consumedFilePath = Files.createTempFile("kafka-log-cleaner-consumed-", ".txt", new FileAttribute[0]);
        Predef$.MODULE$.println((Object)new StringBuilder(29).append("Logging consumed messages to ").append(consumedFilePath).toString());
        BufferedWriter consumedWriter = Files.newBufferedWriter(consumedFilePath, StandardCharsets.UTF_8, new OpenOption[0]);
        try {
            boolean done = false;
            while (!done) {
                ConsumerRecords consumerRecords = consumer.poll(Duration.ofSeconds(20L));
                if (!consumerRecords.isEmpty()) {
                    CollectionConverters$.MODULE$.IterableHasAsScala((Iterable)consumerRecords).asScala().foreach((Function1 & Serializable)record -> {
                        LogCompactionTester$.$anonfun$consumeMessages$1(consumedWriter, record);
                        return BoxedUnit.UNIT;
                    });
                    continue;
                }
                done = true;
            }
            path = consumedFilePath;
        }
        finally {
            consumedWriter.close();
            consumer.close();
        }
        return path;
    }

    public String readString(ByteBuffer buffer) {
        return Utils.utf8((ByteBuffer)buffer);
    }

    public static final /* synthetic */ String $anonfun$main$1(long testId$1, int x$1) {
        return new StringBuilder(18).append("log-cleaner-test-").append(testId$1).append("-").append(x$1).toString();
    }

    public static final /* synthetic */ boolean $anonfun$createTopics$3(scala.collection.immutable.Seq allTopics$1, String topicName) {
        return !allTopics$1.contains((Object)topicName);
    }

    public static final /* synthetic */ boolean $anonfun$createTopics$2(Admin adminClient$1, ObjectRef pendingTopics$1, scala.collection.immutable.Seq topics$1) {
        scala.collection.immutable.Seq allTopics = CollectionConverters$.MODULE$.SetHasAsScala((Set)adminClient$1.listTopics().names().get()).asScala().toSeq();
        pendingTopics$1.elem = (scala.collection.immutable.Seq)topics$1.filter((Function1 & Serializable)topicName -> BoxesRunTime.boxToBoolean((boolean)LogCompactionTester$.$anonfun$createTopics$3(allTopics, topicName)));
        return ((scala.collection.immutable.Seq)pendingTopics$1.elem).isEmpty();
    }

    public static final /* synthetic */ String $anonfun$createTopics$4(ObjectRef pendingTopics$1) {
        return new StringBuilder(31).append("timed out waiting for topics : ").append((scala.collection.immutable.Seq)pendingTopics$1.elem).toString();
    }

    public static final /* synthetic */ String $anonfun$validateOutput$1() {
        return "Additional values produced not found in consumer log.";
    }

    public static final /* synthetic */ String $anonfun$validateOutput$2() {
        return "Additional values consumed not found in producer log.";
    }

    public static final /* synthetic */ String $anonfun$validateOutput$3() {
        return "Non-zero number of row mismatches.";
    }

    public static final /* synthetic */ void $anonfun$consumeMessages$1(BufferedWriter consumedWriter$1, ConsumerRecord record) {
        boolean delete = record.value() == null;
        long value = delete ? -1L : StringOps$.MODULE$.toLong$extension(Predef$.MODULE$.augmentString((String)record.value()));
        consumedWriter$1.write(new TestRecord(record.topic(), StringOps$.MODULE$.toInt$extension(Predef$.MODULE$.augmentString((String)record.key())), value, delete).toString());
        consumedWriter$1.newLine();
    }

    private LogCompactionTester$() {
    }
}

