/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.kafka.retrytopic;

import java.math.BigInteger;
import java.nio.ByteBuffer;
import java.time.Instant;
import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.Set;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import org.apache.commons.logging.LogFactory;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.springframework.core.NestedRuntimeException;
import org.springframework.core.log.LogAccessor;
import org.springframework.kafka.core.KafkaOperations;
import org.springframework.kafka.listener.DeadLetterPublishingRecoverer;
import org.springframework.kafka.listener.SeekUtils;
import org.springframework.kafka.listener.TimestampedException;
import org.springframework.kafka.retrytopic.DestinationTopic;
import org.springframework.kafka.retrytopic.DestinationTopicResolver;
import org.springframework.util.Assert;

public class DeadLetterPublishingRecovererFactory {
    private static final LogAccessor LOGGER = new LogAccessor(LogFactory.getLog(DeadLetterPublishingRecovererFactory.class));
    private final DestinationTopicResolver destinationTopicResolver;
    private final Set<Class<? extends Exception>> fatalExceptions = new LinkedHashSet<Class<? extends Exception>>();
    private final Set<Class<? extends Exception>> nonFatalExceptions = new HashSet<Class<? extends Exception>>();
    private Consumer<DeadLetterPublishingRecoverer> recovererCustomizer = recoverer -> {};
    private BiFunction<ConsumerRecord<?, ?>, Exception, Headers> headersFunction;

    public DeadLetterPublishingRecovererFactory(DestinationTopicResolver destinationTopicResolver) {
        this.destinationTopicResolver = destinationTopicResolver;
    }

    public void setHeadersFunction(BiFunction<ConsumerRecord<?, ?>, Exception, Headers> headersFunction) {
        this.headersFunction = headersFunction;
    }

    public final void addNotRetryableException(Class<? extends Exception> exceptionType) {
        Assert.notNull(exceptionType, (String)"'exceptionType' cannot be null");
        this.fatalExceptions.add(exceptionType);
    }

    public boolean removeNotRetryableException(Class<? extends Exception> exceptionType) {
        return this.nonFatalExceptions.add(exceptionType);
    }

    public DeadLetterPublishingRecoverer create() {
        DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(this::resolveTemplate, false, this::resolveDestination){

            @Override
            protected DeadLetterPublishingRecoverer.HeaderNames getHeaderNames() {
                return DeadLetterPublishingRecoverer.HeaderNames.Builder.original().offsetHeader("kafka_original-offset").timestampHeader("kafka_original-timestamp").timestampTypeHeader("kafka_original-timestamp-type").topicHeader("kafka_original-topic").partitionHeader("kafka_original-partition").consumerGroupHeader("kafka_dlt-original-consumer-group").exception().keyExceptionFqcn("kafka_key-exception-fqcn").exceptionFqcn("kafka_exception-fqcn").exceptionCauseFqcn("kafka_exception-fqcn").keyExceptionMessage("kafka_key-exception-message").exceptionMessage("kafka_exception-message").keyExceptionStacktrace("kafka_key-exception-stacktrace").exceptionStacktrace("kafka_exception-stacktrace").build();
            }
        };
        recoverer.setHeadersFunction((consumerRecord, e) -> this.addHeaders((ConsumerRecord<?, ?>)consumerRecord, (Exception)e, this.getAttempts((ConsumerRecord<?, ?>)consumerRecord)));
        if (this.headersFunction != null) {
            recoverer.addHeadersFunction(this.headersFunction);
        }
        recoverer.setFailIfSendResultIsError(true);
        recoverer.setAppendOriginalHeaders(false);
        recoverer.setThrowIfNoDestinationReturned(false);
        recoverer.setSkipSameTopicFatalExceptions(false);
        this.recovererCustomizer.accept(recoverer);
        this.fatalExceptions.forEach(xva$0 -> recoverer.addNotRetryableExceptions((Class<? extends Exception>)xva$0));
        this.nonFatalExceptions.forEach(recoverer::removeClassification);
        return recoverer;
    }

    private KafkaOperations<?, ?> resolveTemplate(ProducerRecord<?, ?> outRecord) {
        return this.destinationTopicResolver.getDestinationTopicByName(outRecord.topic()).getKafkaOperations();
    }

    public void setDeadLetterPublishingRecovererCustomizer(Consumer<DeadLetterPublishingRecoverer> customizer) {
        this.recovererCustomizer = customizer;
    }

