/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.cloud.stream.binder;

import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.cloud.stream.binder.AbstractBinder;
import org.springframework.cloud.stream.binder.BinderException;
import org.springframework.cloud.stream.binder.Binding;
import org.springframework.cloud.stream.binder.ConsumerProperties;
import org.springframework.cloud.stream.binder.DefaultBinding;
import org.springframework.cloud.stream.binder.EmbeddedHeaderUtils;
import org.springframework.cloud.stream.binder.HeaderMode;
import org.springframework.cloud.stream.binder.MessageValues;
import org.springframework.cloud.stream.binder.ProducerProperties;
import org.springframework.cloud.stream.provisioning.ConsumerDestination;
import org.springframework.cloud.stream.provisioning.ProducerDestination;
import org.springframework.cloud.stream.provisioning.ProvisioningException;
import org.springframework.cloud.stream.provisioning.ProvisioningProvider;
import org.springframework.context.Lifecycle;
import org.springframework.expression.ExpressionParser;
import org.springframework.expression.spel.standard.SpelExpressionParser;
import org.springframework.integration.channel.FixedSubscriberChannel;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.endpoint.EventDrivenConsumer;
import org.springframework.integration.handler.AbstractMessageHandler;
import org.springframework.integration.handler.AbstractReplyProducingMessageHandler;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.util.Assert;
import org.springframework.util.MimeType;

