/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.integration.kafka.listener;

import com.gs.collections.impl.factory.Maps;
import java.io.IOException;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import kafka.client.ClientUtils$;
import kafka.common.ErrorMapping;
import kafka.common.OffsetAndMetadata;
import kafka.network.BlockingChannel;
import kafka.utils.ZKStringSerializer$;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.serialize.ZkSerializer;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.integration.kafka.core.BrokerAddress;
import org.springframework.integration.kafka.core.Configuration;
import org.springframework.integration.kafka.core.Connection;
import org.springframework.integration.kafka.core.ConnectionFactory;
import org.springframework.integration.kafka.core.ConsumerException;
import org.springframework.integration.kafka.core.DefaultConnectionFactory;
import org.springframework.integration.kafka.core.Partition;
import org.springframework.integration.kafka.core.PartitionNotFoundException;
import org.springframework.integration.kafka.core.Result;
import org.springframework.integration.kafka.listener.AbstractOffsetManager;
import org.springframework.integration.kafka.support.ZookeeperConnect;
import org.springframework.retry.RetryCallback;
import org.springframework.retry.RetryContext;
import org.springframework.retry.RetryListener;
import org.springframework.retry.RetryPolicy;
import org.springframework.retry.backoff.BackOffPolicy;
import org.springframework.retry.backoff.ExponentialBackOffPolicy;
import org.springframework.retry.listener.RetryListenerSupport;
import org.springframework.retry.policy.SimpleRetryPolicy;
import org.springframework.retry.support.RetryTemplate;
import org.springframework.util.Assert;

