/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.kafka.requestreply;

import io.micrometer.observation.Observation;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.time.Instant;
import java.util.Collection;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.jspecify.annotations.Nullable;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.context.SmartLifecycle;
import org.springframework.core.ParameterizedTypeReference;
import org.springframework.core.log.LogAccessor;
import org.springframework.kafka.KafkaException;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.listener.BatchMessageListener;
import org.springframework.kafka.listener.ConsumerSeekAware;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.listener.GenericMessageListenerContainer;
import org.springframework.kafka.requestreply.CorrelationKey;
import org.springframework.kafka.requestreply.KafkaReplyTimeoutException;
import org.springframework.kafka.requestreply.ReplyingKafkaOperations;
import org.springframework.kafka.requestreply.RequestReplyFuture;
import org.springframework.kafka.requestreply.RequestReplyMessageFuture;
import org.springframework.kafka.requestreply.RequestReplyTypedMessageFuture;
import org.springframework.kafka.support.KafkaUtils;
import org.springframework.kafka.support.TopicPartitionOffset;
import org.springframework.kafka.support.micrometer.KafkaListenerObservation;
import org.springframework.kafka.support.micrometer.KafkaRecordReceiverContext;
import org.springframework.kafka.support.serializer.DeserializationException;
import org.springframework.kafka.support.serializer.SerializationUtils;
import org.springframework.messaging.Message;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import org.springframework.util.Assert;

