/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.integration.kafka.core;

import com.gs.collections.api.block.function.Function;
import com.gs.collections.api.block.function.Function2;
import com.gs.collections.api.list.MutableList;
import com.gs.collections.api.map.MutableMap;
import com.gs.collections.api.tuple.Pair;
import com.gs.collections.impl.list.mutable.FastList;
import com.gs.collections.impl.tuple.Tuples;
import com.gs.collections.impl.utility.LazyIterate;
import com.gs.collections.impl.utility.MapIterate;
import java.util.Arrays;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.atomic.AtomicInteger;
import kafka.api.FetchRequestBuilder;
import kafka.api.OffsetCommitRequest;
import kafka.api.PartitionOffsetRequestInfo;
import kafka.cluster.Broker;
import kafka.common.ErrorMapping;
import kafka.common.OffsetAndMetadata;
import kafka.common.OffsetMetadataAndError;
import kafka.common.TopicAndPartition;
import kafka.javaapi.FetchResponse;
import kafka.javaapi.OffsetCommitResponse;
import kafka.javaapi.OffsetFetchRequest;
import kafka.javaapi.OffsetFetchResponse;
import kafka.javaapi.OffsetRequest;
import kafka.javaapi.OffsetResponse;
import kafka.javaapi.PartitionMetadata;
import kafka.javaapi.TopicMetadata;
import kafka.javaapi.TopicMetadataRequest;
import kafka.javaapi.TopicMetadataResponse;
import kafka.javaapi.consumer.SimpleConsumer;
import kafka.javaapi.message.ByteBufferMessageSet;
import kafka.message.MessageAndOffset;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.integration.kafka.core.BrokerAddress;
import org.springframework.integration.kafka.core.Connection;
import org.springframework.integration.kafka.core.ConsumerException;
import org.springframework.integration.kafka.core.FetchRequest;
import org.springframework.integration.kafka.core.KafkaMessage;
import org.springframework.integration.kafka.core.KafkaMessageBatch;
import org.springframework.integration.kafka.core.KafkaMessageMetadata;
import org.springframework.integration.kafka.core.Partition;
import org.springframework.integration.kafka.core.Result;
import org.springframework.integration.kafka.core.ResultBuilder;
import org.springframework.util.Assert;

