/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.dispatcher.cleanup;

import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.Function;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.testutils.FlinkAssertions;
import org.apache.flink.core.testutils.FlinkCompletableFutureAssert;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
import org.apache.flink.runtime.checkpoint.TestingCheckpointIDCounter;
import org.apache.flink.runtime.checkpoint.TestingCheckpointRecoveryFactory;
import org.apache.flink.runtime.checkpoint.TestingCompletedCheckpointStore;
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
import org.apache.flink.runtime.dispatcher.UnavailableDispatcherOperationException;
import org.apache.flink.runtime.dispatcher.cleanup.CheckpointResourcesCleanupRunner;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.jobgraph.RestoreMode;
import org.apache.flink.runtime.jobmaster.JobManagerRunnerResult;
import org.apache.flink.runtime.jobmaster.JobMasterGateway;
import org.apache.flink.runtime.jobmaster.JobResult;
import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
import org.apache.flink.runtime.state.SharedStateRegistry;
import org.apache.flink.runtime.state.SharedStateRegistryFactory;
import org.apache.flink.testutils.executor.TestExecutorExtension;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.SerializedThrowable;
import org.apache.flink.util.function.ThrowingConsumer;
import org.assertj.core.api.AbstractThrowableAssert;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.CompletableFutureAssert;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

class CheckpointResourcesCleanupRunnerTest {
    @RegisterExtension
    private static final TestExecutorExtension<ExecutorService> EXECUTOR_EXTENSION = new TestExecutorExtension(Executors::newCachedThreadPool);
    private static final Time TIMEOUT_FOR_REQUESTS = Time.milliseconds((long)0L);
    private static final ThrowingConsumer<CheckpointResourcesCleanupRunner, ? extends Exception> BEFORE_START = ignored -> {};
    private static final ThrowingConsumer<CheckpointResourcesCleanupRunner, ? extends Exception> AFTER_START = CheckpointResourcesCleanupRunner::start;
    private static final ThrowingConsumer<CheckpointResourcesCleanupRunner, ? extends Exception> AFTER_CLOSE = runner -> {
        runner.start();
        runner.close();
    };

    CheckpointResourcesCleanupRunnerTest() {
    }

    @Test
    void testIsInitializedBeforeStart() throws Exception {
        CheckpointResourcesCleanupRunnerTest.testIsInitialized(BEFORE_START);
    }

    @Test
    void testIsInitializedAfterStart() throws Exception {
        CheckpointResourcesCleanupRunnerTest.testIsInitialized(AFTER_START);
    }

    @Test
    void testIsInitializedAfterClose() throws Exception {
        CheckpointResourcesCleanupRunnerTest.testIsInitialized(AFTER_CLOSE);
    }

    private static void testIsInitialized(ThrowingConsumer<CheckpointResourcesCleanupRunner, ? extends Exception> preCheckLifecycleHandling) throws Exception {
        CheckpointResourcesCleanupRunner testInstance = new TestInstanceBuilder().build();
        preCheckLifecycleHandling.accept((Object)testInstance);
        Assertions.assertThat((boolean)testInstance.isInitialized()).isTrue();
    }

    @Test
    void testCloseAsyncBeforeStart() {
        CheckpointResourcesCleanupRunner testInstance = new TestInstanceBuilder().build();
        Assertions.assertThat((CompletableFuture)testInstance.closeAsync()).isNotCompleted();
    }

