/*
 * Decompiled with CFR 0.152.
 */
package org.wso2.carbon.inbound.endpoint.protocol.kafka;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import kafka.consumer.Blacklist;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.ConsumerTimeoutException;
import kafka.consumer.KafkaStream;
import kafka.consumer.TopicFilter;
import kafka.consumer.Whitelist;
import org.I0Itec.zkclient.exception.ZkTimeoutException;
import org.apache.synapse.SynapseException;
import org.wso2.carbon.inbound.endpoint.protocol.kafka.AbstractKafkaMessageListener;
import org.wso2.carbon.inbound.endpoint.protocol.kafka.InjectHandler;

public class KAFKAMessageListener
extends AbstractKafkaMessageListener {
    public KAFKAMessageListener(int threadCount, List<String> topics, Properties kafkaProperties, InjectHandler injectHandler) throws Exception {
        this.threadCount = threadCount;
        this.topics = topics;
        this.kafkaProperties = kafkaProperties;
        this.injectHandler = injectHandler;
    }

    @Override
    public boolean createKafkaConsumerConnector() throws Exception {
        boolean isCreated;
        log.debug((Object)"Create the connection and start to consume the streams");
        try {
            if (this.consumerConnector == null) {
                log.info((Object)"Creating Kafka Consumer Connector...");
                if (!this.kafkaProperties.containsKey("consumer.timeout.ms")) {
                    this.kafkaProperties.put("consumer.timeout.ms", "3000");
                }
                this.consumerConnector = Consumer.createJavaConsumerConnector((ConsumerConfig)new ConsumerConfig(this.kafkaProperties));
                log.info((Object)"Kafka Consumer Connector is created");
                this.start();
            }
            isCreated = true;
        }
        catch (ZkTimeoutException toe) {
            log.error((Object)(" Error in Creating Kafka Consumer Connector | ZkTimeout" + toe.getMessage()));
            throw new SynapseException(" Error in Creating Kafka Consumer Connector| ZkTimeout");
        }
        catch (Exception e) {
            log.error((Object)(" Error in Creating Kafka Consumer Connector." + e.getMessage()), (Throwable)e);
            throw new SynapseException(" Error in Creating Kafka Consumer Connector ", (Throwable)e);
        }
        return isCreated;
    }

    @Override
    public void start() throws Exception {
        log.debug((Object)"Start to consume the streams");
        try {
            log.info((Object)"Starting KAFKA consumer...");
            HashMap<String, Integer> topicCount = new HashMap<String, Integer>();
            if (this.topics != null && this.topics.size() > 0) {
                for (Object topic : this.topics) {
                    topicCount.put((String)topic, this.threadCount);
                }
                Map consumerStreams = this.consumerConnector.createMessageStreams(topicCount);
                this.consumerIte = new ArrayList();
                for (String topic : this.topics) {
                    List streams = (List)consumerStreams.get(topic);
                    this.startConsumers(streams);
                }
            } else if (this.kafkaProperties.getProperty("topic.filter") != null) {
                boolean isFromWhiteList = this.kafkaProperties.getProperty("filter.from.whitelist") == null || this.kafkaProperties.getProperty("filter.from.whitelist").isEmpty() ? Boolean.TRUE : Boolean.parseBoolean(this.kafkaProperties.getProperty("filter.from.whitelist"));
                List consumerStreams = isFromWhiteList ? this.consumerConnector.createMessageStreamsByFilter((TopicFilter)new Whitelist(this.kafkaProperties.getProperty("topic.filter")), this.threadCount) : this.consumerConnector.createMessageStreamsByFilter((TopicFilter)new Blacklist(this.kafkaProperties.getProperty("topic.filter")), this.threadCount);
                this.startConsumers(consumerStreams);
            }
        }
        catch (Exception e) {
            log.error((Object)("Error while Starting KAFKA consumer." + e.getMessage()), (Throwable)e);
            throw new SynapseException("Error while Starting KAFKA consumer.", (Throwable)e);
        }
    }

    protected void startConsumers(List<KafkaStream<byte[], byte[]>> streams) {
        if (streams.size() >= 1) {
            this.consumerIte.add(streams.get(0).iterator());
        }
    }

    @Override
    public void injectMessageToESB(String name) {
        if (this.consumerIte.size() == 1) {
            this.injectMessageToESB(name, (ConsumerIterator<byte[], byte[]>)((ConsumerIterator)this.consumerIte.get(0)));
        } else {
            log.debug((Object)"There are multiple topics to consume from not a single topic");
        }
    }

    public void injectMessageToESB(String sequenceName, ConsumerIterator<byte[], byte[]> consumerIterator) {
        byte[] msg = (byte[])consumerIterator.next().message();
        this.injectHandler.invoke(msg, sequenceName);
    }

    @Override
    public boolean hasNext() {
        if (this.consumerIte.size() == 1) {
            return this.hasNext((ConsumerIterator<byte[], byte[]>)((ConsumerIterator)this.consumerIte.get(0)));
        }
        log.debug((Object)"There are multiple topics to consume from not a single topic,");
        return false;
    }

    public boolean hasNext(ConsumerIterator<byte[], byte[]> consumerIterator) {
        try {
            return consumerIterator.hasNext();
        }
        catch (ConsumerTimeoutException e) {
            if (log.isDebugEnabled()) {
                log.debug((Object)"Topic has no new messages to consume.");
            }
            return false;
        }
        catch (Exception e) {
            if (log.isDebugEnabled()) {
                log.debug((Object)"Kafka listener is interrupted by server shutdown.", (Throwable)e);
            }
            return false;
        }
    }

    @Override
    public boolean hasMultipleTopicsToConsume() {
        return this.consumerIte.size() > 1;
    }

    @Override
    public void consumeMultipleTopics(String name) {
        for (ConsumerIterator consumerIterator : this.consumerIte) {
            if (!this.hasNext((ConsumerIterator<byte[], byte[]>)consumerIterator)) continue;
            this.injectMessageToESB(name, (ConsumerIterator<byte[], byte[]>)consumerIterator);
        }
    }
}