public abstract class AbstractMessageChannelBinder<C extends ConsumerProperties, P extends ProducerProperties, PP extends ProvisioningProvider<C, P>>
extends AbstractBinder<MessageChannel, C, P> {
    protected static final ExpressionParser EXPRESSION_PARSER = new SpelExpressionParser();
    private final boolean supportsHeadersNatively;
    private final String[] headersToEmbed;
    protected final PP provisioningProvider;

    public AbstractMessageChannelBinder(boolean supportsHeadersNatively, String[] headersToEmbed, PP provisioningProvider) {
        this.supportsHeadersNatively = supportsHeadersNatively;
        this.headersToEmbed = headersToEmbed == null ? new String[]{} : headersToEmbed;
        this.provisioningProvider = provisioningProvider;
    }

    @Override
    public final Binding<MessageChannel> doBindProducer(String destination, MessageChannel outputChannel, final P producerProperties) throws BinderException {
        MessageHandler producerMessageHandler;
        ProducerDestination producerDestination;
        Assert.isInstanceOf(SubscribableChannel.class, (Object)outputChannel, (String)"Binding is supported only for SubscribableChannel instances");
        try {
            producerDestination = this.provisioningProvider.provisionProducerDestination(destination, producerProperties);
            producerMessageHandler = this.createProducerMessageHandler(producerDestination, producerProperties);
            if (producerMessageHandler instanceof InitializingBean) {
                ((InitializingBean)producerMessageHandler).afterPropertiesSet();
            }
        }
        catch (Exception e) {
            if (e instanceof BinderException) {
                throw (BinderException)e;
            }
            if (e instanceof ProvisioningException) {
                throw (ProvisioningException)((Object)e);
            }
            throw new BinderException("Exception thrown while building outbound endpoint", e);
        }
        if (producerMessageHandler instanceof Lifecycle) {
            ((Lifecycle)producerMessageHandler).start();
        }
        ((SubscribableChannel)outputChannel).subscribe((MessageHandler)new SendingHandler(producerMessageHandler, !this.supportsHeadersNatively && HeaderMode.embeddedHeaders.equals((Object)((ProducerProperties)producerProperties).getHeaderMode()), this.headersToEmbed, ((ProducerProperties)producerProperties).isUseNativeEncoding()));
        return new DefaultBinding<MessageChannel>(destination, null, outputChannel, producerMessageHandler instanceof Lifecycle ? (Lifecycle)producerMessageHandler : null){

            @Override
            public void afterUnbind() {
                AbstractMessageChannelBinder.this.afterUnbindProducer(producerDestination, producerProperties);
            }
        };
    }

    protected abstract MessageHandler createProducerMessageHandler(ProducerDestination var1, P var2) throws Exception;

    protected void afterUnbindProducer(ProducerDestination destination, P producerProperties) {
    }

    @Override
    public final Binding<MessageChannel> doBindConsumer(String name, String group, MessageChannel inputChannel, final C properties) throws BinderException {
        MessageProducer consumerEndpoint = null;
        try {
            final ConsumerDestination destination = this.provisioningProvider.provisionConsumerDestination(name, group, properties);
            boolean extractEmbeddedHeaders = HeaderMode.embeddedHeaders.equals((Object)((ConsumerProperties)properties).getHeaderMode()) && !this.supportsHeadersNatively;
            ReceivingHandler rh = new ReceivingHandler(extractEmbeddedHeaders);
            rh.setOutputChannel(inputChannel);
            FixedSubscriberChannel bridge = new FixedSubscriberChannel((MessageHandler)rh);
            bridge.setBeanName("bridge." + name);
            consumerEndpoint = this.createConsumerEndpoint(destination, group, properties);
            consumerEndpoint.setOutputChannel((MessageChannel)bridge);
            if (consumerEndpoint instanceof InitializingBean) {
                ((InitializingBean)consumerEndpoint).afterPropertiesSet();
            }
            if (consumerEndpoint instanceof Lifecycle) {
                ((Lifecycle)consumerEndpoint).start();
            }
            MessageProducer endpoint = consumerEndpoint;
            EventDrivenConsumer edc = new EventDrivenConsumer((SubscribableChannel)bridge, (MessageHandler)rh);
            edc.setBeanName("inbound." + this.groupedName(name, group));
            edc.start();
            return new DefaultBinding<MessageChannel>(name, group, inputChannel, endpoint instanceof Lifecycle ? (Lifecycle)endpoint : null){

                @Override
                protected void afterUnbind() {
                    AbstractMessageChannelBinder.this.afterUnbindConsumer(destination, this.group, properties);
                }
            };
        }
        catch (Exception e) {
            if (consumerEndpoint instanceof Lifecycle) {
                ((Lifecycle)consumerEndpoint).stop();
            }
            if (e instanceof BinderException) {
                throw (BinderException)e;
            }
            if (e instanceof ProvisioningException) {
                throw (ProvisioningException)((Object)e);
            }
            throw new BinderException("Exception thrown while starting consumer: ", e);
        }
    }

    protected abstract MessageProducer createConsumerEndpoint(ConsumerDestination var1, String var2, C var3) throws Exception;

    protected void afterUnbindConsumer(ConsumerDestination destination, String group, C consumerProperties) {
    }

    private final class SendingHandler
    extends AbstractMessageHandler
    implements Lifecycle {
        private final boolean embedHeaders;
        private final String[] embeddedHeaders;
        private final MessageHandler delegate;
        private final boolean useNativeEncoding;

        private SendingHandler(MessageHandler delegate, boolean embedHeaders, String[] headersToEmbed, boolean useNativeEncoding) {
            this.delegate = delegate;
            this.setBeanFactory((BeanFactory)AbstractMessageChannelBinder.this.getBeanFactory());
            this.embedHeaders = embedHeaders;
            this.embeddedHeaders = headersToEmbed;
            this.useNativeEncoding = useNativeEncoding;
        }

        protected void handleMessageInternal(Message<?> message) throws Exception {
            Message<?> messageToSend = this.useNativeEncoding ? message : this.serializeAndEmbedHeadersIfApplicable(message);
            this.delegate.handleMessage(messageToSend);
        }

        private Message<?> serializeAndEmbedHeadersIfApplicable(Message<?> message) throws Exception {
            byte[] payload;
            MessageValues transformed = AbstractMessageChannelBinder.this.serializePayloadIfNecessary(message);
            if (this.embedHeaders) {
                Object originalContentType;
                Object contentType = transformed.get("contentType");
                if (contentType instanceof MimeType) {
                    transformed.put("contentType", (Object)contentType.toString());
                }
                if ((originalContentType = transformed.get("originalContentType")) instanceof MimeType) {
                    transformed.put("originalContentType", (Object)originalContentType.toString());
                }
                payload = EmbeddedHeaderUtils.embedHeaders(transformed, this.embeddedHeaders);
            } else {
                payload = (byte[])transformed.getPayload();
            }
            return this.getMessageBuilderFactory().withPayload((Object)payload).copyHeaders(transformed.getHeaders()).build();
        }

        public void start() {
            if (this.delegate instanceof Lifecycle) {
                ((Lifecycle)this.delegate).start();
            }
        }

        public void stop() {
            if (this.delegate instanceof Lifecycle) {
                ((Lifecycle)this.delegate).stop();
            }
        }

        public boolean isRunning() {
            return this.delegate instanceof Lifecycle && ((Lifecycle)this.delegate).isRunning();
        }
    }

    private final class ReceivingHandler
    extends AbstractReplyProducingMessageHandler {
        private final boolean extractEmbeddedHeaders;

        private ReceivingHandler(boolean extractEmbeddedHeaders) {
            this.extractEmbeddedHeaders = extractEmbeddedHeaders;
        }

        protected Object handleRequestMessage(Message<?> requestMessage) {
            MessageValues messageValues;
            if (!(requestMessage.getPayload() instanceof byte[]) && !requestMessage.getHeaders().containsKey((Object)"originalContentType")) {
                return requestMessage;
            }
            if (this.extractEmbeddedHeaders) {
                try {
                    messageValues = EmbeddedHeaderUtils.extractHeaders(requestMessage, true);
                }
                catch (Exception e) {
                    AbstractMessageChannelBinder.this.logger.error((Object)EmbeddedHeaderUtils.decodeExceptionMessage(requestMessage), (Throwable)e);
                    messageValues = new MessageValues(requestMessage);
                }
                messageValues = AbstractMessageChannelBinder.this.deserializePayloadIfNecessary(messageValues);
            } else {
                messageValues = AbstractMessageChannelBinder.this.deserializePayloadIfNecessary(requestMessage);
            }
            return messageValues.toMessage();
        }

        protected boolean shouldCopyRequestHeaders() {
            return false;
        }
    }
}

