/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.kafka.test.rule;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import kafka.server.KafkaConfig;
import kafka.server.KafkaServer;
import kafka.server.NotRunning;
import kafka.utils.CoreUtils;
import kafka.utils.TestUtils;
import kafka.utils.ZKStringSerializer$;
import kafka.zk.EmbeddedZookeeper;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.exception.ZkInterruptedException;
import org.I0Itec.zkclient.serialize.ZkSerializer;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.utils.Time;
import org.assertj.core.api.AbstractBooleanAssert;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.ObjectArrayAssert;
import org.junit.rules.ExternalResource;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.kafka.test.core.BrokerAddress;
import org.springframework.kafka.test.rule.KafkaRule;
import org.springframework.retry.RetryCallback;
import org.springframework.retry.RetryContext;
import org.springframework.retry.RetryPolicy;
import org.springframework.retry.backoff.BackOffPolicy;
import org.springframework.retry.backoff.ExponentialBackOffPolicy;
import org.springframework.retry.policy.SimpleRetryPolicy;
import org.springframework.retry.support.RetryTemplate;
import org.springframework.util.Assert;
import scala.Option;
import scala.collection.Seq;

public class KafkaEmbedded
extends ExternalResource
implements KafkaRule,
InitializingBean,
DisposableBean {
    private static final Log logger = LogFactory.getLog(KafkaEmbedded.class);
    public static final String BEAN_NAME = "kafkaEmbedded";
    public static final String SPRING_EMBEDDED_KAFKA_BROKERS = "spring.embedded.kafka.brokers";
    public static final String SPRING_EMBEDDED_ZOOKEEPER_CONNECT = "spring.embedded.zookeeper.connect";
    public static final long METADATA_PROPAGATION_TIMEOUT = 10000L;
    private final int count;
    private final boolean controlledShutdown;
    private final String[] topics;
    private final int partitionsPerTopic;
    private final List<KafkaServer> kafkaServers = new ArrayList<KafkaServer>();
    private final Map<String, Object> brokerProperties = new HashMap<String, Object>();
    private EmbeddedZookeeper zookeeper;
    private ZkClient zookeeperClient;
    private String zkConnect;
    private int[] kafkaPorts;

    public KafkaEmbedded(int count) {
        this(count, false, new String[0]);
    }

    public KafkaEmbedded(int count, boolean controlledShutdown, String ... topics) {
        this(count, controlledShutdown, 2, topics);
    }

    public KafkaEmbedded(int count, boolean controlledShutdown, int partitions, String ... topics) {
        this.count = count;
        this.kafkaPorts = new int[this.count];
        this.controlledShutdown = controlledShutdown;
        this.topics = topics != null ? topics : new String[0];
        this.partitionsPerTopic = partitions;
    }

    public KafkaEmbedded brokerProperties(Map<String, String> brokerProperties) {
        this.brokerProperties.putAll(brokerProperties);
        return this;
    }

    public KafkaEmbedded brokerProperty(String property, Object value) {
        this.brokerProperties.put(property, value);
        return this;
    }

    public void setKafkaPorts(int ... kafkaPorts) {
        Assert.isTrue((kafkaPorts.length == this.count ? 1 : 0) != 0, (String)("A port must be provided for each instance [" + this.count + "], provided: " + Arrays.toString(kafkaPorts) + ", use 0 for a random port"));
        this.kafkaPorts = kafkaPorts;
    }

    public void afterPropertiesSet() throws Exception {
        this.before();
    }

    public void before() throws Exception {
        this.startZookeeper();
        int zkConnectionTimeout = 6000;
        int zkSessionTimeout = 6000;
        this.zkConnect = "127.0.0.1:" + this.zookeeper.port();
        this.zookeeperClient = new ZkClient(this.zkConnect, zkSessionTimeout, zkConnectionTimeout, (ZkSerializer)ZKStringSerializer$.MODULE$);
        this.kafkaServers.clear();
        for (int i = 0; i < this.count; ++i) {
            Properties brokerConfigProperties = TestUtils.createBrokerConfig((int)i, (String)this.zkConnect, (boolean)this.controlledShutdown, (boolean)true, (int)this.kafkaPorts[i], (Option)Option.apply(null), (Option)Option.apply(null), (Option)Option.apply(null), (boolean)true, (boolean)false, (int)0, (boolean)false, (int)0, (boolean)false, (int)0, (Option)Option.apply(null), (int)1);
            brokerConfigProperties.setProperty(KafkaConfig.ReplicaSocketTimeoutMsProp(), "1000");
            brokerConfigProperties.setProperty(KafkaConfig.ControllerSocketTimeoutMsProp(), "1000");
            brokerConfigProperties.setProperty(KafkaConfig.OffsetsTopicReplicationFactorProp(), "1");
            if (this.brokerProperties != null) {
                this.brokerProperties.forEach(brokerConfigProperties::put);
            }
            KafkaServer server = TestUtils.createServer((KafkaConfig)new KafkaConfig((Map)brokerConfigProperties), (Time)Time.SYSTEM);
            this.kafkaServers.add(server);
            if (this.kafkaPorts[i] != 0) continue;
            this.kafkaPorts[i] = TestUtils.boundPort((KafkaServer)server, (SecurityProtocol)SecurityProtocol.PLAINTEXT);
        }
        HashMap<String, String> adminConfigs = new HashMap<String, String>();
        adminConfigs.put("bootstrap.servers", this.getBrokersAsString());
        AdminClient admin = AdminClient.create(adminConfigs);
        List newTopics = Arrays.stream(this.topics).map(t -> new NewTopic(t, this.partitionsPerTopic, (short)this.count)).collect(Collectors.toList());
        CreateTopicsResult createTopics = admin.createTopics(newTopics);
        createTopics.all().get();
        admin.close();
        System.setProperty(SPRING_EMBEDDED_KAFKA_BROKERS, this.getBrokersAsString());
        System.setProperty(SPRING_EMBEDDED_ZOOKEEPER_CONNECT, this.getZookeeperConnectionString());
    }

    public void destroy() throws Exception {
        this.after();
    }

    public void after() {
        System.getProperties().remove(SPRING_EMBEDDED_KAFKA_BROKERS);
        System.getProperties().remove(SPRING_EMBEDDED_ZOOKEEPER_CONNECT);
        for (KafkaServer kafkaServer : this.kafkaServers) {
            try {
                if (kafkaServer.brokerState().currentState() != NotRunning.state()) {
                    kafkaServer.shutdown();
                    kafkaServer.awaitShutdown();
                }
            }
            catch (Exception exception) {
                // empty catch block
            }
            try {
                CoreUtils.delete((Seq)kafkaServer.config().logDirs());
            }
            catch (Exception exception) {}
        }
        try {
            this.zookeeperClient.close();
        }
        catch (ZkInterruptedException zkInterruptedException) {
            // empty catch block
        }
        try {
            this.zookeeper.shutdown();
        }
        catch (Exception exception) {
            // empty catch block
        }
    }

    @Override
    public List<KafkaServer> getKafkaServers() {
        return this.kafkaServers;
    }

    public KafkaServer getKafkaServer(int id) {
        return this.kafkaServers.get(id);
    }

    public EmbeddedZookeeper getZookeeper() {
        return this.zookeeper;
    }

    @Override
    public ZkClient getZkClient() {
        return this.zookeeperClient;
    }

    @Override
    public String getZookeeperConnectionString() {
        return this.zkConnect;
    }

    public BrokerAddress getBrokerAddress(int i) {
        KafkaServer kafkaServer = this.kafkaServers.get(i);
        return new BrokerAddress("127.0.0.1", kafkaServer.config().port());
    }

    @Override
    public BrokerAddress[] getBrokerAddresses() {
        ArrayList<BrokerAddress> addresses = new ArrayList<BrokerAddress>();
        for (int i = 0; i < this.kafkaPorts.length; ++i) {
            addresses.add(new BrokerAddress("127.0.0.1", this.kafkaPorts[i]));
        }
        return addresses.toArray(new BrokerAddress[addresses.size()]);
    }

    @Override
    public int getPartitionsPerTopic() {
        return this.partitionsPerTopic;
    }

    public void bounce(BrokerAddress brokerAddress) {
        for (KafkaServer kafkaServer : this.getKafkaServers()) {
            if (!brokerAddress.equals(new BrokerAddress(kafkaServer.config().hostName(), kafkaServer.config().port()))) continue;
            kafkaServer.shutdown();
            kafkaServer.awaitShutdown();
        }
    }

    public void startZookeeper() {
        this.zookeeper = new EmbeddedZookeeper();
    }

    @Deprecated
    public void bounce(int index, boolean waitForPropagation) {
        throw new UnsupportedOperationException();
    }

    @Deprecated
    public void bounce(int index) {
        this.bounce(index, true);
    }

    public void restart(final int index) throws Exception {
        SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy(10, Collections.singletonMap(Exception.class, true));
        ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
        backOffPolicy.setInitialInterval(100L);
        backOffPolicy.setMaxInterval(1000L);
        backOffPolicy.setMultiplier(2.0);
        RetryTemplate retryTemplate = new RetryTemplate();
        retryTemplate.setRetryPolicy((RetryPolicy)retryPolicy);
        retryTemplate.setBackOffPolicy((BackOffPolicy)backOffPolicy);
        retryTemplate.execute((RetryCallback)new RetryCallback<Void, Exception>(){

            public Void doWithRetry(RetryContext context) throws Exception {
                ((KafkaServer)KafkaEmbedded.this.kafkaServers.get(index)).startup();
                return null;
            }
        });
    }

    @Deprecated
    public void waitUntilSynced(String topic, int brokerId) {
        throw new UnsupportedOperationException();
    }

    @Override
    public String getBrokersAsString() {
        StringBuilder builder = new StringBuilder();
        for (BrokerAddress brokerAddress : this.getBrokerAddresses()) {
            builder.append(brokerAddress.toString()).append(',');
        }
        return builder.substring(0, builder.length() - 1);
    }

    @Override
    public boolean isEmbedded() {
        return true;
    }

    public void consumeFromAllEmbeddedTopics(Consumer<?, ?> consumer) throws Exception {
        this.consumeFromEmbeddedTopics(consumer, this.topics);
    }

    public void consumeFromAnEmbeddedTopic(Consumer<?, ?> consumer, String topic) throws Exception {
        this.consumeFromEmbeddedTopics(consumer, topic);
    }

    public void consumeFromEmbeddedTopics(Consumer<?, ?> consumer, String ... topics) throws Exception {
        for (String topic : topics) {
            ((ObjectArrayAssert)Assertions.assertThat((Object[])this.topics).as("topic '" + topic + "' is not in embedded topic list", new Object[0])).contains((Object[])new String[]{topic});
        }
        final CountDownLatch consumerLatch = new CountDownLatch(1);
        consumer.subscribe(Arrays.asList(topics), new ConsumerRebalanceListener(){

            public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
            }

            public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
                consumerLatch.countDown();
                if (logger.isDebugEnabled()) {
                    logger.debug((Object)("partitions assigned: " + partitions));
                }
            }
        });
        ConsumerRecords records = consumer.poll(0L);
        if (records.count() > 0) {
            if (logger.isDebugEnabled()) {
                logger.debug((Object)("Records received on initial poll for assignment; re-seeking to beginning; " + records.partitions().stream().flatMap(p -> records.records(p).stream()).map(r -> r.topic() + "-" + r.partition() + "@" + r.offset()).collect(Collectors.toList())));
            }
            consumer.seekToBeginning((Collection)records.partitions());
        }
        ((AbstractBooleanAssert)Assertions.assertThat((boolean)consumerLatch.await(30L, TimeUnit.SECONDS)).as("Failed to be assigned partitions from the embedded topics", new Object[0])).isTrue();
        logger.debug((Object)"Subscription Initiated");
    }
}

