/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.integration.kafka.listener;

import com.gs.collections.api.RichIterable;
import com.gs.collections.api.block.function.Function;
import com.gs.collections.api.block.predicate.Predicate;
import com.gs.collections.api.block.procedure.Procedure;
import com.gs.collections.api.block.procedure.Procedure2;
import com.gs.collections.api.collection.MutableCollection;
import com.gs.collections.api.list.ImmutableList;
import com.gs.collections.api.list.MutableList;
import com.gs.collections.api.multimap.Multimap;
import com.gs.collections.api.multimap.MutableMultimap;
import com.gs.collections.api.partition.PartitionIterable;
import com.gs.collections.impl.block.factory.Functions;
import com.gs.collections.impl.block.function.checked.CheckedFunction;
import com.gs.collections.impl.factory.Lists;
import com.gs.collections.impl.factory.Multimaps;
import com.gs.collections.impl.list.mutable.FastList;
import com.gs.collections.impl.utility.ArrayIterate;
import com.gs.collections.impl.utility.Iterate;
import com.gs.collections.impl.utility.MapIterate;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import kafka.common.ErrorMapping;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.context.SmartLifecycle;
import org.springframework.integration.kafka.core.BrokerAddress;
import org.springframework.integration.kafka.core.ConnectionFactory;
import org.springframework.integration.kafka.core.ConsumerException;
import org.springframework.integration.kafka.core.FetchRequest;
import org.springframework.integration.kafka.core.KafkaMessage;
import org.springframework.integration.kafka.core.KafkaMessageBatch;
import org.springframework.integration.kafka.core.KafkaTemplate;
import org.springframework.integration.kafka.core.Partition;
import org.springframework.integration.kafka.core.Result;
import org.springframework.integration.kafka.listener.AcknowledgingMessageListener;
import org.springframework.integration.kafka.listener.ConcurrentMessageListenerDispatcher;
import org.springframework.integration.kafka.listener.ErrorHandler;
import org.springframework.integration.kafka.listener.LoggingErrorHandler;
import org.springframework.integration.kafka.listener.MessageListener;
import org.springframework.integration.kafka.listener.MetadataStoreOffsetManager;
import org.springframework.integration.kafka.listener.OffsetManager;
import org.springframework.scheduling.SchedulingAwareRunnable;
import org.springframework.util.Assert;
import org.springframework.util.CollectionUtils;

