/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.network.netty;

import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.flink.core.testutils.FlinkAssertions;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.io.network.ConnectionID;
import org.apache.flink.runtime.io.network.NetworkClientHandler;
import org.apache.flink.runtime.io.network.netty.CreditBasedPartitionRequestClientHandler;
import org.apache.flink.runtime.io.network.netty.NettyClient;
import org.apache.flink.runtime.io.network.netty.NettyPartitionRequestClient;
import org.apache.flink.runtime.io.network.netty.NettyProtocol;
import org.apache.flink.runtime.io.network.netty.NettyTestUtil;
import org.apache.flink.runtime.io.network.netty.NeverCompletingChannelFuture;
import org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory;
import org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException;
import org.apache.flink.shaded.netty4.io.netty.channel.Channel;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelException;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFuture;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter;
import org.apache.flink.testutils.executor.TestExecutorExtension;
import org.apache.flink.testutils.junit.extensions.parameterized.Parameter;
import org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension;
import org.apache.flink.testutils.junit.extensions.parameterized.Parameters;
import org.assertj.core.api.AbstractThrowableAssert;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.ObjectAssert;
import org.junit.jupiter.api.TestTemplate;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.mockito.Mockito;

@ExtendWith(value={ParameterizedTestExtension.class})
class PartitionRequestClientFactoryTest {
    private static final ResourceID RESOURCE_ID = ResourceID.generate();
    @RegisterExtension
    private static final TestExecutorExtension<ExecutorService> EXECUTOR_EXTENSION = new TestExecutorExtension(() -> Executors.newFixedThreadPool(10));
    @Parameter
    private boolean connectionReuseEnabled;

    PartitionRequestClientFactoryTest() {
    }

    @Parameters(name="connectionReuseEnabled={0}")
    private static Collection<Boolean> parameters() {
        return Arrays.asList(false, true);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @TestTemplate
    void testInterruptsNotCached() throws Exception {
        NettyTestUtil.NettyServerAndClient nettyServerAndClient = PartitionRequestClientFactoryTest.createNettyServerAndClient();
        try {
            AwaitingNettyClient nettyClient = new AwaitingNettyClient(nettyServerAndClient.client());
            PartitionRequestClientFactory factory = new PartitionRequestClientFactory((NettyClient)nettyClient, this.connectionReuseEnabled);
            nettyClient.awaitForInterrupts = true;
            this.connectAndInterrupt(factory, nettyServerAndClient.getConnectionID(RESOURCE_ID, 0));
            nettyClient.awaitForInterrupts = false;
            factory.createPartitionRequestClient(nettyServerAndClient.getConnectionID(RESOURCE_ID, 0));
        }
        finally {
            NettyTestUtil.shutdown(nettyServerAndClient);
        }
    }

    private void connectAndInterrupt(PartitionRequestClientFactory factory, ConnectionID connectionId) throws Exception {
        CompletableFuture started = new CompletableFuture();
        CompletableFuture interrupted = new CompletableFuture();
        Thread thread = new Thread(() -> {
            try {
                started.complete(null);
                factory.createPartitionRequestClient(connectionId);
            }
            catch (InterruptedException e) {
                interrupted.complete(null);
            }
            catch (Exception e) {
                interrupted.completeExceptionally(e);
            }
        });
        thread.start();
        started.get();
        thread.interrupt();
        interrupted.get();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @TestTemplate
    void testExceptionsAreNotCached() throws Exception {
        NettyTestUtil.NettyServerAndClient nettyServerAndClient = PartitionRequestClientFactoryTest.createNettyServerAndClient();
        try {
            PartitionRequestClientFactory factory = new PartitionRequestClientFactory((NettyClient)new UnstableNettyClient(nettyServerAndClient.client(), 1), this.connectionReuseEnabled);
            ConnectionID connectionID = nettyServerAndClient.getConnectionID(RESOURCE_ID, 0);
            ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> factory.createPartitionRequestClient(connectionID)).withFailMessage("Expected the first request to fail.", new Object[0])).isInstanceOf(RemoteTransportException.class);
            factory.createPartitionRequestClient(connectionID);
        }
        finally {
            NettyTestUtil.shutdown(nettyServerAndClient);
        }
    }

