/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.executiongraph;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ScheduledExecutorService;
import javax.annotation.Nonnull;
import org.apache.flink.runtime.checkpoint.JobManagerTaskRestore;
import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.executiongraph.TestingComponentMainThreadExecutor;
import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.scheduler.DefaultScheduler;
import org.apache.flink.runtime.scheduler.DefaultSchedulerBuilder;
import org.apache.flink.runtime.scheduler.ExecutionSlotAllocatorFactory;
import org.apache.flink.runtime.scheduler.SchedulerBase;
import org.apache.flink.runtime.scheduler.SchedulerTestingUtils;
import org.apache.flink.runtime.scheduler.TestingPhysicalSlot;
import org.apache.flink.runtime.scheduler.TestingPhysicalSlotProvider;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.apache.flink.testutils.TestingUtils;
import org.apache.flink.testutils.executor.TestExecutorExtension;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.function.ThrowingRunnable;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

class ExecutionTest {
    @RegisterExtension
    static final TestExecutorExtension<ScheduledExecutorService> EXECUTOR_RESOURCE = TestingUtils.defaultExecutorExtension();
    @RegisterExtension
    static final TestingComponentMainThreadExecutor.Extension MAIN_EXECUTOR_RESOURCE = new TestingComponentMainThreadExecutor.Extension();
    private final TestingComponentMainThreadExecutor testMainThreadUtil = MAIN_EXECUTOR_RESOURCE.getComponentMainThreadTestExecutor();

    ExecutionTest() {
    }

