/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.amqp.rabbit.listener;

import com.rabbitmq.client.Channel;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.aopalliance.aop.Advice;
import org.springframework.amqp.AmqpConnectException;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.AmqpIllegalStateException;
import org.springframework.amqp.ImmediateAcknowledgeAmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactoryUtils;
import org.springframework.amqp.rabbit.connection.ConsumerChannelRegistry;
import org.springframework.amqp.rabbit.connection.RabbitResourceHolder;
import org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.ActiveObjectCounter;
import org.springframework.amqp.rabbit.listener.BlockingQueueConsumer;
import org.springframework.amqp.rabbit.listener.FatalListenerExecutionException;
import org.springframework.amqp.rabbit.listener.FatalListenerStartupException;
import org.springframework.amqp.rabbit.listener.ListenerExecutionFailedException;
import org.springframework.amqp.rabbit.support.DefaultMessagePropertiesConverter;
import org.springframework.amqp.rabbit.support.MessagePropertiesConverter;
import org.springframework.aop.Advisor;
import org.springframework.aop.Pointcut;
import org.springframework.aop.framework.ProxyFactory;
import org.springframework.aop.support.DefaultPointcutAdvisor;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.jmx.export.annotation.ManagedMetric;
import org.springframework.jmx.support.MetricType;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.TransactionDefinition;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.interceptor.DefaultTransactionAttribute;
import org.springframework.transaction.interceptor.TransactionAttribute;
import org.springframework.transaction.support.TransactionCallback;
import org.springframework.transaction.support.TransactionTemplate;
import org.springframework.util.Assert;