public class KafkaMessageListenerContainer
implements SmartLifecycle {
    public static final int DEFAULT_WAIT_FOR_LEADER_REFRESH_RETRY = 5000;
    private static final int DEFAULT_STOP_TIMEOUT = 1000;
    private static final Log log = LogFactory.getLog(KafkaMessageListenerContainer.class);
    public static final Function<Map.Entry<Partition, ?>, Partition> keyFunction = Functions.getKeyFunction();
    private final GetOffsetForPartitionFunction getOffset = new GetOffsetForPartitionFunction();
    private final PartitionToLeaderFunction getLeader = new PartitionToLeaderFunction();
    private final Function<Partition, Partition> passThru = Functions.getPassThru();
    private final LaunchFetchTaskProcedure launchFetchTask = new LaunchFetchTaskProcedure();
    private final Object lifecycleMonitor = new Object();
    private final KafkaTemplate kafkaTemplate;
    private final String[] topics;
    private Partition[] partitions;
    public boolean autoStartup = true;
    private Executor fetchTaskExecutor;
    private Executor adminTaskExecutor = Executors.newSingleThreadExecutor();
    private Executor dispatcherTaskExecutor;
    private int concurrency = 1;
    private volatile boolean running = false;
    private int maxFetch = 307200;
    private int queueSize = 1024;
    private int stopTimeout = 1000;
    private Object messageListener;
    private ErrorHandler errorHandler = new LoggingErrorHandler();
    private volatile OffsetManager offsetManager;
    private ConcurrentMap<Partition, Long> fetchOffsets;
    private ConcurrentMessageListenerDispatcher messageDispatcher;
    private final MutableMultimap<BrokerAddress, Partition> partitionsByBrokerMap;

    public KafkaMessageListenerContainer(ConnectionFactory connectionFactory, Partition ... partitions) {
        this.partitionsByBrokerMap = Multimaps.mutable.set.with();
        Assert.notNull((Object)connectionFactory, (String)"A connection factory must be supplied");
        Assert.notEmpty((Object[])partitions, (String)"A list of partitions must be provided");
        Assert.noNullElements((Object[])partitions, (String)"The list of partitions cannot contain null elements");
        this.kafkaTemplate = new KafkaTemplate(connectionFactory);
        this.partitions = partitions;
        this.topics = null;
    }

    public KafkaMessageListenerContainer(ConnectionFactory connectionFactory, String ... topics) {
        this.partitionsByBrokerMap = Multimaps.mutable.set.with();
        Assert.notNull((Object)connectionFactory, (String)"A connection factory must be supplied");
        Assert.notNull((Object)topics, (String)"A list of topics must be provided");
        Assert.noNullElements((Object[])topics, (String)"The list of topics cannot contain null elements");
        this.kafkaTemplate = new KafkaTemplate(connectionFactory);
        this.topics = topics;
    }

    public OffsetManager getOffsetManager() {
        return this.offsetManager;
    }

    public void setOffsetManager(OffsetManager offsetManager) {
        this.offsetManager = offsetManager;
    }

    public Object getMessageListener() {
        return this.messageListener;
    }

    public void setMessageListener(Object messageListener) {
        Assert.isTrue((messageListener instanceof MessageListener || messageListener instanceof AcknowledgingMessageListener ? 1 : 0) != 0, (String)("Either a " + MessageListener.class.getName() + " or a " + AcknowledgingMessageListener.class.getName() + " must be provided"));
        this.messageListener = messageListener;
    }

    public ErrorHandler getErrorHandler() {
        return this.errorHandler;
    }

    public void setErrorHandler(ErrorHandler errorHandler) {
        this.errorHandler = errorHandler;
    }

    public int getConcurrency() {
        return this.concurrency;
    }

    public void setConcurrency(int concurrency) {
        this.concurrency = concurrency;
    }

    public void setStopTimeout(int stopTimeout) {
        this.stopTimeout = stopTimeout;
    }

    public int getStopTimeout() {
        return this.stopTimeout;
    }

    public Executor getFetchTaskExecutor() {
        return this.fetchTaskExecutor;
    }

    public void setFetchTaskExecutor(Executor fetchTaskExecutor) {
        this.fetchTaskExecutor = fetchTaskExecutor;
    }

    public Executor getAdminTaskExecutor() {
        return this.adminTaskExecutor;
    }

    public void setAdminTaskExecutor(Executor adminTaskExecutor) {
        this.adminTaskExecutor = adminTaskExecutor;
    }

    public void setDispatcherTaskExecutor(Executor dispatcherTaskExecutor) {
        this.dispatcherTaskExecutor = dispatcherTaskExecutor;
    }

    public int getMaxFetch() {
        return this.maxFetch;
    }

    public int getQueueSize() {
        return this.queueSize;
    }

    public void setQueueSize(int queueSize) {
        Assert.isTrue((queueSize > 0 && Integer.bitCount(queueSize) == 1 ? 1 : 0) != 0, (String)"'queueSize' must be a positive number and a power of 2");
        this.queueSize = queueSize;
    }

    public void setMaxFetch(int maxFetch) {
        this.maxFetch = maxFetch;
    }

    public boolean isAutoStartup() {
        return this.autoStartup;
    }

    public void setAutoStartup(boolean autoStartup) {
        this.autoStartup = autoStartup;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void stop(Runnable callback) {
        Object object = this.lifecycleMonitor;
        synchronized (object) {
            if (this.running) {
                this.running = false;
                try {
                    this.offsetManager.flush();
                }
                catch (IOException e) {
                    log.error((Object)"Error while flushing:", (Throwable)e);
                }
                this.messageDispatcher.stop(this.stopTimeout);
            }
        }
        if (callback != null) {
            callback.run();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void start() {
        Object object = this.lifecycleMonitor;
        synchronized (object) {
            if (!this.running) {
                if (this.partitions == null) {
                    this.partitions = KafkaMessageListenerContainer.getPartitionsForTopics(this.kafkaTemplate.getConnectionFactory(), this.topics);
                }
                this.running = true;
                if (this.offsetManager == null) {
                    this.offsetManager = new MetadataStoreOffsetManager(this.kafkaTemplate.getConnectionFactory());
                }
                ImmutableList partitionsAsList = Lists.immutable.with((Object[])this.partitions);
                this.fetchOffsets = new ConcurrentHashMap<Partition, Long>((Map<Partition, Long>)partitionsAsList.toMap(this.passThru, (Function)this.getOffset));
                this.messageDispatcher = new ConcurrentMessageListenerDispatcher(this.messageListener, this.errorHandler, Arrays.asList(this.partitions), this.offsetManager, this.concurrency, this.queueSize, this.dispatcherTaskExecutor);
                this.messageDispatcher.start();
                this.partitionsByBrokerMap.clear();
                this.partitionsByBrokerMap.putAll((Multimap)partitionsAsList.groupBy((Function)this.getLeader));
                if (this.fetchTaskExecutor == null) {
                    this.fetchTaskExecutor = Executors.newFixedThreadPool(this.partitionsByBrokerMap.size());
                }
                this.partitionsByBrokerMap.forEachKey((Procedure)this.launchFetchTask);
            }
        }
    }

    public void stop() {
        this.stop(null);
    }

    public boolean isRunning() {
        return this.running;
    }

    public int getPhase() {
        return 0;
    }

    private static Partition[] getPartitionsForTopics(ConnectionFactory connectionFactory, String[] topics) {
        MutableList partitionList = ArrayIterate.flatCollect((Object[])topics, (Function)new GetPartitionsForTopic(connectionFactory));
        return (Partition[])partitionList.toArray((Object[])new Partition[partitionList.size()]);
    }

    private class AddPartitionToBrokerProcedure
    implements Procedure2<Partition, BrokerAddress> {
        private AddPartitionToBrokerProcedure() {
        }

        public void value(Partition partition, BrokerAddress newBrokerAddress) {
            KafkaMessageListenerContainer.this.partitionsByBrokerMap.put((Object)newBrokerAddress, (Object)partition);
        }
    }

    private class PartitionToTopicFunction
    implements Function<Partition, String> {
        private PartitionToTopicFunction() {
        }

        public String valueOf(Partition object) {
            return object.getTopic();
        }
    }

    static class GetPartitionsForTopic
    extends CheckedFunction<String, Iterable<Partition>> {
        private final ConnectionFactory connectionFactory;

        public GetPartitionsForTopic(ConnectionFactory connectionFactory) {
            this.connectionFactory = connectionFactory;
        }

        public Iterable<Partition> safeValueOf(String topic) throws Exception {
            return this.connectionFactory.getPartitions(topic);
        }
    }

    private class PartitionToFetchRequestFunction
    implements Function<Partition, FetchRequest> {
        private PartitionToFetchRequestFunction() {
        }

        public FetchRequest valueOf(Partition partition) {
            return new FetchRequest(partition, (Long)KafkaMessageListenerContainer.this.fetchOffsets.get(partition), KafkaMessageListenerContainer.this.maxFetch);
        }
    }

    private class LaunchFetchTaskProcedure
    implements Procedure<BrokerAddress> {
        private LaunchFetchTaskProcedure() {
        }

        public void value(BrokerAddress brokerAddress) {
            KafkaMessageListenerContainer.this.fetchTaskExecutor.execute((Runnable)((Object)new FetchTask(brokerAddress)));
        }
    }

    private class PartitionToLeaderFunction
    implements Function<Partition, BrokerAddress> {
        private PartitionToLeaderFunction() {
        }

        public BrokerAddress valueOf(Partition partition) {
            return KafkaMessageListenerContainer.this.kafkaTemplate.getConnectionFactory().getLeader(partition);
        }
    }

    class GetOffsetForPartitionFunction
    extends CheckedFunction<Partition, Long> {
        GetOffsetForPartitionFunction() {
        }

        public Long safeValueOf(Partition object) throws Exception {
            try {
                return KafkaMessageListenerContainer.this.offsetManager.getOffset(object);
            }
            catch (Exception e) {
                log.error((Object)e);
                throw e;
            }
        }
    }

    public class FetchTask
    implements SchedulingAwareRunnable {
        private final BrokerAddress brokerAddress;

        public FetchTask(BrokerAddress brokerAddress) {
            this.brokerAddress = brokerAddress;
        }

        public boolean isLongLived() {
            return true;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void run() {
            boolean wasInterrupted = false;
            while (KafkaMessageListenerContainer.this.isRunning()) {
                HashSet<Partition> partitionsWithRemainingData;
                boolean hasErrors;
                MutableCollection fetchPartitions;
                MutableMultimap mutableMultimap = KafkaMessageListenerContainer.this.partitionsByBrokerMap;
                synchronized (mutableMultimap) {
                    fetchPartitions = KafkaMessageListenerContainer.this.partitionsByBrokerMap.get((Object)this.brokerAddress);
                    while (KafkaMessageListenerContainer.this.isRunning() && CollectionUtils.isEmpty((Collection)fetchPartitions)) {
                        try {
                            KafkaMessageListenerContainer.this.partitionsByBrokerMap.wait();
                            fetchPartitions = KafkaMessageListenerContainer.this.partitionsByBrokerMap.get((Object)this.brokerAddress);
                        }
                        catch (InterruptedException e) {
                            wasInterrupted = true;
                        }
                    }
                }
                if (!KafkaMessageListenerContainer.this.isRunning()) continue;
                do {
                    partitionsWithRemainingData = new HashSet<Partition>();
                    hasErrors = false;
                    try {
                        MutableCollection fetchRequests = fetchPartitions.collect((Function)new PartitionToFetchRequestFunction());
                        Result<KafkaMessageBatch> result = KafkaMessageListenerContainer.this.kafkaTemplate.receive((Iterable<FetchRequest>)fetchRequests);
                        Collection<KafkaMessageBatch> batches = result.getResults().values();
                        for (KafkaMessageBatch batch : batches) {
                            if (batch.getMessages().isEmpty()) continue;
                            long highestFetchedOffset = 0L;
                            for (KafkaMessage kafkaMessage : batch.getMessages()) {
                                if (kafkaMessage.getMetadata().getOffset() >= (Long)KafkaMessageListenerContainer.this.fetchOffsets.get(batch.getPartition())) {
                                    KafkaMessageListenerContainer.this.messageDispatcher.dispatch(kafkaMessage);
                                }
                                highestFetchedOffset = Math.max(highestFetchedOffset, kafkaMessage.getMetadata().getNextOffset());
                            }
                            KafkaMessageListenerContainer.this.fetchOffsets.replace(batch.getPartition(), highestFetchedOffset);
                            if (highestFetchedOffset >= batch.getHighWatermark()) continue;
                            partitionsWithRemainingData.add(batch.getPartition());
                        }
                        if (result.getErrors().size() <= 0) continue;
                        hasErrors = true;
                        PartitionIterable partitionByLeaderErrors = Iterate.partition(result.getErrors().entrySet(), (Predicate)new IsLeaderErrorPredicate());
                        RichIterable partitionsWithLeaderErrors = partitionByLeaderErrors.getSelected().collect(keyFunction);
                        this.resetLeaders((Iterable<Partition>)partitionsWithLeaderErrors);
                        PartitionIterable partitionsWithOffsetsOutOfRange = partitionByLeaderErrors.getRejected().partition((Predicate)new IsOffsetOutOfRangePredicate());
                        this.resetOffsets((Collection<Partition>)partitionsWithOffsetsOutOfRange.getSelected().collect(keyFunction).toSet());
                        this.stopFetchingFromPartitions((Iterable<Partition>)partitionsWithOffsetsOutOfRange.getRejected().collect(keyFunction));
                    }
                    catch (ConsumerException e) {
                        this.resetLeaders((Iterable<Partition>)fetchPartitions.toImmutable());
                    }
                } while (!hasErrors && KafkaMessageListenerContainer.this.isRunning() && !partitionsWithRemainingData.isEmpty());
            }
            if (wasInterrupted) {
                Thread.currentThread().interrupt();
            }
        }

        private void resetLeaders(Iterable<Partition> partitionsToReset) {
            this.stopFetchingFromPartitions(partitionsToReset);
            KafkaMessageListenerContainer.this.adminTaskExecutor.execute((Runnable)((Object)new UpdateLeadersTask(partitionsToReset)));
        }

        private void resetOffsets(Collection<Partition> partitionsToResetOffsets) {
            this.stopFetchingFromPartitions(partitionsToResetOffsets);
            KafkaMessageListenerContainer.this.adminTaskExecutor.execute(new UpdateOffsetsTask(partitionsToResetOffsets));
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void stopFetchingFromPartitions(Iterable<Partition> partitions) {
            MutableMultimap mutableMultimap = KafkaMessageListenerContainer.this.partitionsByBrokerMap;
            synchronized (mutableMultimap) {
                for (Partition partition : partitions) {
                    KafkaMessageListenerContainer.this.partitionsByBrokerMap.remove((Object)this.brokerAddress, (Object)partition);
                }
            }
        }

        private class IsOffsetOutOfRangePredicate
        implements Predicate<Map.Entry<Partition, Short>> {
            private IsOffsetOutOfRangePredicate() {
            }

            public boolean accept(Map.Entry<Partition, Short> each) {
                return each.getValue() == ErrorMapping.OffsetOutOfRangeCode();
            }
        }

        private class IsLeaderErrorPredicate
        implements Predicate<Map.Entry<Partition, Short>> {
            private IsLeaderErrorPredicate() {
            }

            public boolean accept(Map.Entry<Partition, Short> each) {
                return each.getValue() == ErrorMapping.NotLeaderForPartitionCode() || each.getValue() == ErrorMapping.UnknownTopicOrPartitionCode();
            }
        }

        private class UpdateOffsetsTask
        implements Runnable {
            private final Collection<Partition> partitionsToResetOffsets;

            public UpdateOffsetsTask(Collection<Partition> partitionsToResetOffsets) {
                this.partitionsToResetOffsets = partitionsToResetOffsets;
            }

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public void run() {
                KafkaMessageListenerContainer.this.offsetManager.resetOffsets(this.partitionsToResetOffsets);
                for (Partition partition : this.partitionsToResetOffsets) {
                    KafkaMessageListenerContainer.this.fetchOffsets.replace(partition, KafkaMessageListenerContainer.this.offsetManager.getOffset(partition));
                }
                MutableMultimap mutableMultimap = KafkaMessageListenerContainer.this.partitionsByBrokerMap;
                synchronized (mutableMultimap) {
                    for (Partition partitionsToResetOffset : this.partitionsToResetOffsets) {
                        KafkaMessageListenerContainer.this.partitionsByBrokerMap.put((Object)FetchTask.this.brokerAddress, (Object)partitionsToResetOffset);
                    }
                    KafkaMessageListenerContainer.this.partitionsByBrokerMap.notifyAll();
                }
            }
        }

        private class UpdateLeadersTask
        implements SchedulingAwareRunnable {
            private final Iterable<Partition> partitionsToReset;

            public UpdateLeadersTask(Iterable<Partition> partitionsToReset) {
                this.partitionsToReset = partitionsToReset;
            }

            public boolean isLongLived() {
                return true;
            }

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            public void run() {
                boolean fetchCompleted = false;
                while (!fetchCompleted && KafkaMessageListenerContainer.this.isRunning()) {
                    try {
                        FastList partitionsAsList = FastList.newList(this.partitionsToReset);
                        FastList topics = partitionsAsList.collect((Function)new PartitionToTopicFunction()).distinct();
                        KafkaMessageListenerContainer.this.kafkaTemplate.getConnectionFactory().refreshMetadata((Collection<String>)topics);
                        Map<Partition, BrokerAddress> leaders = KafkaMessageListenerContainer.this.kafkaTemplate.getConnectionFactory().getLeaders(this.partitionsToReset);
                        MutableMultimap mutableMultimap = KafkaMessageListenerContainer.this.partitionsByBrokerMap;
                        synchronized (mutableMultimap) {
                            MapIterate.forEachKeyValue(leaders, (Procedure2)new AddPartitionToBrokerProcedure());
                            KafkaMessageListenerContainer.this.partitionsByBrokerMap.notifyAll();
                        }
                        fetchCompleted = true;
                    }
                    catch (Exception e) {
                        if (!KafkaMessageListenerContainer.this.isRunning()) continue;
                        try {
                            Thread.sleep(5000L);
                        }
                        catch (InterruptedException e1) {
                            Thread.currentThread().interrupt();
                            log.error((Object)("Interrupted after refresh leaders failure for: " + Iterate.makeString(this.partitionsToReset, (String)",")));
                            fetchCompleted = true;
                        }
                    }
                }
            }
        }
    }
}

