/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.messaging.tcp.reactor;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.ChannelGroupFuture;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.handler.codec.ByteToMessageDecoder;
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.ImmediateEventExecutor;
import java.lang.reflect.Method;
import java.time.Duration;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
import org.reactivestreams.Publisher;
import org.springframework.messaging.Message;
import org.springframework.messaging.tcp.ReconnectStrategy;
import org.springframework.messaging.tcp.TcpConnectionHandler;
import org.springframework.messaging.tcp.TcpOperations;
import org.springframework.messaging.tcp.reactor.MonoToListenableFutureAdapter;
import org.springframework.messaging.tcp.reactor.ReactorNettyCodec;
import org.springframework.messaging.tcp.reactor.ReactorNettyTcpConnection;
import org.springframework.util.Assert;
import org.springframework.util.ReflectionUtils;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.SettableListenableFuture;
import reactor.core.publisher.DirectProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoProcessor;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
import reactor.ipc.netty.FutureMono;
import reactor.ipc.netty.NettyContext;
import reactor.ipc.netty.NettyInbound;
import reactor.ipc.netty.NettyOutbound;
import reactor.ipc.netty.options.ClientOptions;
import reactor.ipc.netty.resources.LoopResources;
import reactor.ipc.netty.resources.PoolResources;
import reactor.ipc.netty.tcp.TcpClient;
import reactor.ipc.netty.tcp.TcpResources;
import reactor.util.concurrent.QueueSupplier;