public class SimpleMessageListenerContainer
extends AbstractMessageListenerContainer {
    public static final long DEFAULT_RECEIVE_TIMEOUT = 1000L;
    public static final int DEFAULT_PREFETCH_COUNT = 1;
    public static final long DEFAULT_SHUTDOWN_TIMEOUT = 5000L;
    public static final long DEFAULT_RECOVERY_INTERVAL = 5000L;
    private volatile int prefetchCount = 1;
    private volatile int txSize = 1;
    private volatile Executor taskExecutor = new SimpleAsyncTaskExecutor();
    private volatile int concurrentConsumers = 1;
    private long receiveTimeout = 1000L;
    private long shutdownTimeout = 5000L;
    private long recoveryInterval = 5000L;
    private Set<BlockingQueueConsumer> consumers;
    private final Object consumersMonitor = new Object();
    private PlatformTransactionManager transactionManager;
    private TransactionAttribute transactionAttribute = new DefaultTransactionAttribute();
    private volatile Advice[] adviceChain = new Advice[0];
    private final ActiveObjectCounter<BlockingQueueConsumer> cancellationLock = new ActiveObjectCounter();
    private volatile MessagePropertiesConverter messagePropertiesConverter = new DefaultMessagePropertiesConverter();
    private volatile boolean defaultRequeueRejected = true;
    private final ContainerDelegate delegate;
    private ContainerDelegate proxy = this.delegate = new ContainerDelegate(){

        @Override
        public void invokeListener(Channel channel, Message message) throws Exception {
            SimpleMessageListenerContainer.super.invokeListener(channel, message);
        }
    };

    public SimpleMessageListenerContainer() {
    }

    public SimpleMessageListenerContainer(ConnectionFactory connectionFactory) {
        this.setConnectionFactory(connectionFactory);
    }

    public void setAdviceChain(Advice[] adviceChain) {
        this.adviceChain = adviceChain;
    }

    public void setRecoveryInterval(long recoveryInterval) {
        this.recoveryInterval = recoveryInterval;
    }

    public void setConcurrentConsumers(int concurrentConsumers) {
        Assert.isTrue((concurrentConsumers > 0 ? 1 : 0) != 0, (String)"'concurrentConsumers' value must be at least 1 (one)");
        this.concurrentConsumers = concurrentConsumers;
    }

    public void setReceiveTimeout(long receiveTimeout) {
        this.receiveTimeout = receiveTimeout;
    }

    public void setShutdownTimeout(long shutdownTimeout) {
        this.shutdownTimeout = shutdownTimeout;
    }

    public void setTaskExecutor(Executor taskExecutor) {
        Assert.notNull((Object)taskExecutor, (String)"taskExecutor must not be null");
        this.taskExecutor = taskExecutor;
    }

    public void setPrefetchCount(int prefetchCount) {
        this.prefetchCount = prefetchCount;
    }

    public void setTxSize(int txSize) {
        this.txSize = txSize;
    }

    public void setTransactionManager(PlatformTransactionManager transactionManager) {
        this.transactionManager = transactionManager;
    }

    public void setTransactionAttribute(TransactionAttribute transactionAttribute) {
        this.transactionAttribute = transactionAttribute;
    }

    public void setMessagePropertiesConverter(MessagePropertiesConverter messagePropertiesConverter) {
        Assert.notNull((Object)messagePropertiesConverter, (String)"messagePropertiesConverter must not be null");
        this.messagePropertiesConverter = messagePropertiesConverter;
    }

    public void setDefaultRequeueRejected(boolean defaultRequeueRejected) {
        this.defaultRequeueRejected = defaultRequeueRejected;
    }

    @Override
    protected void validateConfiguration() {
        CachingConnectionFactory cf;
        super.validateConfiguration();
        Assert.state((!this.getAcknowledgeMode().isAutoAck() || this.transactionManager == null ? 1 : 0) != 0, (String)"The acknowledgeMode is NONE (autoack in Rabbit terms) which is not consistent with having an external transaction manager. Either use a different AcknowledgeMode or make sure the transactionManager is null.");
        if (this.getConnectionFactory() instanceof CachingConnectionFactory && (cf = (CachingConnectionFactory)this.getConnectionFactory()).getChannelCacheSize() < this.concurrentConsumers) {
            cf.setChannelCacheSize(this.concurrentConsumers);
            this.logger.warn((Object)("CachingConnectionFactory's channelCacheSize can not be less than the number of concurrentConsumers so it was reset to match: " + this.concurrentConsumers));
        }
    }

    private void initializeProxy() {
        if (this.adviceChain.length == 0) {
            return;
        }
        ProxyFactory factory = new ProxyFactory();
        for (Advice advice : this.getAdviceChain()) {
            factory.addAdvisor((Advisor)new DefaultPointcutAdvisor(Pointcut.TRUE, advice));
        }
        factory.setProxyTargetClass(false);
        factory.addInterface(ContainerDelegate.class);
        factory.setTarget((Object)this.delegate);
        this.proxy = (ContainerDelegate)factory.getProxy();
    }

    protected final boolean sharedConnectionEnabled() {
        return true;
    }

    @Override
    protected void doInitialize() throws Exception {
        if (!this.isExposeListenerChannel() && this.transactionManager != null) {
            this.logger.warn((Object)"exposeListenerChannel=false is ignored when using a TransactionManager");
        }
        this.initializeProxy();
    }

    @ManagedMetric(metricType=MetricType.GAUGE)
    public int getActiveConsumerCount() {
        return this.cancellationLock.getCount();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    protected void doStart() throws Exception {
        super.doStart();
        Object object = this.consumersMonitor;
        synchronized (object) {
            int newConsumers = this.initializeConsumers();
            if (this.consumers == null) {
                if (this.logger.isInfoEnabled()) {
                    this.logger.info((Object)"Consumers were initialized and then cleared (presumably the container was stopped concurrently)");
                }
                return;
            }
            if (newConsumers <= 0) {
                if (this.logger.isInfoEnabled()) {
                    this.logger.info((Object)"Consumers are already running");
                }
                return;
            }
            HashSet<AsyncMessageProcessingConsumer> processors = new HashSet<AsyncMessageProcessingConsumer>();
            for (BlockingQueueConsumer consumer : this.consumers) {
                AsyncMessageProcessingConsumer processor = new AsyncMessageProcessingConsumer(consumer);
                processors.add(processor);
                this.taskExecutor.execute(processor);
            }
            for (AsyncMessageProcessingConsumer processor : processors) {
                FatalListenerStartupException startupException = processor.getStartupException();
                if (startupException == null) continue;
                throw new AmqpIllegalStateException("Fatal exception on listener startup", (Throwable)((Object)startupException));
            }
        }
    }

    @Override
    protected void doStop() {
        this.shutdown();
        super.doStop();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    protected void doShutdown() {
        if (!this.isRunning()) {
            return;
        }
        try {
            this.logger.info((Object)"Waiting for workers to finish.");
            boolean finished = this.cancellationLock.await(this.shutdownTimeout, TimeUnit.MILLISECONDS);
            if (finished) {
                this.logger.info((Object)"Successfully waited for workers to finish.");
            } else {
                this.logger.info((Object)"Workers not finished.  Forcing connections to close.");
            }
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            this.logger.warn((Object)"Interrupted waiting for workers.  Continuing with shutdown.");
        }
        Object object = this.consumersMonitor;
        synchronized (object) {
            this.consumers = null;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected int initializeConsumers() {
        int count = 0;
        Object object = this.consumersMonitor;
        synchronized (object) {
            if (this.consumers == null) {
                this.cancellationLock.reset();
                this.consumers = new HashSet<BlockingQueueConsumer>(this.concurrentConsumers);
                for (int i = 0; i < this.concurrentConsumers; ++i) {
                    BlockingQueueConsumer consumer = this.createBlockingQueueConsumer();
                    this.consumers.add(consumer);
                    ++count;
                }
            }
        }
        return count;
    }

    @Override
    protected boolean isChannelLocallyTransacted(Channel channel) {
        return super.isChannelLocallyTransacted(channel) && this.transactionManager == null;
    }

    protected BlockingQueueConsumer createBlockingQueueConsumer() {
        String[] queues = this.getRequiredQueueNames();
        int actualPrefetchCount = this.prefetchCount > this.txSize ? this.prefetchCount : this.txSize;
        BlockingQueueConsumer consumer = new BlockingQueueConsumer(this.getConnectionFactory(), this.messagePropertiesConverter, this.cancellationLock, this.getAcknowledgeMode(), this.isChannelTransacted(), actualPrefetchCount, this.defaultRequeueRejected, queues);
        return consumer;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void restart(BlockingQueueConsumer consumer) {
        Object object = this.consumersMonitor;
        synchronized (object) {
            if (this.consumers != null) {
                try {
                    consumer.stop();
                    this.cancellationLock.release(consumer);
                    this.consumers.remove(consumer);
                    consumer = this.createBlockingQueueConsumer();
                    this.consumers.add(consumer);
                }
                catch (RuntimeException e) {
                    this.logger.warn((Object)("Consumer failed irretrievably on restart. " + e.getClass() + ": " + e.getMessage()));
                    throw e;
                }
                this.taskExecutor.execute(new AsyncMessageProcessingConsumer(consumer));
            }
        }
    }

    private boolean receiveAndExecute(final BlockingQueueConsumer consumer) throws Throwable {
        if (this.transactionManager != null) {
            try {
                return (Boolean)new TransactionTemplate(this.transactionManager, (TransactionDefinition)this.transactionAttribute).execute((TransactionCallback)new TransactionCallback<Boolean>(){

                    public Boolean doInTransaction(TransactionStatus status) {
                        ConnectionFactoryUtils.bindResourceToTransaction(new RabbitResourceHolder(consumer.getChannel(), false), SimpleMessageListenerContainer.this.getConnectionFactory(), true);
                        try {
                            return SimpleMessageListenerContainer.this.doReceiveAndExecute(consumer);
                        }
                        catch (RuntimeException e) {
                            throw e;
                        }
                        catch (Throwable e) {
                            throw new WrappedTransactionException(e);
                        }
                    }
                });
            }
            catch (WrappedTransactionException e) {
                throw e.getCause();
            }
        }
        return this.doReceiveAndExecute(consumer);
    }

    private boolean doReceiveAndExecute(BlockingQueueConsumer consumer) throws Throwable {
        Channel channel = consumer.getChannel();
        for (int i = 0; i < this.txSize; ++i) {
            this.logger.trace((Object)"Waiting for message from consumer.");
            Message message = consumer.nextMessage(this.receiveTimeout);
            if (message == null) break;
            try {
                this.executeListener(channel, message);
                continue;
            }
            catch (ImmediateAcknowledgeAmqpException e) {
                break;
            }
            catch (Throwable ex) {
                consumer.rollbackOnExceptionIfNecessary(ex);
                throw ex;
            }
        }
        return consumer.commitIfNecessary(this.isChannelLocallyTransacted(channel));
    }

    private Advice[] getAdviceChain() {
        return this.adviceChain;
    }

    @Override
    protected void invokeListener(Channel channel, Message message) throws Exception {
        this.proxy.invokeListener(channel, message);
    }

    protected void handleStartupFailure(Throwable t) throws Exception {
        try {
            long timeout = System.currentTimeMillis() + this.recoveryInterval;
            while (this.isActive() && System.currentTimeMillis() < timeout) {
                Thread.sleep(200L);
            }
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new IllegalStateException("Unrecoverable interruption on consumer restart");
        }
    }

    private static class WrappedTransactionException
    extends RuntimeException {
        public WrappedTransactionException(Throwable cause) {
            super(cause);
        }
    }

    private class AsyncMessageProcessingConsumer
    implements Runnable {
        private final BlockingQueueConsumer consumer;
        private final CountDownLatch start;
        private volatile FatalListenerStartupException startupException;

        public AsyncMessageProcessingConsumer(BlockingQueueConsumer consumer) {
            this.consumer = consumer;
            this.start = new CountDownLatch(1);
        }

        public FatalListenerStartupException getStartupException() throws TimeoutException, InterruptedException {
            this.start.await(60000L, TimeUnit.MILLISECONDS);
            return this.startupException;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            boolean aborted = false;
            try {
                try {
                    this.consumer.start();
                    this.start.countDown();
                }
                catch (FatalListenerStartupException ex) {
                    throw ex;
                }
                catch (Throwable t) {
                    this.start.countDown();
                    SimpleMessageListenerContainer.this.handleStartupFailure(t);
                    throw t;
                }
                if (SimpleMessageListenerContainer.this.transactionManager != null) {
                    ConsumerChannelRegistry.registerConsumerChannel(this.consumer.getChannel(), SimpleMessageListenerContainer.this.getConnectionFactory());
                }
                boolean continuable = false;
                while (SimpleMessageListenerContainer.this.isActive() || continuable) {
                    try {
                        continuable = SimpleMessageListenerContainer.this.receiveAndExecute(this.consumer) && !SimpleMessageListenerContainer.this.isChannelTransacted();
                    }
                    catch (ListenerExecutionFailedException listenerExecutionFailedException) {}
                }
            }
            catch (InterruptedException e) {
                SimpleMessageListenerContainer.this.logger.debug((Object)"Consumer thread interrupted, processing stopped.");
                Thread.currentThread().interrupt();
                aborted = true;
            }
            catch (FatalListenerStartupException ex) {
                SimpleMessageListenerContainer.this.logger.error((Object)"Consumer received fatal exception on startup", (Throwable)((Object)ex));
                this.startupException = ex;
                aborted = true;
            }
            catch (FatalListenerExecutionException ex) {
                SimpleMessageListenerContainer.this.logger.error((Object)"Consumer received fatal exception during processing", (Throwable)((Object)ex));
                aborted = true;
            }
            catch (Throwable t) {
                if (SimpleMessageListenerContainer.this.logger.isDebugEnabled() || !(t instanceof AmqpConnectException)) {
                    SimpleMessageListenerContainer.this.logger.warn((Object)"Consumer raised exception, processing can restart if the connection factory supports it", t);
                } else {
                    SimpleMessageListenerContainer.this.logger.warn((Object)("Consumer raised exception, processing can restart if the connection factory supports it. Exception summary: " + t));
                }
            }
            finally {
                if (SimpleMessageListenerContainer.this.transactionManager != null) {
                    ConsumerChannelRegistry.unRegisterConsumerChannel();
                }
            }
            this.start.countDown();
            if (!SimpleMessageListenerContainer.this.isActive() || aborted) {
                SimpleMessageListenerContainer.this.logger.debug((Object)("Cancelling " + this.consumer));
                try {
                    this.consumer.stop();
                }
                catch (AmqpException e) {
                    SimpleMessageListenerContainer.this.logger.info((Object)"Could not cancel message consumer", (Throwable)e);
                }
                if (aborted) {
                    SimpleMessageListenerContainer.this.logger.info((Object)"Stopping container from aborted consumer");
                    SimpleMessageListenerContainer.this.stop();
                }
            } else {
                SimpleMessageListenerContainer.this.logger.info((Object)("Restarting " + this.consumer));
                SimpleMessageListenerContainer.this.restart(this.consumer);
            }
        }
    }

    public static interface ContainerDelegate {
        public void invokeListener(Channel var1, Message var2) throws Exception;
    }
}

