/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.cloud.stream.binder.redis;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
import org.springframework.cloud.stream.binder.AbstractBinder;
import org.springframework.cloud.stream.binder.BinderHeaders;
import org.springframework.cloud.stream.binder.Binding;
import org.springframework.cloud.stream.binder.ConsumerProperties;
import org.springframework.cloud.stream.binder.DefaultBinding;
import org.springframework.cloud.stream.binder.HeaderMode;
import org.springframework.cloud.stream.binder.MessageValues;
import org.springframework.cloud.stream.binder.PartitionHandler;
import org.springframework.cloud.stream.binder.ProducerProperties;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisOperations;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.expression.spel.standard.SpelExpressionParser;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.endpoint.AbstractEndpoint;
import org.springframework.integration.endpoint.EventDrivenConsumer;
import org.springframework.integration.endpoint.MessageProducerSupport;
import org.springframework.integration.handler.AbstractMessageHandler;
import org.springframework.integration.handler.AbstractReplyProducingMessageHandler;
import org.springframework.integration.redis.inbound.RedisQueueMessageDrivenEndpoint;
import org.springframework.integration.redis.outbound.RedisQueueOutboundChannelAdapter;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.retry.RecoveryCallback;
import org.springframework.retry.RetryCallback;
import org.springframework.retry.RetryContext;
import org.springframework.retry.support.RetryTemplate;
import org.springframework.util.Assert;
import org.springframework.util.ObjectUtils;
import org.springframework.util.StringUtils;

