/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.executiongraph;

import java.util.ArrayDeque;
import java.util.Collection;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils;
import org.apache.flink.runtime.executiongraph.TestingComponentMainThreadExecutor;
import org.apache.flink.runtime.io.network.partition.JobMasterPartitionTracker;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.io.network.partition.TestingJobMasterPartitionTracker;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.scheduler.DefaultScheduler;
import org.apache.flink.runtime.scheduler.DefaultSchedulerBuilder;
import org.apache.flink.runtime.scheduler.ExecutionSlotAllocatorFactory;
import org.apache.flink.runtime.scheduler.SchedulerBase;
import org.apache.flink.runtime.scheduler.SchedulerTestingUtils;
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
import org.apache.flink.testutils.TestingUtils;
import org.apache.flink.testutils.executor.TestExecutorExtension;
import org.apache.flink.util.function.ThrowingRunnable;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

class ExecutionGraphPartitionReleaseTest {
    @RegisterExtension
    public static final TestExecutorExtension<ScheduledExecutorService> EXECUTOR_EXTENSION = TestingUtils.defaultExecutorExtension();
    @RegisterExtension
    public static final TestingComponentMainThreadExecutor.Extension MAIN_THREAD_EXTENSION = new TestingComponentMainThreadExecutor.Extension();
    private final TestingComponentMainThreadExecutor mainThreadExecutor = MAIN_THREAD_EXTENSION.getComponentMainThreadTestExecutor();

    ExecutionGraphPartitionReleaseTest() {
    }

    @Test
    void testStrategyNotifiedOfFinishedVerticesAndResultsRespected() throws Exception {
        JobVertex sourceVertex = ExecutionGraphTestUtils.createNoOpVertex(1);
        JobVertex operatorVertex = ExecutionGraphTestUtils.createNoOpVertex(1);
        JobVertex sinkVertex = ExecutionGraphTestUtils.createNoOpVertex(1);
        operatorVertex.connectNewDataSetAsInput(sourceVertex, DistributionPattern.POINTWISE, ResultPartitionType.BLOCKING);
        sinkVertex.connectNewDataSetAsInput(operatorVertex, DistributionPattern.POINTWISE, ResultPartitionType.BLOCKING);
        TestingJobMasterPartitionTracker partitionTracker = new TestingJobMasterPartitionTracker();
        ArrayDeque releasedPartitions = new ArrayDeque();
        partitionTracker.setStopTrackingAndReleasePartitionsConsumer(partitionIds -> releasedPartitions.add((ResultPartitionID)partitionIds.iterator().next()));
        SchedulerBase scheduler = this.createScheduler(partitionTracker, sourceVertex, operatorVertex, sinkVertex);
        ExecutionGraph executionGraph = scheduler.getExecutionGraph();
        this.mainThreadExecutor.execute((ThrowingRunnable<Throwable>)((ThrowingRunnable)() -> {
            Execution sourceExecution = ExecutionGraphPartitionReleaseTest.getCurrentExecution(sourceVertex, executionGraph);
            scheduler.updateTaskExecutionState(new TaskExecutionState(sourceExecution.getAttemptId(), ExecutionState.FINISHED));
            Assertions.assertThat((Collection)releasedPartitions).isEmpty();
        }));
        this.mainThreadExecutor.execute((ThrowingRunnable<Throwable>)((ThrowingRunnable)() -> {
            Execution sourceExecution = ExecutionGraphPartitionReleaseTest.getCurrentExecution(sourceVertex, executionGraph);
            Execution operatorExecution = ExecutionGraphPartitionReleaseTest.getCurrentExecution(operatorVertex, executionGraph);
            scheduler.updateTaskExecutionState(new TaskExecutionState(operatorExecution.getAttemptId(), ExecutionState.FINISHED));
            Assertions.assertThat((Collection)releasedPartitions).hasSize(1);
            Assertions.assertThat((Object)((ResultPartitionID)releasedPartitions.remove())).isEqualTo((Object)new ResultPartitionID((IntermediateResultPartitionID)sourceExecution.getVertex().getProducedPartitions().keySet().iterator().next(), sourceExecution.getAttemptId()));
        }));
        this.mainThreadExecutor.execute((ThrowingRunnable<Throwable>)((ThrowingRunnable)() -> {
            Execution operatorExecution = ExecutionGraphPartitionReleaseTest.getCurrentExecution(operatorVertex, executionGraph);
            Execution sinkExecution = ExecutionGraphPartitionReleaseTest.getCurrentExecution(sinkVertex, executionGraph);
            scheduler.updateTaskExecutionState(new TaskExecutionState(sinkExecution.getAttemptId(), ExecutionState.FINISHED));
            Assertions.assertThat((Collection)releasedPartitions).hasSize(1);
            Assertions.assertThat((Object)((ResultPartitionID)releasedPartitions.remove())).isEqualTo((Object)new ResultPartitionID((IntermediateResultPartitionID)operatorExecution.getVertex().getProducedPartitions().keySet().iterator().next(), operatorExecution.getAttemptId()));
        }));
    }

