/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.integration.kafka.listener;

import java.util.List;
import java.util.concurrent.AbstractExecutorService;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.springframework.core.task.TaskExecutor;
import org.springframework.core.task.support.ExecutorServiceAdapter;
import org.springframework.integration.kafka.core.KafkaMessage;
import org.springframework.integration.kafka.listener.AcknowledgingMessageListener;
import org.springframework.integration.kafka.listener.DefaultAcknowledgment;
import org.springframework.integration.kafka.listener.ErrorHandler;
import org.springframework.integration.kafka.listener.MessageListener;
import org.springframework.integration.kafka.listener.OffsetManager;
import org.springframework.scheduling.concurrent.ConcurrentTaskExecutor;
import org.springframework.util.ObjectUtils;
import reactor.core.processor.RingBufferProcessor;

class QueueingMessageListenerInvoker {
    private static Log log = LogFactory.getLog(QueueingMessageListenerInvoker.class);
    private final MessageListener messageListener;
    private final AcknowledgingMessageListener acknowledgingMessageListener;
    private final OffsetManager offsetManager;
    private final ErrorHandler errorHandler;
    private final int capacity;
    private final boolean autoCommitOnError;
    private final ExecutorService executorService;
    private volatile RingBufferProcessor<KafkaMessage> ringBufferProcessor;
    private volatile CancelableSingleTaskExecutorService cancelableExecutorService;
    private volatile boolean running = false;
    private volatile CountDownLatch shutdownLatch;

    public QueueingMessageListenerInvoker(int capacity, OffsetManager offsetManager, Object delegate, ErrorHandler errorHandler, Executor executor, boolean autoCommitOnError) {
        this.capacity = capacity;
        this.autoCommitOnError = autoCommitOnError;
        if (delegate instanceof MessageListener) {
            this.messageListener = (MessageListener)delegate;
            this.acknowledgingMessageListener = null;
        } else if (delegate instanceof AcknowledgingMessageListener) {
            this.acknowledgingMessageListener = (AcknowledgingMessageListener)delegate;
            this.messageListener = null;
        } else {
            throw new IllegalArgumentException("Either a " + MessageListener.class.getName() + " or a " + AcknowledgingMessageListener.class.getName() + " must be provided");
        }
        this.offsetManager = offsetManager;
        this.errorHandler = errorHandler;
        this.executorService = executor != null ? new ExecutorServiceAdapter((TaskExecutor)new ConcurrentTaskExecutor(executor)) : Executors.newSingleThreadExecutor();
    }

    public void enqueue(KafkaMessage message) {
        if (this.running) {
            this.ringBufferProcessor.onNext((Object)message);
        }
    }

    public synchronized void start() {
        if (!this.running) {
            this.running = true;
            this.cancelableExecutorService = new CancelableSingleTaskExecutorService(this.executorService);
            this.ringBufferProcessor = RingBufferProcessor.share((ExecutorService)this.cancelableExecutorService, (int)this.capacity);
            this.ringBufferProcessor.subscribe((Subscriber)new KafkaMessageDispatchingSubscriber());
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public synchronized void stop(long stopTimeout) {
        if (this.running) {
            this.running = false;
            if (this.ringBufferProcessor != null) {
                this.shutdownLatch = new CountDownLatch(1);
                this.cancelableExecutorService.cancelTask();
                this.ringBufferProcessor.onComplete();
                this.ringBufferProcessor = null;
                try {
                    this.shutdownLatch.await(stopTimeout, TimeUnit.MILLISECONDS);
                }
                catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
                finally {
                    this.shutdownLatch = null;
                }
            }
        }
    }

    private class KafkaMessageDispatchingSubscriber
    implements Subscriber<KafkaMessage> {
        private KafkaMessageDispatchingSubscriber() {
        }

        public void onSubscribe(Subscription s) {
            s.request(Long.MAX_VALUE);
        }

        public void onNext(KafkaMessage kafkaMessage) {
            block7: {
                if (QueueingMessageListenerInvoker.this.running) {
                    try {
                        if (QueueingMessageListenerInvoker.this.messageListener != null) {
                            QueueingMessageListenerInvoker.this.messageListener.onMessage(kafkaMessage);
                            QueueingMessageListenerInvoker.this.offsetManager.updateOffset(kafkaMessage.getMetadata().getPartition(), kafkaMessage.getMetadata().getNextOffset());
                            break block7;
                        }
                        QueueingMessageListenerInvoker.this.acknowledgingMessageListener.onMessage(kafkaMessage, new DefaultAcknowledgment(QueueingMessageListenerInvoker.this.offsetManager, kafkaMessage));
                    }
                    catch (Exception e) {
                        if (QueueingMessageListenerInvoker.this.errorHandler != null) {
                            QueueingMessageListenerInvoker.this.errorHandler.handle(e, kafkaMessage);
                            if (QueueingMessageListenerInvoker.this.autoCommitOnError) {
                                QueueingMessageListenerInvoker.this.offsetManager.updateOffset(kafkaMessage.getMetadata().getPartition(), kafkaMessage.getMetadata().getNextOffset());
                            }
                        }
                        break block7;
                    }
                }
                if (log.isDebugEnabled()) {
                    log.debug((Object)("Message discarded on shutdown (no offsets have been committed): " + ObjectUtils.nullSafeToString((Object)kafkaMessage)));
                }
            }
        }

        public void onError(Throwable t) {
        }

        public void onComplete() {
            if (QueueingMessageListenerInvoker.this.shutdownLatch != null) {
                QueueingMessageListenerInvoker.this.shutdownLatch.countDown();
            }
        }
    }

    private static class CancelableSingleTaskExecutorService
    extends AbstractExecutorService {
        private Future<?> submittedTask;
        private final ExecutorService executor;

        public CancelableSingleTaskExecutorService(ExecutorService executor) {
            this.executor = executor;
        }

        @Override
        public void execute(Runnable task) {
            if (this.submittedTask != null) {
                throw new IllegalArgumentException("Cannot submit more than one task");
            }
            this.submittedTask = this.executor.submit(task);
        }

        @Override
        public void shutdown() {
            throw new IllegalStateException("Manual shutdown not supported");
        }

        @Override
        public List<Runnable> shutdownNow() {
            throw new IllegalStateException("Manual shutdown not supported");
        }

        @Override
        public boolean isShutdown() {
            return false;
        }

        @Override
        public boolean isTerminated() {
            return false;
        }

        @Override
        public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
            throw new IllegalStateException("Manual termination not supported");
        }

        private void cancelTask() {
            if (this.submittedTask != null) {
                this.submittedTask.cancel(true);
                this.submittedTask = null;
            }
        }
    }
}

