/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.rest.handler.job.checkpoints;

import java.io.Serializable;
import java.util.Collections;
import java.util.HashMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Nullable;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.core.execution.CheckpointType;
import org.apache.flink.runtime.dispatcher.UnknownOperationKeyException;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.rest.RestMatchers;
import org.apache.flink.runtime.rest.handler.HandlerRequest;
import org.apache.flink.runtime.rest.handler.HandlerRequestException;
import org.apache.flink.runtime.rest.handler.RestHandlerException;
import org.apache.flink.runtime.rest.handler.async.AsynchronousOperationResult;
import org.apache.flink.runtime.rest.handler.async.OperationResult;
import org.apache.flink.runtime.rest.handler.async.TriggerResponse;
import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointHandlers;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.rest.messages.MessageParameters;
import org.apache.flink.runtime.rest.messages.RequestBody;
import org.apache.flink.runtime.rest.messages.TriggerId;
import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointInfo;
import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointStatusMessageParameters;
import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointTriggerMessageParameters;
import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointTriggerRequestBody;
import org.apache.flink.runtime.rest.messages.queue.QueueStatus;
import org.apache.flink.runtime.webmonitor.RestfulGateway;
import org.apache.flink.runtime.webmonitor.TestingRestfulGateway;
import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.concurrent.FutureUtils;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;

