/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.integration.kafka.inbound;

import java.util.Map;
import kafka.serializer.Decoder;
import kafka.serializer.DefaultDecoder;
import org.springframework.integration.context.OrderlyShutdownCapable;
import org.springframework.integration.endpoint.MessageProducerSupport;
import org.springframework.integration.kafka.core.KafkaMessageMetadata;
import org.springframework.integration.kafka.listener.AbstractDecodingAcknowledgingMessageListener;
import org.springframework.integration.kafka.listener.AbstractDecodingMessageListener;
import org.springframework.integration.kafka.listener.Acknowledgment;
import org.springframework.integration.kafka.listener.KafkaMessageListenerContainer;
import org.springframework.integration.support.DefaultMessageBuilderFactory;
import org.springframework.integration.support.MessageBuilderFactory;
import org.springframework.integration.support.MutableMessageBuilderFactory;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.util.Assert;

public class KafkaMessageDrivenChannelAdapter
extends MessageProducerSupport
implements OrderlyShutdownCapable {
    private final KafkaMessageListenerContainer messageListenerContainer;
    private Decoder<?> keyDecoder = new DefaultDecoder(null);
    private Decoder<?> payloadDecoder = new DefaultDecoder(null);
    private boolean generateMessageId = false;
    private boolean generateTimestamp = false;
    private boolean useMessageBuilderFactory = false;
    private boolean autoCommitOffset = true;

    public KafkaMessageDrivenChannelAdapter(KafkaMessageListenerContainer messageListenerContainer) {
        Assert.notNull((Object)messageListenerContainer);
        Assert.isNull((Object)messageListenerContainer.getMessageListener());
        this.messageListenerContainer = messageListenerContainer;
        this.messageListenerContainer.setAutoStartup(false);
    }

    public void setKeyDecoder(Decoder<?> keyDecoder) {
        this.keyDecoder = keyDecoder;
    }

    public void setPayloadDecoder(Decoder<?> payloadDecoder) {
        this.payloadDecoder = payloadDecoder;
    }

    public void setAutoCommitOffset(boolean autoCommitOffset) {
        this.autoCommitOffset = autoCommitOffset;
    }

    public void setGenerateMessageId(boolean generateMessageId) {
        this.generateMessageId = generateMessageId;
    }

    public void setGenerateTimestamp(boolean generateTimestamp) {
        this.generateTimestamp = generateTimestamp;
    }

    public void setUseMessageBuilderFactory(boolean useMessageBuilderFactory) {
        this.useMessageBuilderFactory = useMessageBuilderFactory;
    }

    protected void onInit() {
        this.messageListenerContainer.setMessageListener(this.autoCommitOffset ? new AutoAcknowledgingChannelForwardingMessageListener() : new AcknowledgingChannelForwardingMessageListener());
        if (!this.generateMessageId && !this.generateTimestamp && this.getMessageBuilderFactory() instanceof DefaultMessageBuilderFactory) {
            this.setMessageBuilderFactory((MessageBuilderFactory)new MutableMessageBuilderFactory());
        }
        super.onInit();
    }

    protected void doStart() {
        this.messageListenerContainer.start();
    }

    protected void doStop() {
        this.messageListenerContainer.stop();
    }

    public String getComponentType() {
        return "kafka:message-driven-channel-adapter";
    }

    public int beforeShutdown() {
        this.messageListenerContainer.stop();
        return this.getPhase();
    }

    public int afterShutdown() {
        return this.getPhase();
    }

    private Message<Object> toMessage(Object key, Object payload, KafkaMessageMetadata metadata, Acknowledgment acknowledgment) {
        KafkaMessageHeaders kafkaMessageHeaders = new KafkaMessageHeaders(this.generateMessageId, this.generateTimestamp);
        Map<String, Object> rawHeaders = kafkaMessageHeaders.getRawHeaders();
        rawHeaders.put("kafka_messageKey", key);
        rawHeaders.put("kafka_topic", metadata.getPartition().getTopic());
        rawHeaders.put("kafka_partitionId", metadata.getPartition().getId());
        rawHeaders.put("kafka_offset", metadata.getOffset());
        rawHeaders.put("kafka_nextOffset", metadata.getNextOffset());
        if (!this.autoCommitOffset) {
            rawHeaders.put("kafka_acknowledgment", acknowledgment);
        }
        if (this.useMessageBuilderFactory) {
            return this.getMessageBuilderFactory().withPayload(payload).copyHeaders((Map)((Object)kafkaMessageHeaders)).build();
        }
        return MessageBuilder.createMessage((Object)payload, (MessageHeaders)kafkaMessageHeaders);
    }

    private static class KafkaMessageHeaders
    extends MessageHeaders {
        public KafkaMessageHeaders(boolean generateId, boolean generateTimestamp) {
            super(null, generateId ? null : ID_VALUE_NONE, generateTimestamp ? null : Long.valueOf(-1L));
        }

        public Map<String, Object> getRawHeaders() {
            return super.getRawHeaders();
        }
    }

    private class AcknowledgingChannelForwardingMessageListener
    extends AbstractDecodingAcknowledgingMessageListener {
        public AcknowledgingChannelForwardingMessageListener() {
            super(KafkaMessageDrivenChannelAdapter.this.keyDecoder, KafkaMessageDrivenChannelAdapter.this.payloadDecoder);
        }

        public void doOnMessage(Object key, Object payload, KafkaMessageMetadata metadata, Acknowledgment acknowledgment) {
            KafkaMessageDrivenChannelAdapter.this.sendMessage(KafkaMessageDrivenChannelAdapter.this.toMessage(key, payload, metadata, acknowledgment));
        }
    }

    private class AutoAcknowledgingChannelForwardingMessageListener
    extends AbstractDecodingMessageListener {
        public AutoAcknowledgingChannelForwardingMessageListener() {
            super(KafkaMessageDrivenChannelAdapter.this.keyDecoder, KafkaMessageDrivenChannelAdapter.this.payloadDecoder);
        }

        public void doOnMessage(Object key, Object payload, KafkaMessageMetadata metadata) {
            KafkaMessageDrivenChannelAdapter.this.sendMessage(KafkaMessageDrivenChannelAdapter.this.toMessage(key, payload, metadata, null));
        }
    }
}

