/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.kafka.listener;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
import org.springframework.context.ApplicationEvent;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.core.task.AsyncListenableTaskExecutor;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.kafka.KafkaException;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.event.ListenerContainerIdleEvent;
import org.springframework.kafka.listener.AbstractMessageListenerContainer;
import org.springframework.kafka.listener.AcknowledgingMessageListener;
import org.springframework.kafka.listener.KafkaMessageListenerContainer;
import org.springframework.kafka.listener.MessageListener;
import org.springframework.kafka.listener.config.ContainerProperties;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.TopicPartitionInitialOffset;
import org.springframework.scheduling.SchedulingAwareRunnable;
import org.springframework.util.Assert;
import org.springframework.util.CollectionUtils;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;

public class KafkaMessageListenerContainer<K, V>
extends AbstractMessageListenerContainer<K, V> {
    private final ConsumerFactory<K, V> consumerFactory;
    private final TopicPartitionInitialOffset[] topicPartitions;
    private ListenerConsumer listenerConsumer;
    private ListenableFuture<?> listenerConsumerFuture;
    private MessageListener<K, V> listener;
    private AcknowledgingMessageListener<K, V> acknowledgingMessageListener;

    public KafkaMessageListenerContainer(ConsumerFactory<K, V> consumerFactory, ContainerProperties containerProperties) {
        this(consumerFactory, containerProperties, null);
    }

    public KafkaMessageListenerContainer(ConsumerFactory<K, V> consumerFactory, ContainerProperties containerProperties, TopicPartitionInitialOffset ... topicPartitions) {
        super(containerProperties);
        Assert.notNull(consumerFactory, (String)"A ConsumerFactory must be provided");
        this.consumerFactory = consumerFactory;
        this.topicPartitions = topicPartitions != null ? Arrays.copyOf(topicPartitions, topicPartitions.length) : containerProperties.getTopicPartitions();
    }

    public Collection<TopicPartition> getAssignedPartitions() {
        if (this.listenerConsumer.definedPartitions != null) {
            return Collections.unmodifiableCollection(this.listenerConsumer.definedPartitions.keySet());
        }
        if (this.listenerConsumer.assignedPartitions != null) {
            return Collections.unmodifiableCollection(this.listenerConsumer.assignedPartitions);
        }
        return null;
    }

    @Override
    protected void doStart() {
        if (this.isRunning()) {
            return;
        }
        this.setRunning(true);
        Object messageListener = this.getContainerProperties().getMessageListener();
        Assert.state((messageListener != null ? 1 : 0) != 0, (String)"A MessageListener is required");
        if (messageListener instanceof AcknowledgingMessageListener) {
            this.acknowledgingMessageListener = (AcknowledgingMessageListener)messageListener;
        } else if (messageListener instanceof MessageListener) {
            this.listener = (MessageListener)messageListener;
        } else {
            throw new IllegalStateException("messageListener must be 'MessageListener' or 'AcknowledgingMessageListener', not " + messageListener.getClass().getName());
        }
        if (this.getContainerProperties().getConsumerTaskExecutor() == null) {
            SimpleAsyncTaskExecutor consumerExecutor = new SimpleAsyncTaskExecutor((this.getBeanName() == null ? "" : this.getBeanName()) + "-kafka-consumer-");
            this.getContainerProperties().setConsumerTaskExecutor((AsyncListenableTaskExecutor)consumerExecutor);
        }
        if (this.getContainerProperties().getListenerTaskExecutor() == null) {
            SimpleAsyncTaskExecutor listenerExecutor = new SimpleAsyncTaskExecutor((this.getBeanName() == null ? "" : this.getBeanName()) + "-kafka-listener-");
            this.getContainerProperties().setListenerTaskExecutor((AsyncListenableTaskExecutor)listenerExecutor);
        }
        this.listenerConsumer = new ListenerConsumer(this.listener, this.acknowledgingMessageListener);
        this.listenerConsumerFuture = this.getContainerProperties().getConsumerTaskExecutor().submitListenable((Runnable)((Object)this.listenerConsumer));
    }

    @Override
    protected void doStop(final Runnable callback) {
        if (this.isRunning()) {
            this.listenerConsumerFuture.addCallback((ListenableFutureCallback)new ListenableFutureCallback<Object>(){

                public void onFailure(Throwable e) {
                    KafkaMessageListenerContainer.this.logger.error((Object)"Error while stopping the container: ", e);
                    if (callback != null) {
                        callback.run();
                    }
                }

                public void onSuccess(Object result) {
                    if (KafkaMessageListenerContainer.this.logger.isDebugEnabled()) {
                        KafkaMessageListenerContainer.this.logger.debug((Object)(KafkaMessageListenerContainer.this + " stopped normally"));
                    }
                    if (callback != null) {
                        callback.run();
                    }
                }
            });
            this.setRunning(false);
            this.listenerConsumer.consumer.wakeup();
        }
    }

    public String toString() {
        return "KafkaMessageListenerContainer [id=" + this.getBeanName() + ", topicPartitions=" + this.getAssignedPartitions() + "]";
    }

    private static final class LoggingCommitCallback
    implements OffsetCommitCallback {
        private static final Log logger = LogFactory.getLog(LoggingCommitCallback.class);

        private LoggingCommitCallback() {
        }

        public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
            if (exception != null) {
                logger.error((Object)("Commit failed for " + offsets), (Throwable)exception);
            } else if (logger.isDebugEnabled()) {
                logger.debug((Object)("Commits for " + offsets + " completed"));
            }
        }
    }

    private final class ListenerConsumer
    implements SchedulingAwareRunnable {
        private final Log logger = LogFactory.getLog(ListenerConsumer.class);
        private final ContainerProperties containerProperties = KafkaMessageListenerContainer.this.getContainerProperties();
        private final OffsetCommitCallback commitCallback = this.containerProperties.getCommitCallback() != null ? this.containerProperties.getCommitCallback() : new LoggingCommitCallback();
        private final Consumer<K, V> consumer;
        private final ConcurrentMap<String, ConcurrentMap<Integer, Long>> manualOffsets = new ConcurrentHashMap<String, ConcurrentMap<Integer, Long>>();
        private final Map<String, Map<Integer, Long>> offsets = new HashMap<String, Map<Integer, Long>>();
        private final MessageListener<K, V> listener;
        private final AcknowledgingMessageListener<K, V> acknowledgingMessageListener;
        private final boolean autoCommit = KafkaMessageListenerContainer.access$500(KafkaMessageListenerContainer.this).isAutoCommit();
        private final boolean isManualAck = this.containerProperties.getAckMode().equals((Object)AbstractMessageListenerContainer.AckMode.MANUAL);
        private final boolean isManualImmediateAck = this.containerProperties.getAckMode().equals((Object)AbstractMessageListenerContainer.AckMode.MANUAL_IMMEDIATE) || this.containerProperties.getAckMode().equals((Object)AbstractMessageListenerContainer.AckMode.MANUAL_IMMEDIATE_SYNC);
        private final boolean isAnyManualAck = this.isManualAck || this.isManualImmediateAck;
        private final boolean isRecordAck = this.containerProperties.getAckMode().equals((Object)AbstractMessageListenerContainer.AckMode.RECORD);
        private final BlockingQueue<ConsumerRecords<K, V>> recordsToProcess = new LinkedBlockingQueue(this.containerProperties.getQueueDepth());
        private final BlockingQueue<ConsumerRecord<K, V>> acks = new LinkedBlockingQueue();
        private final ApplicationEventPublisher applicationEventPublisher = KafkaMessageListenerContainer.this.getApplicationEventPublisher();
        private volatile Map<TopicPartition, Long> definedPartitions;
        private ConsumerRecords<K, V> unsent;
        private volatile Collection<TopicPartition> assignedPartitions;
        private int count;
        private volatile org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.ListenerInvoker invoker;
        private long last;
        private volatile Future<?> listenerInvokerFuture;
        private boolean paused;

        private ListenerConsumer(MessageListener<K, V> listener, AcknowledgingMessageListener<K, V> ackListener) {
            Assert.state((!this.isAnyManualAck || !this.autoCommit ? 1 : 0) != 0, (String)("Consumer cannot be configured for auto commit for ackMode " + (Object)((Object)this.containerProperties.getAckMode())));
            final Consumer consumer = KafkaMessageListenerContainer.this.consumerFactory.createConsumer();
            ConsumerRebalanceListener rebalanceListener = new ConsumerRebalanceListener(){

                public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
                    if (!ListenerConsumer.this.autoCommit) {
                        if (ListenerConsumer.this.logger.isTraceEnabled()) {
                            ListenerConsumer.this.logger.trace((Object)"Received partition revocation notification, and will stop the invoker.");
                        }
                        if (ListenerConsumer.this.listenerInvokerFuture != null) {
                            ListenerConsumer.this.stopInvokerAndCommitManualAcks();
                            ListenerConsumer.this.recordsToProcess.clear();
                            ListenerConsumer.this.unsent = null;
                        } else if (!CollectionUtils.isEmpty(partitions)) {
                            ListenerConsumer.this.logger.error((Object)"Invalid state: the invoker was not active, but the consumer had allocated partitions");
                        }
                    } else if (ListenerConsumer.this.logger.isTraceEnabled()) {
                        ListenerConsumer.this.logger.trace((Object)"Received partition revocation notification, but the container is in autocommit mode, so transition will be handled by the consumer");
                    }
                    KafkaMessageListenerContainer.this.getContainerProperties().getConsumerRebalanceListener().onPartitionsRevoked(partitions);
                }

                public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
                    ListenerConsumer.this.assignedPartitions = partitions;
                    if (!ListenerConsumer.this.autoCommit) {
                        HashMap<TopicPartition, OffsetAndMetadata> offsets = new HashMap<TopicPartition, OffsetAndMetadata>();
                        for (TopicPartition partition : partitions) {
                            offsets.put(partition, new OffsetAndMetadata(consumer.position(partition)));
                        }
                        if (KafkaMessageListenerContainer.this.getContainerProperties().isSyncCommits()) {
                            ListenerConsumer.this.consumer.commitSync(offsets);
                        } else {
                            ListenerConsumer.this.consumer.commitAsync(offsets, KafkaMessageListenerContainer.this.getContainerProperties().getCommitCallback());
                        }
                    }
                    if (!ListenerConsumer.this.autoCommit && KafkaMessageListenerContainer.this.isRunning() && !CollectionUtils.isEmpty(partitions)) {
                        ListenerConsumer.this.startInvoker();
                    }
                    KafkaMessageListenerContainer.this.getContainerProperties().getConsumerRebalanceListener().onPartitionsAssigned(partitions);
                }
            };
            if (KafkaMessageListenerContainer.this.topicPartitions == null) {
                if (this.containerProperties.getTopicPattern() != null) {
                    consumer.subscribe(this.containerProperties.getTopicPattern(), rebalanceListener);
                } else {
                    consumer.subscribe(Arrays.asList(this.containerProperties.getTopics()), rebalanceListener);
                }
            } else {
                List<TopicPartitionInitialOffset> topicPartitions = Arrays.asList(KafkaMessageListenerContainer.this.topicPartitions);
                this.definedPartitions = new HashMap<TopicPartition, Long>(topicPartitions.size());
                for (TopicPartitionInitialOffset topicPartition : topicPartitions) {
                    this.definedPartitions.put(topicPartition.topicPartition(), topicPartition.initialOffset());
                }
                consumer.assign(new ArrayList<TopicPartition>(this.definedPartitions.keySet()));
            }
            this.consumer = consumer;
            this.listener = listener;
            this.acknowledgingMessageListener = ackListener;
        }

        private void startInvoker() {
            this.invoker = new ListenerInvoker();
            this.listenerInvokerFuture = this.containerProperties.getListenerTaskExecutor().submit((Runnable)this.invoker);
        }

        public boolean isLongLived() {
            return true;
        }

        public void run() {
            long lastReceive;
            this.count = 0;
            this.last = System.currentTimeMillis();
            if (KafkaMessageListenerContainer.this.isRunning() && this.definedPartitions != null) {
                this.initPartitionsIfNeeded();
                if (!this.autoCommit) {
                    this.startInvoker();
                }
            }
            long lastAlertAt = lastReceive = System.currentTimeMillis();
            while (KafkaMessageListenerContainer.this.isRunning()) {
                try {
                    long now;
                    if (this.logger.isTraceEnabled()) {
                        this.logger.trace((Object)("Polling (paused=" + this.paused + ")..."));
                    }
                    ConsumerRecords records = this.consumer.poll(this.containerProperties.getPollTimeout());
                    if (this.logger.isDebugEnabled()) {
                        this.logger.debug((Object)("Received: " + records.count() + " records"));
                    }
                    if (records != null && records.count() > 0) {
                        if (this.containerProperties.getIdleEventInterval() != null) {
                            lastReceive = System.currentTimeMillis();
                        }
                        this.handleManualAcks();
                        if (this.autoCommit) {
                            this.invokeListener(records);
                        } else if (this.sendToListener(records) && this.assignedPartitions != null) {
                            this.consumer.pause(this.assignedPartitions.toArray(new TopicPartition[this.assignedPartitions.size()]));
                            this.paused = true;
                            this.unsent = records;
                        }
                    } else if (this.containerProperties.getIdleEventInterval() != null && (now = System.currentTimeMillis()) > lastReceive + this.containerProperties.getIdleEventInterval() && now > lastAlertAt + this.containerProperties.getIdleEventInterval()) {
                        this.publishIdleContainerEvent(now - lastReceive);
                        lastAlertAt = now;
                    }
                    this.unsent = this.checkPause(this.unsent);
                    if (this.autoCommit) continue;
                    this.processCommits();
                }
                catch (WakeupException e) {
                    this.unsent = this.checkPause(this.unsent);
                }
                catch (Exception e) {
                    if (this.containerProperties.getErrorHandler() != null) {
                        this.containerProperties.getErrorHandler().handle(e, null);
                        continue;
                    }
                    this.logger.error((Object)"Container exception", (Throwable)e);
                }
            }
            if (this.listenerInvokerFuture != null) {
                this.stopInvokerAndCommitManualAcks();
            }
            try {
                this.consumer.unsubscribe();
            }
            catch (WakeupException wakeupException) {
                // empty catch block
            }
            this.consumer.close();
            if (this.logger.isInfoEnabled()) {
                this.logger.info((Object)"Consumer stopped");
            }
        }

        private void publishIdleContainerEvent(long idleTime) {
            if (this.applicationEventPublisher != null) {
                this.applicationEventPublisher.publishEvent((ApplicationEvent)new ListenerContainerIdleEvent(KafkaMessageListenerContainer.this, idleTime, KafkaMessageListenerContainer.this.getBeanName(), KafkaMessageListenerContainer.this.getAssignedPartitions()));
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void stopInvokerAndCommitManualAcks() {
            long now = System.currentTimeMillis();
            ((ListenerInvoker)this.invoker).stop();
            long remaining = this.containerProperties.getShutdownTimeout() + now - System.currentTimeMillis();
            try {
                this.listenerInvokerFuture.get(remaining, TimeUnit.MILLISECONDS);
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            catch (ExecutionException e) {
                this.logger.error((Object)"Error while shutting down the listener invoker:", (Throwable)e);
            }
            catch (TimeoutException e) {
                this.logger.info((Object)"Invoker timed out while waiting for shutdown and will be canceled.");
                this.listenerInvokerFuture.cancel(true);
            }
            finally {
                this.listenerInvokerFuture = null;
            }
            this.handleManualAcks();
            this.processCommits();
            if (this.offsets.size() > 0) {
                this.commitIfNecessary();
            }
            this.invoker = null;
        }

        private ConsumerRecords<K, V> checkPause(ConsumerRecords<K, V> unsent) {
            if (this.paused && this.recordsToProcess.size() < this.containerProperties.getQueueDepth()) {
                this.consumer.resume(this.assignedPartitions.toArray(new TopicPartition[this.assignedPartitions.size()]));
                this.paused = false;
                if (unsent != null) {
                    try {
                        this.sendToListener(unsent);
                    }
                    catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                        throw new KafkaException("Interrupted while sending to listener", e);
                    }
                }
                return null;
            }
            return unsent;
        }

        private boolean sendToListener(ConsumerRecords<K, V> records) throws InterruptedException {
            if (this.containerProperties.isPauseEnabled() && CollectionUtils.isEmpty(this.definedPartitions)) {
                return !this.recordsToProcess.offer(records, this.containerProperties.getPauseAfter(), TimeUnit.MILLISECONDS);
            }
            this.recordsToProcess.put(records);
            return false;
        }

        private void handleManualAcks() {
            if (this.isAnyManualAck) {
                ConsumerRecord record = (ConsumerRecord)this.acks.poll();
                while (record != null) {
                    this.manualAck(record);
                    record = (ConsumerRecord)this.acks.poll();
                }
            }
        }

        private void manualAck(ConsumerRecord<K, V> record) {
            if (this.isManualImmediateAck) {
                try {
                    this.ackImmediate(record);
                }
                catch (WakeupException wakeupException) {}
            } else {
                this.updateManualOffset(record);
            }
        }

        private void ackImmediate(ConsumerRecord<K, V> record) {
            Map<TopicPartition, OffsetAndMetadata> commits = Collections.singletonMap(new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset() + 1L));
            if (this.logger.isDebugEnabled()) {
                this.logger.debug((Object)("Committing: " + commits));
            }
            if (this.containerProperties.getAckMode().equals((Object)AbstractMessageListenerContainer.AckMode.MANUAL_IMMEDIATE)) {
                this.consumer.commitAsync(commits, this.commitCallback);
            } else {
                this.consumer.commitSync(commits);
            }
        }

        private void invokeListener(ConsumerRecords<K, V> records) {
            Iterator iterator = records.iterator();
            while (iterator.hasNext() && (this.autoCommit || this.invoker != null && ((ListenerInvoker)this.invoker).active)) {
                ConsumerRecord record = (ConsumerRecord)iterator.next();
                if (this.logger.isTraceEnabled()) {
                    this.logger.trace((Object)("Processing " + record));
                }
                try {
                    if (this.acknowledgingMessageListener != null) {
                        this.acknowledgingMessageListener.onMessage(record, new ConsumerAcknowledgment(record));
                    } else {
                        this.listener.onMessage(record);
                    }
                    this.acks.add(record);
                    if (!this.isManualImmediateAck && !this.isRecordAck) continue;
                    this.consumer.wakeup();
                }
                catch (Exception e) {
                    if (this.containerProperties.isAckOnError()) {
                        this.acks.add(record);
                    }
                    if (this.containerProperties.getErrorHandler() != null) {
                        this.containerProperties.getErrorHandler().handle(e, record);
                        continue;
                    }
                    this.logger.error((Object)("Listener threw an exception and no error handler for " + record), (Throwable)e);
                }
            }
        }

        private void processCommits() {
            this.count += this.acks.size();
            AbstractMessageListenerContainer.AckMode ackMode = this.containerProperties.getAckMode();
            if (!this.isManualImmediateAck) {
                boolean countExceeded;
                if (!this.isManualAck) {
                    this.updatePendingOffsets();
                }
                boolean bl = countExceeded = this.count >= this.containerProperties.getAckCount();
                if (ackMode.equals((Object)AbstractMessageListenerContainer.AckMode.BATCH) || ackMode.equals((Object)AbstractMessageListenerContainer.AckMode.COUNT) && countExceeded) {
                    this.commitIfNecessary();
                    this.count = 0;
                } else {
                    boolean elapsed;
                    long now = System.currentTimeMillis();
                    boolean bl2 = elapsed = now - this.last > this.containerProperties.getAckTime();
                    if (ackMode.equals((Object)AbstractMessageListenerContainer.AckMode.TIME) && elapsed) {
                        this.commitIfNecessary();
                        this.last = now;
                    } else if ((ackMode.equals((Object)AbstractMessageListenerContainer.AckMode.COUNT_TIME) || this.isManualAck) && (elapsed || countExceeded)) {
                        this.commitIfNecessary();
                        this.last = now;
                        this.count = 0;
                    }
                }
            }
        }

        private void initPartitionsIfNeeded() {
            for (Map.Entry<TopicPartition, Long> entry : this.definedPartitions.entrySet()) {
                TopicPartition topicPartition = entry.getKey();
                Long offset = entry.getValue();
                if (offset == null) continue;
                long newOffset = offset;
                if (offset < 0L) {
                    this.consumer.seekToEnd(new TopicPartition[]{topicPartition});
                    newOffset = this.consumer.position(topicPartition) + offset;
                }
                this.consumer.seek(topicPartition, newOffset);
                if (!this.logger.isDebugEnabled()) continue;
                this.logger.debug((Object)("Reset " + topicPartition + " to offset " + newOffset));
            }
        }

        private void updatePendingOffsets() {
            ConsumerRecord record = (ConsumerRecord)this.acks.poll();
            while (record != null) {
                this.addOffset(record);
                record = (ConsumerRecord)this.acks.poll();
            }
        }

        private void addOffset(ConsumerRecord<K, V> record) {
            if (!this.offsets.containsKey(record.topic())) {
                this.offsets.put(record.topic(), new HashMap());
            }
            this.offsets.get(record.topic()).put(record.partition(), record.offset());
        }

        private void updateManualOffset(ConsumerRecord<K, V> record) {
            if (!this.manualOffsets.containsKey(record.topic())) {
                this.manualOffsets.putIfAbsent(record.topic(), new ConcurrentHashMap());
            }
            ((ConcurrentMap)this.manualOffsets.get(record.topic())).put(record.partition(), record.offset());
        }

        private void commitIfNecessary() {
            HashMap<TopicPartition, OffsetAndMetadata> commits = new HashMap<TopicPartition, OffsetAndMetadata>();
            if (this.isManualAck) {
                for (Map.Entry entry : this.manualOffsets.entrySet()) {
                    Iterator iterator = ((ConcurrentMap)entry.getValue()).entrySet().iterator();
                    while (iterator.hasNext()) {
                        Map.Entry offset = iterator.next();
                        commits.put(new TopicPartition((String)entry.getKey(), ((Integer)offset.getKey()).intValue()), new OffsetAndMetadata((Long)offset.getValue() + 1L));
                        iterator.remove();
                    }
                }
            } else {
                for (Map.Entry<String, Map<Integer, Long>> entry : this.offsets.entrySet()) {
                    for (Map.Entry<Integer, Long> offset : entry.getValue().entrySet()) {
                        commits.put(new TopicPartition(entry.getKey(), offset.getKey().intValue()), new OffsetAndMetadata(offset.getValue() + 1L));
                    }
                }
            }
            this.offsets.clear();
            if (this.logger.isDebugEnabled()) {
                this.logger.debug((Object)("Committing: " + commits));
            }
            if (!commits.isEmpty()) {
                try {
                    if (this.containerProperties.isSyncCommits()) {
                        this.consumer.commitSync(commits);
                    } else {
                        this.consumer.commitAsync(commits, this.commitCallback);
                    }
                }
                catch (WakeupException wakeupException) {
                    // empty catch block
                }
            }
        }

        private final class ConsumerAcknowledgment
        implements Acknowledgment {
            private final ConsumerRecord<K, V> record;

            private ConsumerAcknowledgment(ConsumerRecord<K, V> record) {
                this.record = record;
            }

            @Override
            public void acknowledge() {
                try {
                    ListenerConsumer.this.acks.put(this.record);
                }
                catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    throw new KafkaException("Interrupted while queuing ack for " + this.record, e);
                }
                ListenerConsumer.this.consumer.wakeup();
            }

            public String toString() {
                return "Acknowledgment for " + this.record;
            }
        }

        private final class ListenerInvoker
        implements SchedulingAwareRunnable {
            private final CountDownLatch exitLatch = new CountDownLatch(1);
            private volatile boolean active = true;
            private volatile Thread executingThread;

            private ListenerInvoker() {
            }

            public void run() {
                Assert.isTrue((boolean)this.active, (String)"This instance is not active anymore");
                try {
                    this.executingThread = Thread.currentThread();
                    while (this.active) {
                        try {
                            ConsumerRecords records = (ConsumerRecords)ListenerConsumer.this.recordsToProcess.poll(1L, TimeUnit.SECONDS);
                            if (this.active) {
                                if (records != null) {
                                    ListenerConsumer.this.invokeListener(records);
                                } else if (ListenerConsumer.this.logger.isTraceEnabled()) {
                                    ListenerConsumer.this.logger.trace((Object)"No records to process");
                                }
                            }
                        }
                        catch (InterruptedException e) {
                            if (!this.active) {
                                Thread.currentThread().interrupt();
                            }
                            ListenerConsumer.this.logger.debug((Object)"Interrupt ignored");
                        }
                        if (ListenerConsumer.this.isManualImmediateAck || !this.active) continue;
                        ListenerConsumer.this.consumer.wakeup();
                    }
                }
                finally {
                    this.active = false;
                    this.exitLatch.countDown();
                }
            }

            public boolean isLongLived() {
                return true;
            }

            private void stop() {
                if (ListenerConsumer.this.logger.isDebugEnabled()) {
                    ListenerConsumer.this.logger.debug((Object)"Stopping invoker");
                }
                this.active = false;
                try {
                    if (!this.exitLatch.await(KafkaMessageListenerContainer.this.getContainerProperties().getShutdownTimeout(), TimeUnit.MILLISECONDS) && this.executingThread != null) {
                        if (ListenerConsumer.this.logger.isDebugEnabled()) {
                            ListenerConsumer.this.logger.debug((Object)"Interrupting invoker");
                        }
                        this.executingThread.interrupt();
                    }
                }
                catch (InterruptedException e) {
                    if (this.executingThread != null) {
                        this.executingThread.interrupt();
                    }
                    Thread.currentThread().interrupt();
                }
            }
        }
    }
}