public class ReactorNettyTcpClient<P>
implements TcpOperations<P> {
    private final TcpClient tcpClient;
    private final ReactorNettyCodec<P> codec;
    private final ChannelGroup channelGroup;
    private final LoopResources loopResources;
    private final PoolResources poolResources;
    private final Scheduler scheduler = Schedulers.newParallel((String)"ReactorNettyTcpClient");
    private volatile boolean stopping = false;

    public ReactorNettyTcpClient(String host, int port, ReactorNettyCodec<P> codec) {
        this(opts -> opts.connect(host, port), codec);
    }

    public ReactorNettyTcpClient(Consumer<ClientOptions> optionsConsumer, ReactorNettyCodec<P> codec) {
        Assert.notNull(optionsConsumer, (String)"Consumer<ClientOptions> is required");
        Assert.notNull(codec, (String)"ReactorNettyCodec is required");
        this.channelGroup = new DefaultChannelGroup((EventExecutor)ImmediateEventExecutor.INSTANCE);
        this.loopResources = LoopResources.create((String)"reactor-netty-tcp-client");
        this.poolResources = PoolResources.fixed((String)"reactor-netty-tcp-pool");
        Consumer<ClientOptions> builtInConsumer = opts -> {
            ClientOptions cfr_ignored_0 = (ClientOptions)((ClientOptions)((ClientOptions)opts.channelGroup(this.channelGroup)).loopResources(this.loopResources)).poolResources(this.poolResources).preferNative(false);
        };
        this.tcpClient = TcpClient.create(optionsConsumer.andThen(builtInConsumer));
        this.codec = codec;
    }

    @Override
    public ListenableFuture<Void> connect(TcpConnectionHandler<P> handler) {
        Assert.notNull(handler, (String)"TcpConnectionHandler is required");
        if (this.stopping) {
            return this.handleShuttingDownConnectFailure(handler);
        }
        Mono connectMono = this.tcpClient.newHandler((BiFunction)new ReactorNettyHandler(handler)).doOnError(handler::afterConnectFailure).then();
        return new MonoToListenableFutureAdapter<Void>((Mono<Void>)connectMono);
    }

    @Override
    public ListenableFuture<Void> connect(TcpConnectionHandler<P> handler, ReconnectStrategy strategy) {
        Assert.notNull(handler, (String)"TcpConnectionHandler is required");
        Assert.notNull((Object)strategy, (String)"ReconnectStrategy is required");
        if (this.stopping) {
            return this.handleShuttingDownConnectFailure(handler);
        }
        MonoProcessor connectMono = MonoProcessor.create();
        this.tcpClient.newHandler((BiFunction)new ReactorNettyHandler(handler)).doOnNext(this.updateConnectMono((MonoProcessor<Void>)connectMono)).doOnError(this.updateConnectMono((MonoProcessor<Void>)connectMono)).doOnError(handler::afterConnectFailure).flatMap(NettyContext::onClose).retryWhen(this.reconnectFunction(strategy)).repeatWhen(this.reconnectFunction(strategy)).subscribe();
        return new MonoToListenableFutureAdapter<Void>((Mono<Void>)connectMono);
    }

    private ListenableFuture<Void> handleShuttingDownConnectFailure(TcpConnectionHandler<P> handler) {
        IllegalStateException ex = new IllegalStateException("Shutting down.");
        handler.afterConnectFailure(ex);
        return new MonoToListenableFutureAdapter<Void>((Mono<Void>)Mono.error((Throwable)ex));
    }

    private <T> Consumer<T> updateConnectMono(MonoProcessor<Void> connectMono) {
        return o -> {
            if (!connectMono.isTerminated()) {
                if (o instanceof Throwable) {
                    connectMono.onError((Throwable)o);
                } else {
                    connectMono.onComplete();
                }
            }
        };
    }

    private <T> Function<Flux<T>, Publisher<?>> reconnectFunction(ReconnectStrategy reconnectStrategy) {
        return flux -> flux.scan((Object)1, (count, element) -> {
            Integer n = count;
            Integer n2 = count = Integer.valueOf(count + 1);
            return n;
        }).flatMap(attempt -> Optional.ofNullable(reconnectStrategy.getTimeToNextAttempt((int)attempt)).map(time -> Mono.delay((Duration)Duration.ofMillis(time))).orElse(Mono.empty()));
    }

    @Override
    public ListenableFuture<Void> shutdown() {
        if (this.stopping) {
            SettableListenableFuture future = new SettableListenableFuture();
            future.set(null);
            return future;
        }
        this.stopping = true;
        ChannelGroupFuture close = this.channelGroup.close();
        Mono completion = FutureMono.from((Future)close).doAfterTerminate((x, e) -> {
            this.shutdownGlobalResources();
            this.loopResources.dispose();
            this.poolResources.dispose();
            try {
                Thread.sleep(2000L);
            }
            catch (InterruptedException ex) {
                ex.printStackTrace();
            }
            this.scheduler.dispose();
        });
        return new MonoToListenableFutureAdapter<Void>((Mono<Void>)completion);
    }

    private void shutdownGlobalResources() {
        try {
            Method method = TcpResources.class.getDeclaredMethod("_dispose", new Class[0]);
            ReflectionUtils.makeAccessible((Method)method);
            ReflectionUtils.invokeMethod((Method)method, (Object)TcpResources.get());
        }
        catch (NoSuchMethodException noSuchMethodException) {
            // empty catch block
        }
    }

    private static class StompMessageDecoder<P>
    extends ByteToMessageDecoder {
        private final ReactorNettyCodec<P> codec;

        public StompMessageDecoder(ReactorNettyCodec<P> codec) {
            this.codec = codec;
        }

        protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
            Collection<Message<P>> messages = this.codec.decode(in);
            out.addAll(messages);
        }
    }

    private class ReactorNettyHandler
    implements BiFunction<NettyInbound, NettyOutbound, Publisher<Void>> {
        private final TcpConnectionHandler<P> connectionHandler;

        ReactorNettyHandler(TcpConnectionHandler<P> handler) {
            this.connectionHandler = handler;
        }

        @Override
        public Publisher<Void> apply(NettyInbound inbound, NettyOutbound outbound) {
            DirectProcessor completion = DirectProcessor.create();
            ReactorNettyTcpConnection connection = new ReactorNettyTcpConnection(inbound, outbound, ReactorNettyTcpClient.this.codec, (DirectProcessor<Void>)completion);
            ReactorNettyTcpClient.this.scheduler.schedule(() -> this.connectionHandler.afterConnected(connection));
            inbound.context().addHandler(new StompMessageDecoder(ReactorNettyTcpClient.this.codec));
            inbound.receiveObject().cast(Message.class).publishOn(ReactorNettyTcpClient.this.scheduler, QueueSupplier.SMALL_BUFFER_SIZE).subscribe(this.connectionHandler::handleMessage, this.connectionHandler::handleFailure, this.connectionHandler::afterConnectionClosed);
            return completion;
        }
    }
}