class CheckpointHandlersTest
extends TestLogger {
    private static final Time TIMEOUT = Time.seconds((long)10L);
    private static final JobID JOB_ID = new JobID();
    private static final Long COMPLETED_CHECKPOINT_ID = 123456L;
    private static CheckpointHandlers.CheckpointTriggerHandler checkpointTriggerHandler;
    private static CheckpointHandlers.CheckpointStatusHandler checkpointStatusHandler;

    CheckpointHandlersTest() {
    }

    @BeforeAll
    static void setUp() throws Exception {
        GatewayRetriever leaderRetriever = () -> CompletableFuture.completedFuture(null);
        checkpointTriggerHandler = new CheckpointHandlers.CheckpointTriggerHandler(leaderRetriever, TIMEOUT, Collections.emptyMap());
        checkpointStatusHandler = new CheckpointHandlers.CheckpointStatusHandler(leaderRetriever, TIMEOUT, Collections.emptyMap());
    }

    @Test
    void testCheckpointTriggerCompletedSuccessfully() throws Exception {
        OperationResult successfulResult = OperationResult.success((Serializable)COMPLETED_CHECKPOINT_ID);
        CompletableFuture checkpointPropertiesFuture = new CompletableFuture();
        AtomicReference keyReference = new AtomicReference();
        TestingRestfulGateway testingRestfulGateway = ((TestingRestfulGateway.Builder)((TestingRestfulGateway.Builder)new TestingRestfulGateway.Builder().setTriggerCheckpointFunction((key, checkpointType) -> {
            keyReference.set(key);
            checkpointPropertiesFuture.complete(checkpointType);
            return CompletableFuture.completedFuture(Acknowledge.get());
        })).setGetCheckpointStatusFunction(operationKey -> {
            if (operationKey.equals(keyReference.get())) {
                return CompletableFuture.completedFuture(successfulResult);
            }
            throw new RuntimeException("Expected operation key " + keyReference.get() + ", but received " + operationKey);
        })).build();
        CheckpointType checkpointType2 = CheckpointType.FULL;
        TriggerId triggerId = ((TriggerResponse)checkpointTriggerHandler.handleRequest(CheckpointHandlersTest.triggerCheckpointRequest(checkpointType2, null), (RestfulGateway)testingRestfulGateway).get()).getTriggerId();
        AsynchronousOperationResult checkpointTriggerResponseBody = (AsynchronousOperationResult)checkpointStatusHandler.handleRequest(CheckpointHandlersTest.checkpointTriggerStatusRequest(triggerId), (RestfulGateway)testingRestfulGateway).get();
        Assertions.assertThat((Comparable)checkpointTriggerResponseBody.queueStatus().getId()).isEqualTo((Object)QueueStatus.Id.COMPLETED);
        Assertions.assertThat((Object)checkpointTriggerResponseBody.resource()).isNotNull();
        Assertions.assertThat((Long)((CheckpointInfo)checkpointTriggerResponseBody.resource()).getCheckpointId()).isEqualTo((Object)COMPLETED_CHECKPOINT_ID);
        Assertions.assertThat((Comparable)((Comparable)checkpointPropertiesFuture.get())).isEqualTo((Object)CheckpointType.FULL);
    }

    @Test
    void testTriggerCheckpointNoCheckpointType() throws Exception {
        OperationResult successfulResult = OperationResult.success((Serializable)COMPLETED_CHECKPOINT_ID);
        CompletableFuture checkpointTypeFuture = new CompletableFuture();
        AtomicReference keyReference = new AtomicReference();
        TestingRestfulGateway testingRestfulGateway = ((TestingRestfulGateway.Builder)((TestingRestfulGateway.Builder)new TestingRestfulGateway.Builder().setTriggerCheckpointFunction((key, checkpointType) -> {
            keyReference.set(key);
            checkpointTypeFuture.complete(checkpointType);
            return CompletableFuture.completedFuture(Acknowledge.get());
        })).setGetCheckpointStatusFunction(operationKey -> {
            if (operationKey.equals(keyReference.get())) {
                return CompletableFuture.completedFuture(successfulResult);
            }
            throw new RuntimeException("Expected operation key " + keyReference.get() + ", but received " + operationKey);
        })).build();
        TriggerId triggerId = ((TriggerResponse)checkpointTriggerHandler.handleRequest(CheckpointHandlersTest.triggerCheckpointRequest(null, null), (RestfulGateway)testingRestfulGateway).get()).getTriggerId();
        AsynchronousOperationResult checkpointTriggerResponseBody = (AsynchronousOperationResult)checkpointStatusHandler.handleRequest(CheckpointHandlersTest.checkpointTriggerStatusRequest(triggerId), (RestfulGateway)testingRestfulGateway).get();
        Assertions.assertThat((Comparable)checkpointTriggerResponseBody.queueStatus().getId()).isEqualTo((Object)QueueStatus.Id.COMPLETED);
        Assertions.assertThat((Object)checkpointTriggerResponseBody.resource()).isNotNull();
        Assertions.assertThat((Long)((CheckpointInfo)checkpointTriggerResponseBody.resource()).getCheckpointId()).isEqualTo((Object)COMPLETED_CHECKPOINT_ID);
        Assertions.assertThat((Comparable)((Comparable)checkpointTypeFuture.get())).isEqualTo((Object)CheckpointType.DEFAULT);
    }

    @Test
    void testCheckpointCompletedWithException() throws Exception {
        OperationResult failedResult = OperationResult.failure((Throwable)new RuntimeException("expected"));
        AtomicReference keyReference = new AtomicReference();
        TestingRestfulGateway testingRestfulGateway = ((TestingRestfulGateway.Builder)((TestingRestfulGateway.Builder)new TestingRestfulGateway.Builder().setTriggerCheckpointFunction((key, checkpointType) -> {
            keyReference.set(key);
            return CompletableFuture.completedFuture(Acknowledge.get());
        })).setGetCheckpointStatusFunction(operationKey -> {
            if (operationKey.equals(keyReference.get())) {
                return CompletableFuture.completedFuture(failedResult);
            }
            throw new RuntimeException("Expected operation key " + keyReference.get() + ", but received " + operationKey);
        })).build();
        TriggerId triggerId = ((TriggerResponse)checkpointTriggerHandler.handleRequest(CheckpointHandlersTest.triggerCheckpointRequest(null, null), (RestfulGateway)testingRestfulGateway).get()).getTriggerId();
        AsynchronousOperationResult checkpointTriggerResponseBody = (AsynchronousOperationResult)checkpointStatusHandler.handleRequest(CheckpointHandlersTest.checkpointTriggerStatusRequest(triggerId), (RestfulGateway)testingRestfulGateway).get();
        Assertions.assertThat((Comparable)checkpointTriggerResponseBody.queueStatus().getId()).isEqualTo((Object)QueueStatus.Id.COMPLETED);
        Assertions.assertThat((Object)checkpointTriggerResponseBody.resource()).isNotNull();
        Assertions.assertThat((Throwable)((CheckpointInfo)checkpointTriggerResponseBody.resource()).getFailureCause()).isNotNull();
        Throwable checkpointError = ((CheckpointInfo)checkpointTriggerResponseBody.resource()).getFailureCause().deserializeError(ClassLoader.getSystemClassLoader());
        Assertions.assertThat((String)checkpointError.getMessage()).matches((CharSequence)"expected");
        Assertions.assertThat((Throwable)checkpointError).isInstanceOf(RuntimeException.class);
    }

    @Test
    void testProvidedTriggerId() throws Exception {
        OperationResult successfulResult = OperationResult.success((Serializable)COMPLETED_CHECKPOINT_ID);
        AtomicReference keyReference = new AtomicReference();
        TestingRestfulGateway testingRestfulGateway = ((TestingRestfulGateway.Builder)((TestingRestfulGateway.Builder)new TestingRestfulGateway.Builder().setTriggerCheckpointFunction((key, checkpointType) -> {
            keyReference.set(key);
            return CompletableFuture.completedFuture(Acknowledge.get());
        })).setGetCheckpointStatusFunction(operationKey -> {
            if (operationKey.equals(keyReference.get())) {
                return CompletableFuture.completedFuture(successfulResult);
            }
            throw new RuntimeException("Expected operation key " + keyReference.get() + ", but received " + operationKey);
        })).build();
        TriggerId providedTriggerId = new TriggerId();
        TriggerId returnedTriggerId = ((TriggerResponse)checkpointTriggerHandler.handleRequest(CheckpointHandlersTest.triggerCheckpointRequest(CheckpointType.FULL, providedTriggerId), (RestfulGateway)testingRestfulGateway).get()).getTriggerId();
        Assertions.assertThat((Comparable)providedTriggerId).isEqualTo((Object)returnedTriggerId);
        AsynchronousOperationResult checkpointTriggerResponseBody = (AsynchronousOperationResult)checkpointStatusHandler.handleRequest(CheckpointHandlersTest.checkpointTriggerStatusRequest(providedTriggerId), (RestfulGateway)testingRestfulGateway).get();
        Assertions.assertThat((Comparable)checkpointTriggerResponseBody.queueStatus().getId()).isEqualTo((Object)QueueStatus.Id.COMPLETED);
        Assertions.assertThat((Object)checkpointTriggerResponseBody.resource()).isNotNull();
        Assertions.assertThat((Long)((CheckpointInfo)checkpointTriggerResponseBody.resource()).getCheckpointId()).isEqualTo((Object)COMPLETED_CHECKPOINT_ID);
    }

    @Test
    void testQueryStatusOfUnknownOperationReturnsError() throws HandlerRequestException, RestHandlerException {
        TestingRestfulGateway testingRestfulGateway = ((TestingRestfulGateway.Builder)new TestingRestfulGateway.Builder().setGetCheckpointStatusFunction(key -> FutureUtils.completedExceptionally((Throwable)new UnknownOperationKeyException(key)))).build();
        CompletableFuture statusFuture = checkpointStatusHandler.handleRequest(CheckpointHandlersTest.checkpointTriggerStatusRequest(new TriggerId()), (RestfulGateway)testingRestfulGateway);
        Assertions.assertThat((CompletableFuture)statusFuture).matches(arg_0 -> RestMatchers.respondsWithError(HttpResponseStatus.NOT_FOUND).matches(arg_0));
    }

    private static HandlerRequest<CheckpointTriggerRequestBody> triggerCheckpointRequest(CheckpointType checkpointType, @Nullable TriggerId triggerId) throws HandlerRequestException {
        return HandlerRequest.resolveParametersAndCreate((RequestBody)new CheckpointTriggerRequestBody(checkpointType, triggerId), (MessageParameters)new CheckpointTriggerMessageParameters(), Collections.singletonMap("jobid", JOB_ID.toString()), Collections.emptyMap(), Collections.emptyList());
    }

    private static HandlerRequest<EmptyRequestBody> checkpointTriggerStatusRequest(TriggerId triggerId) throws HandlerRequestException {
        HashMap<String, String> pathParameters = new HashMap<String, String>();
        pathParameters.put("jobid", JOB_ID.toString());
        pathParameters.put("triggerid", triggerId.toString());
        return HandlerRequest.resolveParametersAndCreate((RequestBody)EmptyRequestBody.getInstance(), (MessageParameters)new CheckpointStatusMessageParameters(), pathParameters, Collections.emptyMap(), Collections.emptyList());
    }
}