public class DefaultConnection
implements Connection {
    private static Log log = LogFactory.getLog(DefaultConnection.class);
    private final AtomicInteger correlationIdCounter = new AtomicInteger(new Random(new Date().getTime()).nextInt());
    private final SimpleConsumer simpleConsumer;
    private final BrokerAddress brokerAddress;
    private int minBytes;
    private int maxWait;

    public DefaultConnection(BrokerAddress brokerAddress, String clientId, int bufferSize, int soTimeout, int minBytes, int maxWait) {
        this.brokerAddress = brokerAddress;
        this.minBytes = minBytes;
        this.maxWait = maxWait;
        this.simpleConsumer = new SimpleConsumer(brokerAddress.getHost(), brokerAddress.getPort(), soTimeout, bufferSize, clientId);
    }

    @Override
    public BrokerAddress getBrokerAddress() {
        return this.brokerAddress;
    }

    @Override
    public void close() {
        this.simpleConsumer.close();
    }

    @Override
    public Result<KafkaMessageBatch> fetch(FetchRequest ... requests) throws ConsumerException {
        FetchResponse fetchResponse;
        FetchRequestBuilder fetchRequestBuilder = new FetchRequestBuilder();
        for (FetchRequest request : requests) {
            Partition partition = request.getPartition();
            long offset = request.getOffset();
            int maxSize = request.getMaxSizeInBytes();
            fetchRequestBuilder.addFetch(partition.getTopic(), partition.getId(), offset, maxSize);
        }
        try {
            fetchResponse = this.simpleConsumer.fetch(fetchRequestBuilder.maxWait(this.maxWait).minBytes(this.minBytes).build());
        }
        catch (Exception e) {
            throw new ConsumerException(e);
        }
        ResultBuilder resultBuilder = new ResultBuilder();
        for (FetchRequest request : requests) {
            Partition partition = request.getPartition();
            if (log.isTraceEnabled()) {
                log.trace((Object)("Reading from " + partition + "@" + request.getOffset()));
            }
            short errorCode = fetchResponse.errorCode(partition.getTopic(), partition.getId());
            if (ErrorMapping.NoError() == errorCode) {
                ByteBufferMessageSet messageSet = fetchResponse.messageSet(partition.getTopic(), partition.getId());
                MutableList kafkaMessages = LazyIterate.collect((Iterable)messageSet, (Function)new ConvertToKafkaMessageFunction(request)).toList();
                long highWatermark = fetchResponse.highWatermark(partition.getTopic(), partition.getId());
                resultBuilder.add(partition).withResult(new KafkaMessageBatch(partition, (List<KafkaMessage>)kafkaMessages, highWatermark));
                continue;
            }
            resultBuilder.add(partition).withError(errorCode);
        }
        return resultBuilder.build();
    }

    @Override
    public Result<Long> fetchStoredOffsetsForConsumer(String consumerId, Partition ... partitions) throws ConsumerException {
        FastList topicsAndPartitions = FastList.newList(Arrays.asList(partitions)).collect((Function)new ConvertToTopicAndPartitionFunction());
        OffsetFetchRequest offsetFetchRequest = new OffsetFetchRequest(consumerId, (List)topicsAndPartitions, kafka.api.OffsetFetchRequest.CurrentVersion(), this.createCorrelationId().intValue(), this.simpleConsumer.clientId());
        OffsetFetchResponse offsetFetchResponse = null;
        try {
            offsetFetchResponse = this.simpleConsumer.fetchOffsets(offsetFetchRequest);
        }
        catch (Exception e) {
            throw new ConsumerException(e);
        }
        ResultBuilder resultBuilder = new ResultBuilder();
        for (Partition partition : partitions) {
            OffsetMetadataAndError offsetMetadataAndError = (OffsetMetadataAndError)offsetFetchResponse.offsets().get(new TopicAndPartition(partition.getTopic(), partition.getId()));
            short errorCode = offsetMetadataAndError.error();
            if (ErrorMapping.NoError() == errorCode) {
                resultBuilder.add(partition).withResult(offsetMetadataAndError.offset());
                continue;
            }
            resultBuilder.add(partition).withError(errorCode);
        }
        return resultBuilder.build();
    }

    @Override
    public Result<Long> fetchInitialOffset(long referenceTime, Partition ... partitions) throws ConsumerException {
        Assert.isTrue((partitions.length > 0 ? 1 : 0) != 0, (String)"Must provide at least one partition");
        HashMap<TopicAndPartition, PartitionOffsetRequestInfo> infoMap = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
        for (Partition partition : partitions) {
            infoMap.put(new TopicAndPartition(partition.getTopic(), partition.getId()), new PartitionOffsetRequestInfo(referenceTime, 1));
        }
        OffsetRequest offsetRequest = new OffsetRequest(infoMap, kafka.api.OffsetRequest.CurrentVersion(), this.simpleConsumer.clientId());
        OffsetResponse offsetResponse = null;
        try {
            offsetResponse = this.simpleConsumer.getOffsetsBefore(offsetRequest);
        }
        catch (Exception e) {
            throw new ConsumerException(e);
        }
        ResultBuilder resultBuilder = new ResultBuilder();
        for (Partition partition : partitions) {
            short errorCode = offsetResponse.errorCode(partition.getTopic(), partition.getId());
            if (ErrorMapping.NoError() == errorCode) {
                long[] offsets = offsetResponse.offsets(partition.getTopic(), partition.getId());
                if (offsets.length == 0) {
                    throw new ConsumerException("Inconsistent response: no error has been returned, but no offsets either");
                }
                resultBuilder.add(partition).withResult(offsets[0]);
                continue;
            }
            resultBuilder.add(partition).withError(errorCode);
        }
        return resultBuilder.build();
    }

    @Override
    public Result<Void> commitOffsetsForConsumer(String consumerId, Map<Partition, Long> offsets) throws ConsumerException {
        MutableMap requestInfo = MapIterate.collect(offsets, (Function2)new CreateRequestInfoMapEntryFunction());
        OffsetCommitResponse offsetCommitResponse = null;
        try {
            offsetCommitResponse = this.simpleConsumer.commitOffsets(new kafka.javaapi.OffsetCommitRequest(consumerId, (Map)requestInfo, this.createCorrelationId().intValue(), this.simpleConsumer.clientId(), OffsetCommitRequest.CurrentVersion()));
        }
        catch (Exception e) {
            throw new ConsumerException(e);
        }
        ResultBuilder resultBuilder = new ResultBuilder();
        for (TopicAndPartition topicAndPartition : requestInfo.keySet()) {
            if (!offsetCommitResponse.errors().containsKey(topicAndPartition)) continue;
            Partition partition = new Partition(topicAndPartition.topic(), topicAndPartition.partition());
            resultBuilder.add(partition).withError((Short)offsetCommitResponse.errors().get(topicAndPartition));
        }
        return resultBuilder.build();
    }

    @Override
    @Deprecated
    public Result<BrokerAddress> findLeaders(String ... topics) throws ConsumerException {
        TopicMetadataRequest topicMetadataRequest = new TopicMetadataRequest(Arrays.asList(topics), this.createCorrelationId().intValue());
        TopicMetadataResponse topicMetadataResponse = null;
        try {
            topicMetadataResponse = this.simpleConsumer.send(topicMetadataRequest);
        }
        catch (Exception e) {
            throw new ConsumerException(e);
        }
        ResultBuilder resultBuilder = new ResultBuilder();
        for (TopicMetadata topicMetadata : topicMetadataResponse.topicsMetadata()) {
            if (topicMetadata.errorCode() != ErrorMapping.NoError()) {
                resultBuilder.add(new Partition(topicMetadata.topic(), -1)).withError(topicMetadata.errorCode());
                continue;
            }
            for (PartitionMetadata partitionMetadata : topicMetadata.partitionsMetadata()) {
                Partition partition = new Partition(topicMetadata.topic(), partitionMetadata.partitionId());
                if (ErrorMapping.NoError() == partitionMetadata.errorCode()) {
                    Broker leader = partitionMetadata.leader();
                    BrokerAddress result = new BrokerAddress(leader.host(), leader.port());
                    resultBuilder.add(partition).withResult(result);
                    continue;
                }
                resultBuilder.add(partition).withError(partitionMetadata.errorCode());
            }
        }
        return resultBuilder.build();
    }

    private Integer createCorrelationId() {
        return this.correlationIdCounter.incrementAndGet();
    }

    private static class CreateRequestInfoMapEntryFunction
    implements Function2<Partition, Long, Pair<TopicAndPartition, OffsetAndMetadata>> {
        private CreateRequestInfoMapEntryFunction() {
        }

        public Pair<TopicAndPartition, OffsetAndMetadata> value(Partition partition, Long offset) {
            return Tuples.pair((Object)new TopicAndPartition(partition.getTopic(), partition.getId()), (Object)new OffsetAndMetadata(offset.longValue(), OffsetAndMetadata.NoMetadata(), OffsetAndMetadata.InvalidTime()));
        }
    }

    private static class ConvertToTopicAndPartitionFunction
    implements Function<Partition, TopicAndPartition> {
        private ConvertToTopicAndPartitionFunction() {
        }

        public TopicAndPartition valueOf(Partition partition) {
            return new TopicAndPartition(partition.getTopic(), partition.getId());
        }
    }

    private static class ConvertToKafkaMessageFunction
    implements Function<MessageAndOffset, KafkaMessage> {
        private final FetchRequest request;

        public ConvertToKafkaMessageFunction(FetchRequest request) {
            this.request = request;
        }

        public KafkaMessage valueOf(MessageAndOffset messageAndOffset) {
            return new KafkaMessage(messageAndOffset.message(), new KafkaMessageMetadata(this.request.getPartition(), messageAndOffset.offset(), messageAndOffset.nextOffset()));
        }
    }
}

