/*
 * Decompiled with CFR 0.152.
 */
package org.apache.camel.component.rabbitmq;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.Envelope;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.ExchangePattern;
import org.apache.camel.Message;
import org.apache.camel.Processor;
import org.apache.camel.component.rabbitmq.RabbitMQEndpoint;
import org.apache.camel.impl.DefaultConsumer;

public class RabbitMQConsumer
extends DefaultConsumer {
    ExecutorService executor;
    Connection conn;
    private int closeTimeout = 30000;
    private final RabbitMQEndpoint endpoint;
    private StartConsumerCallable startConsumerCallable;
    private final List<RabbitConsumer> consumers = new ArrayList<RabbitConsumer>();

    public RabbitMQConsumer(RabbitMQEndpoint endpoint, Processor processor) {
        super((Endpoint)endpoint, processor);
        this.endpoint = endpoint;
    }

    public RabbitMQEndpoint getEndpoint() {
        return (RabbitMQEndpoint)super.getEndpoint();
    }

    private void openConnection() throws IOException {
        this.log.trace("Creating connection...");
        this.conn = this.getEndpoint().connect(this.executor);
        this.log.debug("Created connection: {}", (Object)this.conn);
    }

    private Channel openChannel() throws IOException {
        this.log.trace("Creating channel...");
        Channel channel = this.conn.createChannel();
        this.log.debug("Created channel: {}", (Object)channel);
        if (this.endpoint.isPrefetchEnabled()) {
            channel.basicQos(this.endpoint.getPrefetchSize(), this.endpoint.getPrefetchCount(), this.endpoint.isPrefetchGlobal());
        }
        return channel;
    }

    private void startConsumers() throws IOException {
        Channel channel = this.openChannel();
        if (this.getEndpoint().isDeclare()) {
            this.endpoint.declareExchangeAndQueue(channel);
        }
        this.startConsumer(channel);
        for (int i = 1; i < this.endpoint.getConcurrentConsumers(); ++i) {
            channel = this.openChannel();
            this.startConsumer(channel);
        }
    }

    private void startConsumer(Channel channel) throws IOException {
        RabbitConsumer consumer = new RabbitConsumer(this, channel);
        consumer.start();
        this.consumers.add(consumer);
    }

    protected void doStart() throws Exception {
        this.executor = this.endpoint.createExecutor();
        this.log.debug("Using executor {}", (Object)this.executor);
        try {
            this.openConnection();
            this.startConsumers();
        }
        catch (Exception e) {
            this.log.info("Connection failed, will start background thread to retry!", (Throwable)e);
            Integer networkRecoveryInterval = this.getEndpoint().getNetworkRecoveryInterval();
            long connectionRetryInterval = networkRecoveryInterval != null && networkRecoveryInterval > 0 ? (long)networkRecoveryInterval.intValue() : 100L;
            this.startConsumerCallable = new StartConsumerCallable(connectionRetryInterval);
            this.executor.submit(this.startConsumerCallable);
        }
    }

    private void closeConnectionAndChannel() throws IOException {
        if (this.startConsumerCallable != null) {
            this.startConsumerCallable.stop();
        }
        for (RabbitConsumer consumer : this.consumers) {
            consumer.stop();
        }
        this.consumers.clear();
        if (this.conn != null) {
            this.log.debug("Closing connection: {} with timeout: {} ms.", (Object)this.conn, (Object)this.closeTimeout);
            this.conn.close(this.closeTimeout);
            this.conn = null;
        }
    }

    protected void doStop() throws Exception {
        this.closeConnectionAndChannel();
        if (this.executor != null) {
            if (this.endpoint != null && this.endpoint.getCamelContext() != null) {
                this.endpoint.getCamelContext().getExecutorServiceManager().shutdownNow(this.executor);
            } else {
                this.executor.shutdownNow();
            }
            this.executor = null;
        }
    }

    private class StartConsumerCallable
    implements Callable<Void> {
        private final long connectionRetryInterval;
        private final AtomicBoolean running = new AtomicBoolean(true);

        public StartConsumerCallable(long connectionRetryInterval) {
            this.connectionRetryInterval = connectionRetryInterval;
        }

        public void stop() {
            this.running.set(false);
            RabbitMQConsumer.this.startConsumerCallable = null;
        }

        @Override
        public Void call() throws Exception {
            boolean connectionFailed = true;
            while (this.running.get() && connectionFailed) {
                try {
                    RabbitMQConsumer.this.openConnection();
                    connectionFailed = false;
                }
                catch (Exception e) {
                    RabbitMQConsumer.this.log.info("Connection failed, will retry in {}" + this.connectionRetryInterval + "ms", (Throwable)e);
                    Thread.sleep(this.connectionRetryInterval);
                }
            }
            if (!connectionFailed) {
                RabbitMQConsumer.this.startConsumers();
            }
            this.stop();
            return null;
        }
    }

    class RabbitConsumer
    extends com.rabbitmq.client.DefaultConsumer {
        private final RabbitMQConsumer consumer;
        private final Channel channel;
        private String tag;

        public RabbitConsumer(RabbitMQConsumer consumer, Channel channel) {
            super(channel);
            this.consumer = consumer;
            this.channel = channel;
        }

        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
            boolean sendReply;
            Exchange exchange = this.consumer.endpoint.createRabbitExchange(envelope, properties, body);
            this.mergeAmqpProperties(exchange, properties);
            boolean bl = sendReply = properties.getReplyTo() != null;
            if (sendReply && !exchange.getPattern().isOutCapable()) {
                exchange.setPattern(ExchangePattern.InOut);
            }
            RabbitMQConsumer.this.log.trace("Created exchange [exchange={}]", (Object)exchange);
            long deliveryTag = envelope.getDeliveryTag();
            try {
                this.consumer.getProcessor().process(exchange);
            }
            catch (Exception e) {
                exchange.setException((Throwable)e);
            }
            Message msg = exchange.hasOut() ? exchange.getOut() : exchange.getIn();
            if (!exchange.isFailed()) {
                if (sendReply && exchange.getPattern().isOutCapable()) {
                    AMQP.BasicProperties replyProps = new AMQP.BasicProperties.Builder().headers(msg.getHeaders()).correlationId(properties.getCorrelationId()).build();
                    this.channel.basicPublish("", properties.getReplyTo(), replyProps, (byte[])msg.getBody(byte[].class));
                }
                if (!this.consumer.endpoint.isAutoAck()) {
                    RabbitMQConsumer.this.log.trace("Acknowledging receipt [delivery_tag={}]", (Object)deliveryTag);
                    this.channel.basicAck(deliveryTag, false);
                }
            } else {
                boolean isRequeueHeaderSet = (Boolean)msg.getHeader("rabbitmq.REQUEUE", (Object)false, Boolean.TYPE);
                if (deliveryTag != 0L && !this.consumer.endpoint.isAutoAck()) {
                    RabbitMQConsumer.this.log.trace("Rejecting receipt [delivery_tag={}] with requeue={}", (Object)deliveryTag, (Object)isRequeueHeaderSet);
                    if (isRequeueHeaderSet) {
                        this.channel.basicReject(deliveryTag, true);
                    } else {
                        this.channel.basicReject(deliveryTag, false);
                    }
                }
                if (exchange.getException() != null) {
                    RabbitMQConsumer.this.getExceptionHandler().handleException("Error processing exchange", exchange, (Throwable)exchange.getException());
                }
            }
        }

        private void mergeAmqpProperties(Exchange exchange, AMQP.BasicProperties properties) {
            if (properties.getType() != null) {
                exchange.getIn().setHeader("rabbitmq.TYPE", (Object)properties.getType());
            }
            if (properties.getAppId() != null) {
                exchange.getIn().setHeader("rabbitmq.APP_ID", (Object)properties.getAppId());
            }
            if (properties.getClusterId() != null) {
                exchange.getIn().setHeader("rabbitmq.CLUSTERID", (Object)properties.getClusterId());
            }
            if (properties.getContentEncoding() != null) {
                exchange.getIn().setHeader("rabbitmq.CONTENT_ENCODING", (Object)properties.getContentEncoding());
            }
            if (properties.getContentType() != null) {
                exchange.getIn().setHeader("rabbitmq.CONTENT_TYPE", (Object)properties.getContentType());
            }
            if (properties.getCorrelationId() != null) {
                exchange.getIn().setHeader("rabbitmq.CORRELATIONID", (Object)properties.getCorrelationId());
            }
            if (properties.getExpiration() != null) {
                exchange.getIn().setHeader("rabbitmq.EXPIRATION", (Object)properties.getExpiration());
            }
            if (properties.getMessageId() != null) {
                exchange.getIn().setHeader("rabbitmq.MESSAGE_ID", (Object)properties.getMessageId());
            }
            if (properties.getPriority() != null) {
                exchange.getIn().setHeader("rabbitmq.PRIORITY", (Object)properties.getPriority());
            }
            if (properties.getReplyTo() != null) {
                exchange.getIn().setHeader("rabbitmq.REPLY_TO", (Object)properties.getReplyTo());
            }
            if (properties.getTimestamp() != null) {
                exchange.getIn().setHeader("rabbitmq.TIMESTAMP", (Object)properties.getTimestamp());
            }
            if (properties.getUserId() != null) {
                exchange.getIn().setHeader("rabbitmq.USERID", (Object)properties.getUserId());
            }
        }

        public void start() throws IOException {
            this.tag = this.channel.basicConsume(RabbitMQConsumer.this.endpoint.getQueue(), RabbitMQConsumer.this.endpoint.isAutoAck(), (Consumer)this);
        }

        public void stop() throws IOException {
            if (this.tag != null) {
                this.channel.basicCancel(this.tag);
            }
            this.channel.close();
        }
    }
}

