/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.integration.redis.outbound;

import java.util.function.Function;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.data.redis.connection.ReactiveRedisConnectionFactory;
import org.springframework.data.redis.connection.RedisStreamCommands;
import org.springframework.data.redis.connection.stream.ObjectRecord;
import org.springframework.data.redis.connection.stream.Record;
import org.springframework.data.redis.connection.stream.StreamRecords;
import org.springframework.data.redis.core.ReactiveRedisTemplate;
import org.springframework.data.redis.core.ReactiveStreamOperations;
import org.springframework.data.redis.hash.HashMapper;
import org.springframework.data.redis.serializer.RedisSerializationContext;
import org.springframework.expression.EvaluationContext;
import org.springframework.expression.Expression;
import org.springframework.expression.common.LiteralExpression;
import org.springframework.integration.expression.ExpressionUtils;
import org.springframework.integration.handler.AbstractReactiveMessageHandler;
import org.springframework.lang.Nullable;
import org.springframework.messaging.Message;
import org.springframework.util.Assert;
import reactor.core.publisher.Mono;

public class ReactiveRedisStreamMessageHandler
extends AbstractReactiveMessageHandler {
    private final Expression streamKeyExpression;
    private final ReactiveRedisConnectionFactory connectionFactory;
    private EvaluationContext evaluationContext;
    private boolean extractPayload = true;
    private ReactiveStreamOperations<String, ?, ?> reactiveStreamOperations;
    private RedisSerializationContext<String, ?> serializationContext = RedisSerializationContext.string();
    @Nullable
    private HashMapper<String, ?, ?> hashMapper;
    @Nullable
    private Function<Message<?>, RedisStreamCommands.XAddOptions> addOptionsFunction;

    public ReactiveRedisStreamMessageHandler(ReactiveRedisConnectionFactory connectionFactory, String streamKey) {
        this(connectionFactory, (Expression)new LiteralExpression(streamKey));
    }

    public ReactiveRedisStreamMessageHandler(ReactiveRedisConnectionFactory connectionFactory, Expression streamKeyExpression) {
        Assert.notNull((Object)streamKeyExpression, (String)"'streamKeyExpression' must not be null");
        Assert.notNull((Object)connectionFactory, (String)"'connectionFactory' must not be null");
        this.streamKeyExpression = streamKeyExpression;
        this.connectionFactory = connectionFactory;
    }

    public void setSerializationContext(RedisSerializationContext<String, ?> serializationContext) {
        Assert.notNull(serializationContext, (String)"'serializationContext' must not be null");
        this.serializationContext = serializationContext;
    }

    public void setHashMapper(@Nullable HashMapper<String, ?, ?> hashMapper) {
        this.hashMapper = hashMapper;
    }

    public void setExtractPayload(boolean extractPayload) {
        this.extractPayload = extractPayload;
    }

    public void setAddOptionsFunction(Function<Message<?>, RedisStreamCommands.XAddOptions> addOptionsFunction) {
        this.addOptionsFunction = addOptionsFunction;
    }

    public String getComponentType() {
        return "redis:stream-outbound-channel-adapter";
    }

    protected void onInit() {
        super.onInit();
        this.evaluationContext = ExpressionUtils.createStandardEvaluationContext((BeanFactory)this.getBeanFactory());
        ReactiveRedisTemplate template = new ReactiveRedisTemplate(this.connectionFactory, this.serializationContext);
        this.reactiveStreamOperations = this.hashMapper == null ? template.opsForStream() : template.opsForStream(this.hashMapper);
    }

    protected Mono<Void> handleMessageInternal(Message<?> message) {
        return Mono.fromSupplier(() -> {
            String streamKey = (String)this.streamKeyExpression.getValue(this.evaluationContext, (Object)message, String.class);
            Assert.notNull((Object)streamKey, (String)"'streamKey' must not be null");
            return streamKey;
        }).flatMap(streamKey -> {
            Object value = message;
            if (this.extractPayload) {
                value = message.getPayload();
            }
            ObjectRecord record = StreamRecords.objectBacked((Object)value).withStreamKey(streamKey);
            if (this.addOptionsFunction == null) {
                return this.reactiveStreamOperations.add((Record)record);
            }
            return this.reactiveStreamOperations.add((Record)record, this.addOptionsFunction.apply(message));
        }).then();
    }
}

