/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.kafka.test;

import java.io.File;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.net.InetSocketAddress;
import java.nio.file.Files;
import java.nio.file.attribute.FileAttribute;
import java.time.Duration;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.stream.Collectors;
import kafka.cluster.EndPoint;
import kafka.common.KafkaException;
import kafka.server.KafkaBroker;
import kafka.server.KafkaConfig;
import kafka.server.KafkaServer;
import kafka.utils.CoreUtils;
import kafka.utils.TestUtils;
import kafka.zk.ZkFourLetterWords;
import kafka.zookeeper.ZooKeeperClient;
import org.apache.commons.logging.LogFactory;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.metadata.BrokerState;
import org.apache.zookeeper.client.ZKClientConfig;
import org.apache.zookeeper.server.NIOServerCnxnFactory;
import org.apache.zookeeper.server.ZooKeeperServer;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.core.log.LogAccessor;
import org.springframework.kafka.test.core.BrokerAddress;
import org.springframework.retry.RetryPolicy;
import org.springframework.retry.backoff.BackOffPolicy;
import org.springframework.retry.backoff.ExponentialBackOffPolicy;
import org.springframework.retry.policy.SimpleRetryPolicy;
import org.springframework.retry.support.RetryTemplate;
import org.springframework.util.Assert;
import scala.Option;
import scala.collection.Seq;

