/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.integration.kafka.inbound;

import java.util.List;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.integration.context.OrderlyShutdownCapable;
import org.springframework.integration.endpoint.MessageProducerSupport;
import org.springframework.kafka.listener.AbstractMessageListenerContainer;
import org.springframework.kafka.listener.AcknowledgingMessageListener;
import org.springframework.kafka.listener.adapter.BatchMessagingMessageListenerAdapter;
import org.springframework.kafka.listener.adapter.FilteringAcknowledgingMessageListenerAdapter;
import org.springframework.kafka.listener.adapter.FilteringBatchAcknowledgingMessageListenerAdapter;
import org.springframework.kafka.listener.adapter.RecordFilterStrategy;
import org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter;
import org.springframework.kafka.listener.adapter.RetryingAcknowledgingMessageListenerAdapter;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.converter.BatchMessageConverter;
import org.springframework.kafka.support.converter.MessageConverter;
import org.springframework.kafka.support.converter.RecordMessageConverter;
import org.springframework.messaging.Message;
import org.springframework.retry.RecoveryCallback;
import org.springframework.retry.support.RetryTemplate;
import org.springframework.util.Assert;

public class KafkaMessageDrivenChannelAdapter<K, V>
extends MessageProducerSupport
implements OrderlyShutdownCapable {
    private final AbstractMessageListenerContainer<K, V> messageListenerContainer;
    private final RecordMessagingMessageListenerAdapter<K, V> recordListener = new IntegrationRecordMessageListener();
    private final BatchMessagingMessageListenerAdapter<K, V> batchListener = new IntegrationBatchMessageListener();
    private final ListenerMode mode;
    private RecordFilterStrategy<K, V> recordFilterStrategy;
    private boolean ackDiscarded;
    private RetryTemplate retryTemplate;
    private RecoveryCallback<Void> recoveryCallback;
    private boolean filterInRetry;

    public KafkaMessageDrivenChannelAdapter(AbstractMessageListenerContainer<K, V> messageListenerContainer) {
        this(messageListenerContainer, ListenerMode.record);
    }

    public KafkaMessageDrivenChannelAdapter(AbstractMessageListenerContainer<K, V> messageListenerContainer, ListenerMode mode) {
        Assert.notNull(messageListenerContainer, (String)"messageListenerContainer is required");
        Assert.isNull((Object)messageListenerContainer.getContainerProperties().getMessageListener(), (String)"Container must not already have a listener");
        this.messageListenerContainer = messageListenerContainer;
        this.messageListenerContainer.setAutoStartup(false);
        this.mode = mode;
    }

    public void setMessageConverter(MessageConverter messageConverter) {
        if (messageConverter instanceof RecordMessageConverter) {
            this.recordListener.setMessageConverter((RecordMessageConverter)messageConverter);
        } else if (messageConverter instanceof BatchMessageConverter) {
            this.batchListener.setBatchMessageConverter((BatchMessageConverter)messageConverter);
        } else {
            throw new IllegalArgumentException("Message converter must be a 'RecordMessageConverter' or 'BatchMessageConverter'");
        }
    }

    public void setRecordMessageConverter(RecordMessageConverter messageConverter) {
        this.recordListener.setMessageConverter(messageConverter);
    }

    public void setBatchMessageConverter(BatchMessageConverter messageConverter) {
        this.batchListener.setBatchMessageConverter(messageConverter);
    }

    public void setRecordFilterStrategy(RecordFilterStrategy<K, V> recordFilterStrategy) {
        this.recordFilterStrategy = recordFilterStrategy;
    }

    public void setAckDiscarded(boolean ackDiscarded) {
        this.ackDiscarded = ackDiscarded;
    }

    public void setRetryTemplate(RetryTemplate retryTemplate) {
        Assert.isTrue((retryTemplate == null || this.mode.equals((Object)ListenerMode.record) ? 1 : 0) != 0, (String)"Retry is not supported with mode=batch");
        this.retryTemplate = retryTemplate;
    }

    public void setRecoveryCallback(RecoveryCallback<Void> recoveryCallback) {
        this.recoveryCallback = recoveryCallback;
    }

    public void setFilterInRetry(boolean filterInRetry) {
        this.filterInRetry = filterInRetry;
    }

    protected void onInit() {
        super.onInit();
        if (this.mode.equals((Object)ListenerMode.record)) {
            boolean filterInRetry;
            FilteringAcknowledgingMessageListenerAdapter listener = this.recordListener;
            boolean bl = filterInRetry = this.filterInRetry && this.retryTemplate != null && this.recordFilterStrategy != null;
            if (filterInRetry) {
                listener = new FilteringAcknowledgingMessageListenerAdapter(listener, this.recordFilterStrategy, this.ackDiscarded);
                listener = new RetryingAcknowledgingMessageListenerAdapter((AcknowledgingMessageListener)listener, this.retryTemplate, this.recoveryCallback);
            } else {
                if (this.retryTemplate != null) {
                    listener = new RetryingAcknowledgingMessageListenerAdapter((AcknowledgingMessageListener)listener, this.retryTemplate, this.recoveryCallback);
                }
                if (this.recordFilterStrategy != null) {
                    listener = new FilteringAcknowledgingMessageListenerAdapter((AcknowledgingMessageListener)listener, this.recordFilterStrategy, this.ackDiscarded);
                }
            }
            this.messageListenerContainer.getContainerProperties().setMessageListener((Object)listener);
        } else {
            FilteringBatchAcknowledgingMessageListenerAdapter listener = this.batchListener;
            if (this.recordFilterStrategy != null) {
                listener = new FilteringBatchAcknowledgingMessageListenerAdapter(listener, this.recordFilterStrategy, this.ackDiscarded);
            }
            this.messageListenerContainer.getContainerProperties().setMessageListener(listener);
        }
    }

    protected void doStart() {
        this.messageListenerContainer.start();
    }

    protected void doStop() {
        this.messageListenerContainer.stop();
    }

    public String getComponentType() {
        return "kafka:message-driven-channel-adapter";
    }

    public int beforeShutdown() {
        this.messageListenerContainer.stop();
        return this.getPhase();
    }

    public int afterShutdown() {
        return this.getPhase();
    }

    private class IntegrationBatchMessageListener
    extends BatchMessagingMessageListenerAdapter<K, V> {
        IntegrationBatchMessageListener() {
            super(null, null);
        }

        public void onMessage(List<ConsumerRecord<K, V>> records, Acknowledgment acknowledgment) {
            Message message = this.toMessagingMessage(records, acknowledgment);
            KafkaMessageDrivenChannelAdapter.this.sendMessage(message);
        }
    }

    private class IntegrationRecordMessageListener
    extends RecordMessagingMessageListenerAdapter<K, V> {
        IntegrationRecordMessageListener() {
            super(null, null);
        }

        public void onMessage(ConsumerRecord<K, V> record, Acknowledgment acknowledgment) {
            Message message = this.toMessagingMessage(record, acknowledgment);
            KafkaMessageDrivenChannelAdapter.this.sendMessage(message);
        }
    }

    public static enum ListenerMode {
        record,
        batch;

    }
}