    @Test
    void testSuccessfulCloseAsyncAfterStart() throws Exception {
        CompletableFuture<JobStatus> completedCheckpointStoreShutdownFuture = new CompletableFuture<JobStatus>();
        CompletableFuture<JobStatus> checkpointIdCounterShutdownFuture = new CompletableFuture<JobStatus>();
        HaltingCheckpointRecoveryFactory checkpointRecoveryFactory = new HaltingCheckpointRecoveryFactory(completedCheckpointStoreShutdownFuture, checkpointIdCounterShutdownFuture);
        CheckpointResourcesCleanupRunner testInstance = new TestInstanceBuilder().withCheckpointRecoveryFactory(checkpointRecoveryFactory).withExecutor(EXECUTOR_EXTENSION.getExecutor()).build();
        testInstance.start();
        ((CompletableFutureAssert)Assertions.assertThat(completedCheckpointStoreShutdownFuture).as("The CompletedCheckpointStore shouldn't have been shut down, yet.", new Object[0])).isNotCompleted();
        ((CompletableFutureAssert)Assertions.assertThat(checkpointIdCounterShutdownFuture).as("The CheckpointIDCounter shouldn't have been shut down, yet.", new Object[0])).isNotCompleted();
        ((CompletableFutureAssert)Assertions.assertThat((CompletableFuture)testInstance.closeAsync()).as("closeAsync shouldn't have been completed, yet, since the shutdown of the components is not completed.", new Object[0])).isNotCompleted();
        checkpointRecoveryFactory.triggerCreation();
        ((FlinkCompletableFutureAssert)FlinkAssertions.assertThatFuture(completedCheckpointStoreShutdownFuture).as("The CompletedCheckpointStore should have been shut down properly.", new Object[0])).eventuallySucceeds().isEqualTo((Object)JobStatus.FINISHED);
        ((FlinkCompletableFutureAssert)FlinkAssertions.assertThatFuture(checkpointIdCounterShutdownFuture).as("The CheckpointIDCounter should have been shut down properly.", new Object[0])).eventuallySucceeds().isEqualTo((Object)JobStatus.FINISHED);
        FlinkAssertions.assertThatFuture((CompletableFuture)testInstance.closeAsync()).eventuallySucceeds();
    }

    @Test
    void testCloseAsyncAfterStartAndErrorInCompletedCheckpointStoreShutdown() throws Exception {
        CompletableFuture<JobStatus> checkpointIdCounterShutdownFuture = new CompletableFuture<JobStatus>();
        HaltingCheckpointRecoveryFactory checkpointRecoveryFactory = new HaltingCheckpointRecoveryFactory(TestingCompletedCheckpointStore.builder().withShutdownConsumer((ignoredJobStatus, ignoredCheckpointsCleaner) -> {
            throw new RuntimeException("Expected RuntimeException simulating an error during shutdown.");
        }).build(), TestingCheckpointIDCounter.createStoreWithShutdownCheckAndNoStartAction(checkpointIdCounterShutdownFuture));
        CheckpointResourcesCleanupRunner testInstance = new TestInstanceBuilder().withCheckpointRecoveryFactory(checkpointRecoveryFactory).withExecutor(EXECUTOR_EXTENSION.getExecutor()).build();
        testInstance.start();
        ((CompletableFutureAssert)Assertions.assertThat(checkpointIdCounterShutdownFuture).as("The CheckpointIDCounter shouldn't have been shut down, yet.", new Object[0])).isNotCompleted();
        ((CompletableFutureAssert)Assertions.assertThat((CompletableFuture)testInstance.closeAsync()).as("closeAsync shouldn't have been completed, yet, since the shutdown of the components is not completed.", new Object[0])).isNotCompleted();
        checkpointRecoveryFactory.triggerCreation();
        ((FlinkCompletableFutureAssert)FlinkAssertions.assertThatFuture(checkpointIdCounterShutdownFuture).as("The CheckpointIDCounter should have been shut down properly.", new Object[0])).eventuallySucceeds().isEqualTo((Object)JobStatus.FINISHED);
        FlinkAssertions.assertThatFuture((CompletableFuture)testInstance.closeAsync()).eventuallyFailsWith(ExecutionException.class).withCauseInstanceOf(RuntimeException.class);
    }

    @Test
    void testCloseAsyncAfterStartAndErrorInCheckpointIDCounterShutdown() throws Exception {
        CompletableFuture<JobStatus> completedCheckpointStoreShutdownFuture = new CompletableFuture<JobStatus>();
        HaltingCheckpointRecoveryFactory checkpointRecoveryFactory = new HaltingCheckpointRecoveryFactory(TestingCompletedCheckpointStore.createStoreWithShutdownCheckAndNoCompletedCheckpoints(completedCheckpointStoreShutdownFuture), TestingCheckpointIDCounter.builder().withShutdownConsumer(ignoredJobStatus -> {
            throw new RuntimeException("Expected RuntimeException simulating an error during shutdown.");
        }).build());
        CheckpointResourcesCleanupRunner testInstance = new TestInstanceBuilder().withCheckpointRecoveryFactory(checkpointRecoveryFactory).withExecutor(EXECUTOR_EXTENSION.getExecutor()).build();
        testInstance.start();
        ((CompletableFutureAssert)Assertions.assertThat(completedCheckpointStoreShutdownFuture).as("The CompletedCheckpointStore shouldn't have been shut down, yet.", new Object[0])).isNotCompleted();
        ((CompletableFutureAssert)Assertions.assertThat((CompletableFuture)testInstance.closeAsync()).as("closeAsync shouldn't have been completed, yet, since the shutdown of the components is not completed.", new Object[0])).isNotCompleted();
        checkpointRecoveryFactory.triggerCreation();
        ((FlinkCompletableFutureAssert)FlinkAssertions.assertThatFuture(completedCheckpointStoreShutdownFuture).as("The CompletedCheckpointStore should have been shut down properly.", new Object[0])).eventuallySucceeds().isEqualTo((Object)JobStatus.FINISHED);
        FlinkAssertions.assertThatFuture((CompletableFuture)testInstance.closeAsync()).eventuallyFailsWith(ExecutionException.class).withCauseInstanceOf(RuntimeException.class);
    }