public class EmbeddedKafkaBroker
implements InitializingBean,
DisposableBean {
    private static final String BROKER_NEEDED = "Broker must be started before this method can be called";
    private static final String LOOPBACK = "127.0.0.1";
    private static final LogAccessor logger;
    public static final String BEAN_NAME = "embeddedKafka";
    public static final String SPRING_EMBEDDED_KAFKA_BROKERS = "spring.embedded.kafka.brokers";
    public static final String SPRING_EMBEDDED_ZOOKEEPER_CONNECT = "spring.embedded.zookeeper.connect";
    public static final String BROKER_LIST_PROPERTY = "spring.embedded.kafka.brokers.property";
    public static final int DEFAULT_ADMIN_TIMEOUT = 10;
    public static final int DEFAULT_ZK_SESSION_TIMEOUT = 18000;
    public static final int DEFAULT_ZK_CONNECTION_TIMEOUT = 18000;
    private static final Method GET_BROKER_STATE_METHOD;
    private static final Method BOUND_PORT_METHOD;
    private final int count;
    private final boolean controlledShutdown;
    private final Set<String> topics;
    private final int partitionsPerTopic;
    private final List<KafkaServer> kafkaServers = new ArrayList<KafkaServer>();
    private final Map<String, Object> brokerProperties = new HashMap<String, Object>();
    private EmbeddedZookeeper zookeeper;
    private String zkConnect;
    private int zkPort;
    private int[] kafkaPorts;
    private Duration adminTimeout = Duration.ofSeconds(10L);
    private int zkConnectionTimeout = 18000;
    private int zkSessionTimeout = 18000;
    private String brokerListProperty;
    private volatile ZooKeeperClient zooKeeperClient;

    public EmbeddedKafkaBroker(int count) {
        this(count, false, new String[0]);
    }

    public EmbeddedKafkaBroker(int count, boolean controlledShutdown, String ... topics) {
        this(count, controlledShutdown, 2, topics);
    }

    public EmbeddedKafkaBroker(int count, boolean controlledShutdown, int partitions, String ... topics) {
        this.count = count;
        this.kafkaPorts = new int[this.count];
        this.controlledShutdown = controlledShutdown;
        this.topics = topics != null ? new HashSet<String>(Arrays.asList(topics)) : new HashSet<String>();
        this.partitionsPerTopic = partitions;
    }

    public EmbeddedKafkaBroker brokerProperties(Map<String, String> properties) {
        this.brokerProperties.putAll(properties);
        return this;
    }

    public EmbeddedKafkaBroker brokerProperty(String property, Object value) {
        this.brokerProperties.put(property, value);
        return this;
    }

    public EmbeddedKafkaBroker kafkaPorts(int ... ports) {
        Assert.isTrue((ports.length == this.count ? 1 : 0) != 0, (String)("A port must be provided for each instance [" + this.count + "], provided: " + Arrays.toString(ports) + ", use 0 for a random port"));
        this.kafkaPorts = Arrays.copyOf(ports, ports.length);
        return this;
    }

    public EmbeddedKafkaBroker brokerListProperty(String brokerListProperty) {
        this.brokerListProperty = brokerListProperty;
        return this;
    }

    public EmbeddedKafkaBroker zkPort(int port) {
        this.zkPort = port;
        return this;
    }

    public int getZkPort() {
        return this.zookeeper != null ? this.zookeeper.getPort() : this.zkPort;
    }

    public void setZkPort(int zkPort) {
        this.zkPort = zkPort;
    }

    public EmbeddedKafkaBroker adminTimeout(int adminTimeout) {
        this.adminTimeout = Duration.ofSeconds(adminTimeout);
        return this;
    }

    public void setAdminTimeout(int adminTimeout) {
        this.adminTimeout = Duration.ofSeconds(adminTimeout);
    }

    public synchronized EmbeddedKafkaBroker zkConnectionTimeout(int zkConnectionTimeout) {
        this.zkConnectionTimeout = zkConnectionTimeout;
        return this;
    }

    public synchronized EmbeddedKafkaBroker zkSessionTimeout(int zkSessionTimeout) {
        this.zkSessionTimeout = zkSessionTimeout;
        return this;
    }

    public void afterPropertiesSet() {
        this.overrideExitMethods();
        try {
            this.zookeeper = new EmbeddedZookeeper(this.zkPort);
        }
        catch (IOException | InterruptedException e) {
            throw new IllegalStateException("Failed to create embedded Zookeeper", e);
        }
        this.zkConnect = "127.0.0.1:" + this.zookeeper.getPort();
        this.kafkaServers.clear();
        boolean userLogDir = this.brokerProperties.get(KafkaConfig.LogDirProp()) != null && this.count == 1;
        for (int i = 0; i < this.count; ++i) {
            Properties brokerConfigProperties = this.createBrokerProperties(i);
            brokerConfigProperties.setProperty(KafkaConfig.ReplicaSocketTimeoutMsProp(), "1000");
            brokerConfigProperties.setProperty(KafkaConfig.ControllerSocketTimeoutMsProp(), "1000");
            brokerConfigProperties.setProperty(KafkaConfig.OffsetsTopicReplicationFactorProp(), "1");
            brokerConfigProperties.setProperty(KafkaConfig.ReplicaHighWatermarkCheckpointIntervalMsProp(), String.valueOf(Long.MAX_VALUE));
            this.brokerProperties.forEach(brokerConfigProperties::put);
            if (!this.brokerProperties.containsKey(KafkaConfig.NumPartitionsProp())) {
                brokerConfigProperties.setProperty(KafkaConfig.NumPartitionsProp(), "" + this.partitionsPerTopic);
            }
            if (!userLogDir) {
                this.logDir(brokerConfigProperties);
            }
            KafkaServer server = TestUtils.createServer((KafkaConfig)new KafkaConfig((Map)brokerConfigProperties), (Time)Time.SYSTEM);
            this.kafkaServers.add(server);
            if (this.kafkaPorts[i] != 0) continue;
            try {
                this.kafkaPorts[i] = (Integer)BOUND_PORT_METHOD.invoke(null, server, SecurityProtocol.PLAINTEXT);
                continue;
            }
            catch (IllegalAccessException | IllegalArgumentException | InvocationTargetException e) {
                throw new IllegalStateException("Failed to determine broker port", e);
            }
        }
        this.createKafkaTopics(this.topics);
        if (this.brokerListProperty == null) {
            this.brokerListProperty = System.getProperty(BROKER_LIST_PROPERTY);
        }
        if (this.brokerListProperty == null) {
            this.brokerListProperty = SPRING_EMBEDDED_KAFKA_BROKERS;
        }
        System.setProperty(this.brokerListProperty, this.getBrokersAsString());
        System.setProperty(SPRING_EMBEDDED_ZOOKEEPER_CONNECT, this.getZookeeperConnectionString());
    }

    private void logDir(Properties brokerConfigProperties) {
        try {
            brokerConfigProperties.put(KafkaConfig.LogDirProp(), Files.createTempDirectory("spring.kafka." + UUID.randomUUID(), new FileAttribute[0]).toString());
        }
        catch (IOException e) {
            throw new UncheckedIOException(e);
        }
    }

    private void overrideExitMethods() {
        String exitMsg = "Exit.%s(%d, %s) called";
        Exit.setExitProcedure((statusCode, message) -> {
            if (logger.isDebugEnabled()) {
                logger.debug((Throwable)new RuntimeException(), (CharSequence)String.format(exitMsg, "exit", statusCode, message));
            } else {
                logger.warn((CharSequence)String.format(exitMsg, "exit", statusCode, message));
            }
        });
        Exit.setHaltProcedure((statusCode, message) -> {
            if (logger.isDebugEnabled()) {
                logger.debug((Throwable)new RuntimeException(), (CharSequence)String.format(exitMsg, "halt", statusCode, message));
            } else {
                logger.warn((CharSequence)String.format(exitMsg, "halt", statusCode, message));
            }
        });
    }

    private Properties createBrokerProperties(int i) {
        return TestUtils.createBrokerConfig((int)i, (String)this.zkConnect, (boolean)this.controlledShutdown, (boolean)true, (int)this.kafkaPorts[i], (Option)Option.apply(null), (Option)Option.apply(null), (Option)Option.apply(null), (boolean)true, (boolean)false, (int)0, (boolean)false, (int)0, (boolean)false, (int)0, (Option)Option.apply(null), (int)1, (boolean)false, (int)this.partitionsPerTopic, (short)((short)this.count));
    }

    public void addTopics(String ... topicsToAdd) {
        Assert.notNull((Object)this.zookeeper, (String)BROKER_NEEDED);
        HashSet<String> set = new HashSet<String>(Arrays.asList(topicsToAdd));
        this.createKafkaTopics(set);
        this.topics.addAll(set);
    }

    public void addTopics(NewTopic ... topicsToAdd) {
        Assert.notNull((Object)this.zookeeper, (String)BROKER_NEEDED);
        for (NewTopic topic : topicsToAdd) {
            Assert.isTrue((boolean)this.topics.add(topic.name()), () -> "topic already exists: " + topic);
            Assert.isTrue((topic.replicationFactor() <= this.count && (topic.replicasAssignments() == null || topic.replicasAssignments().size() <= this.count) ? 1 : 0) != 0, () -> "Embedded kafka does not support the requested replication factor: " + topic);
        }
        this.doWithAdmin(admin -> this.createTopics((AdminClient)admin, Arrays.asList(topicsToAdd)));
    }

    private void createKafkaTopics(Set<String> topicsToCreate) {
        this.doWithAdmin(admin -> this.createTopics((AdminClient)admin, topicsToCreate.stream().map(t -> new NewTopic(t, this.partitionsPerTopic, (short)this.count)).collect(Collectors.toList())));
    }

    private void createTopics(AdminClient admin, List<NewTopic> newTopics) {
        CreateTopicsResult createTopics = admin.createTopics(newTopics);
        try {
            createTopics.all().get(this.adminTimeout.getSeconds(), TimeUnit.SECONDS);
        }
        catch (Exception e) {
            throw new KafkaException((Throwable)e);
        }
    }

    public Map<String, Exception> addTopicsWithResults(String ... topicsToAdd) {
        Assert.notNull((Object)this.zookeeper, (String)BROKER_NEEDED);
        HashSet<String> set = new HashSet<String>(Arrays.asList(topicsToAdd));
        this.topics.addAll(set);
        return this.createKafkaTopicsWithResults(set);
    }

    public Map<String, Exception> addTopicsWithResults(NewTopic ... topicsToAdd) {
        Assert.notNull((Object)this.zookeeper, (String)BROKER_NEEDED);
        for (NewTopic topic : topicsToAdd) {
            Assert.isTrue((boolean)this.topics.add(topic.name()), () -> "topic already exists: " + topic);
            Assert.isTrue((topic.replicationFactor() <= this.count && (topic.replicasAssignments() == null || topic.replicasAssignments().size() <= this.count) ? 1 : 0) != 0, () -> "Embedded kafka does not support the requested replication factor: " + topic);
        }
        return this.doWithAdminFunction(admin -> this.createTopicsWithResults((AdminClient)admin, Arrays.asList(topicsToAdd)));
    }

    private Map<String, Exception> createKafkaTopicsWithResults(Set<String> topicsToCreate) {
        return this.doWithAdminFunction(admin -> this.createTopicsWithResults((AdminClient)admin, topicsToCreate.stream().map(t -> new NewTopic(t, this.partitionsPerTopic, (short)this.count)).collect(Collectors.toList())));
    }

    private Map<String, Exception> createTopicsWithResults(AdminClient admin, List<NewTopic> newTopics) {
        CreateTopicsResult createTopics = admin.createTopics(newTopics);
        HashMap<String, Exception> results = new HashMap<String, Exception>();
        createTopics.values().entrySet().stream().map(entry -> {
            Exception result;
            try {
                ((KafkaFuture)entry.getValue()).get(this.adminTimeout.getSeconds(), TimeUnit.SECONDS);
                result = null;
            }
            catch (InterruptedException | ExecutionException | TimeoutException e) {
                result = e;
            }
            return new AbstractMap.SimpleEntry<String, Exception>((String)entry.getKey(), result);
        }).forEach(entry -> results.put((String)entry.getKey(), (Exception)entry.getValue()));
        return results;
    }

    public void doWithAdmin(java.util.function.Consumer<AdminClient> callback) {
        HashMap<String, String> adminConfigs = new HashMap<String, String>();
        adminConfigs.put("bootstrap.servers", this.getBrokersAsString());
        try (AdminClient admin = AdminClient.create(adminConfigs);){
            callback.accept(admin);
        }
    }

    public <T> T doWithAdminFunction(Function<AdminClient, T> callback) {
        HashMap<String, String> adminConfigs = new HashMap<String, String>();
        adminConfigs.put("bootstrap.servers", this.getBrokersAsString());
        try (AdminClient admin = AdminClient.create(adminConfigs);){
            T t = callback.apply(admin);
            return t;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void destroy() {
        System.getProperties().remove(this.brokerListProperty);
        System.getProperties().remove(SPRING_EMBEDDED_ZOOKEEPER_CONNECT);
        for (KafkaServer kafkaServer : this.kafkaServers) {
            try {
                if (this.brokerRunning(kafkaServer)) {
                    kafkaServer.shutdown();
                    kafkaServer.awaitShutdown();
                }
            }
            catch (Exception exception) {
                // empty catch block
            }
            try {
                CoreUtils.delete((Seq)kafkaServer.config().logDirs());
            }
            catch (Exception exception) {}
        }
        EmbeddedKafkaBroker embeddedKafkaBroker = this;
        synchronized (embeddedKafkaBroker) {
            if (this.zooKeeperClient != null) {
                this.zooKeeperClient.close();
            }
        }
        try {
            this.zookeeper.shutdown();
            this.zkConnect = null;
        }
        catch (Exception exception) {
            // empty catch block
        }
    }

    private boolean brokerRunning(KafkaServer kafkaServer) {
        try {
            return !kafkaServer.brokerState().equals((Object)BrokerState.NOT_RUNNING);
        }
        catch (NoSuchMethodError error) {
            if (GET_BROKER_STATE_METHOD != null) {
                try {
                    return !GET_BROKER_STATE_METHOD.invoke((Object)kafkaServer, new Object[0]).toString().equals("NOT_RUNNING");
                }
                catch (IllegalAccessException | IllegalArgumentException | InvocationTargetException ex) {
                    logger.debug((Throwable)ex, (CharSequence)"Could not determine broker state during shutdown");
                    return true;
                }
            }
            logger.debug((CharSequence)"Could not determine broker state during shutdown");
            return true;
        }
    }

    public Set<String> getTopics() {
        return new HashSet<String>(this.topics);
    }

    public List<KafkaServer> getKafkaServers() {
        return this.kafkaServers;
    }

    public KafkaServer getKafkaServer(int id) {
        return this.kafkaServers.get(id);
    }

    public EmbeddedZookeeper getZookeeper() {
        return this.zookeeper;
    }

    public synchronized ZooKeeperClient getZooKeeperClient() {
        if (this.zooKeeperClient == null) {
            this.zooKeeperClient = new ZooKeeperClient(this.zkConnect, this.zkSessionTimeout, this.zkConnectionTimeout, 1, Time.SYSTEM, "embeddedKafkaZK", "embeddedKafkaZK", new ZKClientConfig(), "embeddedKafkaZK");
        }
        return this.zooKeeperClient;
    }

    public String getZookeeperConnectionString() {
        return this.zkConnect;
    }

    public BrokerAddress getBrokerAddress(int i) {
        KafkaServer kafkaServer = this.kafkaServers.get(i);
        return new BrokerAddress(LOOPBACK, ((EndPoint)kafkaServer.config().listeners().apply(0)).port());
    }

    public BrokerAddress[] getBrokerAddresses() {
        ArrayList<BrokerAddress> addresses = new ArrayList<BrokerAddress>();
        for (int kafkaPort : this.kafkaPorts) {
            addresses.add(new BrokerAddress(LOOPBACK, kafkaPort));
        }
        return addresses.toArray(new BrokerAddress[0]);
    }

    public int getPartitionsPerTopic() {
        return this.partitionsPerTopic;
    }

    public void bounce(BrokerAddress brokerAddress) {
        for (KafkaServer kafkaServer : this.getKafkaServers()) {
            EndPoint endpoint = (EndPoint)kafkaServer.config().listeners().apply(0);
            if (!brokerAddress.equals(new BrokerAddress(endpoint.host(), endpoint.port()))) continue;
            kafkaServer.shutdown();
            kafkaServer.awaitShutdown();
        }
    }

    public void restart(int index) throws Exception {
        SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy(10, Collections.singletonMap(Exception.class, true));
        ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
        backOffPolicy.setInitialInterval(100L);
        backOffPolicy.setMaxInterval(1000L);
        backOffPolicy.setMultiplier(2.0);
        RetryTemplate retryTemplate = new RetryTemplate();
        retryTemplate.setRetryPolicy((RetryPolicy)retryPolicy);
        retryTemplate.setBackOffPolicy((BackOffPolicy)backOffPolicy);
        retryTemplate.execute(context -> {
            this.kafkaServers.get(index).startup();
            return null;
        });
    }

    public String getBrokersAsString() {
        StringBuilder builder = new StringBuilder();
        for (BrokerAddress brokerAddress : this.getBrokerAddresses()) {
            builder.append(brokerAddress.toString()).append(',');
        }
        return builder.substring(0, builder.length() - 1);
    }

    public void consumeFromAllEmbeddedTopics(Consumer<?, ?> consumer) {
        this.consumeFromEmbeddedTopics(consumer, this.topics.toArray(new String[0]));
    }

    public void consumeFromAllEmbeddedTopics(Consumer<?, ?> consumer, boolean seekToEnd) {
        this.consumeFromEmbeddedTopics(consumer, seekToEnd, this.topics.toArray(new String[0]));
    }

    public void consumeFromAnEmbeddedTopic(Consumer<?, ?> consumer, String topic) {
        this.consumeFromEmbeddedTopics(consumer, topic);
    }

    public void consumeFromAnEmbeddedTopic(Consumer<?, ?> consumer, boolean seekToEnd, String topic) {
        this.consumeFromEmbeddedTopics(consumer, seekToEnd, topic);
    }

    public void consumeFromEmbeddedTopics(Consumer<?, ?> consumer, String ... topicsToConsume) {
        this.consumeFromEmbeddedTopics(consumer, false, topicsToConsume);
    }

    public void consumeFromEmbeddedTopics(Consumer<?, ?> consumer, boolean seekToEnd, String ... topicsToConsume) {
        List notEmbedded = Arrays.stream(topicsToConsume).filter(topic -> !this.topics.contains(topic)).collect(Collectors.toList());
        if (notEmbedded.size() > 0) {
            throw new IllegalStateException("topic(s):'" + notEmbedded + "' are not in embedded topic list");
        }
        final AtomicReference assigned = new AtomicReference();
        consumer.subscribe(Arrays.asList(topicsToConsume), new ConsumerRebalanceListener(){

            public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
            }

            public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
                assigned.set(partitions);
                logger.debug(() -> "partitions assigned: " + partitions);
            }
        });
        int n = 0;
        while (assigned.get() == null && n++ < 600) {
            consumer.poll(Duration.ofMillis(100L));
        }
        if (assigned.get() != null) {
            logger.debug(() -> "Partitions assigned " + assigned.get() + "; re-seeking to " + (seekToEnd ? "end; " : "beginning"));
            if (seekToEnd) {
                consumer.seekToEnd((Collection)assigned.get());
            } else {
                consumer.seekToBeginning((Collection)assigned.get());
            }
        } else {
            throw new IllegalStateException("Failed to be assigned partitions from the embedded topics");
        }
        logger.debug((CharSequence)"Subscription Initiated");
    }

    static {
        Method method;
        logger = new LogAccessor(LogFactory.getLog(EmbeddedKafkaBroker.class));
        try {
            method = KafkaServer.class.getDeclaredMethod("brokerState", new Class[0]);
            GET_BROKER_STATE_METHOD = method.getReturnType().equals(AtomicReference.class) ? method : null;
        }
        catch (NoSuchMethodException | SecurityException e) {
            throw new IllegalStateException("Failed to determine KafkaServer.brokerState() method; client version: " + AppInfoParser.getVersion(), e);
        }
        method = null;
        try {
            method = TestUtils.class.getDeclaredMethod("boundPort", KafkaServer.class, SecurityProtocol.class);
        }
        catch (NoSuchMethodException | SecurityException e) {
            try {
                method = TestUtils.class.getDeclaredMethod("boundPort", KafkaBroker.class, SecurityProtocol.class);
            }
            catch (NoSuchMethodException | SecurityException e1) {
                IllegalStateException isx = new IllegalStateException("Failed to determine TestUtils.boundPort() method; client version: " + AppInfoParser.getVersion(), e);
                isx.addSuppressed(e1);
                throw isx;
            }
        }
        BOUND_PORT_METHOD = method;
    }

    public static final class EmbeddedZookeeper {
        private static final int THREE_K = 3000;
        private static final int HUNDRED = 100;
        private static final int TICK_TIME = 800;
        private final NIOServerCnxnFactory factory;
        private final ZooKeeperServer zookeeper;
        private final int port;
        private final File snapshotDir = TestUtils.tempDir();
        private final File logDir = TestUtils.tempDir();

        public EmbeddedZookeeper(int zkPort) throws IOException, InterruptedException {
            System.setProperty("zookeeper.forceSync", "no");
            this.zookeeper = new ZooKeeperServer(this.snapshotDir, this.logDir, 800);
            this.factory = new NIOServerCnxnFactory();
            InetSocketAddress addr = new InetSocketAddress(EmbeddedKafkaBroker.LOOPBACK, zkPort == 0 ? TestUtils.RandomPort() : zkPort);
            this.factory.configure(addr, 0);
            this.factory.startup(this.zookeeper);
            this.port = this.zookeeper.getClientPort();
        }

        public int getPort() {
            return this.port;
        }

        public File getSnapshotDir() {
            return this.snapshotDir;
        }

        public File getLogDir() {
            return this.logDir;
        }

        public void shutdown() throws IOException {
            try {
                this.factory.shutdown();
            }
            catch (Exception e) {
                logger.error((Throwable)e, (CharSequence)"ZK shutdown failed");
            }
            int n = 0;
            while (n++ < 100) {
                try {
                    ZkFourLetterWords.sendStat((String)EmbeddedKafkaBroker.LOOPBACK, (int)this.port, (int)3000);
                    Thread.sleep(100L);
                }
                catch (Exception e) {
                    // empty catch block
                    break;
                }
            }
            if (n == 100) {
                logger.debug((CharSequence)"Zookeeper failed to stop");
            }
            try {
                this.zookeeper.getZKDatabase().close();
            }
            catch (Exception e) {
                logger.error((Throwable)e, (CharSequence)"ZK db close failed");
            }
            Utils.delete((File)this.logDir);
            Utils.delete((File)this.snapshotDir);
        }
    }
}

