/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.cloud.stream.binder;

import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.cloud.stream.binder.AbstractBinder;
import org.springframework.cloud.stream.binder.BinderException;
import org.springframework.cloud.stream.binder.Binding;
import org.springframework.cloud.stream.binder.ConsumerProperties;
import org.springframework.cloud.stream.binder.DefaultBinding;
import org.springframework.cloud.stream.binder.EmbeddedHeadersMessageConverter;
import org.springframework.cloud.stream.binder.HeaderMode;
import org.springframework.cloud.stream.binder.MessageValues;
import org.springframework.cloud.stream.binder.ProducerProperties;
import org.springframework.context.Lifecycle;
import org.springframework.expression.ExpressionParser;
import org.springframework.expression.spel.standard.SpelExpressionParser;
import org.springframework.integration.channel.FixedSubscriberChannel;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.endpoint.EventDrivenConsumer;
import org.springframework.integration.handler.AbstractMessageHandler;
import org.springframework.integration.handler.AbstractReplyProducingMessageHandler;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.util.Assert;
import org.springframework.util.MimeType;

public abstract class AbstractMessageChannelBinder<C extends ConsumerProperties, P extends ProducerProperties, CD, PD>
extends AbstractBinder<MessageChannel, C, P> {
    protected static final ExpressionParser EXPRESSION_PARSER = new SpelExpressionParser();
    private final EmbeddedHeadersMessageConverter embeddedHeadersMessageConverter = new EmbeddedHeadersMessageConverter();
    private final boolean supportsHeadersNatively;
    private final String[] headersToEmbed;

    public AbstractMessageChannelBinder(boolean supportsHeadersNatively, String[] headersToEmbed) {
        this.supportsHeadersNatively = supportsHeadersNatively;
        this.headersToEmbed = headersToEmbed;
    }

    @Override
    public final Binding<MessageChannel> doBindProducer(final String destination, MessageChannel outputChannel, final P producerProperties) throws BinderException {
        MessageHandler producerMessageHandler;
        Assert.isInstanceOf(SubscribableChannel.class, (Object)outputChannel, (String)"Binding is supported only for SubscribableChannel instances");
        PD producerDestination = this.createProducerDestinationIfNecessary(destination, producerProperties);
        try {
            producerMessageHandler = this.createProducerMessageHandler(producerDestination, producerProperties);
            if (producerMessageHandler instanceof InitializingBean) {
                ((InitializingBean)producerMessageHandler).afterPropertiesSet();
            }
        }
        catch (Exception e) {
            if (e instanceof BinderException) {
                throw (BinderException)e;
            }
            throw new BinderException("Exception thrown while building outbound endpoint", e);
        }
        if (producerMessageHandler instanceof Lifecycle) {
            ((Lifecycle)producerMessageHandler).start();
        }
        ((SubscribableChannel)outputChannel).subscribe((MessageHandler)new SendingHandler(producerMessageHandler, !this.supportsHeadersNatively && HeaderMode.embeddedHeaders.equals((Object)((ProducerProperties)producerProperties).getHeaderMode()), this.headersToEmbed));
        return new DefaultBinding<MessageChannel>(destination, null, outputChannel, producerMessageHandler instanceof Lifecycle ? (Lifecycle)producerMessageHandler : null){

            @Override
            public void afterUnbind() {
                AbstractMessageChannelBinder.this.afterUnbindProducer(destination, producerProperties);
            }
        };
    }

    protected abstract PD createProducerDestinationIfNecessary(String var1, P var2);

    protected abstract MessageHandler createProducerMessageHandler(PD var1, P var2) throws Exception;

    protected void afterUnbindProducer(String destination, P producerProperties) {
    }

    @Override
    public final Binding<MessageChannel> doBindConsumer(String name, String group, MessageChannel inputChannel, final C properties) throws BinderException {
        MessageProducer consumerEndpoint = null;
        try {
            CD destination = this.createConsumerDestinationIfNecessary(name, group, properties);
            boolean extractEmbeddedHeaders = HeaderMode.embeddedHeaders.equals((Object)((ConsumerProperties)properties).getHeaderMode()) && !this.supportsHeadersNatively;
            ReceivingHandler rh = new ReceivingHandler(extractEmbeddedHeaders);
            rh.setOutputChannel(inputChannel);
            FixedSubscriberChannel bridge = new FixedSubscriberChannel((MessageHandler)rh);
            bridge.setBeanName("bridge." + name);
            consumerEndpoint = this.createConsumerEndpoint(name, group, destination, properties);
            consumerEndpoint.setOutputChannel((MessageChannel)bridge);
            if (consumerEndpoint instanceof InitializingBean) {
                ((InitializingBean)consumerEndpoint).afterPropertiesSet();
            }
            if (consumerEndpoint instanceof Lifecycle) {
                ((Lifecycle)consumerEndpoint).start();
            }
            MessageProducer endpoint = consumerEndpoint;
            EventDrivenConsumer edc = new EventDrivenConsumer((SubscribableChannel)bridge, (MessageHandler)rh);
            edc.setBeanName("inbound." + this.groupedName(name, group));
            edc.start();
            return new DefaultBinding<MessageChannel>(name, group, inputChannel, endpoint instanceof Lifecycle ? (Lifecycle)endpoint : null){

                @Override
                protected void afterUnbind() {
                    AbstractMessageChannelBinder.this.afterUnbindConsumer(this.name, this.group, properties);
                }
            };
        }
        catch (Exception e) {
            if (consumerEndpoint instanceof Lifecycle) {
                ((Lifecycle)consumerEndpoint).stop();
            }
            if (e instanceof BinderException) {
                throw (BinderException)e;
            }
            throw new BinderException("Exception thrown while starting consumer: ", e);
        }
    }

    protected abstract CD createConsumerDestinationIfNecessary(String var1, String var2, C var3);

    protected abstract MessageProducer createConsumerEndpoint(String var1, String var2, CD var3, C var4);

    protected void afterUnbindConsumer(String destination, String group, C consumerProperties) {
    }

    private final class SendingHandler
    extends AbstractMessageHandler
    implements Lifecycle {
        private final boolean embedHeaders;
        private final String[] embeddedHeaders;
        private final MessageHandler delegate;

        private SendingHandler(MessageHandler delegate, boolean embedHeaders, String[] headersToEmbed) {
            this.delegate = delegate;
            this.setBeanFactory((BeanFactory)AbstractMessageChannelBinder.this.getBeanFactory());
            this.embedHeaders = embedHeaders;
            this.embeddedHeaders = headersToEmbed;
        }

        protected void handleMessageInternal(Message<?> message) throws Exception {
            byte[] payload;
            Object contentType;
            MessageValues transformed = AbstractMessageChannelBinder.this.serializePayloadIfNecessary(message);
            if (this.embedHeaders) {
                Object originalContentType;
                contentType = transformed.get("contentType");
                if (contentType instanceof MimeType) {
                    transformed.put("contentType", (Object)contentType.toString());
                }
                if ((originalContentType = transformed.get("originalContentType")) instanceof MimeType) {
                    transformed.put("originalContentType", (Object)originalContentType.toString());
                }
                payload = AbstractMessageChannelBinder.this.embeddedHeadersMessageConverter.embedHeaders(transformed, this.embeddedHeaders);
            } else {
                payload = (byte[])transformed.getPayload();
            }
            if (!this.embedHeaders && !AbstractMessageChannelBinder.this.supportsHeadersNatively) {
                contentType = message.getHeaders().get((Object)"contentType");
                if (contentType != null && !contentType.toString().equals("application/octet-stream")) {
                    this.logger.error((Object)("Raw mode supports only application/octet-stream content type" + message.getPayload().getClass()));
                }
                if (message.getPayload() instanceof byte[]) {
                    payload = (byte[])message.getPayload();
                } else {
                    throw new BinderException("Raw mode supports only byte[] payloads but value sent was of type " + message.getPayload().getClass());
                }
            }
            this.delegate.handleMessage(this.getMessageBuilderFactory().withPayload((Object)payload).copyHeaders(transformed.getHeaders()).build());
        }

        public void start() {
            if (this.delegate instanceof Lifecycle) {
                ((Lifecycle)this.delegate).start();
            }
        }

        public void stop() {
            if (this.delegate instanceof Lifecycle) {
                ((Lifecycle)this.delegate).stop();
            }
        }

        public boolean isRunning() {
            return this.delegate instanceof Lifecycle && ((Lifecycle)this.delegate).isRunning();
        }
    }

    private final class ReceivingHandler
    extends AbstractReplyProducingMessageHandler {
        private final boolean extractEmbeddedHeaders;

        private ReceivingHandler(boolean extractEmbeddedHeaders) {
            this.extractEmbeddedHeaders = extractEmbeddedHeaders;
        }

        protected Object handleRequestMessage(Message<?> requestMessage) {
            MessageValues messageValues;
            if (this.extractEmbeddedHeaders) {
                try {
                    messageValues = AbstractMessageChannelBinder.this.embeddedHeadersMessageConverter.extractHeaders(requestMessage, true);
                }
                catch (Exception e) {
                    AbstractMessageChannelBinder.this.logger.error((Object)EmbeddedHeadersMessageConverter.decodeExceptionMessage(requestMessage), (Throwable)e);
                    messageValues = new MessageValues(requestMessage);
                }
                messageValues = AbstractMessageChannelBinder.this.deserializePayloadIfNecessary(messageValues);
            } else {
                messageValues = AbstractMessageChannelBinder.this.deserializePayloadIfNecessary(requestMessage);
            }
            return messageValues.toMessage();
        }

        protected boolean shouldCopyRequestHeaders() {
            return false;
        }
    }
}