    @Test
    void testCancellationBeforeStart() throws Exception {
        CheckpointResourcesCleanupRunner testInstance = new TestInstanceBuilder().withExecutor(EXECUTOR_EXTENSION.getExecutor()).build();
        FlinkAssertions.assertThatFuture((CompletableFuture)testInstance.cancel(TIMEOUT_FOR_REQUESTS)).eventuallyFailsWith(ExecutionException.class).withCauseInstanceOf(FlinkException.class);
        ((CompletableFutureAssert)((CompletableFutureAssert)((CompletableFutureAssert)Assertions.assertThat((CompletableFuture)testInstance.closeAsync()).as("The closeAsync result shouldn't be completed, yet.", new Object[0])).isNotCompleted()).as("The closeAsync result shouldn't be cancelled.", new Object[0])).isNotCancelled();
    }

    @Test
    void testCancellationAfterStart() throws Exception {
        HaltingCheckpointRecoveryFactory checkpointRecoveryFactory = new HaltingCheckpointRecoveryFactory(new CompletableFuture<JobStatus>(), new CompletableFuture<JobStatus>());
        CheckpointResourcesCleanupRunner testInstance = new TestInstanceBuilder().withCheckpointRecoveryFactory(checkpointRecoveryFactory).withExecutor(EXECUTOR_EXTENSION.getExecutor()).build();
        AFTER_START.accept((Object)testInstance);
        FlinkAssertions.assertThatFuture((CompletableFuture)testInstance.cancel(TIMEOUT_FOR_REQUESTS)).eventuallyFailsWith(ExecutionException.class).withCauseInstanceOf(FlinkException.class);
        ((CompletableFutureAssert)((CompletableFutureAssert)((CompletableFutureAssert)Assertions.assertThat((CompletableFuture)testInstance.closeAsync()).as("The closeAsync result shouldn't be completed, yet.", new Object[0])).isNotCompleted()).as("The closeAsync result shouldn't be cancelled.", new Object[0])).isNotCancelled();
    }

    @Test
    void testCancellationAfterClose() throws Exception {
        CheckpointResourcesCleanupRunner testInstance = new TestInstanceBuilder().withExecutor(EXECUTOR_EXTENSION.getExecutor()).build();
        AFTER_CLOSE.accept((Object)testInstance);
        FlinkAssertions.assertThatFuture((CompletableFuture)testInstance.cancel(TIMEOUT_FOR_REQUESTS)).eventuallyFailsWith(ExecutionException.class).withCauseInstanceOf(FlinkException.class);
        ((CompletableFutureAssert)((CompletableFutureAssert)((CompletableFutureAssert)Assertions.assertThat((CompletableFuture)testInstance.closeAsync()).as("The closeAsync result should be completed by now.", new Object[0])).isCompleted()).as("The closeAsync result shouldn't be cancelled.", new Object[0])).isNotCancelled();
    }

    @Test
    void testResultFutureWithSuccessBeforeStart() throws Exception {
        Assertions.assertThat(CheckpointResourcesCleanupRunnerTest.getResultFutureFromTestInstance(CheckpointResourcesCleanupRunnerTest.createDummySuccessJobResult(), BEFORE_START)).isNotCompleted();
    }

    @Test
    void testResultFutureWithSuccessAfterStart() throws Exception {
        this.testResultFutureWithSuccessfulResultAfterStart(AFTER_START);
    }

    @Test
    void testResultFutureWithSuccessAfterClose() throws Exception {
        this.testResultFutureWithSuccessfulResultAfterStart(AFTER_CLOSE);
    }

