/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.dispatcher;

import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.configuration.CleanupOptions;
import org.apache.flink.core.execution.RecoveryClaimMode;
import org.apache.flink.core.testutils.FlinkMatchers;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
import org.apache.flink.runtime.checkpoint.EmbeddedCompletedCheckpointStore;
import org.apache.flink.runtime.checkpoint.PerJobCheckpointRecoveryFactory;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.dispatcher.AbstractDispatcherTest;
import org.apache.flink.runtime.dispatcher.DefaultJobManagerRunnerRegistry;
import org.apache.flink.runtime.dispatcher.DispatcherGateway;
import org.apache.flink.runtime.dispatcher.JobCancellationFailedException;
import org.apache.flink.runtime.dispatcher.JobManagerRunnerRegistry;
import org.apache.flink.runtime.dispatcher.JobMasterTester;
import org.apache.flink.runtime.dispatcher.NoOpJobGraphListener;
import org.apache.flink.runtime.dispatcher.TestingDispatcher;
import org.apache.flink.runtime.dispatcher.TestingJobManagerRunnerRegistry;
import org.apache.flink.runtime.dispatcher.cleanup.DispatcherResourceCleanerFactory;
import org.apache.flink.runtime.dispatcher.cleanup.ResourceCleanerFactory;
import org.apache.flink.runtime.dispatcher.cleanup.TestingRetryStrategies;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.JobResultEntry;
import org.apache.flink.runtime.highavailability.JobResultStore;
import org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedJobResultStore;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobGraphBuilder;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
import org.apache.flink.runtime.jobmanager.JobGraphWriter;
import org.apache.flink.runtime.jobmaster.JobManagerRunner;
import org.apache.flink.runtime.jobmaster.JobMasterGateway;
import org.apache.flink.runtime.jobmaster.JobMasterId;
import org.apache.flink.runtime.jobmaster.JobResult;
import org.apache.flink.runtime.leaderelection.LeaderInformation;
import org.apache.flink.runtime.leaderelection.TestingLeaderElection;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.rpc.RpcEndpoint;
import org.apache.flink.runtime.rpc.RpcUtils;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.runtime.testutils.TestingJobGraphStore;
import org.apache.flink.runtime.testutils.TestingJobResultStore;
import org.apache.flink.util.concurrent.FutureUtils;
import org.apache.flink.util.function.SupplierWithException;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.collection.IsEmptyCollection;
import org.hamcrest.core.IsEqual;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class DispatcherCleanupITCase
extends AbstractDispatcherTest {
    private final BlockingQueue<RpcEndpoint> toTerminate = new LinkedBlockingQueue<RpcEndpoint>();

    @Override
    @Before
    public void setUp() throws Exception {
        super.setUp();
        this.haServices.setCheckpointRecoveryFactory((CheckpointRecoveryFactory)new PerJobCheckpointRecoveryFactory((maxCheckpoints, previous, sharedStateRegistryFactory, ioExecutor, recoveryClaimMode) -> {
            if (previous != null) {
                Assert.assertTrue((boolean)previous.getShutdownStatus().isPresent());
                Assert.assertTrue((boolean)previous.getAllCheckpoints().isEmpty());
                return new EmbeddedCompletedCheckpointStore(maxCheckpoints, (Collection)previous.getAllCheckpoints(), sharedStateRegistryFactory.create(ioExecutor, (Collection)previous.getAllCheckpoints(), recoveryClaimMode));
            }
            return new EmbeddedCompletedCheckpointStore(maxCheckpoints, Collections.emptyList(), sharedStateRegistryFactory.create(ioExecutor, Collections.emptyList(), RecoveryClaimMode.DEFAULT));
        }));
    }

    @Override
    @After
    public void tearDown() {
        while (!this.toTerminate.isEmpty()) {
            RpcEndpoint endpoint = (RpcEndpoint)this.toTerminate.poll();
            try {
                RpcUtils.terminateRpcEndpoint((RpcEndpoint[])new RpcEndpoint[]{endpoint});
            }
            catch (Exception exception) {}
        }
    }

    @Test
    public void testCleanupThroughRetries() throws Exception {
        JobGraph jobGraph = this.createJobGraph();
        JobID jobId = jobGraph.getJobID();
        AtomicInteger actualGlobalCleanupCallCount = new AtomicInteger();
        OneShotLatch successfulCleanupLatch = new OneShotLatch();
        int numberOfErrors = 5;
        RuntimeException temporaryError = new RuntimeException("Expected RuntimeException: Unable to remove job graph.");
        AtomicInteger failureCount = new AtomicInteger(5);
        TestingJobGraphStore jobGraphStore = TestingJobGraphStore.newBuilder().setGlobalCleanupFunction((ignoredJobId, ignoredExecutor) -> {
            actualGlobalCleanupCallCount.incrementAndGet();
            if (failureCount.getAndDecrement() > 0) {
                return FutureUtils.completedExceptionally((Throwable)temporaryError);
            }
            successfulCleanupLatch.trigger();
            return FutureUtils.completedVoidFuture();
        }).build();
        jobGraphStore.start(NoOpJobGraphListener.INSTANCE);
        this.haServices.setJobGraphStore(jobGraphStore);
        TestingLeaderElection leaderElection = new TestingLeaderElection();
        this.haServices.setJobMasterLeaderElection(jobId, leaderElection);
        DefaultJobManagerRunnerRegistry jobManagerRunnerRegistry = new DefaultJobManagerRunnerRegistry(2);
        TestingDispatcher dispatcher = this.createTestingDispatcherBuilder().setResourceCleanerFactory((ResourceCleanerFactory)new DispatcherResourceCleanerFactory((Executor)ForkJoinPool.commonPool(), TestingRetryStrategies.createWithNumberOfRetries(5), (JobManagerRunnerRegistry)jobManagerRunnerRegistry, (JobGraphWriter)this.haServices.getJobGraphStore(), this.blobServer, (HighAvailabilityServices)this.haServices, UnregisteredMetricGroups.createUnregisteredJobManagerMetricGroup())).build(rpcService);
        dispatcher.start();
        this.toTerminate.add((RpcEndpoint)dispatcher);
        CompletableFuture<LeaderInformation> confirmedLeaderInformation = leaderElection.isLeader(UUID.randomUUID());
        DispatcherGateway dispatcherGateway = (DispatcherGateway)dispatcher.getSelfGateway(DispatcherGateway.class);
        dispatcherGateway.submitJob(jobGraph, TIMEOUT).get();
        this.waitForJobToFinish(confirmedLeaderInformation, dispatcherGateway, jobId);
        successfulCleanupLatch.await();
        MatcherAssert.assertThat((Object)actualGlobalCleanupCallCount.get(), (Matcher)IsEqual.equalTo((Object)6));
        MatcherAssert.assertThat((String)"The JobGraph should be removed from JobGraphStore.", (Object)this.haServices.getJobGraphStore().getJobIds(), (Matcher)IsEmptyCollection.empty());
        CommonTestUtils.waitUntilCondition((SupplierWithException<Boolean, Exception>)((SupplierWithException)() -> (Boolean)this.haServices.getJobResultStore().hasJobResultEntryAsync(jobId).get()));
    }

    @Test
    public void testCleanupNotCancellable() throws Exception {
        JobGraph jobGraph = this.createJobGraph();
        JobID jobId = jobGraph.getJobID();
        EmbeddedJobResultStore jobResultStore = new EmbeddedJobResultStore();
        jobResultStore.createDirtyResultAsync(new JobResultEntry(TestingJobResultStore.createSuccessfulJobResult(jobId))).get();
        this.haServices.setJobResultStore((JobResultStore)jobResultStore);
        CompletableFuture<Object> jobManagerRunnerCleanupFuture = new CompletableFuture<Object>();
        AtomicReference<JobManagerRunner> jobManagerRunnerEntry = new AtomicReference<JobManagerRunner>();
        TestingJobManagerRunnerRegistry jobManagerRunnerRegistry = TestingJobManagerRunnerRegistry.newSingleJobBuilder(jobManagerRunnerEntry).withLocalCleanupAsyncFunction((actualJobId, executor) -> jobManagerRunnerCleanupFuture).build();
        TestingDispatcher dispatcher = this.createTestingDispatcherBuilder().setJobManagerRunnerRegistry(jobManagerRunnerRegistry).build(rpcService);
        dispatcher.start();
        this.toTerminate.add((RpcEndpoint)dispatcher);
        CommonTestUtils.waitUntilCondition((SupplierWithException<Boolean, Exception>)((SupplierWithException)() -> jobManagerRunnerEntry.get() != null));
        MatcherAssert.assertThat((String)"The JobResultStore should have this job still marked as dirty.", (Object)((Boolean)this.haServices.getJobResultStore().hasDirtyJobResultEntryAsync(jobId).get()), (Matcher)CoreMatchers.is((Object)true));
        DispatcherGateway dispatcherGateway = (DispatcherGateway)dispatcher.getSelfGateway(DispatcherGateway.class);
        try {
            dispatcherGateway.cancelJob(jobId, TIMEOUT).get();
            Assert.fail((String)"Should fail because cancelling the cleanup is not allowed.");
        }
        catch (ExecutionException e) {
            MatcherAssert.assertThat((Object)e, (Matcher)FlinkMatchers.containsCause(JobCancellationFailedException.class));
        }
        jobManagerRunnerCleanupFuture.complete(null);
        CommonTestUtils.waitUntilCondition((SupplierWithException<Boolean, Exception>)((SupplierWithException)() -> (Boolean)this.haServices.getJobResultStore().hasCleanJobResultEntryAsync(jobId).get()));
    }

    @Test
    public void testCleanupAfterLeadershipChange() throws Exception {
        JobGraph jobGraph = this.createJobGraph();
        JobID jobId = jobGraph.getJobID();
        AtomicInteger actualGlobalCleanupCallCount = new AtomicInteger();
        OneShotLatch firstCleanupTriggered = new OneShotLatch();
        CompletableFuture successfulJobGraphCleanup = new CompletableFuture();
        TestingJobGraphStore jobGraphStore = TestingJobGraphStore.newBuilder().setGlobalCleanupFunction((actualJobId, ignoredExecutor) -> {
            int callCount = actualGlobalCleanupCallCount.getAndIncrement();
            firstCleanupTriggered.trigger();
            if (callCount < 1) {
                return FutureUtils.completedExceptionally((Throwable)new RuntimeException("Expected RuntimeException: Unable to remove job graph."));
            }
            successfulJobGraphCleanup.complete(actualJobId);
            return FutureUtils.completedVoidFuture();
        }).build();
        jobGraphStore.start(NoOpJobGraphListener.INSTANCE);
        this.haServices.setJobGraphStore(jobGraphStore);
        TestingLeaderElection leaderElection = new TestingLeaderElection();
        this.haServices.setJobMasterLeaderElection(jobId, leaderElection);
        this.configuration.set(CleanupOptions.CLEANUP_STRATEGY, (Object)((String)CleanupOptions.NONE_PARAM_VALUES.iterator().next()));
        TestingDispatcher dispatcher = this.createTestingDispatcherBuilder().build(rpcService);
        dispatcher.start();
        this.toTerminate.add((RpcEndpoint)dispatcher);
        CompletableFuture<LeaderInformation> confirmedLeaderInformation = leaderElection.isLeader(UUID.randomUUID());
        DispatcherGateway dispatcherGateway = (DispatcherGateway)dispatcher.getSelfGateway(DispatcherGateway.class);
        dispatcherGateway.submitJob(jobGraph, TIMEOUT).get();
        this.waitForJobToFinish(confirmedLeaderInformation, dispatcherGateway, jobId);
        firstCleanupTriggered.await();
        MatcherAssert.assertThat((String)"The cleanup should have been triggered only once.", (Object)actualGlobalCleanupCallCount.get(), (Matcher)IsEqual.equalTo((Object)1));
        MatcherAssert.assertThat((String)"The cleanup should not have reached the successful cleanup code path.", (Object)successfulJobGraphCleanup.isDone(), (Matcher)IsEqual.equalTo((Object)false));
        MatcherAssert.assertThat((String)"The JobGraph is still stored in the JobGraphStore.", (Object)this.haServices.getJobGraphStore().getJobIds(), (Matcher)IsEqual.equalTo(Collections.singleton(jobId)));
        MatcherAssert.assertThat((String)"The JobResultStore should have this job marked as dirty.", this.haServices.getJobResultStore().getDirtyResults().stream().map(JobResult::getJobId).collect(Collectors.toSet()), (Matcher)IsEqual.equalTo(Collections.singleton(jobId)));
        TestingDispatcher secondDispatcher = this.createTestingDispatcherBuilder().setRecoveredDirtyJobs(this.haServices.getJobResultStore().getDirtyResults()).build(rpcService);
        secondDispatcher.start();
        this.toTerminate.add((RpcEndpoint)secondDispatcher);
        leaderElection.isLeader(UUID.randomUUID());
        CommonTestUtils.waitUntilCondition((SupplierWithException<Boolean, Exception>)((SupplierWithException)() -> this.haServices.getJobResultStore().getDirtyResults().isEmpty()));
        MatcherAssert.assertThat((String)"The JobGraph is not stored in the JobGraphStore.", (Object)this.haServices.getJobGraphStore().getJobIds(), (Matcher)IsEmptyCollection.empty());
        Assert.assertTrue((String)"The JobResultStore has the job listed as clean.", (boolean)((Boolean)this.haServices.getJobResultStore().hasJobResultEntryAsync(jobId).get()));
        MatcherAssert.assertThat((Object)((JobID)successfulJobGraphCleanup.get()), (Matcher)IsEqual.equalTo((Object)jobId));
        MatcherAssert.assertThat((Object)actualGlobalCleanupCallCount.get(), (Matcher)IsEqual.equalTo((Object)2));
    }

    private void waitForJobToFinish(CompletableFuture<LeaderInformation> confirmedLeaderInformation, DispatcherGateway dispatcherGateway, JobID jobId) throws Exception {
        JobMasterGateway jobMasterGateway = DispatcherCleanupITCase.connectToLeadingJobMaster(confirmedLeaderInformation).get();
        try (JobMasterTester tester = new JobMasterTester(rpcService, jobId, jobMasterGateway);){
            CompletableFuture<List<TaskDeploymentDescriptor>> descriptorsFuture = tester.deployVertices(2);
            DispatcherCleanupITCase.awaitStatus(dispatcherGateway, jobId, JobStatus.RUNNING);
            tester.transitionTo(descriptorsFuture.get(), ExecutionState.INITIALIZING).get();
            tester.transitionTo(descriptorsFuture.get(), ExecutionState.RUNNING).get();
            tester.getCheckpointFuture(1L).get();
            tester.transitionTo(descriptorsFuture.get(), ExecutionState.FINISHED).get();
        }
        DispatcherCleanupITCase.awaitStatus(dispatcherGateway, jobId, JobStatus.FINISHED);
    }

    private JobGraph createJobGraph() {
        JobVertex firstVertex = new JobVertex("first");
        firstVertex.setInvokableClass(NoOpInvokable.class);
        firstVertex.setParallelism(1);
        JobVertex secondVertex = new JobVertex("second");
        secondVertex.setInvokableClass(NoOpInvokable.class);
        secondVertex.setParallelism(1);
        CheckpointCoordinatorConfiguration checkpointCoordinatorConfiguration = CheckpointCoordinatorConfiguration.builder().setCheckpointInterval(20L).setMinPauseBetweenCheckpoints(20L).setCheckpointTimeout(10000L).build();
        JobCheckpointingSettings checkpointingSettings = new JobCheckpointingSettings(checkpointCoordinatorConfiguration, null);
        return JobGraphBuilder.newStreamingJobGraphBuilder().addJobVertex(firstVertex).addJobVertex(secondVertex).setJobCheckpointingSettings(checkpointingSettings).build();
    }

    private static CompletableFuture<JobMasterGateway> connectToLeadingJobMaster(CompletableFuture<LeaderInformation> confirmedLeaderInformation) {
        return confirmedLeaderInformation.thenCompose(leaderInformation -> rpcService.connect(leaderInformation.getLeaderAddress(), JobMasterId.fromUuidOrNull((UUID)leaderInformation.getLeaderSessionID()), JobMasterGateway.class));
    }
}

