/*
 * Decompiled with CFR 0.152.
 */
package org.apache.camel.component.kafka;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.MessageAndMetadata;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.component.kafka.KafkaEndpoint;
import org.apache.camel.impl.DefaultConsumer;

public class KafkaConsumer
extends DefaultConsumer {
    protected ExecutorService executor;
    private final KafkaEndpoint endpoint;
    private final Processor processor;
    private ConsumerConnector consumer;

    public KafkaConsumer(KafkaEndpoint endpoint, Processor processor) {
        super((Endpoint)endpoint, processor);
        this.endpoint = endpoint;
        this.processor = processor;
        if (endpoint.getZookeeperConnect() == null) {
            throw new IllegalArgumentException("zookeeper host or zookeeper connect must be specified");
        }
        if (endpoint.getGroupId() == null) {
            throw new IllegalArgumentException("groupId must not be null");
        }
    }

    Properties getProps() {
        Properties props = this.endpoint.getConfiguration().createConsumerProperties();
        props.put("zookeeper.connect", this.endpoint.getZookeeperConnect());
        props.put("group.id", this.endpoint.getGroupId());
        return props;
    }

    protected void doStart() throws Exception {
        super.doStart();
        this.log.info("Starting Kafka consumer");
        this.consumer = Consumer.createJavaConsumerConnector((ConsumerConfig)new ConsumerConfig(this.getProps()));
        HashMap<String, Integer> topicCountMap = new HashMap<String, Integer>();
        topicCountMap.put(this.endpoint.getTopic(), this.endpoint.getConsumerStreams());
        Map consumerMap = this.consumer.createMessageStreams(topicCountMap);
        List streams = (List)consumerMap.get(this.endpoint.getTopic());
        this.executor = this.endpoint.createExecutor();
        for (KafkaStream stream : streams) {
            this.executor.submit(new ConsumerTask((KafkaStream<byte[], byte[]>)stream));
        }
    }

    protected void doStop() throws Exception {
        super.doStop();
        this.log.info("Stopping Kafka consumer");
        if (this.consumer != null) {
            this.consumer.shutdown();
        }
        if (this.executor != null) {
            if (this.getEndpoint() != null && this.getEndpoint().getCamelContext() != null) {
                this.getEndpoint().getCamelContext().getExecutorServiceManager().shutdownNow(this.executor);
            } else {
                this.executor.shutdownNow();
            }
        }
        this.executor = null;
    }

    class ConsumerTask
    implements Runnable {
        private KafkaStream<byte[], byte[]> stream;

        public ConsumerTask(KafkaStream<byte[], byte[]> stream) {
            this.stream = stream;
        }

        @Override
        public void run() {
            for (MessageAndMetadata mm : this.stream) {
                Exchange exchange = KafkaConsumer.this.endpoint.createKafkaExchange((MessageAndMetadata<byte[], byte[]>)mm);
                try {
                    KafkaConsumer.this.processor.process(exchange);
                }
                catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