    private void testResultFutureWithSuccessfulResultAfterStart(ThrowingConsumer<CheckpointResourcesCleanupRunner, ? extends Exception> preCheckLifecycleHandling) throws Exception {
        CompletableFuture<JobManagerRunnerResult> actualResult = CheckpointResourcesCleanupRunnerTest.getResultFutureFromTestInstance(CheckpointResourcesCleanupRunnerTest.createDummySuccessJobResult(), preCheckLifecycleHandling);
        Assertions.assertThat(actualResult).isCompletedWithValueMatching(JobManagerRunnerResult::isSuccess, "The JobManagerRunner should have succeeded.");
    }

    @Test
    void testResultFutureWithErrorBeforeStart() throws Exception {
        CompletableFuture<JobManagerRunnerResult> resultFuture = CheckpointResourcesCleanupRunnerTest.getResultFutureFromTestInstance(CheckpointResourcesCleanupRunnerTest.createJobResultWithFailure(new SerializedThrowable((Throwable)new Exception("Expected exception"))), BEFORE_START);
        Assertions.assertThat(resultFuture).isNotCompleted();
    }

    @Test
    void testResultFutureWithErrorAfterStart() throws Exception {
        CheckpointResourcesCleanupRunnerTest.testResultFutureWithErrorAfterStart(AFTER_START);
    }

    @Test
    void testResultFutureWithErrorAfterClose() throws Exception {
        CheckpointResourcesCleanupRunnerTest.testResultFutureWithErrorAfterStart(AFTER_CLOSE);
    }

    private static void testResultFutureWithErrorAfterStart(ThrowingConsumer<CheckpointResourcesCleanupRunner, ? extends Exception> preCheckLifecycleHandling) throws Exception {
        SerializedThrowable expectedError = new SerializedThrowable((Throwable)new Exception("Expected exception"));
        CompletableFuture<JobManagerRunnerResult> actualResult = CheckpointResourcesCleanupRunnerTest.getResultFutureFromTestInstance(CheckpointResourcesCleanupRunnerTest.createJobResultWithFailure(expectedError), preCheckLifecycleHandling);
        Assertions.assertThat(actualResult).isCompletedWithValueMatching(jobManagerRunnerResult -> Objects.requireNonNull(jobManagerRunnerResult.getExecutionGraphInfo().getArchivedExecutionGraph().getFailureInfo()).getException().equals(expectedError), "JobManagerRunner should have failed with expected error");
    }

    private static CompletableFuture<JobManagerRunnerResult> getResultFutureFromTestInstance(JobResult jobResult, ThrowingConsumer<CheckpointResourcesCleanupRunner, ? extends Exception> preCheckLifecycleHandling) throws Exception {
        CheckpointResourcesCleanupRunner testInstance = new TestInstanceBuilder().withJobResult(jobResult).build();
        preCheckLifecycleHandling.accept((Object)testInstance);
        return testInstance.getResultFuture();
    }

    @Test
    void testGetJobID() {
        JobID jobId = new JobID();
        CheckpointResourcesCleanupRunner testInstance = new TestInstanceBuilder().withJobResult(CheckpointResourcesCleanupRunnerTest.createJobResult(jobId, ApplicationStatus.CANCELED)).build();
        Assertions.assertThat((Comparable)testInstance.getJobID()).isEqualTo((Object)jobId);
    }

    @Test
    void testGetJobMasterGatewayBeforeStart() throws Exception {
        CheckpointResourcesCleanupRunnerTest.testGetJobMasterGateway(BEFORE_START);
    }

    @Test
    void testGetJobMasterGatewayAfterStart() throws Exception {
        CheckpointResourcesCleanupRunnerTest.testGetJobMasterGateway(AFTER_START);
    }

    @Test
    void testGetJobMasterGatewayAfterClose() throws Exception {
        CheckpointResourcesCleanupRunnerTest.testGetJobMasterGateway(AFTER_CLOSE);
    }

