/*
 * Decompiled with CFR 0.152.
 */
package org.apache.camel.component.rabbitmq;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.Envelope;
import java.io.IOException;
import java.util.HashMap;
import java.util.concurrent.ExecutorService;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.component.rabbitmq.RabbitMQEndpoint;
import org.apache.camel.impl.DefaultConsumer;

public class RabbitMQConsumer
extends DefaultConsumer {
    ExecutorService executor;
    Connection conn;
    Channel channel;
    private final RabbitMQEndpoint endpoint;

    public RabbitMQConsumer(RabbitMQEndpoint endpoint, Processor processor) {
        super((Endpoint)endpoint, processor);
        this.endpoint = endpoint;
    }

    protected void doStart() throws Exception {
        super.doStart();
        this.log.info("Starting RabbitMQ consumer");
        this.executor = this.endpoint.createExecutor();
        this.log.debug("Using executor {}", (Object)this.executor);
        this.conn = this.endpoint.connect(this.executor);
        this.log.debug("Using conn {}", (Object)this.conn);
        this.channel = this.conn.createChannel();
        this.log.debug("Using channel {}", (Object)this.channel);
        this.channel.exchangeDeclare(this.endpoint.getExchangeName(), "direct", this.endpoint.isDurable(), this.endpoint.isAutoDelete(), new HashMap());
        this.channel.queueDeclare(this.endpoint.getQueue(), this.endpoint.isDurable(), false, this.endpoint.isAutoDelete(), null);
        this.channel.queueBind(this.endpoint.getQueue(), this.endpoint.getExchangeName(), this.endpoint.getRoutingKey() == null ? "" : this.endpoint.getRoutingKey());
        this.channel.basicConsume(this.endpoint.getQueue(), this.endpoint.isAutoAck(), (Consumer)new RabbitConsumer(this, this.channel));
    }

    protected void doStop() throws Exception {
        super.doStop();
        this.log.info("Stopping RabbitMQ consumer");
        if (this.conn != null) {
            try {
                this.conn.close();
            }
            catch (Exception exception) {
                // empty catch block
            }
        }
        this.channel = null;
        this.conn = null;
        if (this.executor != null) {
            if (this.getEndpoint() != null && this.getEndpoint().getCamelContext() != null) {
                this.getEndpoint().getCamelContext().getExecutorServiceManager().shutdownNow(this.executor);
            } else {
                this.executor.shutdownNow();
            }
        }
        this.executor = null;
    }

    class RabbitConsumer
    extends com.rabbitmq.client.DefaultConsumer {
        private final RabbitMQConsumer consumer;
        private final Channel channel;

        public RabbitConsumer(RabbitMQConsumer consumer, Channel channel) {
            super(channel);
            this.consumer = consumer;
            this.channel = channel;
        }

        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
            Exchange exchange = this.consumer.endpoint.createRabbitExchange(envelope, body);
            RabbitMQConsumer.this.log.trace("Created exchange [exchange={}]", new Object[]{exchange});
            try {
                this.consumer.getProcessor().process(exchange);
                long deliveryTag = envelope.getDeliveryTag();
                RabbitMQConsumer.this.log.trace("Acknowleding receipt [delivery_tag={}]", (Object)deliveryTag);
                this.channel.basicAck(deliveryTag, false);
            }
            catch (Exception e) {
                RabbitMQConsumer.this.getExceptionHandler().handleException("Error processing exchange", exchange, (Throwable)e);
            }
        }
    }
}

