/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.integration.kafka.listener;

import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import kafka.admin.AdminUtils$;
import kafka.api.OffsetRequest;
import kafka.common.ErrorMapping$;
import kafka.common.TopicExistsException;
import kafka.serializer.Decoder;
import kafka.utils.ZKStringSerializer$;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.exception.ZkInterruptedException;
import org.I0Itec.zkclient.serialize.ZkSerializer;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.Serializer;
import org.springframework.beans.factory.BeanInitializationException;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.integration.kafka.core.BrokerAddress;
import org.springframework.integration.kafka.core.DefaultConnectionFactory;
import org.springframework.integration.kafka.core.FetchRequest;
import org.springframework.integration.kafka.core.KafkaMessage;
import org.springframework.integration.kafka.core.KafkaMessageBatch;
import org.springframework.integration.kafka.core.KafkaTemplate;
import org.springframework.integration.kafka.core.Partition;
import org.springframework.integration.kafka.core.Result;
import org.springframework.integration.kafka.core.TopicNotFoundException;
import org.springframework.integration.kafka.core.ZookeeperConfiguration;
import org.springframework.integration.kafka.listener.AbstractOffsetManager;
import org.springframework.integration.kafka.listener.LongSerializerDecoder;
import org.springframework.integration.kafka.support.ProducerFactoryBean;
import org.springframework.integration.kafka.support.ProducerMetadata;
import org.springframework.integration.kafka.support.ZookeeperConnect;
import org.springframework.integration.kafka.util.LoggingUtils;
import org.springframework.integration.kafka.util.MessageUtils;
import org.springframework.retry.RetryCallback;
import org.springframework.retry.RetryContext;
import org.springframework.retry.RetryPolicy;
import org.springframework.retry.backoff.BackOffPolicy;
import org.springframework.retry.backoff.ExponentialBackOffPolicy;
import org.springframework.retry.policy.SimpleRetryPolicy;
import org.springframework.retry.support.RetryTemplate;
import org.springframework.util.Assert;
import org.springframework.util.ObjectUtils;