public class KafkaNativeOffsetManager
extends AbstractOffsetManager
implements InitializingBean {
    private static final String PARTITION_ATTRIBUTE = "partition";
    private final Map<Partition, BrokerAddress> offsetManagerBrokerAddressCache = new ConcurrentHashMap<Partition, BrokerAddress>();
    private final ZkClient zkClient;
    private RetryTemplate retryTemplate;

    public KafkaNativeOffsetManager(ConnectionFactory connectionFactory, ZookeeperConnect zookeeperConnect) {
        this(connectionFactory, zookeeperConnect, Collections.emptyMap());
    }

    public KafkaNativeOffsetManager(ConnectionFactory connectionFactory, ZookeeperConnect zookeeperConnect, Map<Partition, Long> initialOffsets) {
        super(connectionFactory, initialOffsets);
        Assert.notNull((Object)zookeeperConnect, (String)"'zookeeperConnect' must not be null.");
        this.zkClient = new ZkClient(zookeeperConnect.getZkConnect(), Integer.parseInt(zookeeperConnect.getZkSessionTimeout()), Integer.parseInt(zookeeperConnect.getZkConnectionTimeout()), (ZkSerializer)ZKStringSerializer$.MODULE$);
    }

    public void setRetryTemplate(RetryTemplate retryTemplate) {
        this.retryTemplate = retryTemplate;
    }

    public void afterPropertiesSet() throws Exception {
        if (this.retryTemplate == null) {
            this.retryTemplate = new RetryTemplate();
            this.retryTemplate.registerListener((RetryListener)new ResetOffsetManagerBrokerAddressRetryListener());
            SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();
            retryPolicy.setMaxAttempts(5);
            this.retryTemplate.setRetryPolicy((RetryPolicy)retryPolicy);
            ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
            backOffPolicy.setInitialInterval(125L);
            backOffPolicy.setMaxInterval(5000L);
            backOffPolicy.setMultiplier(2.0);
            this.retryTemplate.setBackOffPolicy((BackOffPolicy)backOffPolicy);
        }
    }

    @Override
    protected Long doGetOffset(final Partition partition) {
        Long offset = (Long)this.retryTemplate.execute((RetryCallback)new RetryCallback<Long, RuntimeException>(){

            public Long doWithRetry(RetryContext context) throws RuntimeException {
                context.setAttribute(KafkaNativeOffsetManager.PARTITION_ATTRIBUTE, (Object)partition);
                Result<Long> result = KafkaNativeOffsetManager.this.getOffsetManagerConnection(partition).fetchStoredOffsetsForConsumer(KafkaNativeOffsetManager.this.getConsumerId(), partition);
                KafkaNativeOffsetManager.this.checkResultForErrors(result, partition);
                return result.getResult(partition);
            }

            public String toString() {
                return String.format("fetchStoredOffsetsForConsumer(%s, %s)", KafkaNativeOffsetManager.this.getConsumerId(), partition);
            }
        });
        if (offset != null && offset < 0L) {
            return null;
        }
        return offset;
    }

    @Override
    protected void doUpdateOffset(final Partition partition, final long offset) {
        this.retryTemplate.execute((RetryCallback)new RetryCallback<Void, RuntimeException>(){

            public Void doWithRetry(RetryContext context) throws RuntimeException {
                context.setAttribute(KafkaNativeOffsetManager.PARTITION_ATTRIBUTE, (Object)partition);
                Result<Void> result = KafkaNativeOffsetManager.this.getOffsetManagerConnection(partition).commitOffsetsForConsumer(KafkaNativeOffsetManager.this.getConsumerId(), Maps.immutable.of((Object)partition, (Object)offset).castToMap());
                KafkaNativeOffsetManager.this.checkResultForErrors(result, partition);
                return null;
            }

            public String toString() {
                return String.format("commitOffsetsForConsumer(%s, %s, %s)", KafkaNativeOffsetManager.this.getConsumerId(), partition, offset);
            }
        });
    }

    @Override
    protected void doRemoveOffset(Partition partition) {
        this.doUpdateOffset(partition, OffsetAndMetadata.InvalidOffset());
    }

    @Override
    public void close() throws IOException {
        this.zkClient.close();
    }

    @Override
    public void flush() throws IOException {
    }

    private BrokerAddress getOffsetManagerBrokerAddress(Partition partition) {
        BrokerAddress brokerAddress = this.offsetManagerBrokerAddressCache.get(partition);
        if (brokerAddress == null) {
            int socketTimeoutMs = 30000;
            int retryBackOffMs = 1000;
            if (this.connectionFactory instanceof DefaultConnectionFactory) {
                Configuration configuration = ((DefaultConnectionFactory)this.connectionFactory).getConfiguration();
                socketTimeoutMs = configuration.getSocketTimeout();
                retryBackOffMs = configuration.getBackOff();
            }
            BlockingChannel channel = ClientUtils$.MODULE$.channelToOffsetManager(this.getConsumerId(), this.zkClient, socketTimeoutMs, retryBackOffMs);
            brokerAddress = new BrokerAddress(channel.host(), channel.port());
            if (this.log.isDebugEnabled()) {
                this.log.debug((Object)String.format("Offset manager for [%s] is at [%s].", partition, brokerAddress));
            }
            this.offsetManagerBrokerAddressCache.put(partition, brokerAddress);
            channel.disconnect();
        }
        return brokerAddress;
    }

    private Connection getOffsetManagerConnection(Partition partition) {
        return this.connectionFactory.connect(this.getOffsetManagerBrokerAddress(partition));
    }

    private void checkResultForErrors(Result<?> result, Partition partition) {
        if (result.getErrors().containsKey(partition)) {
            short errorCode = result.getError(partition);
            if (errorCode == ErrorMapping.UnknownTopicOrPartitionCode()) {
                throw new PartitionNotFoundException(partition);
            }
            if (errorCode != ErrorMapping.NoError()) {
                throw new ConsumerException(ErrorMapping.exceptionFor((short)errorCode));
            }
        }
    }

    private class ResetOffsetManagerBrokerAddressRetryListener
    extends RetryListenerSupport {
        private ResetOffsetManagerBrokerAddressRetryListener() {
        }

        public <T, E extends Throwable> void onError(RetryContext context, RetryCallback<T, E> callback, Throwable t) {
            Partition partition;
            if (KafkaNativeOffsetManager.this.log.isWarnEnabled()) {
                KafkaNativeOffsetManager.this.log.warn((Object)("Retrying kafka operation [" + callback + "] due to [" + t + "]"), t);
            }
            if ((partition = (Partition)context.getAttribute(KafkaNativeOffsetManager.PARTITION_ATTRIBUTE)) != null) {
                KafkaNativeOffsetManager.this.offsetManagerBrokerAddressCache.remove(partition);
            }
        }
    }
}

