/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.cloud.stream.function;

import java.time.Duration;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.cloud.function.context.FunctionType;
import org.springframework.cloud.function.context.catalog.FunctionInspector;
import org.springframework.cloud.stream.binder.ConsumerProperties;
import org.springframework.cloud.stream.converter.CompositeMessageConverterFactory;
import org.springframework.cloud.stream.function.FunctionCatalogWrapper;
import org.springframework.cloud.stream.function.StreamFunctionProperties;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.converter.CompositeMessageConverter;
import org.springframework.messaging.support.ErrorMessage;
import org.springframework.util.Assert;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

class FunctionInvoker<I, O>
implements Function<Flux<Message<I>>, Flux<Message<O>>> {
    private static final Log logger = LogFactory.getLog(FunctionInvoker.class);
    private final Class<?> inputClass;
    private final Class<?> outputClass;
    private final Function<Flux<?>, Flux<?>> userFunction;
    private final CompositeMessageConverter messageConverter;
    private final MessageChannel errorChannel;
    private final boolean isInputArgumentMessage;
    private final ConsumerProperties consumerProperties;

    FunctionInvoker(StreamFunctionProperties functionProperties, FunctionCatalogWrapper functionCatalog, FunctionInspector functionInspector, CompositeMessageConverterFactory compositeMessageConverterFactory) {
        this(functionProperties, functionCatalog, functionInspector, compositeMessageConverterFactory, null);
    }

    FunctionInvoker(StreamFunctionProperties functionProperties, FunctionCatalogWrapper functionCatalog, FunctionInspector functionInspector, CompositeMessageConverterFactory compositeMessageConverterFactory, MessageChannel errorChannel) {
        this.userFunction = (Function)functionCatalog.lookup(functionProperties.getDefinition());
        Assert.isInstanceOf(Function.class, this.userFunction);
        this.messageConverter = compositeMessageConverterFactory.getMessageConverterForAllRegistered();
        FunctionType functionType = functionInspector.getRegistration(this.userFunction).getType();
        this.isInputArgumentMessage = functionType.isMessage();
        this.inputClass = functionType.getInputType();
        this.outputClass = functionType.getOutputType();
        this.errorChannel = errorChannel;
        this.consumerProperties = functionProperties.getConsumerProperties() == null ? new ConsumerProperties() : functionProperties.getConsumerProperties();
    }

    @Override
    public Flux<Message<O>> apply(Flux<Message<I>> input) {
        AtomicReference originalMessageRef = new AtomicReference();
        return input.concatMap(message -> Flux.just((Object)message).doOnNext(originalMessageRef::set).map(this::resolveArgument).transform(this.userFunction::apply).retryBackoff((long)this.consumerProperties.getMaxAttempts(), Duration.ofMillis(this.consumerProperties.getBackOffInitialInterval()), Duration.ofMillis(this.consumerProperties.getBackOffMaxInterval())).onErrorResume(e -> {
            this.onError((Throwable)e, (Message<I>)((Message)originalMessageRef.get()));
            return Mono.empty();
        })).log().map(resultMessage -> this.toMessage(resultMessage, (Message)originalMessageRef.get()));
    }

    private void onError(Throwable t, Message<I> originalMessage) {
        if (this.errorChannel != null) {
            ErrorMessage em = new ErrorMessage(t, originalMessage);
            logger.error((Object)em);
            this.errorChannel.send((Message)em);
        } else {
            logger.error((Object)t);
        }
    }

    private <T> Message<O> toMessage(T value, Message<I> originalMessage) {
        Message returnMessage;
        if (logger.isDebugEnabled()) {
            logger.debug((Object)("Converting result back to message using the original message: " + originalMessage));
        }
        if ((returnMessage = (Message)(value instanceof Message ? value : this.messageConverter.toMessage(value, originalMessage.getHeaders(), this.outputClass))) == null && value.getClass().isAssignableFrom(this.outputClass)) {
            returnMessage = MessageBuilder.withPayload(value).copyHeaders((Map)originalMessage.getHeaders()).removeHeader("contentType").build();
        }
        Assert.notNull((Object)returnMessage, (String)("Failed to convert result value '" + value + "' to message."));
        return returnMessage;
    }

    private <T> T resolveArgument(Message<I> message) {
        if (logger.isDebugEnabled()) {
            logger.debug((Object)("Resolving input argument from message: " + message));
        }
        Object argument = this.shouldConvertFromMessage(message) ? this.messageConverter.fromMessage(message, this.inputClass) : message;
        Assert.notNull(argument, (String)("Failed to resolve argument type '" + this.inputClass + "' from message: " + message));
        if (!this.isInputArgumentMessage && argument instanceof Message) {
            argument = argument.getPayload();
        }
        return (T)argument;
    }

    private boolean shouldConvertFromMessage(Message<?> message) {
        return !this.inputClass.isAssignableFrom(Message.class) && !this.inputClass.isAssignableFrom(message.getPayload().getClass()) && !this.inputClass.isAssignableFrom(Object.class);
    }
}

