/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.cloud.stream.reactive;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.cloud.stream.binding.StreamListenerParameterAdapter;
import org.springframework.cloud.stream.reactive.FluxSender;
import org.springframework.core.MethodParameter;
import org.springframework.core.ResolvableType;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import reactor.core.publisher.MonoProcessor;

public class MessageChannelToFluxSenderParameterAdapter
implements StreamListenerParameterAdapter<FluxSender, MessageChannel> {
    private Log log = LogFactory.getLog(MessageChannelToFluxSenderParameterAdapter.class);

    public boolean supports(Class<?> bindingTargetType, MethodParameter methodParameter) {
        ResolvableType type = ResolvableType.forMethodParameter((MethodParameter)methodParameter);
        return MessageChannel.class.isAssignableFrom(bindingTargetType) && FluxSender.class.isAssignableFrom(type.getRawClass());
    }

    public FluxSender adapt(MessageChannel bindingTarget, MethodParameter parameter) {
        return resultPublisher -> {
            MonoProcessor sendResult = MonoProcessor.create();
            resultPublisher.doOnError(e -> this.log.error((Object)"Error during processing: ", e)).retry().subscribe(result -> bindingTarget.send(result instanceof Message ? (Message)result : MessageBuilder.withPayload((Object)result).build()), e -> sendResult.onError(e), () -> sendResult.onComplete());
            return sendResult;
        };
    }
}