public class RedisMessageChannelBinder
extends AbstractBinder<MessageChannel, ConsumerProperties, ProducerProperties> {
    private static final String ERROR_HEADER = "errorKey";
    static final String CONSUMER_GROUPS_KEY_PREFIX = "groups.";
    private static final SpelExpressionParser parser = new SpelExpressionParser();
    private final String[] headersToMap;
    private final RedisOperations<String, String> redisOperations;
    private final RedisConnectionFactory connectionFactory;
    private final RedisQueueOutboundChannelAdapter errorAdapter;

    public RedisMessageChannelBinder(RedisConnectionFactory connectionFactory) {
        this(connectionFactory, new String[0]);
    }

    public RedisMessageChannelBinder(RedisConnectionFactory connectionFactory, String ... headersToMap) {
        Assert.notNull((Object)connectionFactory, (String)"connectionFactory must not be null");
        this.connectionFactory = connectionFactory;
        StringRedisTemplate template = new StringRedisTemplate(connectionFactory);
        template.afterPropertiesSet();
        this.redisOperations = template;
        if (headersToMap != null && headersToMap.length > 0) {
            String[] combinedHeadersToMap = Arrays.copyOfRange(BinderHeaders.STANDARD_HEADERS, 0, BinderHeaders.STANDARD_HEADERS.length + headersToMap.length);
            System.arraycopy(headersToMap, 0, combinedHeadersToMap, BinderHeaders.STANDARD_HEADERS.length, headersToMap.length);
            this.headersToMap = combinedHeadersToMap;
        } else {
            this.headersToMap = BinderHeaders.STANDARD_HEADERS;
        }
        this.errorAdapter = new RedisQueueOutboundChannelAdapter(parser.parseExpression("headers['errorKey']"), connectionFactory);
    }

    public void onInit() {
        this.errorAdapter.setIntegrationEvaluationContext(this.evaluationContext);
        this.errorAdapter.setBeanFactory((BeanFactory)this.getBeanFactory());
        this.errorAdapter.afterPropertiesSet();
    }

    protected Binding<MessageChannel> doBindConsumer(String name, String group, MessageChannel moduleInputChannel, ConsumerProperties properties) {
        if (!StringUtils.hasText((String)group)) {
            group = "anonymous." + UUID.randomUUID().toString();
        }
        String queueName = this.groupedName(name, group);
        if (properties.isPartitioned()) {
            queueName = queueName + "-" + properties.getInstanceIndex();
        }
        MessageProducerSupport adapter = this.createInboundAdapter(properties, queueName);
        return this.doRegisterConsumer(name, group, queueName, moduleInputChannel, adapter, properties);
    }

    private MessageProducerSupport createInboundAdapter(ConsumerProperties accessor, String queueName) {
        CompositeRedisQueueMessageDrivenEndpoint adapter;
        int concurrency = accessor.getConcurrency();
        int n = concurrency = concurrency > 0 ? concurrency : 1;
        if (concurrency == 1) {
            RedisQueueMessageDrivenEndpoint single = new RedisQueueMessageDrivenEndpoint(queueName, this.connectionFactory);
            single.setBeanFactory((BeanFactory)this.getBeanFactory());
            single.setSerializer(null);
            adapter = single;
        } else {
            adapter = new CompositeRedisQueueMessageDrivenEndpoint(queueName, concurrency);
        }
        return adapter;
    }

    private Binding<MessageChannel> doRegisterConsumer(String bindingName, String group, String channelName, MessageChannel moduleInputChannel, MessageProducerSupport adapter, ConsumerProperties properties) {
        DirectChannel bridgeToModuleChannel = new DirectChannel();
        bridgeToModuleChannel.setBeanFactory((BeanFactory)this.getBeanFactory());
        bridgeToModuleChannel.setBeanName(channelName + ".bridge");
        MessageChannel bridgeInputChannel = this.addRetryIfNeeded(channelName, bridgeToModuleChannel, properties);
        adapter.setOutputChannel(bridgeInputChannel);
        adapter.setBeanName("inbound." + channelName);
        adapter.afterPropertiesSet();
        DefaultBinding<MessageChannel> consumerBinding = new DefaultBinding<MessageChannel>(bindingName, group, moduleInputChannel, (AbstractEndpoint)adapter){

            protected void afterUnbind() {
                String key = RedisMessageChannelBinder.CONSUMER_GROUPS_KEY_PREFIX + this.getName();
                RedisMessageChannelBinder.this.redisOperations.boundZSetOps((Object)key).incrementScore((Object)this.getGroup(), -1.0);
            }
        };
        ReceivingHandler convertingBridge = new ReceivingHandler(properties);
        convertingBridge.setOutputChannel(moduleInputChannel);
        convertingBridge.setBeanName(channelName + ".bridge.handler");
        convertingBridge.afterPropertiesSet();
        bridgeToModuleChannel.subscribe((MessageHandler)convertingBridge);
        this.redisOperations.boundZSetOps((Object)(CONSUMER_GROUPS_KEY_PREFIX + bindingName)).incrementScore((Object)group, 1.0);
        adapter.start();
        return consumerBinding;
    }

    private MessageChannel addRetryIfNeeded(final String name, final DirectChannel bridgeToModuleChannel, ConsumerProperties properties) {
        final RetryTemplate retryTemplate = this.buildRetryTemplateIfRetryEnabled(properties);
        if (retryTemplate == null) {
            return bridgeToModuleChannel;
        }
        DirectChannel channel = new DirectChannel(){

            protected boolean doSend(final Message<?> message, final long timeout) {
                try {
                    return (Boolean)retryTemplate.execute((RetryCallback)new RetryCallback<Boolean, Exception>(){

                        public Boolean doWithRetry(RetryContext context) throws Exception {
                            return bridgeToModuleChannel.send(message, timeout);
                        }
                    }, (RecoveryCallback)new RecoveryCallback<Boolean>(){

                        public Boolean recover(RetryContext context) throws Exception {
                            logger.error((Object)("Failed to deliver message; retries exhausted; message sent to queue 'ERRORS:" + name + "' "), context.getLastThrowable());
                            RedisMessageChannelBinder.this.errorAdapter.handleMessage(this.getMessageBuilderFactory().fromMessage(message).setHeader(RedisMessageChannelBinder.ERROR_HEADER, (Object)("ERRORS:" + name)).build());
                            return true;
                        }
                    });
                }
                catch (Exception e) {
                    this.logger.error((Object)"Failed to deliver message", (Throwable)e);
                    return false;
                }
            }
        };
        channel.setBeanName(name + ".bridge");
        return channel;
    }

    protected Binding<MessageChannel> doBindProducer(String name, MessageChannel moduleOutputChannel, ProducerProperties properties) {
        Assert.isInstanceOf(SubscribableChannel.class, (Object)moduleOutputChannel);
        return this.doRegisterProducer(name, moduleOutputChannel, properties);
    }

    private RedisQueueOutboundChannelAdapter createProducerEndpoint(String name, ProducerProperties properties) {
        RedisQueueOutboundChannelAdapter queue = !properties.isPartitioned() ? new RedisQueueOutboundChannelAdapter(name, this.connectionFactory) : new RedisQueueOutboundChannelAdapter(parser.parseExpression(this.buildPartitionRoutingExpression(name)), this.connectionFactory);
        queue.setIntegrationEvaluationContext(this.evaluationContext);
        queue.setBeanFactory((BeanFactory)this.getBeanFactory());
        queue.afterPropertiesSet();
        return queue;
    }

    private Binding<MessageChannel> doRegisterProducer(String name, MessageChannel moduleOutputChannel, ProducerProperties properties) {
        Assert.isInstanceOf(SubscribableChannel.class, (Object)moduleOutputChannel);
        SendingHandler handler = new SendingHandler(name, properties);
        EventDrivenConsumer consumer = new EventDrivenConsumer((SubscribableChannel)moduleOutputChannel, (MessageHandler)handler);
        consumer.setBeanFactory((BeanFactory)this.getBeanFactory());
        consumer.setBeanName("outbound." + name);
        consumer.afterPropertiesSet();
        DefaultBinding producerBinding = new DefaultBinding(name, null, (Object)moduleOutputChannel, (AbstractEndpoint)consumer);
        Object[] requiredGroups = properties.getRequiredGroups();
        if (!ObjectUtils.isEmpty((Object[])requiredGroups)) {
            for (Object group : requiredGroups) {
                this.redisOperations.boundZSetOps((Object)(CONSUMER_GROUPS_KEY_PREFIX + name)).incrementScore(group, 1.0);
            }
        }
        consumer.start();
        return producerBinding;
    }

    private class CompositeRedisQueueMessageDrivenEndpoint
    extends MessageProducerSupport {
        private final List<RedisQueueMessageDrivenEndpoint> consumers = new ArrayList<RedisQueueMessageDrivenEndpoint>();

        public CompositeRedisQueueMessageDrivenEndpoint(String queueName, int concurrency) {
            for (int i = 0; i < concurrency; ++i) {
                RedisQueueMessageDrivenEndpoint adapter = new RedisQueueMessageDrivenEndpoint(queueName, RedisMessageChannelBinder.this.connectionFactory);
                adapter.setBeanFactory((BeanFactory)RedisMessageChannelBinder.this.getBeanFactory());
                adapter.setSerializer(null);
                adapter.setBeanName("inbound." + queueName + "." + i);
                this.consumers.add(adapter);
            }
            this.setBeanFactory((BeanFactory)RedisMessageChannelBinder.this.getBeanFactory());
        }

        protected void onInit() {
            for (RedisQueueMessageDrivenEndpoint consumer : this.consumers) {
                consumer.afterPropertiesSet();
            }
        }

        protected void doStart() {
            for (RedisQueueMessageDrivenEndpoint consumer : this.consumers) {
                consumer.start();
            }
        }

        protected void doStop() {
            for (RedisQueueMessageDrivenEndpoint consumer : this.consumers) {
                consumer.stop();
            }
        }

        public void setOutputChannel(MessageChannel outputChannel) {
            for (RedisQueueMessageDrivenEndpoint consumer : this.consumers) {
                consumer.setOutputChannel(outputChannel);
            }
        }

        public void setErrorChannel(MessageChannel errorChannel) {
            for (RedisQueueMessageDrivenEndpoint consumer : this.consumers) {
                consumer.setErrorChannel(errorChannel);
            }
        }
    }

    private class ReceivingHandler
    extends AbstractReplyProducingMessageHandler {
        private final ConsumerProperties consumerProperties;

        public ReceivingHandler(ConsumerProperties consumerProperties) {
            this.consumerProperties = consumerProperties;
            this.setBeanFactory((BeanFactory)RedisMessageChannelBinder.this.getBeanFactory());
        }

        protected Object handleRequestMessage(Message<?> requestMessage) {
            if (HeaderMode.embeddedHeaders.equals((Object)this.consumerProperties.getHeaderMode())) {
                return RedisMessageChannelBinder.this.extractMessageValues(requestMessage).toMessage(this.getMessageBuilderFactory());
            }
            return requestMessage;
        }

        protected boolean shouldCopyRequestHeaders() {
            return false;
        }
    }

    private class SendingHandler
    extends AbstractMessageHandler {
        private final String bindingName;
        private final ProducerProperties producerProperties;
        private final Map<String, RedisQueueOutboundChannelAdapter> adapters = new HashMap<String, RedisQueueOutboundChannelAdapter>();
        private final PartitionHandler partitionHandler;

        private SendingHandler(String bindingName, ProducerProperties producerProperties) {
            this.bindingName = bindingName;
            this.producerProperties = producerProperties;
            ConfigurableListableBeanFactory beanFactory = RedisMessageChannelBinder.this.getBeanFactory();
            this.setBeanFactory((BeanFactory)beanFactory);
            this.partitionHandler = new PartitionHandler(beanFactory, RedisMessageChannelBinder.this.evaluationContext, RedisMessageChannelBinder.this.partitionSelector, producerProperties);
            this.refreshChannelAdapters();
        }

        protected void handleMessageInternal(Message<?> message) throws Exception {
            MessageValues transformed = RedisMessageChannelBinder.this.serializePayloadIfNecessary(message);
            if (this.producerProperties.isPartitioned()) {
                transformed.put("partition", (Object)this.partitionHandler.determinePartition(message));
            }
            byte[] messageToSend = null;
            if (HeaderMode.embeddedHeaders.equals((Object)this.producerProperties.getHeaderMode())) {
                messageToSend = RedisMessageChannelBinder.this.embeddedHeadersMessageConverter.embedHeaders(transformed, RedisMessageChannelBinder.this.headersToMap);
            } else {
                Object contentType = message.getHeaders().get((Object)"contentType");
                if (contentType != null && !contentType.equals("application/octet-stream")) {
                    this.logger.error((Object)("Raw mode supports only application/octet-stream content type" + message.getPayload().getClass()));
                }
                if (message.getPayload() instanceof byte[]) {
                    messageToSend = (byte[])message.getPayload();
                } else {
                    this.logger.error((Object)("Raw mode supports only byte[] payloads but value sent was of type " + message.getPayload().getClass()));
                }
            }
            if (messageToSend != null) {
                this.refreshChannelAdapters();
                for (RedisQueueOutboundChannelAdapter adapter : this.adapters.values()) {
                    adapter.handleMessage(MessageBuilder.withPayload((Object)messageToSend).copyHeaders((Map)transformed).build());
                }
            }
        }

        private void refreshChannelAdapters() {
            Set groups = RedisMessageChannelBinder.this.redisOperations.boundZSetOps((Object)(RedisMessageChannelBinder.CONSUMER_GROUPS_KEY_PREFIX + this.bindingName)).rangeByScore(1.0, Double.MAX_VALUE);
            for (String group : groups) {
                if (this.adapters.containsKey(group)) continue;
                String channel = String.format("%s.%s", this.bindingName, group);
                this.adapters.put(group, RedisMessageChannelBinder.this.createProducerEndpoint(channel, this.producerProperties));
            }
        }
    }
}