    @TestTemplate
    void testReuseNettyPartitionRequestClient() throws Exception {
        NettyTestUtil.NettyServerAndClient nettyServerAndClient = PartitionRequestClientFactoryTest.createNettyServerAndClient();
        try {
            this.checkReuseNettyPartitionRequestClient(nettyServerAndClient, 1);
            this.checkReuseNettyPartitionRequestClient(nettyServerAndClient, 2);
            this.checkReuseNettyPartitionRequestClient(nettyServerAndClient, 5);
            this.checkReuseNettyPartitionRequestClient(nettyServerAndClient, 10);
        }
        finally {
            NettyTestUtil.shutdown(nettyServerAndClient);
        }
    }

    private void checkReuseNettyPartitionRequestClient(NettyTestUtil.NettyServerAndClient nettyServerAndClient, int maxNumberOfConnections) throws Exception {
        HashSet<NettyPartitionRequestClient> set = new HashSet<NettyPartitionRequestClient>();
        PartitionRequestClientFactory factory = new PartitionRequestClientFactory(nettyServerAndClient.client(), 0, maxNumberOfConnections, this.connectionReuseEnabled);
        for (int i = 0; i < Math.max(100, maxNumberOfConnections); ++i) {
            ConnectionID connectionID = nettyServerAndClient.getConnectionID(RESOURCE_ID, (int)(Math.random() * 2.147483647E9));
            set.add(factory.createPartitionRequestClient(connectionID));
        }
        Assertions.assertThat(set).hasSizeLessThanOrEqualTo(maxNumberOfConnections);
    }