    @Test
    void testStrategyNotifiedOfUnFinishedVertices() throws Exception {
        JobVertex sourceVertex = ExecutionGraphTestUtils.createNoOpVertex("source", 1);
        JobVertex operator1Vertex = ExecutionGraphTestUtils.createNoOpVertex("operator1", 1);
        JobVertex operator2Vertex = ExecutionGraphTestUtils.createNoOpVertex("operator2", 1);
        JobVertex operator3Vertex = ExecutionGraphTestUtils.createNoOpVertex("operator3", 1);
        operator1Vertex.connectNewDataSetAsInput(sourceVertex, DistributionPattern.POINTWISE, ResultPartitionType.BLOCKING);
        operator2Vertex.connectNewDataSetAsInput(operator1Vertex, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
        operator3Vertex.connectNewDataSetAsInput(operator1Vertex, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
        TestingJobMasterPartitionTracker partitionTracker = new TestingJobMasterPartitionTracker();
        ArrayDeque releasedPartitions = new ArrayDeque();
        partitionTracker.setStopTrackingAndReleasePartitionsConsumer(partitionIds -> releasedPartitions.add((ResultPartitionID)partitionIds.iterator().next()));
        SchedulerBase scheduler = this.createScheduler(partitionTracker, sourceVertex, operator1Vertex, operator2Vertex, operator3Vertex);
        ExecutionGraph executionGraph = scheduler.getExecutionGraph();
        this.mainThreadExecutor.execute((ThrowingRunnable<Throwable>)((ThrowingRunnable)() -> {
            Execution sourceExecution = ExecutionGraphPartitionReleaseTest.getCurrentExecution(sourceVertex, executionGraph);
            scheduler.updateTaskExecutionState(new TaskExecutionState(sourceExecution.getAttemptId(), ExecutionState.FINISHED));
            Assertions.assertThat((Collection)releasedPartitions).isEmpty();
        }));
        this.mainThreadExecutor.execute((ThrowingRunnable<Throwable>)((ThrowingRunnable)() -> {
            Execution operator1Execution = ExecutionGraphPartitionReleaseTest.getCurrentExecution(operator1Vertex, executionGraph);
            scheduler.updateTaskExecutionState(new TaskExecutionState(operator1Execution.getAttemptId(), ExecutionState.FINISHED));
            Assertions.assertThat((Collection)releasedPartitions).isEmpty();
        }));
        this.mainThreadExecutor.execute((ThrowingRunnable<Throwable>)((ThrowingRunnable)() -> {
            Execution operator2Execution = ExecutionGraphPartitionReleaseTest.getCurrentExecution(operator2Vertex, executionGraph);
            scheduler.updateTaskExecutionState(new TaskExecutionState(operator2Execution.getAttemptId(), ExecutionState.FINISHED));
            Assertions.assertThat((Collection)releasedPartitions).isEmpty();
        }));
        this.mainThreadExecutor.execute((ThrowingRunnable<Throwable>)((ThrowingRunnable)() -> {
            Execution operator2Execution = ExecutionGraphPartitionReleaseTest.getCurrentExecution(operator2Vertex, executionGraph);
            operator2Execution.getVertex().resetForNewExecution();
            Assertions.assertThat((Collection)releasedPartitions).isEmpty();
        }));
        this.mainThreadExecutor.execute((ThrowingRunnable<Throwable>)((ThrowingRunnable)() -> {
            Execution operator3Execution = ExecutionGraphPartitionReleaseTest.getCurrentExecution(operator3Vertex, executionGraph);
            scheduler.updateTaskExecutionState(new TaskExecutionState(operator3Execution.getAttemptId(), ExecutionState.FINISHED));
            Assertions.assertThat((Collection)releasedPartitions).isEmpty();
        }));
    }

    private static Execution getCurrentExecution(JobVertex jobVertex, ExecutionGraph executionGraph) {
        return executionGraph.getJobVertex(jobVertex.getID()).getTaskVertices()[0].getCurrentExecutionAttempt();
    }

    private SchedulerBase createScheduler(JobMasterPartitionTracker partitionTracker, JobVertex ... vertices) throws Exception {
        JobGraph jobGraph = JobGraphTestUtils.batchJobGraph(vertices);
        DefaultScheduler scheduler = new DefaultSchedulerBuilder(jobGraph, this.mainThreadExecutor.getMainThreadExecutor(), (ScheduledExecutorService)EXECUTOR_EXTENSION.getExecutor()).setExecutionSlotAllocatorFactory((ExecutionSlotAllocatorFactory)SchedulerTestingUtils.newSlotSharingExecutionSlotAllocatorFactory()).setPartitionTracker(partitionTracker).build();
        this.mainThreadExecutor.execute((ThrowingRunnable<Throwable>)((ThrowingRunnable)() -> ((SchedulerBase)scheduler).startScheduling()));
        return scheduler;
    }
}

