/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.boot.actuate.kafka;

import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.Config;
import org.apache.kafka.clients.admin.DescribeClusterOptions;
import org.apache.kafka.clients.admin.DescribeClusterResult;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.config.ConfigResource;
import org.springframework.boot.actuate.health.AbstractHealthIndicator;
import org.springframework.boot.actuate.health.Health;
import org.springframework.boot.actuate.health.Status;
import org.springframework.kafka.core.KafkaAdmin;
import org.springframework.util.Assert;

public class KafkaHealthIndicator
extends AbstractHealthIndicator {
    static final String REPLICATION_PROPERTY = "transaction.state.log.replication.factor";
    private final KafkaAdmin kafkaAdmin;
    private final DescribeClusterOptions describeOptions;

    public KafkaHealthIndicator(KafkaAdmin kafkaAdmin, long requestTimeout) {
        Assert.notNull((Object)kafkaAdmin, (String)"KafkaAdmin must not be null");
        this.kafkaAdmin = kafkaAdmin;
        this.describeOptions = (DescribeClusterOptions)new DescribeClusterOptions().timeoutMs(Integer.valueOf((int)requestTimeout));
    }

    @Override
    protected void doHealthCheck(Health.Builder builder) throws Exception {
        try (AdminClient adminClient = AdminClient.create((Map)this.kafkaAdmin.getConfig());){
            DescribeClusterResult result = adminClient.describeCluster(this.describeOptions);
            String brokerId = ((Node)result.controller().get()).idString();
            int replicationFactor = this.getReplicationFactor(brokerId, adminClient);
            int nodes = ((Collection)result.nodes().get()).size();
            Status status = nodes >= replicationFactor ? Status.UP : Status.DOWN;
            builder.status(status).withDetail("clusterId", result.clusterId().get()).withDetail("brokerId", brokerId).withDetail("nodes", nodes);
        }
    }

    private int getReplicationFactor(String brokerId, AdminClient adminClient) throws ExecutionException, InterruptedException {
        ConfigResource configResource = new ConfigResource(ConfigResource.Type.BROKER, brokerId);
        Map kafkaConfig = (Map)adminClient.describeConfigs(Collections.singletonList(configResource)).all().get();
        Config brokerConfig = (Config)kafkaConfig.get(configResource);
        return Integer.parseInt(brokerConfig.get(REPLICATION_PROPERTY).value());
    }
}

