/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.messaging.tcp.reactor;

import java.lang.reflect.Modifier;
import java.net.InetSocketAddress;
import java.util.Arrays;
import java.util.Properties;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.messaging.Message;
import org.springframework.messaging.tcp.ReconnectStrategy;
import org.springframework.messaging.tcp.TcpConnectionHandler;
import org.springframework.messaging.tcp.TcpOperations;
import org.springframework.messaging.tcp.reactor.AbstractPromiseToListenableFutureAdapter;
import org.springframework.messaging.tcp.reactor.PassThroughPromiseToListenableFutureAdapter;
import org.springframework.messaging.tcp.reactor.ReactorTcpConnection;
import org.springframework.util.Assert;
import org.springframework.util.concurrent.ListenableFuture;
import reactor.core.Environment;
import reactor.core.composable.Composable;
import reactor.core.composable.Deferred;
import reactor.core.composable.Promise;
import reactor.core.composable.Stream;
import reactor.core.composable.spec.Promises;
import reactor.core.configuration.ConfigurationReader;
import reactor.core.configuration.DispatcherConfiguration;
import reactor.core.configuration.ReactorConfiguration;
import reactor.function.Consumer;
import reactor.function.support.SingleUseConsumer;
import reactor.io.Buffer;
import reactor.tcp.Reconnect;
import reactor.tcp.TcpClient;
import reactor.tcp.TcpConnection;
import reactor.tcp.encoding.Codec;
import reactor.tcp.netty.NettyTcpClient;
import reactor.tcp.spec.TcpClientSpec;
import reactor.tuple.Tuple;
import reactor.tuple.Tuple2;

public class ReactorTcpClient<P>
implements TcpOperations<P> {
    public static final Class<NettyTcpClient> REACTOR_TCP_CLIENT_TYPE = NettyTcpClient.class;
    private static final Log logger = LogFactory.getLog(ReactorTcpClient.class);
    private final TcpClient<Message<P>, Message<P>> tcpClient;
    private final Environment environment;

    public ReactorTcpClient(String host, int port, Codec<Buffer, Message<P>, Message<P>> codec) {
        this.environment = new Environment((ConfigurationReader)new SynchronousDispatcherConfigReader());
        this.tcpClient = (TcpClient)((TcpClientSpec)new TcpClientSpec(REACTOR_TCP_CLIENT_TYPE).env(this.environment)).codec(codec).connect(host, port).get();
        ReactorTcpClient.checkReactorVersion();
    }

    public ReactorTcpClient(TcpClient<Message<P>, Message<P>> tcpClient) {
        Assert.notNull(tcpClient, (String)"'tcpClient' must not be null");
        this.tcpClient = tcpClient;
        this.environment = null;
        ReactorTcpClient.checkReactorVersion();
    }

    private static void checkReactorVersion() {
        Class<?> type = null;
        try {
            type = ReactorTcpClient.class.getClassLoader().loadClass("reactor.event.dispatch.BaseDispatcher");
            Assert.isTrue((boolean)Modifier.isPublic(type.getModifiers()), (String)"Detected older version of reactor-tcp. Switch to 1.0.1.RELEASE or higher.");
        }
        catch (ClassNotFoundException classNotFoundException) {
            // empty catch block
        }
    }

    @Override
    public ListenableFuture<Void> connect(TcpConnectionHandler<P> connectionHandler) {
        Promise promise = this.tcpClient.open();
        this.composeConnectionHandling((Composable<TcpConnection<Message<P>, Message<P>>>)promise, connectionHandler);
        return new AbstractPromiseToListenableFutureAdapter<TcpConnection<Message<P>, Message<P>>, Void>(promise){

            @Override
            protected Void adapt(TcpConnection<Message<P>, Message<P>> result) {
                return null;
            }
        };
    }

    @Override
    public ListenableFuture<Void> connect(TcpConnectionHandler<P> connectionHandler, final ReconnectStrategy reconnectStrategy) {
        Assert.notNull((Object)reconnectStrategy, (String)"ReconnectStrategy must not be null");
        Stream stream = this.tcpClient.open(new Reconnect(){

            public Tuple2<InetSocketAddress, Long> reconnect(InetSocketAddress address, int attempt) {
                return Tuple.of((Object)address, (Object)reconnectStrategy.getTimeToNextAttempt(attempt));
            }
        });
        this.composeConnectionHandling((Composable<TcpConnection<Message<P>, Message<P>>>)stream, connectionHandler);
        return new PassThroughPromiseToListenableFutureAdapter<Void>(this.toPromise(stream));
    }

    private void composeConnectionHandling(Composable<TcpConnection<Message<P>, Message<P>>> composable, final TcpConnectionHandler<P> connectionHandler) {
        composable.when(Throwable.class, (Consumer)new Consumer<Throwable>(){

            public void accept(Throwable ex) {
                connectionHandler.afterConnectFailure(ex);
            }
        });
        composable.consume(new Consumer<TcpConnection<Message<P>, Message<P>>>(){

            public void accept(TcpConnection<Message<P>, Message<P>> connection) {
                connection.on().close(new Runnable(){

                    @Override
                    public void run() {
                        connectionHandler.afterConnectionClosed();
                    }
                });
                connection.consume(new Consumer<Message<P>>(){

                    public void accept(Message<P> message) {
                        connectionHandler.handleMessage(message);
                    }
                });
                connection.when(Throwable.class, (Consumer)new Consumer<Throwable>(){

                    public void accept(Throwable t) {
                        logger.error((Object)("Exception on connection " + connectionHandler), t);
                    }
                });
                connectionHandler.afterConnected(new ReactorTcpConnection(connection));
            }
        });
    }

    private Promise<Void> toPromise(Stream<TcpConnection<Message<P>, Message<P>>> stream) {
        final Deferred deferred = (Deferred)Promises.defer().get();
        stream.consume(SingleUseConsumer.once((Consumer)new Consumer<TcpConnection<Message<P>, Message<P>>>(){

            public void accept(TcpConnection<Message<P>, Message<P>> conn) {
                deferred.accept((Object)null);
            }
        }));
        stream.when(Throwable.class, SingleUseConsumer.once((Consumer)new Consumer<Throwable>(){

            public void accept(Throwable throwable) {
                deferred.accept(throwable);
            }
        }));
        return (Promise)deferred.compose();
    }

    @Override
    public ListenableFuture<Void> shutdown() {
        try {
            Promise promise = this.tcpClient.close();
            AbstractPromiseToListenableFutureAdapter<Void, Void> abstractPromiseToListenableFutureAdapter = new AbstractPromiseToListenableFutureAdapter<Void, Void>(promise){

                @Override
                protected Void adapt(Void result) {
                    return result;
                }
            };
            return abstractPromiseToListenableFutureAdapter;
        }
        finally {
            this.environment.shutdown();
        }
    }

    private static class SynchronousDispatcherConfigReader
    implements ConfigurationReader {
        private SynchronousDispatcherConfigReader() {
        }

        public ReactorConfiguration read() {
            return new ReactorConfiguration(Arrays.asList(new DispatcherConfiguration[0]), "sync", new Properties());
        }
    }
}

