/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.integration.kafka.listener;

import com.gs.collections.api.block.procedure.Procedure;
import com.gs.collections.api.block.procedure.Procedure2;
import com.gs.collections.api.map.MutableMap;
import com.gs.collections.impl.factory.Maps;
import java.util.ArrayList;
import java.util.Collection;
import java.util.concurrent.Executor;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.integration.kafka.core.KafkaMessage;
import org.springframework.integration.kafka.core.Partition;
import org.springframework.integration.kafka.listener.AcknowledgingMessageListener;
import org.springframework.integration.kafka.listener.ErrorHandler;
import org.springframework.integration.kafka.listener.MessageListener;
import org.springframework.integration.kafka.listener.OffsetManager;
import org.springframework.integration.kafka.listener.QueueingMessageListenerInvoker;
import org.springframework.util.Assert;

class ConcurrentMessageListenerDispatcher {
    private static final Log log = LogFactory.getLog(ConcurrentMessageListenerDispatcher.class);
    private static final StartDelegateProcedure startDelegateProcedure = new StartDelegateProcedure();
    private static final StopDelegateProcedure stopDelegateProcedure = new StopDelegateProcedure();
    private final Object lifecycleMonitor = new Object();
    private final Collection<Partition> partitions;
    private final int consumers;
    private final Object delegateListener;
    private final ErrorHandler errorHandler;
    private final OffsetManager offsetManager;
    private final int queueSize;
    private final Executor taskExecutor;
    private volatile boolean running;
    private MutableMap<Partition, QueueingMessageListenerInvoker> delegates;
    private boolean autoCommitOnError;

    public ConcurrentMessageListenerDispatcher(Object delegateListener, ErrorHandler errorHandler, Collection<Partition> partitions, OffsetManager offsetManager, int consumers, int queueSize, Executor taskExecutor, boolean autoCommitOnError) {
        Assert.isTrue((delegateListener instanceof MessageListener || delegateListener instanceof AcknowledgingMessageListener ? 1 : 0) != 0, (String)("Either a " + MessageListener.class.getName() + " or a " + AcknowledgingMessageListener.class.getName() + " must be provided"));
        Assert.notEmpty(partitions, (String)"A set of partitions must be provided");
        Assert.isTrue((consumers <= partitions.size() ? 1 : 0) != 0, (String)"The number of consumers must be smaller or equal to the number of partitions");
        Assert.notNull((Object)delegateListener, (String)"A delegate must be provided");
        this.delegateListener = delegateListener;
        this.errorHandler = errorHandler;
        this.partitions = partitions;
        this.offsetManager = offsetManager;
        this.consumers = Math.min(partitions.size(), consumers);
        this.queueSize = queueSize;
        this.taskExecutor = taskExecutor;
        this.autoCommitOnError = autoCommitOnError;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void start() {
        Object object = this.lifecycleMonitor;
        synchronized (object) {
            if (!this.running) {
                this.initializeAndStartDispatching();
                this.running = true;
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void stop(int stopTimeout) {
        Object object = this.lifecycleMonitor;
        synchronized (object) {
            if (this.running) {
                this.running = false;
                this.delegates.flip().keyBag().toSet().forEachWith((Procedure2)stopDelegateProcedure, (Object)stopTimeout);
            }
            this.delegates = null;
        }
    }

    public void dispatch(KafkaMessage message) {
        if (this.running) {
            ((QueueingMessageListenerInvoker)this.delegates.get((Object)message.getMetadata().getPartition())).enqueue(message);
        }
    }

    private void initializeAndStartDispatching() {
        int i;
        ArrayList<QueueingMessageListenerInvoker> delegateList = new ArrayList<QueueingMessageListenerInvoker>(this.consumers);
        for (i = 0; i < this.consumers; ++i) {
            QueueingMessageListenerInvoker queueingMessageListenerInvoker = new QueueingMessageListenerInvoker(this.queueSize, this.offsetManager, this.delegateListener, this.errorHandler, this.taskExecutor, this.autoCommitOnError);
            delegateList.add(queueingMessageListenerInvoker);
        }
        i = 0;
        this.delegates = Maps.mutable.of();
        for (Partition partition : this.partitions) {
            this.delegates.put((Object)partition, delegateList.get(i++ % this.consumers));
        }
        this.delegates.flip().keyBag().toSet().forEach((Procedure)startDelegateProcedure);
    }

    private static class StartDelegateProcedure
    implements Procedure<QueueingMessageListenerInvoker> {
        private StartDelegateProcedure() {
        }

        public void value(QueueingMessageListenerInvoker delegate) {
            delegate.start();
        }
    }

    private static class StopDelegateProcedure
    implements Procedure2<QueueingMessageListenerInvoker, Integer> {
        private StopDelegateProcedure() {
        }

        public void value(QueueingMessageListenerInvoker delegate, Integer stopTimeout) {
            block2: {
                try {
                    delegate.stop(stopTimeout.intValue());
                }
                catch (Exception e) {
                    if (!log.isInfoEnabled()) break block2;
                    log.info((Object)"Exception thrown while stopping dispatcher:", (Throwable)e);
                }
            }
        }
    }
}

