/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.rest;

import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.Collections;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.rest.HttpMethodWrapper;
import org.apache.flink.runtime.rest.RestClient;
import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
import org.apache.flink.runtime.rest.messages.MessageHeaders;
import org.apache.flink.runtime.rest.messages.MessageParameters;
import org.apache.flink.runtime.rest.messages.RequestBody;
import org.apache.flink.runtime.rest.messages.RuntimeMessageHeaders;
import org.apache.flink.runtime.rest.versioning.RestAPIVersion;
import org.apache.flink.runtime.rest.versioning.RuntimeRestAPIVersion;
import org.apache.flink.shaded.netty4.io.netty.channel.Channel;
import org.apache.flink.shaded.netty4.io.netty.channel.ConnectTimeoutException;
import org.apache.flink.shaded.netty4.io.netty.channel.DefaultSelectStrategyFactory;
import org.apache.flink.shaded.netty4.io.netty.channel.SelectStrategy;
import org.apache.flink.shaded.netty4.io.netty.channel.SelectStrategyFactory;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
import org.apache.flink.testutils.TestingUtils;
import org.apache.flink.testutils.executor.TestExecutorResource;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.NetUtils;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.concurrent.Executors;
import org.apache.flink.util.function.CheckedSupplier;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.function.ThrowingRunnable;