    private TopicPartition resolveDestination(ConsumerRecord<?, ?> cr, Exception e) {
        if (SeekUtils.isBackoffException(e)) {
            throw (NestedRuntimeException)e;
        }
        DestinationTopic nextDestination = this.destinationTopicResolver.resolveDestinationTopic(cr.topic(), this.getAttempts(cr), e, this.getOriginalTimestampHeaderLong(cr));
        LOGGER.debug(() -> "Resolved topic: " + (nextDestination.isNoOpsTopic() ? "none" : nextDestination.getDestinationName()));
        return nextDestination.isNoOpsTopic() ? null : this.resolveTopicPartition(cr, nextDestination);
    }

    protected TopicPartition resolveTopicPartition(ConsumerRecord<?, ?> cr, DestinationTopic nextDestination) {
        return new TopicPartition(nextDestination.getDestinationName(), cr.partition());
    }

    private int getAttempts(ConsumerRecord<?, ?> consumerRecord) {
        Header header = consumerRecord.headers().lastHeader("retry_topic-attempts");
        if (header != null) {
            byte[] value = header.value();
            if (value.length == 1) {
                return value[0];
            }
            if (value.length == 4) {
                return ByteBuffer.wrap(value).getInt();
            }
            LOGGER.debug(() -> "Unexected size for retry_topic-attempts header: " + value.length);
        }
        return 1;
    }

    private Headers addHeaders(ConsumerRecord<?, ?> consumerRecord, Exception e, int attempts) {
        RecordHeaders headers = new RecordHeaders();
        byte[] originalTimestampHeader = this.getOriginalTimestampHeaderBytes(consumerRecord);
        headers.add("retry_topic-original-timestamp", originalTimestampHeader);
        headers.add("retry_topic-attempts", ByteBuffer.wrap(new byte[4]).putInt(attempts + 1).array());
        headers.add("retry_topic-backoff-timestamp", BigInteger.valueOf(this.getNextExecutionTimestamp(consumerRecord, e, originalTimestampHeader)).toByteArray());
        return headers;
    }

    private long getNextExecutionTimestamp(ConsumerRecord<?, ?> consumerRecord, Exception e, byte[] originalTimestampHeader) {
        long originalTimestamp = new BigInteger(originalTimestampHeader).longValue();
        long failureTimestamp = this.getFailureTimestamp(e);
        long nextExecutionTimestamp = failureTimestamp + this.destinationTopicResolver.resolveDestinationTopic(consumerRecord.topic(), this.getAttempts(consumerRecord), e, originalTimestamp).getDestinationDelay();
        LOGGER.debug(() -> String.format("FailureTimestamp: %s, Original timestamp: %s, nextExecutionTimestamp: %s", failureTimestamp, originalTimestamp, nextExecutionTimestamp));
        return nextExecutionTimestamp;
    }

    private long getFailureTimestamp(Exception e) {
        return e instanceof NestedRuntimeException && ((NestedRuntimeException)e).contains(TimestampedException.class) ? this.getTimestampedException(e).getTimestamp() : Instant.now().toEpochMilli();
    }

    private TimestampedException getTimestampedException(Throwable e) {
        if (e == null) {
            throw new IllegalArgumentException("Provided exception does not contain a " + TimestampedException.class.getSimpleName() + " cause.");
        }
        return e.getClass().isAssignableFrom(TimestampedException.class) ? (TimestampedException)((Object)e) : this.getTimestampedException(e.getCause());
    }

    private byte[] getOriginalTimestampHeaderBytes(ConsumerRecord<?, ?> consumerRecord) {
        Header currentOriginalTimestampHeader = this.getOriginaTimeStampHeader(consumerRecord);
        return currentOriginalTimestampHeader != null ? currentOriginalTimestampHeader.value() : BigInteger.valueOf(consumerRecord.timestamp()).toByteArray();
    }

    private long getOriginalTimestampHeaderLong(ConsumerRecord<?, ?> consumerRecord) {
        Header currentOriginalTimestampHeader = this.getOriginaTimeStampHeader(consumerRecord);
        return currentOriginalTimestampHeader != null ? new BigInteger(currentOriginalTimestampHeader.value()).longValue() : consumerRecord.timestamp();
    }

    private Header getOriginaTimeStampHeader(ConsumerRecord<?, ?> consumerRecord) {
        return consumerRecord.headers().lastHeader("retry_topic-original-timestamp");
    }
}

