/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.cloud.stream.binder.kafka;

import io.micrometer.core.instrument.Gauge;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.binder.MeterBinder;
import io.micrometer.core.instrument.noop.NoopGauge;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.ToDoubleFunction;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.springframework.cloud.stream.binder.BindingCreatedEvent;
import org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder;
import org.springframework.cloud.stream.binder.kafka.common.TopicInformation;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaBinderConfigurationProperties;
import org.springframework.context.ApplicationListener;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.lang.Nullable;
import org.springframework.util.ObjectUtils;

public class KafkaBinderMetrics
implements MeterBinder,
ApplicationListener<BindingCreatedEvent>,
AutoCloseable {
    private static final int DEFAULT_TIMEOUT = 5;
    private static final Log LOG = LogFactory.getLog(KafkaBinderMetrics.class);
    public static final String OFFSET_LAG_METRIC_NAME = "spring.cloud.stream.binder.kafka.offset";
    private final KafkaMessageChannelBinder binder;
    private final KafkaBinderConfigurationProperties binderConfigurationProperties;
    private ConsumerFactory<?, ?> defaultConsumerFactory;
    private final MeterRegistry meterRegistry;
    private Map<String, Consumer<?, ?>> metadataConsumers;
    private int timeout = 5;
    private final Map<String, Long> lastUnconsumedMessagesValues = new ConcurrentHashMap<String, Long>();
    ScheduledExecutorService scheduler;
    private final ReentrantLock consumerFactoryLock = new ReentrantLock();

    public KafkaBinderMetrics(KafkaMessageChannelBinder binder, KafkaBinderConfigurationProperties binderConfigurationProperties, ConsumerFactory<?, ?> defaultConsumerFactory, @Nullable MeterRegistry meterRegistry) {
        this.binder = binder;
        this.binderConfigurationProperties = binderConfigurationProperties;
        this.defaultConsumerFactory = defaultConsumerFactory;
        this.meterRegistry = meterRegistry;
        this.metadataConsumers = new ConcurrentHashMap();
    }

    public KafkaBinderMetrics(KafkaMessageChannelBinder binder, KafkaBinderConfigurationProperties binderConfigurationProperties) {
        this(binder, binderConfigurationProperties, null, null);
    }

    public void setTimeout(int timeout) {
        this.timeout = timeout;
    }

    public void bindTo(MeterRegistry registry) {
        if (this.scheduler != null) {
            LOG.info((Object)("Try to shutdown the old scheduler with " + ((ScheduledThreadPoolExecutor)this.scheduler).getPoolSize() + " threads"));
            this.scheduler.shutdown();
        }
        this.scheduler = Executors.newScheduledThreadPool(this.binder.getTopicsInUse().size());
        for (Map.Entry<String, TopicInformation> topicInfo : this.binder.getTopicsInUse().entrySet()) {
            String group;
            String topic;
            ToDoubleFunction<KafkaBinderMetrics> offsetComputation;
            Gauge register;
            if (!topicInfo.getValue().isConsumerTopic() || (register = Gauge.builder((String)OFFSET_LAG_METRIC_NAME, (Object)this, offsetComputation = this.computeOffsetComputationFunction(topic = topicInfo.getKey(), group = topicInfo.getValue().consumerGroup())).tag("group", group).tag("topic", topic).description("Unconsumed messages for a particular group and topic").register(registry)) instanceof NoopGauge) continue;
            this.lastUnconsumedMessagesValues.put(topic + "-" + group, 0L);
            this.scheduler.scheduleWithFixedDelay(() -> this.computeUnconsumedMessages(topic, group), 1L, this.binderConfigurationProperties.getMetrics().getOffsetLagMetricsInterval().toSeconds(), TimeUnit.SECONDS);
        }
    }

    private ToDoubleFunction<KafkaBinderMetrics> computeOffsetComputationFunction(String topic, String group) {
        if (this.binderConfigurationProperties.getMetrics().isDefaultOffsetLagMetricsEnabled()) {
            return o -> this.computeAndGetUnconsumedMessagesWithTimeout(topic, group);
        }
        return o -> this.lastUnconsumedMessagesValues.get(topic + "-" + group).longValue();
    }

    private long computeAndGetUnconsumedMessagesWithTimeout(String topic, String group) {
        Future<Long> future = this.scheduler.submit(() -> this.computeUnconsumedMessages(topic, group));
        try {
            return future.get(this.timeout, TimeUnit.SECONDS);
        }
        catch (InterruptedException ex) {
            Thread.currentThread().interrupt();
            return this.lastUnconsumedMessagesValues.get(topic + "-" + group);
        }
        catch (ExecutionException | TimeoutException ex) {
            return this.lastUnconsumedMessagesValues.get(topic + "-" + group);
        }
    }

    private long computeUnconsumedMessages(String topic, String group) {
        long lag = 0L;
        try {
            lag = this.findTotalTopicGroupLag(topic, group, this.metadataConsumers);
            this.lastUnconsumedMessagesValues.put(topic + "-" + group, lag);
        }
        catch (Exception ex) {
            LOG.debug((Object)("Cannot generate metric for topic: " + topic), (Throwable)ex);
        }
        return lag;
    }

    private long findTotalTopicGroupLag(String topic, String group, Map<String, Consumer<?, ?>> metadataConsumers) {
        long lag = 0L;
        Consumer metadataConsumer = metadataConsumers.computeIfAbsent(group, g -> this.createConsumerFactory().createConsumer(g, "monitoring"));
        List partitionInfos = metadataConsumer.partitionsFor(topic);
        LinkedList<TopicPartition> topicPartitions = new LinkedList<TopicPartition>();
        for (PartitionInfo partitionInfo : partitionInfos) {
            topicPartitions.add(new TopicPartition(partitionInfo.topic(), partitionInfo.partition()));
        }
        Map endOffsets = metadataConsumer.endOffsets(topicPartitions);
        Map committedOffsets = metadataConsumer.committed(endOffsets.keySet());
        Map beginningOffsets = metadataConsumer.beginningOffsets(endOffsets.keySet());
        for (Map.Entry endOffset : endOffsets.entrySet()) {
            OffsetAndMetadata current = (OffsetAndMetadata)committedOffsets.get(endOffset.getKey());
            Long beginningOffset = (Long)beginningOffsets.get(endOffset.getKey());
            lag += ((Long)endOffset.getValue()).longValue();
            if (current != null) {
                lag -= current.offset();
                continue;
            }
            if (beginningOffset == null) continue;
            lag -= beginningOffset.longValue();
        }
        return lag;
    }

    private ConsumerFactory<?, ?> createConsumerFactory() {
        if (this.defaultConsumerFactory == null) {
            try {
                this.consumerFactoryLock.lock();
                if (this.defaultConsumerFactory == null) {
                    HashMap<String, Object> props = new HashMap<String, Object>();
                    props.put("key.deserializer", ByteArrayDeserializer.class);
                    props.put("value.deserializer", ByteArrayDeserializer.class);
                    Map mergedConfig = this.binderConfigurationProperties.mergedConsumerConfiguration();
                    if (!ObjectUtils.isEmpty((Object)mergedConfig)) {
                        props.putAll(mergedConfig);
                    }
                    if (!props.containsKey("bootstrap.servers")) {
                        props.put("bootstrap.servers", this.binderConfigurationProperties.getKafkaConnectionString());
                    }
                    this.defaultConsumerFactory = new DefaultKafkaConsumerFactory(props);
                }
            }
            finally {
                this.consumerFactoryLock.unlock();
            }
        }
        return this.defaultConsumerFactory;
    }

    public void onApplicationEvent(BindingCreatedEvent event) {
        if (this.meterRegistry != null) {
            this.bindTo(this.meterRegistry);
        }
    }

    @Override
    public void close() throws Exception {
        if (this.meterRegistry != null) {
            this.meterRegistry.find(OFFSET_LAG_METRIC_NAME).meters().forEach(arg_0 -> ((MeterRegistry)this.meterRegistry).remove(arg_0));
        }
        Optional.ofNullable(this.scheduler).ifPresent(ExecutorService::shutdown);
    }
}