public class ReplyingKafkaTemplate<K, V, R>
extends KafkaTemplate<K, V>
implements BatchMessageListener<K, R>,
InitializingBean,
SmartLifecycle,
DisposableBean,
ReplyingKafkaOperations<K, V, R>,
ConsumerSeekAware {
    private static final String WITH_CORRELATION_ID = " with correlationId: ";
    private static final int FIVE = 5;
    private static final Duration DEFAULT_REPLY_TIMEOUT = Duration.ofSeconds(5L);
    private final GenericMessageListenerContainer<K, R> replyContainer;
    private final ConcurrentMap<Object, RequestReplyFuture<K, V, R>> futures = new ConcurrentHashMap<Object, RequestReplyFuture<K, V, R>>();
    private final byte[] replyTopic;
    private final byte[] replyPartition;
    private TaskScheduler scheduler = new ThreadPoolTaskScheduler();
    private int phase;
    private boolean autoStartup = true;
    private Duration defaultReplyTimeout = DEFAULT_REPLY_TIMEOUT;
    private boolean schedulerSet;
    private boolean sharedReplyTopic;
    private Function<ProducerRecord<K, V>, CorrelationKey> correlationStrategy = ReplyingKafkaTemplate::defaultCorrelationIdStrategy;
    private boolean binaryCorrelation = true;
    private String correlationHeaderName = "kafka_correlationId";
    private String replyTopicHeaderName = "kafka_replyTopic";
    private String replyPartitionHeaderName = "kafka_replyPartition";
    private Function<ConsumerRecord<?, ?>, @Nullable Exception> replyErrorChecker = rec -> null;
    private CountDownLatch assignLatch = new CountDownLatch(1);
    private volatile boolean running;
    private volatile boolean schedulerInitialized;

    public ReplyingKafkaTemplate(ProducerFactory<K, V> producerFactory, GenericMessageListenerContainer<K, R> replyContainer) {
        this(producerFactory, replyContainer, false);
    }

    public ReplyingKafkaTemplate(ProducerFactory<K, V> producerFactory, GenericMessageListenerContainer<K, R> replyContainer, boolean autoFlush) {
        super(producerFactory, autoFlush);
        Assert.notNull(replyContainer, (String)"'replyContainer' cannot be null");
        this.replyContainer = replyContainer;
        this.replyContainer.setupMessageListener(this);
        ContainerProperties properties = this.replyContainer.getContainerProperties();
        String tempReplyTopic = null;
        byte[] tempReplyPartition = null;
        TopicPartitionOffset[] topicPartitionsToAssign = properties.getTopicPartitions();
        String[] topics = properties.getTopics();
        if (topics != null && topics.length == 1) {
            tempReplyTopic = topics[0];
        } else if (topicPartitionsToAssign != null && topicPartitionsToAssign.length == 1) {
            TopicPartitionOffset topicPartitionOffset = topicPartitionsToAssign[0];
            Assert.notNull((Object)topicPartitionOffset, (String)"'topicPartitionsToAssign' must not be null");
            tempReplyTopic = topicPartitionOffset.getTopic();
            ByteBuffer buffer = ByteBuffer.allocate(4);
            buffer.putInt(topicPartitionOffset.getPartition());
            tempReplyPartition = buffer.array();
        }
        if (tempReplyTopic == null) {
            this.replyTopic = null;
            this.replyPartition = null;
            this.logger.debug(() -> "Could not determine container's reply topic/partition; senders must populate at least the kafka_replyTopic header, and optionally the kafka_replyPartition header");
        } else {
            this.replyTopic = tempReplyTopic.getBytes(StandardCharsets.UTF_8);
            this.replyPartition = tempReplyPartition;
        }
    }

    public void setTaskScheduler(TaskScheduler scheduler) {
        Assert.notNull((Object)scheduler, (String)"'scheduler' cannot be null");
        this.scheduler = scheduler;
        this.schedulerSet = true;
    }

    protected Duration getDefaultReplyTimeout() {
        return this.defaultReplyTimeout;
    }

    public void setDefaultReplyTimeout(Duration defaultReplyTimeout) {
        Assert.notNull((Object)defaultReplyTimeout, (String)"'defaultReplyTimeout' cannot be null");
        Assert.isTrue((defaultReplyTimeout.toMillis() >= 0L ? 1 : 0) != 0, (String)"'replyTimeout' must be >= 0");
        this.defaultReplyTimeout = defaultReplyTimeout;
    }

    public boolean isRunning() {
        return this.running;
    }

    public int getPhase() {
        return this.phase;
    }

    public void setPhase(int phase) {
        this.phase = phase;
    }

    public boolean isAutoStartup() {
        return this.autoStartup;
    }

    public void setAutoStartup(boolean autoStartup) {
        this.autoStartup = autoStartup;
    }

    public @Nullable Collection<TopicPartition> getAssignedReplyTopicPartitions() {
        return this.replyContainer.getAssignedPartitions();
    }

    public void setSharedReplyTopic(boolean sharedReplyTopic) {
        this.sharedReplyTopic = sharedReplyTopic;
    }

    public void setCorrelationIdStrategy(Function<ProducerRecord<K, V>, CorrelationKey> correlationStrategy) {
        Assert.notNull(correlationStrategy, (String)"'correlationStrategy' cannot be null");
        this.correlationStrategy = correlationStrategy;
    }

    public void setCorrelationHeaderName(String correlationHeaderName) {
        Assert.notNull((Object)correlationHeaderName, (String)"'correlationHeaderName' cannot be null");
        this.correlationHeaderName = correlationHeaderName;
    }

    protected String getCorrelationHeaderName() {
        return this.correlationHeaderName;
    }

    public void setReplyTopicHeaderName(String replyTopicHeaderName) {
        Assert.notNull((Object)replyTopicHeaderName, (String)"'replyTopicHeaderName' cannot be null");
        this.replyTopicHeaderName = replyTopicHeaderName;
    }

    public void setReplyPartitionHeaderName(String replyPartitionHeaderName) {
        Assert.notNull((Object)replyPartitionHeaderName, (String)"'replyPartitionHeaderName' cannot be null");
        this.replyPartitionHeaderName = replyPartitionHeaderName;
    }

    public void setReplyErrorChecker(Function<ConsumerRecord<?, ?>, @Nullable Exception> replyErrorChecker) {
        Assert.notNull(replyErrorChecker, (String)"'replyErrorChecker' cannot be null");
        this.replyErrorChecker = replyErrorChecker;
    }

    public void setBinaryCorrelation(boolean binaryCorrelation) {
        this.binaryCorrelation = binaryCorrelation;
    }

    protected boolean isBinaryCorrelation() {
        return this.binaryCorrelation;
    }

    public void afterPropertiesSet() {
        if (!this.schedulerSet && !this.schedulerInitialized) {
            ((ThreadPoolTaskScheduler)this.scheduler).initialize();
            this.schedulerInitialized = true;
        }
    }

    public synchronized void start() {
        if (!this.running) {
            try {
                this.afterPropertiesSet();
            }
            catch (Exception e) {
                throw new KafkaException("Failed to initialize", e);
            }
            this.assignLatch = new CountDownLatch(1);
            this.replyContainer.start();
            this.running = true;
        }
    }

    public synchronized void stop() {
        if (this.running) {
            this.running = false;
            this.replyContainer.stop();
            this.futures.clear();
        }
    }

    public void stop(Runnable callback) {
        this.stop();
        callback.run();
    }

    @Override
    public void onFirstPoll() {
        this.assignLatch.countDown();
    }

    @Override
    public boolean waitForAssignment(Duration duration) throws InterruptedException {
        return this.assignLatch.await(duration.toMillis(), TimeUnit.MILLISECONDS);
    }

    @Override
    public RequestReplyMessageFuture<K, V> sendAndReceive(Message<?> message) {
        return this.sendAndReceive(message, this.defaultReplyTimeout, null);
    }

    @Override
    public RequestReplyMessageFuture<K, V> sendAndReceive(Message<?> message, @Nullable Duration replyTimeout) {
        return this.sendAndReceive(message, replyTimeout, null);
    }

    @Override
    public <P> RequestReplyTypedMessageFuture<K, V, P> sendAndReceive(Message<?> message, @Nullable ParameterizedTypeReference<P> returnType) {
        return this.sendAndReceive(message, this.defaultReplyTimeout, returnType);
    }

    @Override
    public <P> RequestReplyTypedMessageFuture<K, V, P> sendAndReceive(Message<?> message, @Nullable Duration replyTimeout, @Nullable ParameterizedTypeReference<P> returnType) {
        RequestReplyFuture<?, ?, R> future = this.sendAndReceive(this.getMessageConverter().fromMessage(message, this.getDefaultTopic()), replyTimeout);
        RequestReplyTypedMessageFuture replyFuture = new RequestReplyTypedMessageFuture(future.getSendFuture());
        future.whenComplete((result, ex) -> {
            if (ex == null) {
                try {
                    replyFuture.complete(this.getMessageConverter().toMessage((ConsumerRecord<?, ?>)result, null, null, returnType == null ? null : returnType.getType()));
                }
                catch (Exception ex2) {
                    replyFuture.completeExceptionally(ex2);
                }
            } else {
                replyFuture.completeExceptionally((Throwable)ex);
            }
        });
        return replyFuture;
    }

    @Override
    public RequestReplyFuture<K, V, R> sendAndReceive(ProducerRecord<K, V> record) {
        return this.sendAndReceive(record, this.defaultReplyTimeout);
    }

    @Override
    public RequestReplyFuture<K, V, R> sendAndReceive(ProducerRecord<K, V> record, @Nullable Duration replyTimeout) {
        boolean hasReplyPartition;
        boolean hasReplyTopic;
        Assert.state((boolean)this.running, (String)"Template has not been started");
        Duration timeout = replyTimeout;
        if (timeout == null) {
            timeout = this.defaultReplyTimeout;
        }
        CorrelationKey correlationId = this.correlationStrategy.apply(record);
        Assert.notNull((Object)correlationId, (String)"the created 'correlationId' cannot be null");
        Headers headers = record.headers();
        boolean bl = hasReplyTopic = headers.lastHeader(this.replyTopicHeaderName) != null;
        if (!hasReplyTopic && this.replyTopic != null) {
            headers.add((Header)new RecordHeader(this.replyTopicHeaderName, this.replyTopic));
        }
        boolean bl2 = hasReplyPartition = headers.lastHeader(this.replyPartitionHeaderName) != null;
        if (!hasReplyPartition && this.replyPartition != null) {
            headers.add((Header)new RecordHeader(this.replyPartitionHeaderName, this.replyPartition));
        }
        Object correlation = this.binaryCorrelation ? correlationId : correlationId.toString();
        byte[] correlationValue = this.binaryCorrelation ? correlationId.getCorrelationId() : ((String)correlation).getBytes(StandardCharsets.UTF_8);
        headers.add((Header)new RecordHeader(this.correlationHeaderName, correlationValue));
        this.logger.debug(() -> "Sending: " + KafkaUtils.format(record) + WITH_CORRELATION_ID + String.valueOf(correlationId));
        RequestReplyFuture future = new RequestReplyFuture();
        this.futures.put(correlation, future);
        try {
            future.setSendFuture(this.send(record));
        }
        catch (Exception e) {
            this.futures.remove(correlation);
            throw new KafkaException("Send failed", e);
        }
        this.scheduleTimeout(record, correlation, timeout);
        return future;
    }

    private void scheduleTimeout(ProducerRecord<K, V> record, Object correlationId, Duration replyTimeout) {
        this.scheduler.schedule(() -> {
            RequestReplyFuture removed = (RequestReplyFuture)this.futures.remove(correlationId);
            if (removed != null) {
                this.logger.warn(() -> "Reply timed out for: " + KafkaUtils.format(record) + WITH_CORRELATION_ID + String.valueOf(correlationId));
                if (!this.handleTimeout(correlationId, removed)) {
                    removed.completeExceptionally((Throwable)((Object)new KafkaReplyTimeoutException("Reply timed out")));
                }
            }
        }, Instant.now().plus(replyTimeout));
    }

    protected boolean handleTimeout(Object correlationId, RequestReplyFuture<K, V, R> future) {
        return false;
    }

    protected boolean isPending(Object correlationId) {
        return this.futures.containsKey(correlationId);
    }

    @Override
    public void destroy() {
        if (!this.schedulerSet) {
            ((ThreadPoolTaskScheduler)this.scheduler).destroy();
        }
    }

    private static <K, V> CorrelationKey defaultCorrelationIdStrategy(ProducerRecord<K, V> record) {
        UUID uuid = UUID.randomUUID();
        byte[] bytes = new byte[16];
        ByteBuffer bb = ByteBuffer.wrap(bytes);
        bb.putLong(uuid.getMostSignificantBits());
        bb.putLong(uuid.getLeastSignificantBits());
        return new CorrelationKey(bytes);
    }

    @Override
    public void onMessage(List<ConsumerRecord<K, R>> data) {
        data.forEach(record -> {
            ContainerProperties containerProperties = this.replyContainer.getContainerProperties();
            Observation observation = KafkaListenerObservation.LISTENER_OBSERVATION.observation(containerProperties.getObservationConvention(), KafkaListenerObservation.DefaultKafkaListenerObservationConvention.INSTANCE, () -> {
                ReplyingKafkaTemplate rec$ = this;
                return new KafkaRecordReceiverContext((ConsumerRecord<?, ?>)record, this.replyContainer.getListenerId(), containerProperties.getClientId(), this.replyContainer.getGroupId(), () -> rec$.clusterId());
            }, this.getObservationRegistry());
            observation.observe(() -> this.handleReply((ConsumerRecord<K, R>)record));
        });
    }

    private void handleReply(ConsumerRecord<K, R> record) {
        Header correlationHeader = record.headers().lastHeader(this.correlationHeaderName);
        Object correlationId = null;
        if (correlationHeader != null) {
            Object object = correlationId = this.binaryCorrelation ? new CorrelationKey(correlationHeader.value()) : new String(correlationHeader.value(), StandardCharsets.UTF_8);
        }
        if (correlationId == null) {
            this.logger.error(() -> "No correlationId found in reply: " + KafkaUtils.format(record) + " - to use request/reply semantics, the responding server must return the correlation id  in the '" + this.correlationHeaderName + "' header");
        } else {
            RequestReplyFuture future = (RequestReplyFuture)this.futures.remove(correlationId);
            Object correlationKey = correlationId;
            if (future == null) {
                this.logLateArrival(record, correlationId);
            } else {
                boolean ok = true;
                Exception exception = this.checkForErrors(record);
                if (exception != null) {
                    ok = false;
                    future.completeExceptionally(exception);
                }
                if (ok) {
                    this.logger.debug(() -> "Received: " + KafkaUtils.format(record) + WITH_CORRELATION_ID + String.valueOf(correlationKey));
                    future.complete(record);
                }
            }
        }
    }

    protected @Nullable Exception checkForErrors(ConsumerRecord<K, R> record) {
        DeserializationException de;
        if ((record.value() == null || record.key() == null) && (de = ReplyingKafkaTemplate.checkDeserialization(record, this.logger)) != null) {
            return de;
        }
        return this.replyErrorChecker.apply(record);
    }

    public static @Nullable DeserializationException checkDeserialization(ConsumerRecord<?, ?> record, LogAccessor logger) {
        DeserializationException exception = SerializationUtils.getExceptionFromHeader(record, "springDeserializerExceptionValue", logger);
        if (exception != null) {
            logger.error((Throwable)((Object)exception), () -> "Reply value deserialization failed for " + record.topic() + "-" + record.partition() + "@" + record.offset());
            return exception;
        }
        exception = SerializationUtils.getExceptionFromHeader(record, "springDeserializerExceptionKey", logger);
        if (exception != null) {
            logger.error((Throwable)((Object)exception), () -> "Reply key deserialization failed for " + record.topic() + "-" + record.partition() + "@" + record.offset());
            return exception;
        }
        return null;
    }

    protected void logLateArrival(ConsumerRecord<K, R> record, Object correlationId) {
        if (this.sharedReplyTopic) {
            this.logger.debug(() -> this.missingCorrelationLogMessage(record, correlationId));
        } else {
            this.logger.error(() -> this.missingCorrelationLogMessage(record, correlationId));
        }
    }

    private String missingCorrelationLogMessage(ConsumerRecord<K, R> record, Object correlationId) {
        return "No pending reply: " + KafkaUtils.format(record) + WITH_CORRELATION_ID + String.valueOf(correlationId) + ", perhaps timed out, or using a shared reply topic";
    }
}