public class RestClientTest
extends TestLogger {
    @ClassRule
    public static final TestExecutorResource<ScheduledExecutorService> EXECUTOR_RESOURCE = TestingUtils.defaultExecutorResource();
    private static final String unroutableIp = "240.0.0.0";
    private static final long TIMEOUT = 10L;

    @Test
    public void testConnectionTimeout() throws Exception {
        Configuration config = new Configuration();
        config.setLong(RestOptions.CONNECTION_TIMEOUT, 1L);
        try (RestClient restClient = new RestClient(config, Executors.directExecutor());){
            CompletableFuture future = restClient.sendRequest(unroutableIp, 80, (MessageHeaders)new TestMessageHeaders(), (MessageParameters)EmptyMessageParameters.getInstance(), (RequestBody)EmptyRequestBody.getInstance());
            Throwable cause = ((ExecutionException)Assert.assertThrows(ExecutionException.class, future::get)).getCause();
            Assert.assertThat((Object)cause, (Matcher)Matchers.instanceOf(ConnectTimeoutException.class));
            Assert.assertThat((Object)cause.getMessage(), (Matcher)Matchers.containsString((String)unroutableIp));
        }
    }

    @Test
    public void testInvalidVersionRejection() throws Exception {
        try (RestClient restClient = new RestClient(new Configuration(), Executors.directExecutor());){
            CompletableFuture invalidVersionResponse = restClient.sendRequest(unroutableIp, 80, (MessageHeaders)new TestMessageHeaders(), (MessageParameters)EmptyMessageParameters.getInstance(), (RequestBody)EmptyRequestBody.getInstance(), Collections.emptyList(), (RestAPIVersion)RuntimeRestAPIVersion.V0);
            Assert.fail((String)"The request should have been rejected due to a version mismatch.");
        }
        catch (IllegalArgumentException illegalArgumentException) {
            // empty catch block
        }
    }

    @Test
    public void testConnectionClosedHandling() throws Exception {
        Configuration config = new Configuration();
        config.setLong(RestOptions.IDLENESS_TIMEOUT, 5000L);
        try (ServerSocket serverSocket = new ServerSocket(0);
             RestClient restClient = new RestClient(config, (Executor)EXECUTOR_RESOURCE.getExecutor());){
            String targetAddress = "localhost";
            int targetPort = serverSocket.getLocalPort();
            CompletableFuture socketCompletableFuture = CompletableFuture.supplyAsync(CheckedSupplier.unchecked(() -> NetUtils.acceptWithoutTimeout((ServerSocket)serverSocket)));
            CompletableFuture responseFuture = restClient.sendRequest("localhost", targetPort, (MessageHeaders)new TestMessageHeaders(), (MessageParameters)EmptyMessageParameters.getInstance(), (RequestBody)EmptyRequestBody.getInstance(), Collections.emptyList());
            Socket connectionSocket = null;
            try {
                connectionSocket = (Socket)socketCompletableFuture.get(10L, TimeUnit.SECONDS);
            }
            catch (TimeoutException ignored) {
                socketCompletableFuture.cancel(true);
            }
            if (connectionSocket != null) {
                connectionSocket.close();
            }
            try {
                responseFuture.get();
            }
            catch (ExecutionException ee) {
                if (!ExceptionUtils.findThrowable((Throwable)ee, IOException.class).isPresent()) {
                    throw ee;
                }
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testRestClientClosedHandling() throws Exception {
        Configuration config = new Configuration();
        config.setLong(RestOptions.IDLENESS_TIMEOUT, 5000L);
        try (Socket connectionSocket = null;
             ServerSocket serverSocket = new ServerSocket(0);
             RestClient restClient = new RestClient(config, (Executor)EXECUTOR_RESOURCE.getExecutor());){
            String targetAddress = "localhost";
            int targetPort = serverSocket.getLocalPort();
            CompletableFuture socketCompletableFuture = CompletableFuture.supplyAsync(CheckedSupplier.unchecked(() -> NetUtils.acceptWithoutTimeout((ServerSocket)serverSocket)));
            CompletableFuture responseFuture = restClient.sendRequest("localhost", targetPort, (MessageHeaders)new TestMessageHeaders(), (MessageParameters)EmptyMessageParameters.getInstance(), (RequestBody)EmptyRequestBody.getInstance(), Collections.emptyList());
            try {
                connectionSocket = (Socket)socketCompletableFuture.get(10L, TimeUnit.SECONDS);
            }
            catch (TimeoutException ignored) {
                socketCompletableFuture.cancel(true);
            }
            restClient.close();
            try {
                responseFuture.get();
            }
            catch (ExecutionException ee) {
                if (!ExceptionUtils.findThrowable((Throwable)ee, IOException.class).isPresent()) {
                    throw ee;
                }
            }
        }
    }

    @Test
    public void testCloseClientBeforeRequest() throws Exception {
        try (RestClient restClient = new RestClient(new Configuration(), Executors.directExecutor());){
            restClient.close();
            CompletableFuture future = restClient.sendRequest(unroutableIp, 80, (MessageHeaders)new TestMessageHeaders(), (MessageParameters)EmptyMessageParameters.getInstance(), (RequestBody)EmptyRequestBody.getInstance());
            ThrowingRunnable getFuture = () -> future.get(0L, TimeUnit.SECONDS);
            Throwable cause = ((ExecutionException)Assert.assertThrows(ExecutionException.class, (ThrowingRunnable)getFuture)).getCause();
            Assert.assertThat((Object)cause, (Matcher)Matchers.instanceOf(IllegalStateException.class));
            Assert.assertThat((Object)cause.getMessage(), (Matcher)Matchers.equalTo((Object)"RestClient is already closed"));
        }
    }

    @Test
    public void testCloseClientWhileProcessingRequest() throws Exception {
        OneShotLatch connectTriggered = new OneShotLatch();
        OneShotLatch closeTriggered = new OneShotLatch();
        SelectStrategy fallbackSelectStrategy = DefaultSelectStrategyFactory.INSTANCE.newSelectStrategy();
        SelectStrategyFactory selectStrategyFactory = () -> (selectSupplier, hasTasks) -> {
            connectTriggered.trigger();
            closeTriggered.await();
            return fallbackSelectStrategy.calculateStrategy(selectSupplier, hasTasks);
        };
        try (RestClient restClient = new RestClient(new Configuration(), Executors.directExecutor(), selectStrategyFactory);){
            Assert.assertThat((Object)restClient.getResponseChannelFutures(), (Matcher)Matchers.empty());
            CompletableFuture requestFuture = restClient.sendRequest(unroutableIp, 80, (MessageHeaders)new TestMessageHeaders(), (MessageParameters)EmptyMessageParameters.getInstance(), (RequestBody)EmptyRequestBody.getInstance());
            Assert.assertThat((Object)restClient.getResponseChannelFutures(), (Matcher)Matchers.hasSize((int)1));
            connectTriggered.await();
            CompletableFuture closeFuture = restClient.closeAsync();
            closeTriggered.trigger();
            closeFuture.get();
            Throwable cause = ((ExecutionException)Assert.assertThrows(ExecutionException.class, () -> requestFuture.get(0L, TimeUnit.SECONDS))).getCause();
            Assert.assertThat((Object)cause, (Matcher)Matchers.instanceOf(IllegalStateException.class));
            Assert.assertThat((Object)cause.getMessage(), (Matcher)Matchers.equalTo((Object)"executor not accepting a task"));
        }
    }

    @Test
    public void testResponseChannelFuturesResolvedExceptionallyOnClose() throws Exception {
        try (RestClient restClient = new RestClient(new Configuration(), Executors.directExecutor());){
            CompletableFuture responseChannelFuture = new CompletableFuture();
            restClient.getResponseChannelFutures().add(responseChannelFuture);
            restClient.close();
            Assert.assertThat((Object)restClient.getResponseChannelFutures(), (Matcher)Matchers.empty());
            Throwable cause = ((ExecutionException)Assert.assertThrows(ExecutionException.class, () -> {
                Channel cfr_ignored_0 = (Channel)responseChannelFuture.get(0L, TimeUnit.SECONDS);
            })).getCause();
            Assert.assertThat((Object)cause, (Matcher)Matchers.instanceOf(IllegalStateException.class));
            Assert.assertThat((Object)cause.getMessage(), (Matcher)Matchers.equalTo((Object)"RestClient closed before request completed"));
        }
    }

    private static class TestMessageHeaders
    implements RuntimeMessageHeaders<EmptyRequestBody, EmptyResponseBody, EmptyMessageParameters> {
        private TestMessageHeaders() {
        }

        public Class<EmptyRequestBody> getRequestClass() {
            return EmptyRequestBody.class;
        }

        public Class<EmptyResponseBody> getResponseClass() {
            return EmptyResponseBody.class;
        }

        public HttpResponseStatus getResponseStatusCode() {
            return HttpResponseStatus.OK;
        }

        public String getDescription() {
            return "";
        }

        public EmptyMessageParameters getUnresolvedMessageParameters() {
            return EmptyMessageParameters.getInstance();
        }

        public HttpMethodWrapper getHttpMethod() {
            return HttpMethodWrapper.GET;
        }

        public String getTargetRestEndpointURL() {
            return "/";
        }
    }
}