    @Test
    void testTerminationFutureIsCompletedAfterSlotRelease() throws Exception {
        JobVertex jobVertex = this.createNoOpJobVertex();
        JobVertexID jobVertexId = jobVertex.getID();
        TestingPhysicalSlotProvider physicalSlotProvider = TestingPhysicalSlotProvider.createWithLimitedAmountOfPhysicalSlots(1);
        DefaultScheduler scheduler = new DefaultSchedulerBuilder(JobGraphTestUtils.streamingJobGraph(jobVertex), ComponentMainThreadExecutorServiceAdapter.forMainThread(), (ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor()).setExecutionSlotAllocatorFactory((ExecutionSlotAllocatorFactory)SchedulerTestingUtils.newSlotSharingExecutionSlotAllocatorFactory(physicalSlotProvider)).build();
        ExecutionJobVertex executionJobVertex = scheduler.getExecutionJobVertex(jobVertexId);
        ExecutionVertex executionVertex = executionJobVertex.getTaskVertices()[0];
        scheduler.startScheduling();
        Execution currentExecutionAttempt = executionVertex.getCurrentExecutionAttempt();
        CompletableFuture<TestingPhysicalSlot> returnedSlotFuture = physicalSlotProvider.getFirstResponseOrFail();
        CompletableFuture terminationFuture = executionVertex.cancel();
        currentExecutionAttempt.completeCancelling();
        CompletionStage restartFuture = terminationFuture.thenApply(ignored -> {
            Assertions.assertThat((CompletableFuture)returnedSlotFuture).isDone();
            return true;
        });
        ((CompletableFuture)restartFuture).get();
    }

    @Test
    void testTaskRestoreStateIsNulledAfterDeployment() throws Exception {
        JobVertex jobVertex = this.createNoOpJobVertex();
        JobVertexID jobVertexId = jobVertex.getID();
        DefaultScheduler scheduler = new DefaultSchedulerBuilder(JobGraphTestUtils.streamingJobGraph(jobVertex), ComponentMainThreadExecutorServiceAdapter.forMainThread(), (ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor()).setExecutionSlotAllocatorFactory((ExecutionSlotAllocatorFactory)SchedulerTestingUtils.newSlotSharingExecutionSlotAllocatorFactory(TestingPhysicalSlotProvider.createWithLimitedAmountOfPhysicalSlots(1))).build();
        ExecutionJobVertex executionJobVertex = scheduler.getExecutionJobVertex(jobVertexId);
        ExecutionVertex executionVertex = executionJobVertex.getTaskVertices()[0];
        Execution execution = executionVertex.getCurrentExecutionAttempt();
        JobManagerTaskRestore taskRestoreState = new JobManagerTaskRestore(1L, new TaskStateSnapshot());
        execution.setInitialState(taskRestoreState);
        Assertions.assertThat((Object)execution.getTaskRestore()).isNotNull();
        scheduler.startScheduling();
        Assertions.assertThat((Object)execution.getTaskRestore()).isNull();
    }

    @Test
    void testCanceledExecutionReturnsSlot() throws Exception {
        JobVertex jobVertex = this.createNoOpJobVertex();
        JobVertexID jobVertexId = jobVertex.getID();
        SimpleAckingTaskManagerGateway taskManagerGateway = new SimpleAckingTaskManagerGateway();
        TestingPhysicalSlotProvider physicalSlotProvider = TestingPhysicalSlotProvider.create(resourceProfile -> CompletableFuture.completedFuture(TestingPhysicalSlot.builder().withTaskManagerGateway(taskManagerGateway).build()));
        DefaultScheduler scheduler = new DefaultSchedulerBuilder(JobGraphTestUtils.streamingJobGraph(jobVertex), this.testMainThreadUtil.getMainThreadExecutor(), (ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor()).setExecutionSlotAllocatorFactory((ExecutionSlotAllocatorFactory)SchedulerTestingUtils.newSlotSharingExecutionSlotAllocatorFactory(physicalSlotProvider)).build();
        ExecutionJobVertex executionJobVertex = scheduler.getExecutionJobVertex(jobVertexId);
        ExecutionVertex executionVertex = executionJobVertex.getTaskVertices()[0];
        Execution execution = executionVertex.getCurrentExecutionAttempt();
        taskManagerGateway.setCancelConsumer(executionAttemptID -> {
            if (execution.getAttemptId().equals(executionAttemptID)) {
                execution.completeCancelling();
            }
        });
        this.testMainThreadUtil.execute((ThrowingRunnable<Throwable>)((ThrowingRunnable)() -> ((SchedulerBase)scheduler).startScheduling()));
        this.testMainThreadUtil.execute((ThrowingRunnable<Throwable>)((ThrowingRunnable)() -> ((Execution)execution).cancel()));
        Assertions.assertThat(physicalSlotProvider.getRequests().keySet()).isEqualTo(physicalSlotProvider.getCancellations().keySet());
    }

    @Test
    void testSlotReleaseAtomicallyReleasesExecution() throws Exception {
        JobVertex jobVertex = this.createNoOpJobVertex();
        TestingPhysicalSlotProvider physicalSlotProvider = TestingPhysicalSlotProvider.createWithLimitedAmountOfPhysicalSlots(1);
        DefaultScheduler scheduler = new DefaultSchedulerBuilder(JobGraphTestUtils.streamingJobGraph(jobVertex), this.testMainThreadUtil.getMainThreadExecutor(), (ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor()).setExecutionSlotAllocatorFactory((ExecutionSlotAllocatorFactory)SchedulerTestingUtils.newSlotSharingExecutionSlotAllocatorFactory(physicalSlotProvider)).build();
        Execution execution = scheduler.getExecutionJobVertex(jobVertex.getID()).getTaskVertices()[0].getCurrentExecutionAttempt();
        this.testMainThreadUtil.execute((ThrowingRunnable<Throwable>)((ThrowingRunnable)() -> ((SchedulerBase)scheduler).startScheduling()));
        physicalSlotProvider.awaitAllSlotRequests();
        TestingPhysicalSlot physicalSlot = physicalSlotProvider.getFirstResponseOrFail().get();
        this.testMainThreadUtil.execute((ThrowingRunnable<Throwable>)((ThrowingRunnable)() -> {
            Assertions.assertThat((Comparable)execution.getAssignedAllocationID()).isEqualTo((Object)physicalSlot.getAllocationId());
            physicalSlot.releasePayload((Throwable)new FlinkException("Test exception"));
            Assertions.assertThat((CompletableFuture)execution.getReleaseFuture()).isDone();
        }));
    }

    @Nonnull
    private JobVertex createNoOpJobVertex() {
        JobVertex jobVertex = new JobVertex("Test vertex", new JobVertexID());
        jobVertex.setInvokableClass(NoOpInvokable.class);
        return jobVertex;
    }
}

