/*
 * Decompiled with CFR 0.152.
 */
package org.apache.camel.component.seda;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.camel.Consumer;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.component.seda.SedaEndpoint;
import org.apache.camel.impl.ServiceSupport;
import org.apache.camel.util.concurrent.ExecutorServiceHelper;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

public class SedaConsumer
extends ServiceSupport
implements Consumer,
Runnable {
    private static final transient Log LOG = LogFactory.getLog(SedaConsumer.class);
    private SedaEndpoint endpoint;
    private Processor processor;
    private ExecutorService executor;

    public SedaConsumer(SedaEndpoint endpoint, Processor processor) {
        this.endpoint = endpoint;
        this.processor = processor;
    }

    public String toString() {
        return "SedaConsumer[" + this.endpoint.getEndpointUri() + "]";
    }

    public void run() {
        BlockingQueue<Exchange> queue = this.endpoint.getQueue();
        while (queue != null && this.isRunAllowed()) {
            Exchange exchange;
            try {
                exchange = queue.poll(1000L, TimeUnit.MILLISECONDS);
            }
            catch (InterruptedException e) {
                LOG.debug((Object)("Sleep interrupted, are we stopping? " + (this.isStopping() || this.isStopped())));
                continue;
            }
            if (exchange == null) continue;
            if (this.isRunAllowed()) {
                try {
                    this.processor.process(exchange);
                }
                catch (Exception e) {
                    LOG.error((Object)("Seda queue caught: " + e), (Throwable)e);
                }
                continue;
            }
            LOG.warn((Object)("This consumer is stopped during polling an exchange, so putting it back on the seda queue: " + exchange));
            try {
                queue.put(exchange);
            }
            catch (InterruptedException e) {
                LOG.debug((Object)("Sleep interrupted, are we stopping? " + (this.isStopping() || this.isStopped())));
            }
        }
    }

    protected void doStart() throws Exception {
        int poolSize = this.endpoint.getConcurrentConsumers();
        this.executor = ExecutorServiceHelper.newFixedThreadPool(poolSize, this.endpoint.getEndpointUri(), true);
        for (int i = 0; i < poolSize; ++i) {
            this.executor.execute(this);
        }
        this.endpoint.onStarted(this);
    }

    protected void doStop() throws Exception {
        this.endpoint.onStopped(this);
        this.executor.shutdownNow();
        this.executor = null;
    }
}

