/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.cloud.stream.binder.kafka;

import java.util.Collection;
import java.util.HashSet;
import java.util.Map;
import kafka.cluster.Broker;
import kafka.utils.ZKStringSerializer$;
import kafka.utils.ZkUtils$;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.serialize.ZkSerializer;
import org.springframework.boot.actuate.health.Health;
import org.springframework.boot.actuate.health.HealthIndicator;
import org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder;
import org.springframework.cloud.stream.binder.kafka.config.KafkaBinderConfigurationProperties;
import org.springframework.integration.kafka.core.BrokerAddress;
import org.springframework.integration.kafka.core.Partition;
import scala.collection.Iterable;
import scala.collection.JavaConversions;
import scala.collection.Seq;

public class KafkaBinderHealthIndicator
implements HealthIndicator {
    private final KafkaMessageChannelBinder binder;
    private final KafkaBinderConfigurationProperties configurationProperties;

    public KafkaBinderHealthIndicator(KafkaMessageChannelBinder binder, KafkaBinderConfigurationProperties configurationProperties) {
        this.binder = binder;
        this.configurationProperties = configurationProperties;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public Health health() {
        Health health;
        ZkClient zkClient = null;
        try {
            zkClient = new ZkClient(this.configurationProperties.getZkConnectionString(), this.configurationProperties.getZkSessionTimeout(), this.configurationProperties.getZkConnectionTimeout(), (ZkSerializer)ZKStringSerializer$.MODULE$);
            HashSet<String> brokersInClusterSet = new HashSet<String>();
            Seq allBrokersInCluster = ZkUtils$.MODULE$.getAllBrokersInCluster(zkClient);
            Collection brokersInCluster = JavaConversions.asJavaCollection((Iterable)allBrokersInCluster);
            for (Broker broker : brokersInCluster) {
                brokersInClusterSet.add(broker.connectionString());
            }
            HashSet<String> downMessages = new HashSet<String>();
            for (Map.Entry<String, Collection<Partition>> entry : this.binder.getTopicsInUse().entrySet()) {
                for (Partition partition : entry.getValue()) {
                    BrokerAddress address = this.binder.getConnectionFactory().getLeader(partition);
                    if (brokersInClusterSet.contains(address.toString())) continue;
                    downMessages.add(address.toString());
                }
            }
            if (downMessages.isEmpty()) {
                health = Health.up().build();
                return health;
            }
            health = Health.down().withDetail("Following brokers are down: ", (Object)((Object)downMessages).toString()).build();
        }
        catch (Exception e) {
            Health health2 = Health.down((Exception)e).build();
            return health2;
        }
        finally {
            if (zkClient != null) {
                try {
                    zkClient.close();
                }
                catch (Exception exception) {}
            }
        }
        return health;
    }
}

