/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.kafka.listener;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.kafka.clients.consumer.AcknowledgeType;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ShareConsumer;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.jspecify.annotations.Nullable;
import org.springframework.context.ApplicationEvent;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.core.log.LogAccessor;
import org.springframework.core.log.LogMessage;
import org.springframework.core.task.AsyncTaskExecutor;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.kafka.core.ShareConsumerFactory;
import org.springframework.kafka.event.ConsumerStartedEvent;
import org.springframework.kafka.event.ConsumerStartingEvent;
import org.springframework.kafka.listener.AbstractShareKafkaMessageListenerContainer;
import org.springframework.kafka.listener.AcknowledgingShareConsumerAwareMessageListener;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.listener.GenericMessageListener;
import org.springframework.kafka.listener.ShareKafkaMessageListenerContainer;
import org.springframework.kafka.support.ShareAcknowledgment;
import org.springframework.util.Assert;

public class ShareKafkaMessageListenerContainer<K, V>
extends AbstractShareKafkaMessageListenerContainer<K, V> {
    private static final int POLL_TIMEOUT = 1000;
    private @Nullable String clientId;
    private int concurrency = 1;
    private final List<ShareListenerConsumer> consumers = new ArrayList<ShareListenerConsumer>();
    private final List<CompletableFuture<Void>> consumerFutures = new ArrayList<CompletableFuture<Void>>();
    private volatile CountDownLatch startLatch = new CountDownLatch(1);

    public ShareKafkaMessageListenerContainer(ShareConsumerFactory<? super K, ? super V> shareConsumerFactory, ContainerProperties containerProperties) {
        super(shareConsumerFactory, containerProperties);
        Assert.notNull(shareConsumerFactory, (String)"A ShareConsumerFactory must be provided");
    }

    public @Nullable String getClientId() {
        return this.clientId;
    }

    public void setClientId(String clientId) {
        this.clientId = clientId;
    }

    public void setConcurrency(int concurrency) {
        Assert.isTrue((concurrency > 0 ? 1 : 0) != 0, (String)"concurrency must be greater than 0");
        Assert.state((!this.isRunning() ? 1 : 0) != 0, (String)"Cannot change concurrency while container is running");
        this.concurrency = concurrency;
    }

    @Override
    public boolean isInExpectedState() {
        return this.isRunning();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public Map<String, Map<MetricName, ? extends Metric>> metrics() {
        this.lifecycleLock.lock();
        try {
            if (this.consumers.isEmpty()) {
                Map<String, Map<MetricName, ? extends Metric>> map = Collections.emptyMap();
                return map;
            }
            HashMap<String, Map> allMetrics = new HashMap<String, Map>();
            for (ShareListenerConsumer consumer : this.consumers) {
                Map consumerMetrics = consumer.consumer.metrics();
                String consumerId = consumer.getClientId();
                if (consumerId == null) continue;
                allMetrics.put(consumerId, consumerMetrics);
            }
            Map map = Collections.unmodifiableMap(allMetrics);
            return map;
        }
        finally {
            this.lifecycleLock.unlock();
        }
    }

    @Override
    protected void doStart() {
        GenericMessageListener listener;
        if (this.isRunning()) {
            return;
        }
        ContainerProperties containerProperties = this.getContainerProperties();
        Object messageListener = containerProperties.getMessageListener();
        AsyncTaskExecutor consumerExecutor = containerProperties.getListenerTaskExecutor();
        if (consumerExecutor == null) {
            consumerExecutor = new SimpleAsyncTaskExecutor((this.getBeanName() == null ? "" : this.getBeanName()) + "-C-");
            containerProperties.setListenerTaskExecutor(consumerExecutor);
        }
        Assert.state(((listener = (GenericMessageListener)messageListener) != null ? 1 : 0) != 0, (String)"'messageListener' cannot be null");
        if (containerProperties.isExplicitShareAcknowledgment()) {
            boolean isAcknowledgingListener = listener instanceof AcknowledgingShareConsumerAwareMessageListener;
            Assert.state((boolean)isAcknowledgingListener, (String)("Explicit acknowledgment mode requires an AcknowledgingShareConsumerAwareMessageListener. Current listener type: " + listener.getClass().getName() + ". Either use implicit acknowledgment mode or provide a listener that can handle acknowledgments."));
        }
        this.setRunning(true);
        for (int i = 0; i < this.concurrency; ++i) {
            String consumerClientId = this.determineClientId(i);
            ShareListenerConsumer consumer = new ShareListenerConsumer(listener, consumerClientId);
            this.consumers.add(consumer);
            CompletableFuture<Void> future = CompletableFuture.runAsync(consumer, (Executor)consumerExecutor);
            this.consumerFutures.add(future);
        }
    }

    private String determineClientId(int index) {
        String baseClientId;
        String string = baseClientId = this.clientId != null ? this.clientId : this.getBeanName();
        if (this.concurrency > 1) {
            return baseClientId + "-" + index;
        }
        return baseClientId;
    }

    @Override
    protected void doStop() {
        this.setRunning(false);
        this.lifecycleLock.lock();
        try {
            CompletableFuture.allOf(this.consumerFutures.toArray(new CompletableFuture[0])).join();
            this.consumers.clear();
            this.consumerFutures.clear();
        }
        catch (Exception e) {
            this.logger.error((Throwable)e, (CharSequence)"Error waiting for consumer threads to stop");
        }
        finally {
            this.lifecycleLock.unlock();
        }
    }

    private void publishConsumerStartingEvent() {
        this.startLatch.countDown();
        ApplicationEventPublisher publisher = this.getApplicationEventPublisher();
        if (publisher != null) {
            publisher.publishEvent((ApplicationEvent)new ConsumerStartingEvent(this, this));
        }
    }

    private void publishConsumerStartedEvent() {
        ApplicationEventPublisher publisher = this.getApplicationEventPublisher();
        if (publisher != null) {
            publisher.publishEvent((ApplicationEvent)new ConsumerStartedEvent(this, this));
        }
    }

    private class ShareListenerConsumer
    implements Runnable {
        private final LogAccessor logger;
        private final ShareConsumer<K, V> consumer;
        private final GenericMessageListener<?> genericListener;
        private final @Nullable String consumerGroupId;
        private final @Nullable String clientId;
        private final Map<ConsumerRecord<K, V>, org.springframework.kafka.listener.ShareKafkaMessageListenerContainer$ShareListenerConsumer.ShareConsumerAcknowledgment> pendingAcknowledgments;
        private final Map<ConsumerRecord<K, V>, Long> acknowledgmentTimestamps;
        private final Object acknowledgmentLock;
        private final ConcurrentLinkedQueue<PendingAcknowledgment<K, V>> acknowledgmentQueue;
        private final boolean isExplicitMode;
        private final long ackTimeoutMs;

        ShareListenerConsumer(GenericMessageListener<?> listener, String consumerClientId) {
            this.logger = ShareKafkaMessageListenerContainer.this.logger;
            this.consumerGroupId = ShareKafkaMessageListenerContainer.this.getGroupId();
            this.pendingAcknowledgments = new ConcurrentHashMap();
            this.acknowledgmentTimestamps = new ConcurrentHashMap();
            this.acknowledgmentLock = new Object();
            this.acknowledgmentQueue = new ConcurrentLinkedQueue();
            this.consumer = ShareKafkaMessageListenerContainer.this.shareConsumerFactory.createShareConsumer(ShareKafkaMessageListenerContainer.this.getGroupId(), consumerClientId);
            this.genericListener = listener;
            this.clientId = consumerClientId;
            ContainerProperties containerProperties = ShareKafkaMessageListenerContainer.this.getContainerProperties();
            this.isExplicitMode = containerProperties.isExplicitShareAcknowledgment();
            this.ackTimeoutMs = containerProperties.getShareAcknowledgmentTimeout().toMillis();
            if (this.isExplicitMode) {
                this.logger.info((CharSequence)"Share consumer configured for explicit acknowledgment mode");
            }
            this.consumer.subscribe(Arrays.asList(containerProperties.getTopics()));
        }

        @Nullable String getClientId() {
            return this.clientId;
        }

        @Override
        public void run() {
            this.initialize();
            Exception exitThrowable = null;
            while (ShareKafkaMessageListenerContainer.this.isRunning()) {
                try {
                    ConsumerRecords records;
                    this.processQueuedAcknowledgments();
                    if (this.isExplicitMode && !this.pendingAcknowledgments.isEmpty()) {
                        this.checkAcknowledgmentTimeouts();
                    }
                    try {
                        records = this.consumer.poll(Duration.ofMillis(1000L));
                    }
                    catch (IllegalStateException e) {
                        if (this.isExplicitMode && !this.pendingAcknowledgments.isEmpty()) {
                            this.logger.trace(() -> "Poll blocked waiting for " + this.pendingAcknowledgments.size() + " acknowledgments");
                            try {
                                Thread.sleep(10L);
                                continue;
                            }
                            catch (InterruptedException ie) {
                                Thread.currentThread().interrupt();
                                break;
                            }
                        }
                        throw e;
                    }
                    if (records == null || records.count() <= 0) continue;
                    this.processRecords(records);
                }
                catch (Error e) {
                    this.logger.error((Throwable)e, (CharSequence)"Stopping share consumer due to an Error");
                    this.wrapUp();
                    throw e;
                }
                catch (Exception e) {
                    if (e.getCause() instanceof InterruptedException) {
                        Thread.currentThread().interrupt();
                    }
                    this.logger.error((Throwable)e, (CharSequence)"Error in share consumer poll loop");
                    exitThrowable = e;
                    break;
                }
            }
            if (exitThrowable != null) {
                this.logger.error(exitThrowable, (CharSequence)"ShareListenerConsumer exiting due to error");
            }
            this.wrapUp();
        }

        private void processRecords(ConsumerRecords<K, V> records) {
            for (ConsumerRecord record : records) {
                ShareConsumerAcknowledgment acknowledgment = null;
                try {
                    GenericMessageListener<?> genericMessageListener;
                    if (this.isExplicitMode) {
                        acknowledgment = new ShareConsumerAcknowledgment(record);
                        this.pendingAcknowledgments.put(record, (org.springframework.kafka.listener.ShareKafkaMessageListenerContainer$ShareListenerConsumer.ShareConsumerAcknowledgment)acknowledgment);
                        this.acknowledgmentTimestamps.put(record, System.currentTimeMillis());
                    }
                    if ((genericMessageListener = this.genericListener) instanceof AcknowledgingShareConsumerAwareMessageListener) {
                        AcknowledgingShareConsumerAwareMessageListener ackListener;
                        AcknowledgingShareConsumerAwareMessageListener typedAckListener = ackListener = (AcknowledgingShareConsumerAwareMessageListener)genericMessageListener;
                        typedAckListener.onShareRecord(record, acknowledgment, this.consumer);
                        continue;
                    }
                    GenericMessageListener<?> listener = this.genericListener;
                    listener.onMessage(record);
                }
                catch (Exception e) {
                    this.handleProcessingError(record, acknowledgment, e);
                }
            }
            this.commitAcknowledgments();
        }

        /*
         * Ignored method signature, as it can't be verified against descriptor
         */
        private void handleProcessingError(ConsumerRecord record, @Nullable ShareConsumerAcknowledgment acknowledgment, Exception e) {
            this.logger.error((Throwable)e, (CharSequence)("Error processing record: " + String.valueOf(record)));
            if (this.isExplicitMode && acknowledgment != null) {
                this.pendingAcknowledgments.remove(record);
                try {
                    acknowledgment.reject();
                }
                catch (Exception ackEx) {
                    this.logger.error((Throwable)ackEx, (CharSequence)"Failed to reject record after processing error");
                }
            } else {
                try {
                    this.consumer.acknowledge(record, AcknowledgeType.REJECT);
                }
                catch (Exception ackEx) {
                    this.logger.error((Throwable)ackEx, (CharSequence)"Failed to reject record after processing error");
                }
            }
        }

        private void commitAcknowledgments() {
            try {
                this.consumer.commitSync();
            }
            catch (Exception e) {
                this.logger.error((Throwable)e, (CharSequence)"Failed to commit acknowledgments");
            }
        }

        void onRecordAcknowledged(ConsumerRecord<K, V> record) {
            if (this.isExplicitMode) {
                this.pendingAcknowledgments.remove(record);
                this.acknowledgmentTimestamps.remove(record);
                this.logger.debug(() -> "Record acknowledged, " + this.pendingAcknowledgments.size() + " still pending");
            }
        }

        private void processQueuedAcknowledgments() {
            PendingAcknowledgment pendingAck;
            while ((pendingAck = this.acknowledgmentQueue.poll()) != null) {
                PendingAcknowledgment ack = pendingAck;
                try {
                    this.consumer.acknowledge(ack.record, ack.type);
                    ShareConsumerAcknowledgment acknowledgment = (ShareConsumerAcknowledgment)this.pendingAcknowledgments.get(ack.record);
                    if (acknowledgment == null) continue;
                    acknowledgment.notifyAcknowledged(ack.type);
                    this.onRecordAcknowledged(ack.record);
                }
                catch (Exception e) {
                    this.logger.error((Throwable)e, () -> "Failed to process queued acknowledgment for record: " + String.valueOf(ack.record));
                }
            }
        }

        private void checkAcknowledgmentTimeouts() {
            if (!this.isExplicitMode || this.acknowledgmentTimestamps.isEmpty()) {
                return;
            }
            long currentTime = System.currentTimeMillis();
            for (Map.Entry entry : this.acknowledgmentTimestamps.entrySet()) {
                long recordAge = currentTime - entry.getValue();
                if (recordAge <= this.ackTimeoutMs) continue;
                ConsumerRecord record = entry.getKey();
                this.logger.warn((CharSequence)LogMessage.format((String)"Record not acknowledged within timeout (%d seconds). In explicit acknowledgment mode, you must call ack.acknowledge(), ack.release(), or ack.reject() for every record. Unacknowledged record: topic='%s', partition=%d, offset=%d", (Object)(this.ackTimeoutMs / 1000L), (Object)record.topic(), (Object)record.partition(), (Object)record.offset()));
                this.acknowledgmentTimestamps.put(record, currentTime);
            }
        }

        protected void initialize() {
            ShareKafkaMessageListenerContainer.this.publishConsumerStartingEvent();
            ShareKafkaMessageListenerContainer.this.publishConsumerStartedEvent();
        }

        private void wrapUp() {
            this.consumer.close();
            this.logger.info(() -> this.consumerGroupId + ": Consumer stopped");
        }

        public String toString() {
            return "ShareKafkaMessageListenerContainer.ShareListenerConsumer [consumerGroupId=" + this.consumerGroupId + ", clientId=" + this.clientId + "]";
        }

        private class ShareConsumerAcknowledgment
        implements ShareAcknowledgment {
            private final ConsumerRecord<K, V> record;
            private final AtomicReference<AcknowledgeType> acknowledgmentType = new AtomicReference();

            ShareConsumerAcknowledgment(ConsumerRecord<K, V> record) {
                this.record = record;
            }

            @Override
            public void acknowledge() {
                this.acknowledgeInternal(AcknowledgeType.ACCEPT);
            }

            @Override
            public void release() {
                this.acknowledgeInternal(AcknowledgeType.RELEASE);
            }

            @Override
            public void reject() {
                this.acknowledgeInternal(AcknowledgeType.REJECT);
            }

            private void acknowledgeInternal(AcknowledgeType type) {
                if (!this.acknowledgmentType.compareAndSet(null, type)) {
                    throw new IllegalStateException(String.format("Record at offset %d has already been acknowledged with type %s", this.record.offset(), this.acknowledgmentType.get()));
                }
                ShareListenerConsumer.this.acknowledgmentQueue.offer(new PendingAcknowledgment(this.record, type));
            }

            void notifyAcknowledged(AcknowledgeType type) {
                this.acknowledgmentType.set(type);
            }

            boolean isAcknowledged() {
                return this.acknowledgmentType.get() != null;
            }

            @Nullable AcknowledgeType getAcknowledgmentType() {
                return this.acknowledgmentType.get();
            }

            ConsumerRecord<K, V> getRecord() {
                return this.record;
            }

            public String toString() {
                return "ShareConsumerAcknowledgment{topic=" + this.record.topic() + ", partition=" + this.record.partition() + ", offset=" + this.record.offset() + ", acknowledged=" + this.isAcknowledged() + ", type=" + String.valueOf(this.getAcknowledgmentType()) + "}";
            }
        }
    }

    private static class PendingAcknowledgment<K, V> {
        private final ConsumerRecord<K, V> record;
        private final AcknowledgeType type;

        PendingAcknowledgment(ConsumerRecord<K, V> record, AcknowledgeType type) {
            this.record = record;
            this.type = type;
        }
    }
}

