/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.integration.zeromq.inbound;

import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import org.reactivestreams.Publisher;
import org.springframework.integration.endpoint.MessageProducerSupport;
import org.springframework.integration.mapping.ConvertingBytesMessageMapper;
import org.springframework.integration.mapping.InboundMessageMapper;
import org.springframework.integration.support.converter.ConfigurableCompositeMessageConverter;
import org.springframework.integration.support.management.IntegrationManagedResource;
import org.springframework.jmx.export.annotation.ManagedOperation;
import org.springframework.jmx.export.annotation.ManagedResource;
import org.springframework.lang.Nullable;
import org.springframework.messaging.Message;
import org.springframework.messaging.converter.MessageConverter;
import org.springframework.util.Assert;
import org.zeromq.SocketType;
import org.zeromq.ZContext;
import org.zeromq.ZMQ;
import org.zeromq.ZMsg;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;

@ManagedResource
@IntegrationManagedResource
public class ZeroMqMessageProducer
extends MessageProducerSupport {
    public static final Duration DEFAULT_CONSUME_DELAY = Duration.ofSeconds(1L);
    private static final List<SocketType> VALID_SOCKET_TYPES = Arrays.asList(SocketType.PAIR, SocketType.PULL, SocketType.SUB);
    private final Scheduler consumerScheduler = Schedulers.newSingle((String)"zeroMqMessageProducerScheduler");
    private final AtomicInteger bindPort = new AtomicInteger();
    private final ZContext context;
    private final SocketType socketType;
    private InboundMessageMapper<byte[]> messageMapper;
    private Consumer<ZMQ.Socket> socketConfigurer = socket -> {};
    private Duration consumeDelay = DEFAULT_CONSUME_DELAY;
    private String[] topics = new String[]{""};
    private boolean receiveRaw;
    @Nullable
    private String connectUrl;
    private volatile Mono<ZMQ.Socket> socketMono;

    public ZeroMqMessageProducer(ZContext context) {
        this(context, SocketType.PAIR);
    }

    public ZeroMqMessageProducer(ZContext context, SocketType socketType) {
        Assert.notNull((Object)context, (String)"'context' must not be null");
        Assert.state((boolean)VALID_SOCKET_TYPES.contains(socketType), () -> "'socketType' can only be one of the: " + VALID_SOCKET_TYPES);
        this.context = context;
        this.socketType = socketType;
    }

    public void setConsumeDelay(Duration consumeDelay) {
        Assert.notNull((Object)consumeDelay, (String)"'consumeDelay' must not be null");
        this.consumeDelay = consumeDelay;
    }

    public void setMessageMapper(InboundMessageMapper<byte[]> messageMapper) {
        Assert.notNull(messageMapper, (String)"'messageMapper' must not be null");
        this.messageMapper = messageMapper;
    }

    public void setMessageConverter(MessageConverter messageConverter) {
        this.setMessageMapper((InboundMessageMapper<byte[]>)new ConvertingBytesMessageMapper(messageConverter));
    }

    public void setReceiveRaw(boolean receiveRaw) {
        this.receiveRaw = receiveRaw;
    }

    public void setSocketConfigurer(Consumer<ZMQ.Socket> socketConfigurer) {
        Assert.notNull(socketConfigurer, (String)"'socketConfigurer' must not be null");
        this.socketConfigurer = socketConfigurer;
    }

    public void setTopics(String ... topics) {
        Assert.notNull((Object)topics, (String)"'topics' cannot be null");
        Assert.noNullElements((Object[])topics, (String)"'topics' must not contain null elements");
        this.topics = Arrays.copyOf(topics, topics.length);
    }

    public void setConnectUrl(@Nullable String connectUrl) {
        this.connectUrl = connectUrl;
    }

    public void setBindPort(int port) {
        Assert.isTrue((port > 0 ? 1 : 0) != 0, (String)"'port' must not be zero or negative");
        this.bindPort.set(port);
    }

    public int getBoundPort() {
        return this.bindPort.get();
    }

    public String getComponentType() {
        return "zeromq:inbound-channel-adapter";
    }

    protected void onInit() {
        super.onInit();
        Assert.state((this.connectUrl == null || this.bindPort.get() == 0 ? 1 : 0) != 0, (String)"Only one of the 'connectUrl' or `bindPort` must be provided on none");
        if (this.messageMapper == null && !this.receiveRaw) {
            ConfigurableCompositeMessageConverter messageConverter = new ConfigurableCompositeMessageConverter();
            messageConverter.setBeanFactory(this.getBeanFactory());
            messageConverter.afterPropertiesSet();
            this.messageMapper = new ConvertingBytesMessageMapper((MessageConverter)messageConverter);
        }
    }

    @ManagedOperation
    public void subscribeToTopics(String ... topics) {
        Assert.state((boolean)SocketType.SUB.equals((Object)this.socketType), (String)"Only SUB socket can accept a subscription option.");
        Assert.state((boolean)this.isActive(), (String)"This message producer is not active to accept a new subscription.");
        Flux.fromArray((Object[])topics).flatMap(topic -> this.socketMono.doOnNext(socket -> socket.subscribe(topic))).subscribe();
    }

    @ManagedOperation
    public void unsubscribeFromTopics(String ... topics) {
        Assert.state((boolean)SocketType.SUB.equals((Object)this.socketType), (String)"Only SUB socket can accept a unsubscription option.");
        Assert.state((boolean)this.isActive(), (String)"This message producer is not active to cancel a subscription.");
        Flux.fromArray((Object[])topics).flatMap(topic -> this.socketMono.doOnNext(socket -> socket.unsubscribe(topic))).subscribe();
    }

    protected void doStart() {
        this.socketMono = Mono.just((Object)this.context.createSocket(this.socketType)).publishOn(this.consumerScheduler).doOnNext(this.socketConfigurer).doOnNext(socket -> {
            if (SocketType.SUB.equals((Object)this.socketType)) {
                for (String topic : this.topics) {
                    socket.subscribe(topic);
                }
            }
        }).doOnNext(socket -> {
            if (this.connectUrl != null) {
                socket.connect(this.connectUrl);
            } else {
                this.bindPort.set(ZeroMqMessageProducer.bindSocket(socket, this.bindPort.get()));
            }
        }).cache().publishOn(this.consumerScheduler);
        Flux dataFlux = this.socketMono.flatMap(socket -> {
            ZMsg msg;
            if (this.isRunning() && (msg = ZMsg.recvMsg((ZMQ.Socket)socket, (boolean)false)) != null) {
                return Mono.just((Object)msg);
            }
            return Mono.empty();
        }).publishOn(Schedulers.boundedElastic()).transform(msgMono -> this.receiveRaw ? this.mapRaw((Mono<ZMsg>)msgMono) : this.convertMessage((Mono<ZMsg>)msgMono)).doOnError(error -> this.logger.error(error, () -> "Error processing ZeroMQ message in the " + this)).repeatWhenEmpty(repeat -> this.isActive() ? repeat.delayElements(this.consumeDelay) : repeat).repeat(() -> ((ZeroMqMessageProducer)this).isActive()).doOnComplete(() -> ((Scheduler)this.consumerScheduler).dispose());
        this.subscribeToPublisher((Publisher)dataFlux);
    }

    private Mono<Message<?>> mapRaw(Mono<ZMsg> msgMono) {
        return msgMono.map(msg -> this.getMessageBuilderFactory().withPayload(msg).build());
    }

    private Mono<Message<?>> convertMessage(Mono<ZMsg> msgMono) {
        return msgMono.map(msg -> {
            Map<String, String> headers = null;
            if (msg.size() > 1) {
                headers = Collections.singletonMap("zeromq_topic", msg.unwrap().getString(ZMQ.CHARSET));
            }
            return this.messageMapper.toMessage((Object)msg.getLast().getData(), headers);
        });
    }

    protected void doStop() {
        super.doStop();
        this.socketMono.doOnNext(ZMQ.Socket::close).subscribe();
    }

    public void destroy() {
        super.destroy();
        this.socketMono.doOnNext(ZMQ.Socket::close).block();
    }

    private static int bindSocket(ZMQ.Socket socket, int port) {
        if (port == 0) {
            return socket.bindToRandomPort("tcp://*");
        }
        boolean bound = socket.bind("tcp://*:" + port);
        if (!bound) {
            throw new IllegalArgumentException("Cannot bind ZeroMQ socket to port: " + port);
        }
        return port;
    }
}

