/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.kafka.core;

import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.OutOfOrderSequenceException;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.AppInfoParser;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextStoppedEvent;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.support.TransactionSupport;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;

public class DefaultKafkaProducerFactory<K, V>
implements ProducerFactory<K, V>,
ApplicationContextAware,
ApplicationListener<ContextStoppedEvent>,
DisposableBean {
    private static final Duration DEFAULT_PHYSICAL_CLOSE_TIMEOUT = Duration.ofSeconds(30L);
    private static final Log logger = LogFactory.getLog(DefaultKafkaProducerFactory.class);
    private final Map<String, Object> configs;
    private final AtomicInteger transactionIdSuffix = new AtomicInteger();
    private final BlockingQueue<CloseSafeProducer<K, V>> cache = new LinkedBlockingQueue<CloseSafeProducer<K, V>>();
    private final Map<String, CloseSafeProducer<K, V>> consumerProducers = new HashMap<String, CloseSafeProducer<K, V>>();
    private final AtomicInteger clientIdCounter = new AtomicInteger();
    private volatile CloseSafeProducer<K, V> producer;
    private Serializer<K> keySerializer;
    private Serializer<V> valueSerializer;
    private Duration physicalCloseTimeout = DEFAULT_PHYSICAL_CLOSE_TIMEOUT;
    private String transactionIdPrefix;
    private ApplicationContext applicationContext;
    private boolean producerPerConsumerPartition = true;
    private String clientIdPrefix;

    public DefaultKafkaProducerFactory(Map<String, Object> configs) {
        this(configs, null, null);
    }

    public DefaultKafkaProducerFactory(Map<String, Object> configs, @Nullable Serializer<K> keySerializer, @Nullable Serializer<V> valueSerializer) {
        this.configs = new HashMap<String, Object>(configs);
        this.keySerializer = keySerializer;
        this.valueSerializer = valueSerializer;
        if (configs.get("client.id") instanceof String) {
            this.clientIdPrefix = (String)configs.get("client.id");
        }
    }

    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        this.applicationContext = applicationContext;
    }

    public void setKeySerializer(@Nullable Serializer<K> keySerializer) {
        this.keySerializer = keySerializer;
    }

    public void setValueSerializer(@Nullable Serializer<V> valueSerializer) {
        this.valueSerializer = valueSerializer;
    }

    public void setPhysicalCloseTimeout(int physicalCloseTimeout) {
        this.physicalCloseTimeout = Duration.ofSeconds(physicalCloseTimeout);
    }

    public void setTransactionIdPrefix(String transactionIdPrefix) {
        Assert.notNull((Object)transactionIdPrefix, (String)"'transactionIdPrefix' cannot be null");
        this.transactionIdPrefix = transactionIdPrefix;
        this.enableIdempotentBehaviour();
    }

    private void enableIdempotentBehaviour() {
        Object previousValue = this.configs.putIfAbsent("enable.idempotence", true);
        if (logger.isDebugEnabled() && Boolean.FALSE.equals(previousValue)) {
            logger.debug((Object)"The 'enable.idempotence' is set to false, may result in duplicate messages");
        }
    }

    public void setProducerPerConsumerPartition(boolean producerPerConsumerPartition) {
        this.producerPerConsumerPartition = producerPerConsumerPartition;
    }

    @Override
    public boolean isProducerPerConsumerPartition() {
        return this.producerPerConsumerPartition;
    }

    public Map<String, Object> getConfigurationProperties() {
        return Collections.unmodifiableMap(this.configs);
    }

    @Override
    public boolean transactionCapable() {
        return this.transactionIdPrefix != null;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void destroy() {
        CloseSafeProducer producerToClose;
        Object object = this;
        synchronized (object) {
            producerToClose = this.producer;
            this.producer = null;
        }
        if (producerToClose != null) {
            producerToClose.delegate.close(this.physicalCloseTimeout.getSeconds(), TimeUnit.SECONDS);
        }
        producerToClose = (CloseSafeProducer)this.cache.poll();
        while (producerToClose != null) {
            try {
                producerToClose.delegate.close(this.physicalCloseTimeout.getSeconds(), TimeUnit.SECONDS);
            }
            catch (Exception e) {
                logger.error((Object)"Exception while closing producer", (Throwable)e);
            }
            producerToClose = (CloseSafeProducer)this.cache.poll();
        }
        object = this.consumerProducers;
        synchronized (object) {
            this.consumerProducers.forEach((k, v) -> ((CloseSafeProducer)v).delegate.close(this.physicalCloseTimeout.getSeconds(), TimeUnit.SECONDS));
            this.consumerProducers.clear();
        }
    }

    public void onApplicationEvent(ContextStoppedEvent event) {
        if (event.getApplicationContext().equals(this.applicationContext)) {
            this.reset();
        }
    }

    @Deprecated
    public void start() {
    }

    @Deprecated
    public void stop() {
        this.reset();
    }

    public void reset() {
        try {
            this.destroy();
        }
        catch (Exception e) {
            logger.error((Object)"Exception while closing producer", (Throwable)e);
        }
    }

    @Deprecated
    public boolean isRunning() {
        return true;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public Producer<K, V> createProducer() {
        if (this.transactionIdPrefix != null) {
            if (this.producerPerConsumerPartition) {
                return this.createTransactionalProducerForPartition();
            }
            return this.createTransactionalProducer();
        }
        DefaultKafkaProducerFactory defaultKafkaProducerFactory = this;
        synchronized (defaultKafkaProducerFactory) {
            if (this.producer == null) {
                this.producer = new CloseSafeProducer<K, V>(this.createKafkaProducer(), this::removeProducer, this.physicalCloseTimeout);
            }
            return this.producer;
        }
    }

    protected Producer<K, V> createKafkaProducer() {
        if (this.clientIdPrefix == null) {
            return new KafkaProducer(this.configs, this.keySerializer, this.valueSerializer);
        }
        HashMap<String, Object> newConfigs = new HashMap<String, Object>(this.configs);
        newConfigs.put("client.id", this.clientIdPrefix + "-" + this.clientIdCounter.incrementAndGet());
        return new KafkaProducer(newConfigs, this.keySerializer, this.valueSerializer);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    Producer<K, V> createTransactionalProducerForPartition() {
        String suffix = TransactionSupport.getTransactionIdSuffix();
        if (suffix == null) {
            return this.createTransactionalProducer();
        }
        Map<String, CloseSafeProducer<K, V>> map = this.consumerProducers;
        synchronized (map) {
            if (!this.consumerProducers.containsKey(suffix)) {
                CloseSafeProducer<K, V> newProducer = this.doCreateTxProducer(suffix, this::removeConsumerProducer);
                this.consumerProducers.put(suffix, newProducer);
                return newProducer;
            }
            return this.consumerProducers.get(suffix);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void removeConsumerProducer(CloseSafeProducer<K, V> producer) {
        Map<String, CloseSafeProducer<K, V>> map = this.consumerProducers;
        synchronized (map) {
            Iterator<Map.Entry<String, CloseSafeProducer<K, V>>> iterator = this.consumerProducers.entrySet().iterator();
            while (iterator.hasNext()) {
                if (!iterator.next().getValue().equals(producer)) continue;
                iterator.remove();
                break;
            }
        }
    }

    protected final synchronized void removeProducer(CloseSafeProducer<K, V> producerToRemove) {
        if (producerToRemove.equals(this.producer)) {
            this.producer = null;
        }
    }

    protected Producer<K, V> createTransactionalProducer() {
        Producer cachedProducer = (Producer)this.cache.poll();
        if (cachedProducer == null) {
            return this.doCreateTxProducer("" + this.transactionIdSuffix.getAndIncrement(), null);
        }
        return cachedProducer;
    }

    private CloseSafeProducer<K, V> doCreateTxProducer(String suffix, Consumer<CloseSafeProducer<K, V>> remover) {
        HashMap<String, Object> newProducerConfigs = new HashMap<String, Object>(this.configs);
        newProducerConfigs.put("transactional.id", this.transactionIdPrefix + suffix);
        if (this.clientIdPrefix != null) {
            newProducerConfigs.put("client.id", this.clientIdPrefix + "-" + this.clientIdCounter.incrementAndGet());
        }
        KafkaProducer newProducer = new KafkaProducer(newProducerConfigs, this.keySerializer, this.valueSerializer);
        newProducer.initTransactions();
        return new CloseSafeProducer<K, V>(newProducer, this.cache, remover, (String)newProducerConfigs.get("transactional.id"), this.physicalCloseTimeout);
    }

    protected BlockingQueue<CloseSafeProducer<K, V>> getCache() {
        return this.cache;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void closeProducerFor(String transactionIdSuffix) {
        if (this.producerPerConsumerPartition) {
            Map<String, CloseSafeProducer<K, V>> map = this.consumerProducers;
            synchronized (map) {
                CloseSafeProducer<K, V> removed = this.consumerProducers.remove(transactionIdSuffix);
                if (removed != null) {
                    ((CloseSafeProducer)removed).delegate.close(this.physicalCloseTimeout.getSeconds(), TimeUnit.SECONDS);
                }
            }
        }
    }

    protected static class CloseSafeProducer<K, V>
    implements Producer<K, V> {
        private static final Duration CLOSE_TIMEOUT_AFTER_TX_TIMEOUT = Duration.ofMillis(0L);
        private static final Method CLOSE_WITH_DURATION;
        private final Producer<K, V> delegate;
        private final BlockingQueue<CloseSafeProducer<K, V>> cache;
        private final Consumer<CloseSafeProducer<K, V>> removeProducer;
        private final String txId;
        private final Duration closeTimeout;
        private volatile Exception producerFailed;
        private volatile boolean closed;

        CloseSafeProducer(Producer<K, V> delegate, Consumer<CloseSafeProducer<K, V>> removeProducer, Duration closeTimeout) {
            this(delegate, null, removeProducer, null, closeTimeout);
            Assert.isTrue((!(delegate instanceof CloseSafeProducer) ? 1 : 0) != 0, (String)"Cannot double-wrap a producer");
        }

        CloseSafeProducer(Producer<K, V> delegate, BlockingQueue<CloseSafeProducer<K, V>> cache, Duration closeTimeout) {
            this(delegate, cache, null, closeTimeout);
        }

        CloseSafeProducer(Producer<K, V> delegate, BlockingQueue<CloseSafeProducer<K, V>> cache, @Nullable Consumer<CloseSafeProducer<K, V>> removeConsumerProducer, Duration closeTimeout) {
            this(delegate, cache, removeConsumerProducer, null, closeTimeout);
        }

        CloseSafeProducer(Producer<K, V> delegate, BlockingQueue<CloseSafeProducer<K, V>> cache, @Nullable Consumer<CloseSafeProducer<K, V>> removeProducer, @Nullable String txId, Duration closeTimeout) {
            this.delegate = delegate;
            this.cache = cache;
            this.removeProducer = removeProducer;
            this.txId = txId;
            this.closeTimeout = closeTimeout;
        }

        Producer<K, V> getDelegate() {
            return this.delegate;
        }

        public Future<RecordMetadata> send(ProducerRecord<K, V> record) {
            return this.delegate.send(record);
        }

        public Future<RecordMetadata> send(ProducerRecord<K, V> record, final Callback callback) {
            return this.delegate.send(record, new Callback(){

                public void onCompletion(RecordMetadata metadata, Exception exception) {
                    if (exception instanceof OutOfOrderSequenceException) {
                        producerFailed = exception;
                        this.close(closeTimeout.getSeconds(), TimeUnit.SECONDS);
                    }
                    callback.onCompletion(metadata, exception);
                }
            });
        }

        public void flush() {
            this.delegate.flush();
        }

        public List<PartitionInfo> partitionsFor(String topic) {
            return this.delegate.partitionsFor(topic);
        }

        public Map<MetricName, ? extends Metric> metrics() {
            return this.delegate.metrics();
        }

        public void initTransactions() {
            this.delegate.initTransactions();
        }

        public void beginTransaction() throws ProducerFencedException {
            if (logger.isDebugEnabled()) {
                logger.debug((Object)("beginTransaction: " + this));
            }
            try {
                this.delegate.beginTransaction();
            }
            catch (RuntimeException e) {
                if (logger.isErrorEnabled()) {
                    logger.error((Object)("beginTransaction failed: " + this), (Throwable)e);
                }
                this.producerFailed = e;
                throw e;
            }
        }

        public void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets, String consumerGroupId) throws ProducerFencedException {
            this.delegate.sendOffsetsToTransaction(offsets, consumerGroupId);
        }

        public void commitTransaction() throws ProducerFencedException {
            if (logger.isDebugEnabled()) {
                logger.debug((Object)("commitTransaction: " + this));
            }
            try {
                this.delegate.commitTransaction();
            }
            catch (RuntimeException e) {
                if (logger.isErrorEnabled()) {
                    logger.error((Object)("commitTransaction failed: " + this), (Throwable)e);
                }
                this.producerFailed = e;
                throw e;
            }
        }

        public void abortTransaction() throws ProducerFencedException {
            if (logger.isDebugEnabled()) {
                logger.debug((Object)("abortTransaction: " + this));
            }
            try {
                this.delegate.abortTransaction();
            }
            catch (RuntimeException e) {
                if (logger.isErrorEnabled()) {
                    logger.error((Object)("Abort failed: " + this), (Throwable)e);
                }
                this.producerFailed = e;
                throw e;
            }
        }

        public void close() {
            this.close(0L, null);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void close(long timeout, @Nullable TimeUnit unit) {
            if (!this.closed) {
                if (this.producerFailed != null) {
                    if (logger.isWarnEnabled()) {
                        logger.warn((Object)("Error during transactional operation; producer removed from cache; possible cause: broker restarted during transaction: " + this));
                    }
                    this.closed = true;
                    this.closeDelegate(this.producerFailed instanceof TimeoutException ? CLOSE_TIMEOUT_AFTER_TX_TIMEOUT : (unit == null ? Duration.ofMillis(timeout) : Duration.ofMillis(unit.toMillis(timeout))));
                    if (this.removeProducer != null) {
                        this.removeProducer.accept(this);
                    }
                } else if (this.cache != null && this.removeProducer == null) {
                    CloseSafeProducer closeSafeProducer = this;
                    synchronized (closeSafeProducer) {
                        if (!this.cache.contains(this) && !this.cache.offer(this)) {
                            this.closed = true;
                            this.closeDelegate(unit == null ? Duration.ofMillis(timeout) : Duration.ofMillis(unit.toMillis(timeout)));
                        }
                    }
                }
            }
        }

        private void closeDelegate(Duration closeTimeout) {
            if (CLOSE_WITH_DURATION != null) {
                try {
                    CLOSE_WITH_DURATION.invoke(this.delegate, closeTimeout);
                }
                catch (IllegalAccessException | IllegalArgumentException | InvocationTargetException e) {
                    logger.error((Object)"Failed to invoke close(Duration) with reflection", (Throwable)e);
                }
            } else {
                this.delegate.close(closeTimeout.toMillis(), TimeUnit.MILLISECONDS);
            }
        }

        public String toString() {
            return "CloseSafeProducer [delegate=" + this.delegate + "" + (this.txId != null ? ", txId=" + this.txId : "") + "]";
        }

        static {
            Method method = null;
            String clientVersion = AppInfoParser.getVersion();
            try {
                if (!(clientVersion.startsWith("1.") || clientVersion.startsWith("2.0.") || clientVersion.startsWith("2.1."))) {
                    method = KafkaProducer.class.getDeclaredMethod("close", Duration.class);
                }
            }
            catch (NoSuchMethodException e) {
                logger.error((Object)("Failed to get close(Duration) method for version: " + clientVersion), (Throwable)e);
            }
            CLOSE_WITH_DURATION = method;
        }
    }
}