    private static void testGetJobMasterGateway(ThrowingConsumer<CheckpointResourcesCleanupRunner, ? extends Exception> preCheckLifecycleHandling) throws Exception {
        CheckpointResourcesCleanupRunner testInstance = new TestInstanceBuilder().build();
        preCheckLifecycleHandling.accept((Object)testInstance);
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> {
            JobMasterGateway cfr_ignored_0 = (JobMasterGateway)testInstance.getJobMasterGateway().get();
        }).isInstanceOf(ExecutionException.class)).hasCauseExactlyInstanceOf(UnavailableDispatcherOperationException.class);
    }

    @Test
    void testRequestJob_ExceptionHistory() {
        CheckpointResourcesCleanupRunnerTest.testRequestJob(CheckpointResourcesCleanupRunnerTest.createDummySuccessJobResult(), System.currentTimeMillis(), actualExecutionGraphInfo -> !actualExecutionGraphInfo.getExceptionHistory().iterator().hasNext());
    }

    @Test
    void testRequestJob_JobName() {
        CheckpointResourcesCleanupRunnerTest.testRequestJobExecutionGraph(CheckpointResourcesCleanupRunnerTest.createDummySuccessJobResult(), System.currentTimeMillis(), actualExecutionGraph -> actualExecutionGraph.getJobName().equals("unknown"));
    }

    @Test
    void testRequestJob_JobId() {
        JobResult jobResult = CheckpointResourcesCleanupRunnerTest.createDummySuccessJobResult();
        CheckpointResourcesCleanupRunnerTest.testRequestJobExecutionGraph(jobResult, System.currentTimeMillis(), actualExecutionGraph -> actualExecutionGraph.getJobID().equals((Object)jobResult.getJobId()));
    }

    @Test
    void testRequestJob_JobState() {
        JobResult jobResult = CheckpointResourcesCleanupRunnerTest.createDummySuccessJobResult();
        CheckpointResourcesCleanupRunnerTest.testRequestJobExecutionGraph(jobResult, System.currentTimeMillis(), actualExecutionGraph -> actualExecutionGraph.getState().equals((Object)jobResult.getApplicationStatus().deriveJobStatus()));
    }

    @Test
    void testRequestJob_InitiatizationTimestamp() {
        long initializationTimestamp = System.currentTimeMillis();
        CheckpointResourcesCleanupRunnerTest.testRequestJobExecutionGraph(CheckpointResourcesCleanupRunnerTest.createDummySuccessJobResult(), initializationTimestamp, actualExecutionGraph -> actualExecutionGraph.getStatusTimestamp(JobStatus.INITIALIZING) == initializationTimestamp);
    }

    @Test
    void testRequestJobWithFailure() {
        SerializedThrowable expectedError = new SerializedThrowable((Throwable)new Exception("Expected exception"));
        JobResult jobResult = CheckpointResourcesCleanupRunnerTest.createJobResultWithFailure(expectedError);
        CheckpointResourcesCleanupRunnerTest.testRequestJobExecutionGraph(jobResult, System.currentTimeMillis(), actualExecutionGraph -> Objects.requireNonNull(actualExecutionGraph.getFailureInfo()).getException().equals(expectedError));
    }

    private static void testRequestJobExecutionGraph(JobResult jobResult, long initializationTimestamp, Function<AccessExecutionGraph, Boolean> assertion) {
        CheckpointResourcesCleanupRunnerTest.testRequestJob(jobResult, initializationTimestamp, actualExecutionGraphInfo -> (Boolean)assertion.apply((AccessExecutionGraph)actualExecutionGraphInfo.getArchivedExecutionGraph()));
    }

    private static void testRequestJob(JobResult jobResult, long initializationTimestamp, Function<ExecutionGraphInfo, Boolean> assertion) {
        CheckpointResourcesCleanupRunner testInstance = new TestInstanceBuilder().withJobResult(jobResult).withInitializationTimestamp(initializationTimestamp).build();
        CompletableFuture response = testInstance.requestJob(TIMEOUT_FOR_REQUESTS);
        Assertions.assertThat((CompletableFuture)response).isCompletedWithValueMatching(assertion::apply);
    }

    private static JobResult createDummySuccessJobResult() {
        return CheckpointResourcesCleanupRunnerTest.createJobResult(new JobID(), ApplicationStatus.SUCCEEDED);
    }

    private static JobResult createJobResultWithFailure(SerializedThrowable throwable) {
        return new JobResult.Builder().jobId(new JobID()).applicationStatus(ApplicationStatus.FAILED).serializedThrowable(throwable).netRuntime(1L).build();
    }

    private static JobResult createJobResult(JobID jobId, ApplicationStatus applicationStatus) {
        return new JobResult.Builder().jobId(jobId).applicationStatus(applicationStatus).netRuntime(1L).build();
    }

    private static CheckpointRecoveryFactory createCheckpointRecoveryFactory() {
        return new TestingCheckpointRecoveryFactory(TestingCompletedCheckpointStore.createStoreWithShutdownCheckAndNoCompletedCheckpoints(new CompletableFuture<JobStatus>()), TestingCheckpointIDCounter.createStoreWithShutdownCheckAndNoStartAction(new CompletableFuture<JobStatus>()));
    }

    static /* synthetic */ JobResult access$100() {
        return CheckpointResourcesCleanupRunnerTest.createDummySuccessJobResult();
    }

    static /* synthetic */ CheckpointRecoveryFactory access$200() {
        return CheckpointResourcesCleanupRunnerTest.createCheckpointRecoveryFactory();
    }

    private static class HaltingCheckpointRecoveryFactory
    implements CheckpointRecoveryFactory {
        private final CompletedCheckpointStore completedCheckpointStore;
        private final CheckpointIDCounter checkpointIDCounter;
        private final OneShotLatch creationLatch = new OneShotLatch();

        public HaltingCheckpointRecoveryFactory(CompletableFuture<JobStatus> completableCheckpointStoreShutDownFuture, CompletableFuture<JobStatus> checkpointIDCounterShutDownFuture) {
            this(TestingCompletedCheckpointStore.createStoreWithShutdownCheckAndNoCompletedCheckpoints(completableCheckpointStoreShutDownFuture), TestingCheckpointIDCounter.createStoreWithShutdownCheckAndNoStartAction(checkpointIDCounterShutDownFuture));
        }

        public HaltingCheckpointRecoveryFactory(CompletedCheckpointStore completedCheckpointStore, CheckpointIDCounter checkpointIDCounter) {
            this.completedCheckpointStore = (CompletedCheckpointStore)Preconditions.checkNotNull((Object)completedCheckpointStore);
            this.checkpointIDCounter = (CheckpointIDCounter)Preconditions.checkNotNull((Object)checkpointIDCounter);
        }

        public CompletedCheckpointStore createRecoveredCompletedCheckpointStore(JobID jobId, int maxNumberOfCheckpointsToRetain, SharedStateRegistryFactory sharedStateRegistryFactory, Executor ioExecutor, RestoreMode restoreMode) throws Exception {
            this.creationLatch.await();
            return this.completedCheckpointStore;
        }

        public CheckpointIDCounter createCheckpointIDCounter(JobID jobId) throws Exception {
            this.creationLatch.await();
            return this.checkpointIDCounter;
        }

        public void triggerCreation() {
            this.creationLatch.trigger();
        }
    }

    private static class TestInstanceBuilder {
        private JobResult jobResult = CheckpointResourcesCleanupRunnerTest.access$100();
        private CheckpointRecoveryFactory checkpointRecoveryFactory = CheckpointResourcesCleanupRunnerTest.access$200();
        private SharedStateRegistryFactory sharedStateRegistryFactory = SharedStateRegistry.DEFAULT_FACTORY;
        private Executor executor = org.apache.flink.util.concurrent.Executors.directExecutor();
        private Configuration configuration = new Configuration();
        private long initializationTimestamp = System.currentTimeMillis();

        private TestInstanceBuilder() {
        }

        public TestInstanceBuilder withJobResult(JobResult jobResult) {
            this.jobResult = jobResult;
            return this;
        }

        public TestInstanceBuilder withCheckpointRecoveryFactory(CheckpointRecoveryFactory checkpointRecoveryFactory) {
            this.checkpointRecoveryFactory = checkpointRecoveryFactory;
            return this;
        }

        public TestInstanceBuilder withSharedStateRegistryFactory(SharedStateRegistryFactory sharedStateRegistryFactory) {
            this.sharedStateRegistryFactory = sharedStateRegistryFactory;
            return this;
        }

        public TestInstanceBuilder withExecutor(Executor executor) {
            this.executor = executor;
            return this;
        }

        public TestInstanceBuilder withConfiguration(Configuration configuration) {
            this.configuration = configuration;
            return this;
        }

        public TestInstanceBuilder withInitializationTimestamp(long initializationTimestamp) {
            this.initializationTimestamp = initializationTimestamp;
            return this;
        }

        public CheckpointResourcesCleanupRunner build() {
            return new CheckpointResourcesCleanupRunner(this.jobResult, this.checkpointRecoveryFactory, this.sharedStateRegistryFactory, this.configuration, this.executor, this.initializationTimestamp);
        }
    }
}