    @TestTemplate
    void testConnectionReuseWhenRemoteCloseAndNoInputChannel() throws Exception {
        final CompletableFuture inactiveFuture = new CompletableFuture();
        final CompletableFuture serverChannelFuture = new CompletableFuture();
        NettyProtocol protocol = new NettyProtocol(null, null){

            public ChannelHandler[] getServerChannelHandlers() {
                return new ChannelHandler[]{new ChannelInboundHandlerAdapter(){

                    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
                        super.channelRegistered(ctx);
                        serverChannelFuture.complete(ctx.channel());
                    }
                }};
            }

            public ChannelHandler[] getClientChannelHandlers() {
                return new ChannelHandler[]{new ChannelInactiveFutureHandler(inactiveFuture)};
            }
        };
        NettyTestUtil.NettyServerAndClient serverAndClient = NettyTestUtil.initServerAndClient(protocol);
        PartitionRequestClientFactory factory = new PartitionRequestClientFactory(serverAndClient.client(), 2, 1, this.connectionReuseEnabled);
        ConnectionID connectionID = serverAndClient.getConnectionID(RESOURCE_ID, 0);
        NettyPartitionRequestClient oldClient = factory.createPartitionRequestClient(connectionID);
        Channel channel = (Channel)serverChannelFuture.get();
        channel.close();
        inactiveFuture.get();
        NettyPartitionRequestClient newClient = factory.createPartitionRequestClient(connectionID);
        ((ObjectAssert)Assertions.assertThat((Object)newClient).as("Factory should create a new client.", new Object[0])).isNotSameAs((Object)oldClient);
        NettyTestUtil.shutdown(serverAndClient);
    }

    @TestTemplate
    void testNettyClientConnectRetry() throws Exception {
        NettyTestUtil.NettyServerAndClient serverAndClient = PartitionRequestClientFactoryTest.createNettyServerAndClient();
        UnstableNettyClient unstableNettyClient = new UnstableNettyClient(serverAndClient.client(), 2);
        PartitionRequestClientFactory factory = new PartitionRequestClientFactory((NettyClient)unstableNettyClient, 2, 1, this.connectionReuseEnabled);
        factory.createPartitionRequestClient(serverAndClient.getConnectionID(RESOURCE_ID, 0));
        NettyTestUtil.shutdown(serverAndClient);
    }

    @TestTemplate
    void testFailureReportedToSubsequentRequests() {
        PartitionRequestClientFactory factory = new PartitionRequestClientFactory((NettyClient)new FailingNettyClient(), 2, 1, this.connectionReuseEnabled);
        Assertions.assertThatThrownBy(() -> factory.createPartitionRequestClient(new ConnectionID(ResourceID.generate(), new InetSocketAddress(InetAddress.getLocalHost(), 8080), 0)));
        Assertions.assertThatThrownBy(() -> factory.createPartitionRequestClient(new ConnectionID(ResourceID.generate(), new InetSocketAddress(InetAddress.getLocalHost(), 8080), 0))).isInstanceOf(IOException.class);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @TestTemplate
    void testNettyClientConnectRetryFailure() throws Exception {
        NettyTestUtil.NettyServerAndClient serverAndClient = PartitionRequestClientFactoryTest.createNettyServerAndClient();
        UnstableNettyClient unstableNettyClient = new UnstableNettyClient(serverAndClient.client(), 3);
        try {
            PartitionRequestClientFactory factory = new PartitionRequestClientFactory((NettyClient)unstableNettyClient, 2, 1, this.connectionReuseEnabled);
            Assertions.assertThatThrownBy(() -> factory.createPartitionRequestClient(serverAndClient.getConnectionID(RESOURCE_ID, 0))).isInstanceOf(IOException.class);
        }
        finally {
            NettyTestUtil.shutdown(serverAndClient);
        }
    }

    @TestTemplate
    void testNettyClientConnectRetryMultipleThread() throws Exception {
        NettyTestUtil.NettyServerAndClient serverAndClient = PartitionRequestClientFactoryTest.createNettyServerAndClient();
        UnstableNettyClient unstableNettyClient = new UnstableNettyClient(serverAndClient.client(), 2);
        PartitionRequestClientFactory factory = new PartitionRequestClientFactory((NettyClient)unstableNettyClient, 2, 1, this.connectionReuseEnabled);
        ArrayList<CompletableFuture<NettyPartitionRequestClient>> futures = new ArrayList<CompletableFuture<NettyPartitionRequestClient>>();
        for (int i = 0; i < 10; ++i) {
            futures.add(CompletableFuture.supplyAsync(() -> {
                try {
                    return factory.createPartitionRequestClient(serverAndClient.getConnectionID(RESOURCE_ID, 0));
                }
                catch (Exception e) {
                    throw new CompletionException(e);
                }
            }, EXECUTOR_EXTENSION.getExecutor()));
        }
        futures.forEach(runnableFuture -> {
            ObjectAssert cfr_ignored_0 = (ObjectAssert)FlinkAssertions.assertThatFuture((CompletableFuture)runnableFuture).eventuallySucceeds().isNotNull();
        });
        NettyTestUtil.shutdown(serverAndClient);
    }

    private static NettyTestUtil.NettyServerAndClient createNettyServerAndClient() throws Exception {
        return NettyTestUtil.initServerAndClient(new NettyProtocol(null, null){

            public ChannelHandler[] getServerChannelHandlers() {
                return new ChannelHandler[10];
            }

            public ChannelHandler[] getClientChannelHandlers() {
                return new ChannelHandler[]{(ChannelHandler)Mockito.mock(NetworkClientHandler.class)};
            }
        });
    }

    private static class ChannelInactiveFutureHandler
    extends CreditBasedPartitionRequestClientHandler {
        private final CompletableFuture<Void> inactiveFuture;

        private ChannelInactiveFutureHandler(CompletableFuture<Void> inactiveFuture) {
            this.inactiveFuture = inactiveFuture;
        }

        public void channelInactive(ChannelHandlerContext ctx) throws Exception {
            super.channelInactive(ctx);
            this.inactiveFuture.complete(null);
        }

        public CompletableFuture<Void> getInactiveFuture() {
            return this.inactiveFuture;
        }
    }

    private static class AwaitingNettyClient
    extends NettyClient {
        private volatile boolean awaitForInterrupts;
        private final NettyClient client;

        AwaitingNettyClient(NettyClient client) {
            super(null);
            this.client = client;
        }

        ChannelFuture connect(InetSocketAddress serverSocketAddress) {
            if (this.awaitForInterrupts) {
                return new NeverCompletingChannelFuture();
            }
            try {
                return this.client.connect(serverSocketAddress);
            }
            catch (Exception exception) {
                throw new RuntimeException(exception);
            }
        }
    }

    private static class FailingNettyClient
    extends NettyClient {
        FailingNettyClient() {
            super(null);
        }

        ChannelFuture connect(InetSocketAddress serverSocketAddress) {
            throw new ChannelException("Simulate connect failure");
        }
    }

    private static class UnstableNettyClient
    extends NettyClient {
        private final NettyClient nettyClient;
        private int retry;

        UnstableNettyClient(NettyClient nettyClient, int retry) {
            super(null);
            this.nettyClient = nettyClient;
            this.retry = retry;
        }

        ChannelFuture connect(InetSocketAddress serverSocketAddress) {
            if (this.retry > 0) {
                --this.retry;
                throw new ChannelException("Simulate connect failure");
            }
            return this.nettyClient.connect(serverSocketAddress);
        }
    }
}