public class KafkaTopicOffsetManager
extends AbstractOffsetManager
implements InitializingBean {
    private static final KeySerializerDecoder KEY_CODEC = new KeySerializerDecoder();
    private static final LongSerializerDecoder VALUE_CODEC = new LongSerializerDecoder();
    public static final String CLEANUP_POLICY = "cleanup.policy";
    public static final String CLEANUP_POLICY_COMPACT = "compact";
    public static final String DELETE_RETENTION = "delete.retention.ms";
    public static final String SEGMENT_BYTES = "segment.bytes";
    private final ZookeeperConnect zookeeperConnect;
    private final String topic;
    private final KafkaTemplate kafkaTemplate;
    private final ConcurrentMap<Partition, Long> data = new ConcurrentHashMap<Partition, Long>();
    private ProducerMetadata.CompressionType compressionType = ProducerMetadata.CompressionType.none;
    private Producer<Key, Long> producer;
    private int maxSize = 10240;
    private int segmentSize = 25600;
    private int retentionTime = 60000;
    private int replicationFactor;
    private int batchBytes = 200;
    private int requiredAcks = 1;
    private int maxQueueBufferingTime;

    public KafkaTopicOffsetManager(ZookeeperConnect zookeeperConnect, String topic) {
        this(zookeeperConnect, topic, new HashMap<Partition, Long>());
    }

    public KafkaTopicOffsetManager(ZookeeperConnect zookeeperConnect, String topic, Map<Partition, Long> initialOffsets) {
        super(new DefaultConnectionFactory(new ZookeeperConfiguration(zookeeperConnect)), initialOffsets);
        Assert.notNull((Object)zookeeperConnect);
        this.zookeeperConnect = zookeeperConnect;
        this.kafkaTemplate = new KafkaTemplate(this.connectionFactory);
        this.topic = topic;
    }

    public void setMaxSize(int maxSize) {
        this.maxSize = maxSize;
    }

    public void setCompressionCodec(ProducerMetadata.CompressionType compressionType) {
        this.compressionType = compressionType;
    }

    public void setMaxQueueBufferingTime(int maxQueueBufferingTime) {
        this.maxQueueBufferingTime = maxQueueBufferingTime;
    }

    public void setSegmentSize(int segmentSize) {
        this.segmentSize = segmentSize;
    }

    public void setRetentionTime(int retentionTime) {
        this.retentionTime = retentionTime;
    }

    public void setReplicationFactor(int replicationFactor) {
        this.replicationFactor = replicationFactor;
    }

    public void setBatchBytes(int batchBytes) {
        this.batchBytes = batchBytes;
    }

    public void setRequiredAcks(int requiredAcks) {
        this.requiredAcks = requiredAcks;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void afterPropertiesSet() throws Exception {
        ((DefaultConnectionFactory)this.connectionFactory).afterPropertiesSet();
        ZkClient zkClient = new ZkClient(this.zookeeperConnect.getZkConnect(), Integer.parseInt(this.zookeeperConnect.getZkSessionTimeout()), Integer.parseInt(this.zookeeperConnect.getZkConnectionTimeout()), (ZkSerializer)ZKStringSerializer$.MODULE$);
        try {
            this.createCompactedTopicIfNotFound(zkClient);
            this.validateOffsetTopic(zkClient);
            Partition offsetPartition = new Partition(this.topic, 0);
            BrokerAddress offsetPartitionLeader = this.connectionFactory.getLeader(offsetPartition);
            this.readOffsetData(offsetPartition, offsetPartitionLeader);
            this.initializeProducer(offsetPartitionLeader);
        }
        finally {
            try {
                zkClient.close();
            }
            catch (ZkInterruptedException e) {
                this.log.error((Object)"Error while closing Zookeeper client", (Throwable)e);
            }
        }
    }

    @Override
    protected void doUpdateOffset(Partition partition, long offset) {
        this.data.put(partition, offset);
        this.producer.send(new ProducerRecord(this.topic, (Object)new Key(this.consumerId, partition), (Object)offset));
    }

    @Override
    protected void doRemoveOffset(Partition partition) {
        this.data.remove(partition);
        this.producer.send(new ProducerRecord(this.topic, (Object)new Key(this.consumerId, partition), null));
    }

    @Override
    protected Long doGetOffset(Partition partition) {
        return (Long)this.data.get(partition);
    }

    @Override
    public void flush() throws IOException {
    }

    @Override
    public void close() throws IOException {
        this.producer.close();
        try {
            ((DefaultConnectionFactory)this.connectionFactory).destroy();
        }
        catch (Exception e) {
            throw new IOException(e);
        }
    }

    private void createCompactedTopicIfNotFound(ZkClient zkClient) {
        Properties topicConfig = new Properties();
        topicConfig.setProperty(CLEANUP_POLICY, CLEANUP_POLICY_COMPACT);
        topicConfig.setProperty(DELETE_RETENTION, String.valueOf(this.retentionTime));
        topicConfig.setProperty(SEGMENT_BYTES, String.valueOf(this.segmentSize));
        try {
            this.replicationFactor = 1;
            AdminUtils$.MODULE$.createTopic(zkClient, this.topic, 1, this.replicationFactor, topicConfig);
        }
        catch (TopicExistsException e) {
            this.log.debug((Object)"Topic already exists", (Throwable)e);
        }
    }

    private void validateOffsetTopic(ZkClient zkClient) throws Exception {
        RetryTemplate retryValidateTopic = new RetryTemplate();
        retryValidateTopic.setRetryPolicy((RetryPolicy)new SimpleRetryPolicy(10, Collections.singletonMap(TopicNotFoundException.class, true)));
        ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
        backOffPolicy.setInitialInterval(100L);
        backOffPolicy.setMaxInterval(1000L);
        backOffPolicy.setMultiplier(2.0);
        retryValidateTopic.setBackOffPolicy((BackOffPolicy)backOffPolicy);
        Collection partitions = (Collection)retryValidateTopic.execute((RetryCallback)new RetryCallback<Collection<Partition>, Exception>(){

            public Collection<Partition> doWithRetry(RetryContext context) throws Exception {
                return KafkaTopicOffsetManager.this.connectionFactory.getPartitions(KafkaTopicOffsetManager.this.topic);
            }
        });
        if (partitions.size() > 1) {
            throw new BeanInitializationException("Offset management topic cannot have more than one partition");
        }
        Properties properties = AdminUtils$.MODULE$.fetchTopicConfig(zkClient, this.topic);
        if (!properties.containsKey(CLEANUP_POLICY) || !CLEANUP_POLICY_COMPACT.equals(properties.getProperty(CLEANUP_POLICY))) {
            throw new BeanInitializationException("Property 'cleanup.policy' must be set to 'compact' on offset topic");
        }
    }

    private void readOffsetData(Partition offsetManagementPartition, BrokerAddress leader) {
        Result<Long> earliestOffsetResult = this.connectionFactory.connect(leader).fetchInitialOffset(OffsetRequest.EarliestTime(), offsetManagementPartition);
        if (earliestOffsetResult.getErrors().size() > 0) {
            throw new BeanInitializationException("Cannot initialize offset manager, unable to read earliest offset", ErrorMapping$.MODULE$.exceptionFor(earliestOffsetResult.getError(offsetManagementPartition)));
        }
        Result<Long> latestOffsetResult = this.connectionFactory.connect(leader).fetchInitialOffset(OffsetRequest.LatestTime(), offsetManagementPartition);
        if (latestOffsetResult.getErrors().size() > 0) {
            throw new BeanInitializationException("Cannot initialize offset manager, unable to read latest offset");
        }
        long initialOffset = earliestOffsetResult.getResult(offsetManagementPartition);
        long finalOffset = latestOffsetResult.getResult(offsetManagementPartition);
        long readingOffset = initialOffset;
        while (readingOffset < finalOffset) {
            FetchRequest fetchRequest = new FetchRequest(offsetManagementPartition, readingOffset, this.maxSize);
            Result<KafkaMessageBatch> receive = this.kafkaTemplate.receive(Collections.singleton(fetchRequest));
            if (receive.getErrors().size() > 0) {
                throw new BeanInitializationException("Error while fetching initial offsets:", ErrorMapping$.MODULE$.exceptionFor(receive.getError(offsetManagementPartition)));
            }
            KafkaMessageBatch result = receive.getResult(offsetManagementPartition);
            for (KafkaMessage kafkaMessage : result.getMessages()) {
                this.checkAndAddData(kafkaMessage);
                readingOffset = kafkaMessage.getMetadata().getNextOffset();
            }
            if (this.log.isDebugEnabled()) {
                this.log.debug((Object)(this.data.size() + " entries in the final map"));
            }
            if (!this.log.isTraceEnabled()) continue;
            for (Map.Entry entry : this.data.entrySet()) {
                this.log.trace((Object)String.format("Final value for %s : %s", ((Partition)entry.getKey()).toString(), String.valueOf(entry.getValue())));
            }
        }
    }

    private void checkAndAddData(KafkaMessage kafkaMessage) {
        Key key = MessageUtils.decodeKey(kafkaMessage, KEY_CODEC);
        Long value = MessageUtils.decodePayload(kafkaMessage, VALUE_CODEC);
        if (this.log.isTraceEnabled()) {
            this.log.trace((Object)("Loading key " + key + " with value " + value));
        }
        if (key != null && ObjectUtils.nullSafeEquals((Object)this.consumerId, (Object)key.getConsumerId())) {
            if (null != value) {
                this.data.put(key.getPartition(), value);
            } else if (this.data.containsKey(key.getPartition())) {
                this.data.remove(key.getPartition());
            }
        }
    }

    private void initializeProducer(BrokerAddress leader) throws Exception {
        ProducerMetadata<Key, Long> producerMetadata = new ProducerMetadata<Key, Long>(this.topic, Key.class, Long.class, KEY_CODEC, VALUE_CODEC);
        producerMetadata.setBatchBytes(this.batchBytes);
        producerMetadata.setCompressionType(this.compressionType);
        Properties additionalProps = new Properties();
        additionalProps.setProperty("linger.ms", Integer.toString(this.maxQueueBufferingTime));
        additionalProps.setProperty("acks", Integer.toString(this.requiredAcks));
        ProducerFactoryBean<Key, Long> producerFactoryBean = new ProducerFactoryBean<Key, Long>(producerMetadata, leader.toString(), additionalProps);
        this.producer = producerFactoryBean.getObject();
    }

    private static int intFromBytes(byte[] bytes, int start) {
        return bytes[start] << 24 | (bytes[start + 1] & 0xFF) << 16 | (bytes[start + 2] & 0xFF) << 8 | bytes[start + 3] & 0xFF;
    }

    private static byte[] intToBytes(Integer message) {
        int value = message;
        return new byte[]{(byte)(value >>> 24), (byte)(value >>> 16), (byte)(value >>> 8), (byte)value};
    }

    public static class KeySerializerDecoder
    implements Serializer<Key>,
    Decoder<Key> {
        private static final Log log = LogFactory.getLog(KeySerializerDecoder.class);

        public Key fromBytes(byte[] bytes) {
            if (bytes == null || bytes.length <= 0) {
                return null;
            }
            try {
                int consumerIdSize = KafkaTopicOffsetManager.intFromBytes(bytes, 0);
                int topicIdSize = KafkaTopicOffsetManager.intFromBytes(bytes, consumerIdSize + 4);
                return new Key(new String(bytes, 4, consumerIdSize), new Partition(new String(bytes, consumerIdSize + 8, topicIdSize), KafkaTopicOffsetManager.intFromBytes(bytes, consumerIdSize + topicIdSize + 8)));
            }
            catch (Exception e) {
                if (log.isDebugEnabled()) {
                    log.debug((Object)("Cannot decode key:" + LoggingUtils.asCommaSeparatedHexDump(bytes)));
                }
                return null;
            }
        }

        public void configure(Map<String, ?> configs, boolean isKey) {
        }

        public byte[] serialize(String topic, Key data) {
            if (data == null) {
                return null;
            }
            try {
                byte[] consumerIdBytes = data.consumerId.getBytes("UTF-8");
                byte[] topicNameBytes = data.partition.getTopic().getBytes("UTF-8");
                byte[] partitionIdBytes = KafkaTopicOffsetManager.intToBytes(data.partition.getId());
                byte[] result = new byte[4 + consumerIdBytes.length + 4 + topicNameBytes.length + 4];
                System.arraycopy(KafkaTopicOffsetManager.intToBytes(consumerIdBytes.length), 0, result, 0, 4);
                System.arraycopy(consumerIdBytes, 0, result, 4, consumerIdBytes.length);
                System.arraycopy(KafkaTopicOffsetManager.intToBytes(topicNameBytes.length), 0, result, consumerIdBytes.length + 4, 4);
                System.arraycopy(topicNameBytes, 0, result, consumerIdBytes.length + 8, topicNameBytes.length);
                System.arraycopy(partitionIdBytes, 0, result, consumerIdBytes.length + topicNameBytes.length + 8, 4);
                return result;
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
        }

        public void close() {
        }
    }

    public static class Key {
        String consumerId;
        Partition partition;

        public Key(String consumerID, Partition partition) {
            Assert.notNull((Object)consumerID, (String)"Consumer Id cannot be null");
            Assert.notNull((Object)partition, (String)"Partition cannot be null");
            this.consumerId = consumerID;
            this.partition = partition;
        }

        public String getConsumerId() {
            return this.consumerId;
        }

        public Partition getPartition() {
            return this.partition;
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || this.getClass() != o.getClass()) {
                return false;
            }
            Key key = (Key)o;
            return this.consumerId.equals(key.consumerId) && this.partition.equals(key.partition);
        }

        public int hashCode() {
            int result = this.consumerId.hashCode();
            result = 31 * result + this.partition.hashCode();
            return result;
        }
    }
}

