/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.cloud.stream.reactive;

import org.springframework.cloud.stream.binding.StreamListenerParameterAdapter;
import org.springframework.core.MethodParameter;
import org.springframework.core.ResolvableType;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.messaging.converter.CompositeMessageConverter;
import org.springframework.util.Assert;
import reactor.core.publisher.Flux;

public class MessageChannelToInputFluxParameterAdapter
implements StreamListenerParameterAdapter<Flux<?>, SubscribableChannel> {
    private final CompositeMessageConverter messageConverter;

    public MessageChannelToInputFluxParameterAdapter(CompositeMessageConverter messageConverter) {
        Assert.notNull((Object)messageConverter, (String)"cannot not be null");
        this.messageConverter = messageConverter;
    }

    public boolean supports(Class<?> bindingTargetType, MethodParameter methodParameter) {
        return SubscribableChannel.class.isAssignableFrom(bindingTargetType) && Flux.class.isAssignableFrom(methodParameter.getParameterType());
    }

    public Flux<?> adapt(SubscribableChannel bindingTarget, MethodParameter parameter) {
        ResolvableType resolvableType = ResolvableType.forMethodParameter((MethodParameter)parameter);
        Class argumentClass = resolvableType.getGeneric(new int[]{0}).getRawClass() != null ? resolvableType.getGeneric(new int[]{0}).getRawClass() : Object.class;
        Object monitor = new Object();
        if (Message.class.isAssignableFrom(argumentClass)) {
            return Flux.create(emitter -> {
                MessageHandler messageHandler = message -> {
                    Object object = monitor;
                    synchronized (object) {
                        emitter.next((Object)message);
                    }
                };
                bindingTarget.subscribe(messageHandler);
                emitter.setCancellation(() -> bindingTarget.unsubscribe(messageHandler));
            }).publish().autoConnect();
        }
        return Flux.create(emitter -> {
            MessageHandler messageHandler = message -> {
                Object object = monitor;
                synchronized (object) {
                    if (argumentClass.isAssignableFrom(message.getPayload().getClass())) {
                        emitter.next(message.getPayload());
                    } else {
                        emitter.next(this.messageConverter.fromMessage(message, argumentClass));
                    }
                }
            };
            bindingTarget.subscribe(messageHandler);
            emitter.setCancellation(() -> bindingTarget.unsubscribe(messageHandler));
        }).publish().autoConnect();
    }
}

