/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.scheduler.adaptive;

import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.MetricOptions;
import org.apache.flink.configuration.SchedulerExecutionMode;
import org.apache.flink.core.execution.SavepointFormatType;
import org.apache.flink.core.failure.FailureEnricher;
import org.apache.flink.core.failure.TestingFailureEnricher;
import org.apache.flink.core.testutils.FlinkAssertions;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.runtime.checkpoint.CheckpointException;
import org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
import org.apache.flink.runtime.checkpoint.CheckpointsCleaner;
import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
import org.apache.flink.runtime.checkpoint.StandaloneCheckpointIDCounter;
import org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore;
import org.apache.flink.runtime.checkpoint.TestingCheckpointIDCounter;
import org.apache.flink.runtime.checkpoint.TestingCheckpointRecoveryFactory;
import org.apache.flink.runtime.checkpoint.TestingCompletedCheckpointStore;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
import org.apache.flink.runtime.concurrent.ManuallyTriggeredComponentMainThreadExecutor;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.execution.SuppressRestartsException;
import org.apache.flink.runtime.executiongraph.ArchivedExecution;
import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraphTest;
import org.apache.flink.runtime.executiongraph.ArchivedExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ArchivedExecutionVertex;
import org.apache.flink.runtime.executiongraph.ErrorInfo;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils;
import org.apache.flink.runtime.executiongraph.TaskExecutionStateTransition;
import org.apache.flink.runtime.executiongraph.failover.FixedDelayRestartBackoffTimeStrategy;
import org.apache.flink.runtime.executiongraph.failover.NoRestartBackoffTimeStrategy;
import org.apache.flink.runtime.executiongraph.failover.RestartBackoffTimeStrategy;
import org.apache.flink.runtime.executiongraph.failover.TestRestartBackoffTimeStrategy;
import org.apache.flink.runtime.executiongraph.metrics.DownTimeGauge;
import org.apache.flink.runtime.executiongraph.metrics.UpTimeGauge;
import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
import org.apache.flink.runtime.jobgraph.JobResourceRequirements;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexResourceRequirements;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
import org.apache.flink.runtime.jobmanager.PartitionProducerDisposedException;
import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
import org.apache.flink.runtime.jobmaster.slotpool.AllocatedSlotPool;
import org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPool;
import org.apache.flink.runtime.jobmaster.slotpool.DefaultAllocatedSlotPool;
import org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool;
import org.apache.flink.runtime.jobmaster.slotpool.SlotPoolTestUtils;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
import org.apache.flink.runtime.metrics.MetricRegistry;
import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
import org.apache.flink.runtime.metrics.util.TestingMetricRegistry;
import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.runtime.operators.coordination.TaskNotRunningException;
import org.apache.flink.runtime.operators.coordination.TestOperatorEvent;
import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder;
import org.apache.flink.runtime.scheduler.DefaultSchedulerTest;
import org.apache.flink.runtime.scheduler.SchedulerBase;
import org.apache.flink.runtime.scheduler.SchedulerNG;
import org.apache.flink.runtime.scheduler.SchedulerTestingUtils;
import org.apache.flink.runtime.scheduler.VertexParallelismInformation;
import org.apache.flink.runtime.scheduler.VertexParallelismStore;
import org.apache.flink.runtime.scheduler.adaptive.AdaptiveScheduler;
import org.apache.flink.runtime.scheduler.adaptive.AdaptiveSchedulerBuilder;
import org.apache.flink.runtime.scheduler.adaptive.Created;
import org.apache.flink.runtime.scheduler.adaptive.CreatingExecutionGraph;
import org.apache.flink.runtime.scheduler.adaptive.Executing;
import org.apache.flink.runtime.scheduler.adaptive.FailureResult;
import org.apache.flink.runtime.scheduler.adaptive.Finished;
import org.apache.flink.runtime.scheduler.adaptive.JobSchedulingPlan;
import org.apache.flink.runtime.scheduler.adaptive.State;
import org.apache.flink.runtime.scheduler.adaptive.StateFactory;
import org.apache.flink.runtime.scheduler.adaptive.StateTrackingMockExecutionGraph;
import org.apache.flink.runtime.scheduler.adaptive.WaitingForResources;
import org.apache.flink.runtime.scheduler.adaptive.allocator.TestSlotInfo;
import org.apache.flink.runtime.scheduler.adaptive.allocator.TestingSlotAllocator;
import org.apache.flink.runtime.scheduler.exceptionhistory.RootExceptionHistoryEntry;
import org.apache.flink.runtime.slots.ResourceRequirement;
import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.runtime.util.ResourceCounter;
import org.apache.flink.runtime.util.TestingFatalErrorHandler;
import org.apache.flink.testutils.TestingUtils;
import org.apache.flink.testutils.executor.TestExecutorExtension;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.IterableUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.concurrent.FutureUtils;
import org.apache.flink.util.function.SupplierWithException;
import org.apache.flink.util.jackson.JacksonMapperFactory;
import org.assertj.core.api.AbstractComparableAssert;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.ObjectAssert;
import org.assertj.core.api.ThrowingConsumer;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AdaptiveSchedulerTest {
    private static final Duration DEFAULT_TIMEOUT = Duration.ofHours(1L);
    private static final int PARALLELISM = 4;
    private static final JobVertex JOB_VERTEX = ExecutionGraphTestUtils.createNoOpVertex("v1", 4);
    private static final Logger LOG = LoggerFactory.getLogger(AdaptiveSchedulerTest.class);
    @RegisterExtension
    public static final TestExecutorExtension<ScheduledExecutorService> EXECUTOR_RESOURCE = TestingUtils.defaultExecutorExtension();
    @RegisterExtension
    public static final TestExecutorExtension<ScheduledExecutorService> TEST_EXECUTOR_RESOURCE = new TestExecutorExtension(Executors::newSingleThreadScheduledExecutor);
    private final ManuallyTriggeredComponentMainThreadExecutor mainThreadExecutor = new ManuallyTriggeredComponentMainThreadExecutor(Thread.currentThread());
    private final ComponentMainThreadExecutor singleThreadMainThreadExecutor = ComponentMainThreadExecutorServiceAdapter.forSingleThreadExecutor((ScheduledExecutorService)TEST_EXECUTOR_RESOURCE.getExecutor());
    private final ClassLoader classLoader = ClassLoader.getSystemClassLoader();

    @Test
    void testInitialState() throws Exception {
        AdaptiveScheduler scheduler = new AdaptiveSchedulerBuilder(AdaptiveSchedulerTest.createJobGraph(), this.mainThreadExecutor, (ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor()).build();
        Assertions.assertThat((Object)scheduler.getState()).isInstanceOf(Created.class);
    }

    @Test
    void testArchivedCheckpointingSettingsNotNullIfCheckpointingIsEnabled() throws Exception {
        JobGraph jobGraph = AdaptiveSchedulerTest.createJobGraph();
        jobGraph.setSnapshotSettings(new JobCheckpointingSettings(CheckpointCoordinatorConfiguration.builder().build(), null));
        ArchivedExecutionGraph archivedExecutionGraph = new AdaptiveSchedulerBuilder(jobGraph, this.mainThreadExecutor, (ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor()).build().getArchivedExecutionGraph(JobStatus.INITIALIZING, null);
        ArchivedExecutionGraphTest.assertContainsCheckpointSettings(archivedExecutionGraph);
    }

    @Test
    void testArchivedJobVerticesPresent() throws Exception {
        JobGraph jobGraph = AdaptiveSchedulerTest.createJobGraph();
        jobGraph.setSnapshotSettings(new JobCheckpointingSettings(CheckpointCoordinatorConfiguration.builder().build(), null));
        ArchivedExecutionGraph archivedExecutionGraph = new AdaptiveSchedulerBuilder(jobGraph, this.mainThreadExecutor, (ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor()).build().getArchivedExecutionGraph(JobStatus.INITIALIZING, null);
        ArchivedExecutionJobVertex jobVertex = archivedExecutionGraph.getJobVertex(JOB_VERTEX.getID());
        ((ObjectAssert)Assertions.assertThat((Object)jobVertex).isNotNull()).satisfies(new ThrowingConsumer[]{archived -> {
            Assertions.assertThat((int)archived.getParallelism()).isEqualTo(JOB_VERTEX.getParallelism());
            Assertions.assertThat((int)archived.getMaxParallelism()).isEqualTo(128);
        }});
        ArchivedExecutionGraphTest.assertContainsCheckpointSettings(archivedExecutionGraph);
    }

    @Test
    void testIsState() throws Exception {
        AdaptiveScheduler scheduler = new AdaptiveSchedulerBuilder(AdaptiveSchedulerTest.createJobGraph(), this.mainThreadExecutor, (ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor()).build();
        State state = scheduler.getState();
        Assertions.assertThat((boolean)scheduler.isState(state)).isTrue();
        Assertions.assertThat((boolean)scheduler.isState((State)new DummyState())).isFalse();
    }

    @Test
    void testRunIfState() throws Exception {
        AdaptiveScheduler scheduler = new AdaptiveSchedulerBuilder(AdaptiveSchedulerTest.createJobGraph(), this.mainThreadExecutor, (ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor()).build();
        AtomicBoolean ran = new AtomicBoolean(false);
        scheduler.runIfState(scheduler.getState(), () -> ran.set(true));
        Assertions.assertThat((boolean)ran.get()).isTrue();
    }

    @Test
    void testRunIfStateWithStateMismatch() throws Exception {
        AdaptiveScheduler scheduler = new AdaptiveSchedulerBuilder(AdaptiveSchedulerTest.createJobGraph(), this.mainThreadExecutor, (ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor()).build();
        AtomicBoolean ran = new AtomicBoolean(false);
        scheduler.runIfState((State)new DummyState(), () -> ran.set(true));
        Assertions.assertThat((boolean)ran.get()).isFalse();
    }

    @Test
    void testHasEnoughResourcesReturnsFalseIfUnsatisfied() {
        ResourceCounter resourceRequirement = ResourceCounter.withResource((ResourceProfile)ResourceProfile.UNKNOWN, (int)1);
        Assertions.assertThat((boolean)AdaptiveScheduler.hasDesiredResources((ResourceCounter)resourceRequirement, Collections.emptyList())).isFalse();
    }

    @Test
    void testHasEnoughResourcesReturnsTrueIfSatisfied() {
        ResourceCounter resourceRequirement = ResourceCounter.withResource((ResourceProfile)ResourceProfile.UNKNOWN, (int)1);
        Collection<TestSlotInfo> freeSlots = this.createSlotInfosForResourceRequirements(resourceRequirement);
        Assertions.assertThat((boolean)AdaptiveScheduler.hasDesiredResources((ResourceCounter)resourceRequirement, freeSlots)).isTrue();
    }

    private Collection<TestSlotInfo> createSlotInfosForResourceRequirements(ResourceCounter resourceRequirements) {
        ArrayList<TestSlotInfo> slotInfos = new ArrayList<TestSlotInfo>();
        for (Map.Entry resourceProfileCount : resourceRequirements.getResourcesWithCount()) {
            for (int i = 0; i < (Integer)resourceProfileCount.getValue(); ++i) {
                slotInfos.add(new TestSlotInfo((ResourceProfile)resourceProfileCount.getKey()));
            }
        }
        return slotInfos;
    }

    @Test
    void testHasEnoughResourcesUsesUnmatchedSlotsAsUnknown() {
        boolean numRequiredSlots = true;
        ResourceCounter requiredResources = ResourceCounter.withResource((ResourceProfile)ResourceProfile.UNKNOWN, (int)1);
        ResourceCounter providedResources = ResourceCounter.withResource((ResourceProfile)ResourceProfile.newBuilder().setCpuCores(1.0).build(), (int)1);
        Collection<TestSlotInfo> freeSlots = this.createSlotInfosForResourceRequirements(providedResources);
        Assertions.assertThat((boolean)AdaptiveScheduler.hasDesiredResources((ResourceCounter)requiredResources, freeSlots)).isTrue();
    }

    @Test
    void testExecutionGraphGenerationWithAvailableResources() throws Exception {
        JobGraph jobGraph = AdaptiveSchedulerTest.createJobGraph();
        DefaultDeclarativeSlotPool declarativeSlotPool = AdaptiveSchedulerTest.createDeclarativeSlotPool(jobGraph.getJobID());
        Configuration configuration = new Configuration();
        configuration.set(JobManagerOptions.RESOURCE_WAIT_TIMEOUT, (Object)Duration.ofMillis(1L));
        AdaptiveScheduler scheduler = new AdaptiveSchedulerBuilder(jobGraph, this.singleThreadMainThreadExecutor, (ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor()).setDeclarativeSlotPool((DeclarativeSlotPool)declarativeSlotPool).setJobMasterConfiguration(configuration).build();
        int numAvailableSlots = 2;
        SubmissionBufferingTaskManagerGateway taskManagerGateway = new SubmissionBufferingTaskManagerGateway(2);
        this.singleThreadMainThreadExecutor.execute(() -> {
            scheduler.startScheduling();
            SlotPoolTestUtils.offerSlots((DeclarativeSlotPool)declarativeSlotPool, SlotPoolTestUtils.createSlotOffersForResourceRequirements(ResourceCounter.withResource((ResourceProfile)ResourceProfile.UNKNOWN, (int)2)), (TaskManagerGateway)taskManagerGateway);
        });
        taskManagerGateway.waitForSubmissions(2);
        ArchivedExecutionGraph executionGraph = CompletableFuture.supplyAsync(() -> scheduler.requestJob().getArchivedExecutionGraph(), (Executor)this.singleThreadMainThreadExecutor).join();
        Assertions.assertThat((int)executionGraph.getJobVertex(JOB_VERTEX.getID()).getParallelism()).isEqualTo(2);
        Assertions.assertThat((int)JacksonMapperFactory.createObjectMapper().readTree(executionGraph.getJsonPlan()).get("nodes").size()).isEqualTo(1);
    }

    @Test
    void testExecutionGraphGenerationSetsInitializationTimestamp() throws Exception {
        long initializationTimestamp = 42L;
        JobGraph jobGraph = AdaptiveSchedulerTest.createJobGraph();
        DefaultDeclarativeSlotPool declarativeSlotPool = AdaptiveSchedulerTest.createDeclarativeSlotPool(jobGraph.getJobID());
        Configuration configuration = new Configuration();
        configuration.set(JobManagerOptions.RESOURCE_WAIT_TIMEOUT, (Object)Duration.ofMillis(1L));
        AdaptiveScheduler adaptiveScheduler = new AdaptiveSchedulerBuilder(jobGraph, this.singleThreadMainThreadExecutor, (ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor()).setInitializationTimestamp(42L).setDeclarativeSlotPool((DeclarativeSlotPool)declarativeSlotPool).setJobMasterConfiguration(configuration).build();
        SubmissionBufferingTaskManagerGateway taskManagerGateway = new SubmissionBufferingTaskManagerGateway(4);
        this.singleThreadMainThreadExecutor.execute(() -> {
            adaptiveScheduler.startScheduling();
            SlotPoolTestUtils.offerSlots((DeclarativeSlotPool)declarativeSlotPool, SlotPoolTestUtils.createSlotOffersForResourceRequirements(ResourceCounter.withResource((ResourceProfile)ResourceProfile.UNKNOWN, (int)4)), (TaskManagerGateway)taskManagerGateway);
        });
        taskManagerGateway.waitForSubmissions(1);
        ArchivedExecutionGraph executionGraph = CompletableFuture.supplyAsync(() -> adaptiveScheduler.requestJob().getArchivedExecutionGraph(), (Executor)this.singleThreadMainThreadExecutor).join();
        Assertions.assertThat((long)executionGraph.getStatusTimestamp(JobStatus.INITIALIZING)).isEqualTo(42L);
    }

    @Test
    void testInitializationTimestampForwarding() throws Exception {
        long expectedInitializationTimestamp = 42L;
        AdaptiveScheduler adaptiveScheduler = new AdaptiveSchedulerBuilder(AdaptiveSchedulerTest.createJobGraph(), this.mainThreadExecutor, (ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor()).setInitializationTimestamp(42L).build();
        long initializationTimestamp = adaptiveScheduler.requestJob().getArchivedExecutionGraph().getStatusTimestamp(JobStatus.INITIALIZING);
        Assertions.assertThat((long)initializationTimestamp).isEqualTo(42L);
    }

    @Test
    void testFatalErrorsForwardedToFatalErrorHandler() throws Exception {
        TestingFatalErrorHandler fatalErrorHandler = new TestingFatalErrorHandler();
        AdaptiveScheduler scheduler = new AdaptiveSchedulerBuilder(AdaptiveSchedulerTest.createJobGraph(), this.mainThreadExecutor, (ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor()).setFatalErrorHandler(fatalErrorHandler).build();
        RuntimeException exception = new RuntimeException();
        scheduler.runIfState(scheduler.getState(), () -> {
            throw exception;
        });
        Assertions.assertThat((Throwable)fatalErrorHandler.getException()).isEqualTo((Object)exception);
    }

    @Test
    void testResourceTimeout() throws Exception {
        ManuallyTriggeredComponentMainThreadExecutor mainThreadExecutor = new ManuallyTriggeredComponentMainThreadExecutor(Thread.currentThread());
        Duration resourceTimeout = Duration.ofMinutes(1234L);
        Configuration configuration = new Configuration();
        configuration.set(JobManagerOptions.RESOURCE_WAIT_TIMEOUT, (Object)resourceTimeout);
        AdaptiveScheduler scheduler = new AdaptiveSchedulerBuilder(AdaptiveSchedulerTest.createJobGraph(), mainThreadExecutor, (ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor()).setJobMasterConfiguration(configuration).build();
        scheduler.startScheduling();
        boolean b = mainThreadExecutor.getActiveNonPeriodicScheduledTask().stream().anyMatch(scheduledTask -> scheduledTask.getDelay(TimeUnit.MINUTES) == resourceTimeout.toMinutes());
        Assertions.assertThat((boolean)b).isTrue();
    }

    @Test
    void testNumRestartsMetric() throws Exception {
        CompletableFuture numRestartsMetricFuture = new CompletableFuture();
        TestingMetricRegistry metricRegistry = TestingMetricRegistry.builder().setRegisterConsumer((metric, name, group) -> {
            if ("numRestarts".equals(name)) {
                numRestartsMetricFuture.complete((Gauge)metric);
            }
        }).build();
        JobGraph jobGraph = AdaptiveSchedulerTest.createJobGraph();
        DefaultDeclarativeSlotPool declarativeSlotPool = new DefaultDeclarativeSlotPool(jobGraph.getJobID(), (AllocatedSlotPool)new DefaultAllocatedSlotPool(), ignored -> {}, Time.minutes((long)10L), Time.minutes((long)10L));
        Configuration configuration = AdaptiveSchedulerTest.createConfigurationWithNoTimeouts();
        configuration.set(JobManagerOptions.MIN_PARALLELISM_INCREASE, (Object)1);
        AdaptiveScheduler scheduler = new AdaptiveSchedulerBuilder(jobGraph, this.singleThreadMainThreadExecutor, (ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor()).setJobMasterConfiguration(configuration).setJobManagerJobMetricGroup(JobManagerMetricGroup.createJobManagerMetricGroup((MetricRegistry)metricRegistry, (String)"localhost").addJob(new JobID(), "jobName")).setDeclarativeSlotPool((DeclarativeSlotPool)declarativeSlotPool).build();
        Gauge numRestartsMetric = (Gauge)numRestartsMetricFuture.get();
        SubmissionBufferingTaskManagerGateway taskManagerGateway = new SubmissionBufferingTaskManagerGateway(5);
        taskManagerGateway.setCancelConsumer(this.createCancelConsumer((SchedulerNG)scheduler));
        this.singleThreadMainThreadExecutor.execute(() -> {
            scheduler.startScheduling();
            declarativeSlotPool.offerSlots(SlotPoolTestUtils.createSlotOffersForResourceRequirements(ResourceCounter.withResource((ResourceProfile)ResourceProfile.UNKNOWN, (int)1)), (TaskManagerLocation)new LocalTaskManagerLocation(), (TaskManagerGateway)taskManagerGateway, System.currentTimeMillis());
        });
        taskManagerGateway.waitForSubmissions(1);
        Assertions.assertThat((Long)((Long)numRestartsMetric.getValue())).isEqualTo(0L);
        this.singleThreadMainThreadExecutor.execute(() -> SlotPoolTestUtils.offerSlots((DeclarativeSlotPool)declarativeSlotPool, SlotPoolTestUtils.createSlotOffersForResourceRequirements(ResourceCounter.withResource((ResourceProfile)ResourceProfile.UNKNOWN, (int)4)), (TaskManagerGateway)taskManagerGateway));
        taskManagerGateway.waitForSubmissions(4);
        Assertions.assertThat((Long)((Long)numRestartsMetric.getValue())).isEqualTo(1L);
    }

    @Test
    void testStatusMetrics() throws Exception {
        CompletableFuture upTimeMetricFuture = new CompletableFuture();
        CompletableFuture downTimeMetricFuture = new CompletableFuture();
        CompletableFuture restartTimeMetricFuture = new CompletableFuture();
        TestingMetricRegistry metricRegistry = TestingMetricRegistry.builder().setRegisterConsumer((metric, name, group) -> {
            switch (name) {
                case "uptime": {
                    upTimeMetricFuture.complete((UpTimeGauge)metric);
                    break;
                }
                case "downtime": {
                    downTimeMetricFuture.complete((DownTimeGauge)metric);
                    break;
                }
                case "restartingTimeTotal": {
                    restartTimeMetricFuture.complete((Gauge)metric);
                }
            }
        }).build();
        JobGraph jobGraph = AdaptiveSchedulerTest.createJobGraph();
        DefaultDeclarativeSlotPool declarativeSlotPool = AdaptiveSchedulerTest.createDeclarativeSlotPool(jobGraph.getJobID());
        Configuration configuration = AdaptiveSchedulerTest.createConfigurationWithNoTimeouts();
        configuration.set(JobManagerOptions.MIN_PARALLELISM_INCREASE, (Object)1);
        configuration.set(JobManagerOptions.RESOURCE_WAIT_TIMEOUT, (Object)Duration.ofMillis(10L));
        configuration.set(MetricOptions.JOB_STATUS_METRICS, Arrays.asList(MetricOptions.JobStatusMetrics.TOTAL_TIME));
        AdaptiveScheduler scheduler = new AdaptiveSchedulerBuilder(jobGraph, this.singleThreadMainThreadExecutor, (ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor()).setJobMasterConfiguration(configuration).setJobManagerJobMetricGroup(JobManagerMetricGroup.createJobManagerMetricGroup((MetricRegistry)metricRegistry, (String)"localhost").addJob(new JobID(), "jobName")).setDeclarativeSlotPool((DeclarativeSlotPool)declarativeSlotPool).build();
        UpTimeGauge upTimeGauge = (UpTimeGauge)upTimeMetricFuture.get();
        DownTimeGauge downTimeGauge = (DownTimeGauge)downTimeMetricFuture.get();
        Gauge restartTimeGauge = (Gauge)restartTimeMetricFuture.get();
        SubmissionBufferingTaskManagerGateway taskManagerGateway = new SubmissionBufferingTaskManagerGateway(5);
        taskManagerGateway.setCancelConsumer(this.createCancelConsumer((SchedulerNG)scheduler));
        this.singleThreadMainThreadExecutor.execute(() -> {
            scheduler.startScheduling();
            SlotPoolTestUtils.offerSlots((DeclarativeSlotPool)declarativeSlotPool, SlotPoolTestUtils.createSlotOffersForResourceRequirements(ResourceCounter.withResource((ResourceProfile)ResourceProfile.UNKNOWN, (int)1)), (TaskManagerGateway)taskManagerGateway);
        });
        taskManagerGateway.waitForSubmissions(1);
        CommonTestUtils.waitUntilCondition((SupplierWithException<Boolean, Exception>)((SupplierWithException)() -> upTimeGauge.getValue() > 0L));
        Assertions.assertThat((Long)downTimeGauge.getValue()).isEqualTo(0L);
        Assertions.assertThat((Long)((Long)restartTimeGauge.getValue())).isEqualTo(0L);
        this.singleThreadMainThreadExecutor.execute(() -> SlotPoolTestUtils.offerSlots((DeclarativeSlotPool)declarativeSlotPool, SlotPoolTestUtils.createSlotOffersForResourceRequirements(ResourceCounter.withResource((ResourceProfile)ResourceProfile.UNKNOWN, (int)1)), (TaskManagerGateway)taskManagerGateway));
        taskManagerGateway.waitForSubmissions(2);
        CommonTestUtils.waitUntilCondition((SupplierWithException<Boolean, Exception>)((SupplierWithException)() -> upTimeGauge.getValue() > 0L));
        Assertions.assertThat((Long)downTimeGauge.getValue()).isEqualTo(0L);
        Assertions.assertThat((Long)((Long)restartTimeGauge.getValue())).isGreaterThanOrEqualTo(0L);
    }

    @Test
    void testStartSchedulingTransitionsToWaitingForResources() throws Exception {
        AdaptiveScheduler scheduler = new AdaptiveSchedulerBuilder(AdaptiveSchedulerTest.createJobGraph(), this.mainThreadExecutor, (ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor()).build();
        scheduler.startScheduling();
        Assertions.assertThat((Object)scheduler.getState()).isInstanceOf(WaitingForResources.class);
    }

    @Test
    void testStartSchedulingSetsResourceRequirementsForDefaultMode() throws Exception {
        JobGraph jobGraph = AdaptiveSchedulerTest.createJobGraph();
        DefaultDeclarativeSlotPool declarativeSlotPool = AdaptiveSchedulerTest.createDeclarativeSlotPool(jobGraph.getJobID());
        AdaptiveScheduler scheduler = new AdaptiveSchedulerBuilder(jobGraph, this.mainThreadExecutor, (ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor()).setDeclarativeSlotPool((DeclarativeSlotPool)declarativeSlotPool).build();
        scheduler.startScheduling();
        Assertions.assertThat((Collection)declarativeSlotPool.getResourceRequirements()).contains((Object[])new ResourceRequirement[]{ResourceRequirement.create((ResourceProfile)ResourceProfile.UNKNOWN, (int)4)});
    }

    @Test
    void testStartSchedulingSetsResourceRequirementsForReactiveMode() throws Exception {
        JobGraph jobGraph = AdaptiveSchedulerTest.createJobGraph();
        DefaultDeclarativeSlotPool declarativeSlotPool = AdaptiveSchedulerTest.createDeclarativeSlotPool(jobGraph.getJobID());
        Configuration configuration = new Configuration();
        configuration.set(JobManagerOptions.SCHEDULER_MODE, (Object)SchedulerExecutionMode.REACTIVE);
        AdaptiveScheduler scheduler = new AdaptiveSchedulerBuilder(jobGraph, this.mainThreadExecutor, (ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor()).setDeclarativeSlotPool((DeclarativeSlotPool)declarativeSlotPool).setJobMasterConfiguration(configuration).build();
        scheduler.startScheduling();
        int expectedParallelism = KeyGroupRangeAssignment.computeDefaultMaxParallelism((int)4);
        Assertions.assertThat((Collection)declarativeSlotPool.getResourceRequirements()).contains((Object[])new ResourceRequirement[]{ResourceRequirement.create((ResourceProfile)ResourceProfile.UNKNOWN, (int)expectedParallelism)});
    }

    @Test
    void testResourceAcquisitionTriggersJobExecution() throws Exception {
        JobGraph jobGraph = AdaptiveSchedulerTest.createJobGraph();
        DefaultDeclarativeSlotPool declarativeSlotPool = AdaptiveSchedulerTest.createDeclarativeSlotPool(jobGraph.getJobID());
        Configuration configuration = new Configuration();
        configuration.set(JobManagerOptions.RESOURCE_WAIT_TIMEOUT, (Object)Duration.ofMillis(1L));
        AdaptiveScheduler scheduler = new AdaptiveSchedulerBuilder(jobGraph, this.singleThreadMainThreadExecutor, (ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor()).setDeclarativeSlotPool((DeclarativeSlotPool)declarativeSlotPool).setJobMasterConfiguration(configuration).build();
        SubmissionBufferingTaskManagerGateway taskManagerGateway = new SubmissionBufferingTaskManagerGateway(4);
        CompletableFuture startingStateFuture = new CompletableFuture();
        this.singleThreadMainThreadExecutor.execute(() -> {
            scheduler.startScheduling();
            startingStateFuture.complete(scheduler.getState());
            SlotPoolTestUtils.offerSlots((DeclarativeSlotPool)declarativeSlotPool, SlotPoolTestUtils.createSlotOffersForResourceRequirements(ResourceCounter.withResource((ResourceProfile)ResourceProfile.UNKNOWN, (int)4)), (TaskManagerGateway)taskManagerGateway);
        });
        Assertions.assertThat(startingStateFuture.get()).isInstanceOf(WaitingForResources.class);
        taskManagerGateway.waitForSubmissions(4);
        ArchivedExecutionGraph executionGraph = CompletableFuture.supplyAsync(() -> scheduler.requestJob().getArchivedExecutionGraph(), (Executor)this.singleThreadMainThreadExecutor).get();
        Assertions.assertThat((int)executionGraph.getJobVertex(JOB_VERTEX.getID()).getParallelism()).isEqualTo(4);
    }

    @Test
    void testGoToFinished() throws Exception {
        AdaptiveScheduler scheduler = new AdaptiveSchedulerBuilder(AdaptiveSchedulerTest.createJobGraph(), this.mainThreadExecutor, (ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor()).build();
        ArchivedExecutionGraph archivedExecutionGraph = new ArchivedExecutionGraphBuilder().setState(JobStatus.FAILED).build();
        scheduler.goToFinished(archivedExecutionGraph);
        Assertions.assertThat((Object)scheduler.getState()).isInstanceOf(Finished.class);
    }

    @Test
    void testJobStatusListenerOnlyCalledIfJobStatusChanges() throws Exception {
        AtomicInteger numStatusUpdates = new AtomicInteger();
        AdaptiveScheduler scheduler = new AdaptiveSchedulerBuilder(AdaptiveSchedulerTest.createJobGraph(), this.mainThreadExecutor, (ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor()).setJobStatusListener((jobId, newJobStatus, timestamp) -> numStatusUpdates.incrementAndGet()).build();
        ((AbstractComparableAssert)Assertions.assertThat((Comparable)scheduler.requestJobStatus()).withFailMessage("Assumption about job status for Scheduler@Created is incorrect.", new Object[0])).isEqualTo((Object)JobStatus.INITIALIZING);
        scheduler.transitionToState((StateFactory)new DummyState.Factory(JobStatus.INITIALIZING));
        Assertions.assertThat((int)numStatusUpdates.get()).isEqualTo(0);
    }

    @Test
    void testJobStatusListenerNotifiedOfJobStatusChanges() throws Exception {
        JobGraph jobGraph = AdaptiveSchedulerTest.createJobGraph();
        DefaultDeclarativeSlotPool declarativeSlotPool = AdaptiveSchedulerTest.createDeclarativeSlotPool(jobGraph.getJobID());
        Configuration configuration = new Configuration();
        configuration.set(JobManagerOptions.RESOURCE_WAIT_TIMEOUT, (Object)Duration.ofMillis(1L));
        CompletableFuture jobCreatedNotification = new CompletableFuture();
        CompletableFuture jobRunningNotification = new CompletableFuture();
        CompletableFuture jobFinishedNotification = new CompletableFuture();
        CompletableFuture unexpectedJobStatusNotification = new CompletableFuture();
        AdaptiveScheduler scheduler = new AdaptiveSchedulerBuilder(jobGraph, this.singleThreadMainThreadExecutor, (ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor()).setJobMasterConfiguration(configuration).setJobStatusListener((jobId, newJobStatus, timestamp) -> {
            switch (newJobStatus) {
                case CREATED: {
                    jobCreatedNotification.complete(null);
                    break;
                }
                case RUNNING: {
                    jobRunningNotification.complete(null);
                    break;
                }
                case FINISHED: {
                    jobFinishedNotification.complete(null);
                    break;
                }
                default: {
                    unexpectedJobStatusNotification.complete(newJobStatus);
                }
            }
        }).setDeclarativeSlotPool((DeclarativeSlotPool)declarativeSlotPool).build();
        SubmissionBufferingTaskManagerGateway taskManagerGateway = new SubmissionBufferingTaskManagerGateway(5);
        this.singleThreadMainThreadExecutor.execute(() -> {
            scheduler.startScheduling();
            SlotPoolTestUtils.offerSlots((DeclarativeSlotPool)declarativeSlotPool, SlotPoolTestUtils.createSlotOffersForResourceRequirements(ResourceCounter.withResource((ResourceProfile)ResourceProfile.UNKNOWN, (int)1)), (TaskManagerGateway)taskManagerGateway);
        });
        TaskDeploymentDescriptor submittedTask = taskManagerGateway.submittedTasks.take();
        this.singleThreadMainThreadExecutor.execute(() -> scheduler.updateTaskExecutionState(new TaskExecutionState(submittedTask.getExecutionAttemptId(), ExecutionState.FINISHED)));
        jobCreatedNotification.get();
        jobRunningNotification.get();
        jobFinishedNotification.get();
        Assertions.assertThat((boolean)unexpectedJobStatusNotification.isDone()).isFalse();
    }

    @Test
    void testCloseShutsDownCheckpointingComponents() throws Exception {
        CompletableFuture<JobStatus> completedCheckpointStoreShutdownFuture = new CompletableFuture<JobStatus>();
        TestingCompletedCheckpointStore completedCheckpointStore = TestingCompletedCheckpointStore.createStoreWithShutdownCheckAndNoCompletedCheckpoints(completedCheckpointStoreShutdownFuture);
        CompletableFuture<JobStatus> checkpointIdCounterShutdownFuture = new CompletableFuture<JobStatus>();
        TestingCheckpointIDCounter checkpointIdCounter = TestingCheckpointIDCounter.createStoreWithShutdownCheckAndNoStartAction(checkpointIdCounterShutdownFuture);
        JobGraph jobGraph = AdaptiveSchedulerTest.createJobGraph();
        jobGraph.setSnapshotSettings(new JobCheckpointingSettings(CheckpointCoordinatorConfiguration.builder().build(), null));
        AdaptiveScheduler scheduler = new AdaptiveSchedulerBuilder(jobGraph, this.singleThreadMainThreadExecutor, (ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor()).setCheckpointRecoveryFactory(new TestingCheckpointRecoveryFactory(completedCheckpointStore, checkpointIdCounter)).build();
        this.singleThreadMainThreadExecutor.execute(() -> {
            scheduler.startScheduling();
            scheduler.handleGlobalFailure((Throwable)new FlinkException("Test exception"));
            scheduler.closeAsync();
        });
        Assertions.assertThat((Comparable)((Comparable)completedCheckpointStoreShutdownFuture.get())).isEqualTo((Object)JobStatus.FAILED);
        Assertions.assertThat((Comparable)((Comparable)checkpointIdCounterShutdownFuture.get())).isEqualTo((Object)JobStatus.FAILED);
    }

    @Test
    void testTransitionToStateCallsOnLeave() throws Exception {
        AdaptiveScheduler scheduler = new AdaptiveSchedulerBuilder(AdaptiveSchedulerTest.createJobGraph(), this.mainThreadExecutor, (ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor()).build();
        LifecycleMethodCapturingState firstState = new LifecycleMethodCapturingState();
        scheduler.transitionToState((StateFactory)new StateInstanceFactory(firstState));
        firstState.reset();
        scheduler.transitionToState((StateFactory)new DummyState.Factory());
        Assertions.assertThat((boolean)firstState.onLeaveCalled).isTrue();
        Assertions.assertThat((boolean)firstState.onLeaveNewStateArgument.equals(DummyState.class)).isTrue();
    }

    @Test
    void testConsistentMaxParallelism() throws Exception {
        int parallelism = 240;
        int expectedMaxParallelism = KeyGroupRangeAssignment.computeDefaultMaxParallelism((int)240);
        JobVertex vertex = ExecutionGraphTestUtils.createNoOpVertex(240);
        JobGraph jobGraph = JobGraphTestUtils.streamingJobGraph(vertex);
        DefaultDeclarativeSlotPool declarativeSlotPool = AdaptiveSchedulerTest.createDeclarativeSlotPool(jobGraph.getJobID());
        AdaptiveScheduler scheduler = new AdaptiveSchedulerBuilder(jobGraph, this.singleThreadMainThreadExecutor, (ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor()).setDeclarativeSlotPool((DeclarativeSlotPool)declarativeSlotPool).setJobMasterConfiguration(AdaptiveSchedulerTest.createConfigurationWithNoTimeouts()).build();
        SubmissionBufferingTaskManagerGateway taskManagerGateway = new SubmissionBufferingTaskManagerGateway(241);
        taskManagerGateway.setCancelConsumer(this.createCancelConsumer((SchedulerNG)scheduler));
        this.singleThreadMainThreadExecutor.execute(() -> {
            scheduler.startScheduling();
            SlotPoolTestUtils.offerSlots((DeclarativeSlotPool)declarativeSlotPool, SlotPoolTestUtils.createSlotOffersForResourceRequirements(ResourceCounter.withResource((ResourceProfile)ResourceProfile.UNKNOWN, (int)1)), (TaskManagerGateway)taskManagerGateway);
        });
        taskManagerGateway.waitForSubmissions(1);
        ArchivedExecutionGraph executionGraph = this.getArchivedExecutionGraphForRunningJob((SchedulerNG)scheduler).get();
        ArchivedExecutionJobVertex archivedVertex = executionGraph.getJobVertex(vertex.getID());
        Assertions.assertThat((int)archivedVertex.getParallelism()).isEqualTo(1);
        Assertions.assertThat((int)archivedVertex.getMaxParallelism()).isEqualTo(expectedMaxParallelism);
        this.singleThreadMainThreadExecutor.execute(() -> SlotPoolTestUtils.offerSlots((DeclarativeSlotPool)declarativeSlotPool, SlotPoolTestUtils.createSlotOffersForResourceRequirements(ResourceCounter.withResource((ResourceProfile)ResourceProfile.UNKNOWN, (int)240)), (TaskManagerGateway)taskManagerGateway));
        taskManagerGateway.waitForSubmissions(240);
        ArchivedExecutionGraph resubmittedExecutionGraph = this.getArchivedExecutionGraphForRunningJob((SchedulerNG)scheduler).get();
        ArchivedExecutionJobVertex resubmittedArchivedVertex = resubmittedExecutionGraph.getJobVertex(vertex.getID());
        Assertions.assertThat((int)resubmittedArchivedVertex.getParallelism()).isEqualTo(240);
        Assertions.assertThat((int)resubmittedArchivedVertex.getMaxParallelism()).isEqualTo(expectedMaxParallelism);
    }

    @Test
    void testRequirementIncreaseTriggersScaleUp() throws Exception {
        JobGraph jobGraph = AdaptiveSchedulerTest.createJobGraph();
        DefaultDeclarativeSlotPool declarativeSlotPool = AdaptiveSchedulerTest.createDeclarativeSlotPool(jobGraph.getJobID());
        AdaptiveScheduler scheduler = this.createSchedulerWithNoTimeouts(jobGraph, (DeclarativeSlotPool)declarativeSlotPool);
        int scaledUpParallelism = 8;
        SubmissionBufferingTaskManagerGateway taskManagerGateway = this.createSubmissionBufferingTaskManagerGateway(8, (SchedulerNG)scheduler);
        this.startJobWithSlotsMatchingParallelism((SchedulerNG)scheduler, (DeclarativeSlotPool)declarativeSlotPool, taskManagerGateway, 4);
        this.awaitJobReachingParallelism(taskManagerGateway, (SchedulerNG)scheduler, 4);
        JobResourceRequirements newJobResourceRequirements = AdaptiveSchedulerTest.createRequirementsWithUpperParallelism(8);
        this.singleThreadMainThreadExecutor.execute(() -> {
            scheduler.updateJobResourceRequirements(newJobResourceRequirements);
            SlotPoolTestUtils.offerSlots((DeclarativeSlotPool)declarativeSlotPool, SlotPoolTestUtils.createSlotOffersForResourceRequirements(ResourceCounter.withResource((ResourceProfile)ResourceProfile.UNKNOWN, (int)4)), (TaskManagerGateway)taskManagerGateway);
        });
        this.awaitJobReachingParallelism(taskManagerGateway, (SchedulerNG)scheduler, 8);
    }

    @Test
    void testRequirementDecreaseTriggersScaleDown() throws Exception {
        JobGraph jobGraph = AdaptiveSchedulerTest.createJobGraph();
        DefaultDeclarativeSlotPool declarativeSlotPool = AdaptiveSchedulerTest.createDeclarativeSlotPool(jobGraph.getJobID());
        AdaptiveScheduler scheduler = this.createSchedulerWithNoTimeouts(jobGraph, (DeclarativeSlotPool)declarativeSlotPool);
        SubmissionBufferingTaskManagerGateway taskManagerGateway = this.createSubmissionBufferingTaskManagerGateway(4, (SchedulerNG)scheduler);
        this.startJobWithSlotsMatchingParallelism((SchedulerNG)scheduler, (DeclarativeSlotPool)declarativeSlotPool, taskManagerGateway, 4);
        this.awaitJobReachingParallelism(taskManagerGateway, (SchedulerNG)scheduler, 4);
        int scaledDownParallelism = 3;
        JobResourceRequirements newJobResourceRequirements = AdaptiveSchedulerTest.createRequirementsWithUpperParallelism(scaledDownParallelism);
        this.singleThreadMainThreadExecutor.execute(() -> scheduler.updateJobResourceRequirements(newJobResourceRequirements));
        this.awaitJobReachingParallelism(taskManagerGateway, (SchedulerNG)scheduler, scaledDownParallelism);
    }

    @Test
    void testRequirementLowerBoundIncreaseBelowCurrentParallelismDoesNotTriggerRescale() throws Exception {
        JobGraph jobGraph = AdaptiveSchedulerTest.createJobGraph();
        DefaultDeclarativeSlotPool declarativeSlotPool = AdaptiveSchedulerTest.createDeclarativeSlotPool(jobGraph.getJobID());
        AdaptiveScheduler scheduler = this.createSchedulerWithNoTimeouts(jobGraph, (DeclarativeSlotPool)declarativeSlotPool);
        SubmissionBufferingTaskManagerGateway taskManagerGateway = this.createSubmissionBufferingTaskManagerGateway(4, (SchedulerNG)scheduler);
        this.startJobWithSlotsMatchingParallelism((SchedulerNG)scheduler, (DeclarativeSlotPool)declarativeSlotPool, taskManagerGateway, 4);
        this.awaitJobReachingParallelism(taskManagerGateway, (SchedulerNG)scheduler, 4);
        JobResourceRequirements newJobResourceRequirements = AdaptiveSchedulerTest.createRequirementsWithEqualLowerAndUpperParallelism(4);
        CompletableFuture<Void> asyncAssertion = CompletableFuture.runAsync(() -> {
            State state = scheduler.getState();
            scheduler.updateJobResourceRequirements(newJobResourceRequirements);
            Assertions.assertThat((Object)scheduler.getState()).isSameAs((Object)state);
            Assertions.assertThat(taskManagerGateway.submittedTasks).isEmpty();
        }, (Executor)this.singleThreadMainThreadExecutor);
        FlinkAssertions.assertThatFuture(asyncAssertion).eventuallySucceeds();
    }

    @Test
    void testRequirementLowerBoundIncreaseBeyondCurrentParallelismKeepsJobRunning() throws Exception {
        JobGraph jobGraph = AdaptiveSchedulerTest.createJobGraph();
        DefaultDeclarativeSlotPool declarativeSlotPool = AdaptiveSchedulerTest.createDeclarativeSlotPool(jobGraph.getJobID());
        AdaptiveScheduler scheduler = this.createSchedulerWithNoTimeouts(jobGraph, (DeclarativeSlotPool)declarativeSlotPool);
        int scaledUpParallelism = 40;
        SubmissionBufferingTaskManagerGateway taskManagerGateway = this.createSubmissionBufferingTaskManagerGateway(scaledUpParallelism, (SchedulerNG)scheduler);
        this.startJobWithSlotsMatchingParallelism((SchedulerNG)scheduler, (DeclarativeSlotPool)declarativeSlotPool, taskManagerGateway, 4);
        this.awaitJobReachingParallelism(taskManagerGateway, (SchedulerNG)scheduler, 4);
        JobResourceRequirements newJobResourceRequirements = AdaptiveSchedulerTest.createRequirementsWithEqualLowerAndUpperParallelism(scaledUpParallelism);
        FlinkAssertions.assertThatFuture(CompletableFuture.runAsync(() -> {
            State originalState = scheduler.getState();
            scheduler.updateJobResourceRequirements(newJobResourceRequirements);
            Assertions.assertThat((Object)scheduler.getState()).isSameAs((Object)originalState);
        }, (Executor)this.singleThreadMainThreadExecutor)).eventuallySucceeds();
        FlinkAssertions.assertThatFuture(CompletableFuture.runAsync(() -> {
            State originalState = scheduler.getState();
            SlotPoolTestUtils.offerSlots((DeclarativeSlotPool)declarativeSlotPool, SlotPoolTestUtils.createSlotOffersForResourceRequirements(ResourceCounter.withResource((ResourceProfile)ResourceProfile.UNKNOWN, (int)4)), (TaskManagerGateway)taskManagerGateway);
            Assertions.assertThat((Object)scheduler.getState()).isSameAs((Object)originalState);
        }, (Executor)this.singleThreadMainThreadExecutor)).eventuallySucceeds();
        FlinkAssertions.assertThatFuture(CompletableFuture.runAsync(() -> SlotPoolTestUtils.offerSlots((DeclarativeSlotPool)declarativeSlotPool, SlotPoolTestUtils.createSlotOffersForResourceRequirements(ResourceCounter.withResource((ResourceProfile)ResourceProfile.UNKNOWN, (int)32)), (TaskManagerGateway)taskManagerGateway), (Executor)this.singleThreadMainThreadExecutor)).eventuallySucceeds();
        this.awaitJobReachingParallelism(taskManagerGateway, (SchedulerNG)scheduler, scaledUpParallelism);
    }

    @Test
    void testInitialRequirementLowerBoundBeyondAvailableSlotsCausesImmediateFailure() throws Exception {
        JobGraph jobGraph = AdaptiveSchedulerTest.createJobGraph();
        DefaultDeclarativeSlotPool declarativeSlotPool = AdaptiveSchedulerTest.createDeclarativeSlotPool(jobGraph.getJobID());
        boolean availableSlots = true;
        JobResourceRequirements initialJobResourceRequirements = AdaptiveSchedulerTest.createRequirementsWithEqualLowerAndUpperParallelism(4);
        AdaptiveScheduler scheduler = this.prepareSchedulerWithNoTimeouts(jobGraph, (DeclarativeSlotPool)declarativeSlotPool).withConfigurationOverride(conf -> {
            conf.set(JobManagerOptions.RESOURCE_WAIT_TIMEOUT, (Object)Duration.ofMillis(1L));
            return conf;
        }).setJobResourceRequirements(initialJobResourceRequirements).build();
        SubmissionBufferingTaskManagerGateway taskManagerGateway = this.createSubmissionBufferingTaskManagerGateway(4, (SchedulerNG)scheduler);
        this.startJobWithSlotsMatchingParallelism((SchedulerNG)scheduler, (DeclarativeSlotPool)declarativeSlotPool, taskManagerGateway, 1);
        FlinkAssertions.assertThatFuture((CompletableFuture)scheduler.getJobTerminationFuture()).eventuallySucceeds().isEqualTo((Object)JobStatus.FAILED);
        Assertions.assertThat(taskManagerGateway.submittedTasks).isEmpty();
    }

    @Test
    void testRequirementLowerBoundDecreaseAfterResourceScarcityBelowAvailableSlots() throws Exception {
        JobGraph jobGraph = AdaptiveSchedulerTest.createJobGraph();
        DefaultDeclarativeSlotPool declarativeSlotPool = AdaptiveSchedulerTest.createDeclarativeSlotPool(jobGraph.getJobID());
        boolean availableSlots = true;
        JobResourceRequirements initialJobResourceRequirements = AdaptiveSchedulerTest.createRequirementsWithEqualLowerAndUpperParallelism(4);
        AdaptiveScheduler scheduler = this.prepareSchedulerWithNoTimeouts(jobGraph, (DeclarativeSlotPool)declarativeSlotPool).setJobResourceRequirements(initialJobResourceRequirements).build();
        SubmissionBufferingTaskManagerGateway taskManagerGateway = this.createSubmissionBufferingTaskManagerGateway(4, (SchedulerNG)scheduler);
        this.startJobWithSlotsMatchingParallelism((SchedulerNG)scheduler, (DeclarativeSlotPool)declarativeSlotPool, taskManagerGateway, 1);
        JobResourceRequirements newJobResourceRequirements = AdaptiveSchedulerTest.createRequirementsWithLowerAndUpperParallelism(1, 4);
        this.singleThreadMainThreadExecutor.execute(() -> scheduler.updateJobResourceRequirements(newJobResourceRequirements));
        this.awaitJobReachingParallelism(taskManagerGateway, (SchedulerNG)scheduler, 1);
    }

    private static Configuration createConfigurationWithNoTimeouts() {
        return new Configuration().set(JobManagerOptions.RESOURCE_WAIT_TIMEOUT, (Object)Duration.ofMillis(-1L)).set(JobManagerOptions.RESOURCE_STABILIZATION_TIMEOUT, (Object)Duration.ofMillis(1L)).set(JobManagerOptions.SCHEDULER_SCALING_INTERVAL_MIN, (Object)Duration.ofMillis(1L));
    }

    private AdaptiveSchedulerBuilder prepareSchedulerWithNoTimeouts(JobGraph jobGraph, DeclarativeSlotPool declarativeSlotPool) {
        return new AdaptiveSchedulerBuilder(jobGraph, this.singleThreadMainThreadExecutor, (ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor()).setDeclarativeSlotPool(declarativeSlotPool).setJobMasterConfiguration(AdaptiveSchedulerTest.createConfigurationWithNoTimeouts());
    }

    private AdaptiveScheduler createSchedulerWithNoTimeouts(JobGraph jobGraph, DeclarativeSlotPool declarativeSlotPool) throws Exception {
        return this.prepareSchedulerWithNoTimeouts(jobGraph, declarativeSlotPool).build();
    }

    private SubmissionBufferingTaskManagerGateway createSubmissionBufferingTaskManagerGateway(int parallelism, SchedulerNG scheduler) {
        SubmissionBufferingTaskManagerGateway taskManagerGateway = new SubmissionBufferingTaskManagerGateway(parallelism);
        taskManagerGateway.setCancelConsumer(executionAttemptID -> this.singleThreadMainThreadExecutor.execute(() -> scheduler.updateTaskExecutionState(new TaskExecutionState(executionAttemptID, ExecutionState.CANCELED))));
        return taskManagerGateway;
    }

    private void startJobWithSlotsMatchingParallelism(SchedulerNG scheduler, DeclarativeSlotPool declarativeSlotPool, TaskManagerGateway taskManagerGateway, int parallelism) {
        this.singleThreadMainThreadExecutor.execute(() -> {
            scheduler.startScheduling();
            SlotPoolTestUtils.offerSlots(declarativeSlotPool, SlotPoolTestUtils.createSlotOffersForResourceRequirements(ResourceCounter.withResource((ResourceProfile)ResourceProfile.UNKNOWN, (int)parallelism)), taskManagerGateway);
        });
    }

    private void awaitJobReachingParallelism(SubmissionBufferingTaskManagerGateway taskManagerGateway, SchedulerNG scheduler, int parallelism) throws Exception {
        taskManagerGateway.waitForSubmissions(parallelism);
        ArchivedExecutionGraph executionGraph = CompletableFuture.supplyAsync(() -> scheduler.requestJob().getArchivedExecutionGraph(), (Executor)this.singleThreadMainThreadExecutor).get();
        Assertions.assertThat((int)executionGraph.getJobVertex(JOB_VERTEX.getID()).getParallelism()).isEqualTo(parallelism);
    }

    private static JobResourceRequirements createRequirementsWithUpperParallelism(int parallelism) {
        return AdaptiveSchedulerTest.createRequirementsWithLowerAndUpperParallelism(1, parallelism);
    }

    private static JobResourceRequirements createRequirementsWithEqualLowerAndUpperParallelism(int parallelism) {
        return AdaptiveSchedulerTest.createRequirementsWithLowerAndUpperParallelism(parallelism, parallelism);
    }

    private static JobResourceRequirements createRequirementsWithLowerAndUpperParallelism(int lowerParallelism, int upperParallelism) {
        return new JobResourceRequirements(Collections.singletonMap(JOB_VERTEX.getID(), new JobVertexResourceRequirements(new JobVertexResourceRequirements.Parallelism(lowerParallelism, upperParallelism))));
    }

    @Test
    void testHowToHandleFailureRejectedByStrategy() throws Exception {
        AdaptiveScheduler scheduler = new AdaptiveSchedulerBuilder(AdaptiveSchedulerTest.createJobGraph(), this.mainThreadExecutor, (ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor()).setRestartBackoffTimeStrategy((RestartBackoffTimeStrategy)NoRestartBackoffTimeStrategy.INSTANCE).build();
        Assertions.assertThat((boolean)scheduler.howToHandleFailure((Throwable)new Exception("test")).canRestart()).isFalse();
    }

    @Test
    void testHowToHandleFailureAllowedByStrategy() throws Exception {
        TestRestartBackoffTimeStrategy restartBackoffTimeStrategy = new TestRestartBackoffTimeStrategy(true, 1234L);
        AdaptiveScheduler scheduler = new AdaptiveSchedulerBuilder(AdaptiveSchedulerTest.createJobGraph(), this.mainThreadExecutor, (ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor()).setRestartBackoffTimeStrategy(restartBackoffTimeStrategy).build();
        FailureResult failureResult = scheduler.howToHandleFailure((Throwable)new Exception("test"));
        Assertions.assertThat((boolean)failureResult.canRestart()).isTrue();
        Assertions.assertThat((long)failureResult.getBackoffTime().toMillis()).isEqualTo(restartBackoffTimeStrategy.getBackoffTime());
    }

    @Test
    void testHowToHandleFailureUnrecoverableFailure() throws Exception {
        AdaptiveScheduler scheduler = new AdaptiveSchedulerBuilder(AdaptiveSchedulerTest.createJobGraph(), this.mainThreadExecutor, (ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor()).build();
        Assertions.assertThat((boolean)scheduler.howToHandleFailure((Throwable)new SuppressRestartsException((Throwable)new Exception("test"))).canRestart()).isFalse();
    }

    @Test
    void testExceptionHistoryWithGlobalFailureLabels() throws Exception {
        Exception expectedException = new Exception("Global Exception to label");
        BiConsumer<AdaptiveScheduler, List<ExecutionAttemptID>> testLogic = (scheduler, attemptIds) -> scheduler.handleGlobalFailure((Throwable)expectedException);
        TestingFailureEnricher failureEnricher = new TestingFailureEnricher();
        Iterable<RootExceptionHistoryEntry> actualExceptionHistory = new ExceptionHistoryTester(this.singleThreadMainThreadExecutor).withTestLogic(testLogic).withFailureEnrichers(Collections.singletonList(failureEnricher)).run();
        Assertions.assertThat(actualExceptionHistory).hasSize(1);
        RootExceptionHistoryEntry failure = actualExceptionHistory.iterator().next();
        Assertions.assertThat((Object)failure.getTaskManagerLocation()).isNull();
        Assertions.assertThat((String)failure.getFailingTaskName()).isNull();
        Assertions.assertThat((Collection)failureEnricher.getSeenThrowables()).containsExactly((Object[])new Throwable[]{expectedException});
        Assertions.assertThat((Map)failure.getFailureLabels()).isEqualTo((Object)failureEnricher.getFailureLabels());
        Assertions.assertThat((Throwable)failure.getException().deserializeError(this.classLoader)).isEqualTo((Object)expectedException);
    }

    @Test
    void testExceptionHistoryWithGlobalFailure() throws Exception {
        Exception expectedException = new Exception("Expected Global Exception");
        BiConsumer<AdaptiveScheduler, List<ExecutionAttemptID>> testLogic = (scheduler, attemptIds) -> scheduler.handleGlobalFailure((Throwable)expectedException);
        Iterable<RootExceptionHistoryEntry> actualExceptionHistory = new ExceptionHistoryTester(this.singleThreadMainThreadExecutor).withTestLogic(testLogic).run();
        Assertions.assertThat(actualExceptionHistory).hasSize(1);
        RootExceptionHistoryEntry failure = actualExceptionHistory.iterator().next();
        Assertions.assertThat((Object)failure.getTaskManagerLocation()).isNull();
        Assertions.assertThat((String)failure.getFailingTaskName()).isNull();
        Assertions.assertThat((Throwable)failure.getException().deserializeError(this.classLoader)).isEqualTo((Object)expectedException);
    }

    @Test
    void testExceptionHistoryWithTaskFailureLabels() throws Exception {
        Exception taskException = new Exception("Task Exception");
        BiConsumer<AdaptiveScheduler, List<ExecutionAttemptID>> testLogic = (scheduler, attemptIds) -> {
            ExecutionAttemptID attemptId = (ExecutionAttemptID)attemptIds.get(1);
            scheduler.updateTaskExecutionState(new TaskExecutionStateTransition(new TaskExecutionState(attemptId, ExecutionState.FAILED, (Throwable)taskException)));
        };
        TestingFailureEnricher failureEnricher = new TestingFailureEnricher();
        Iterable<RootExceptionHistoryEntry> actualExceptionHistory = new ExceptionHistoryTester(this.singleThreadMainThreadExecutor).withFailureEnrichers(Collections.singletonList(failureEnricher)).withTestLogic(testLogic).run();
        Assertions.assertThat(actualExceptionHistory).hasSize(1);
        RootExceptionHistoryEntry failure = actualExceptionHistory.iterator().next();
        Assertions.assertThat((Throwable)failure.getException().deserializeError(this.classLoader)).isEqualTo((Object)taskException);
        Assertions.assertThat((Map)failure.getFailureLabels()).isEqualTo((Object)failureEnricher.getFailureLabels());
    }

    @Test
    void testExceptionHistoryWithTaskFailure() throws Exception {
        Exception expectedException = new Exception("Expected Local Exception");
        BiConsumer<AdaptiveScheduler, List<ExecutionAttemptID>> testLogic = (scheduler, attemptIds) -> {
            ExecutionAttemptID attemptId = (ExecutionAttemptID)attemptIds.get(1);
            scheduler.updateTaskExecutionState(new TaskExecutionStateTransition(new TaskExecutionState(attemptId, ExecutionState.FAILED, (Throwable)expectedException)));
        };
        Iterable<RootExceptionHistoryEntry> actualExceptionHistory = new ExceptionHistoryTester(this.singleThreadMainThreadExecutor).withTestLogic(testLogic).run();
        Assertions.assertThat(actualExceptionHistory).hasSize(1);
        RootExceptionHistoryEntry failure = actualExceptionHistory.iterator().next();
        Assertions.assertThat((Throwable)failure.getException().deserializeError(this.classLoader)).isEqualTo((Object)expectedException);
    }

    @Test
    void testExceptionHistoryWithTaskFailureWithRestart() throws Exception {
        Exception expectedException = new Exception("Expected Local Exception");
        Consumer<AdaptiveSchedulerBuilder> setupScheduler = builder -> builder.setRestartBackoffTimeStrategy(new FixedDelayRestartBackoffTimeStrategy.FixedDelayRestartBackoffTimeStrategyFactory(1, 100L).create());
        BiConsumer<AdaptiveScheduler, List<ExecutionAttemptID>> testLogic = (scheduler, attemptIds) -> {
            ExecutionAttemptID attemptId = (ExecutionAttemptID)attemptIds.get(1);
            scheduler.updateTaskExecutionState(new TaskExecutionStateTransition(new TaskExecutionState(attemptId, ExecutionState.FAILED, (Throwable)expectedException)));
        };
        Iterable<RootExceptionHistoryEntry> actualExceptionHistory = new ExceptionHistoryTester(this.singleThreadMainThreadExecutor).withTestLogic(testLogic).withModifiedScheduler(setupScheduler).run();
        Assertions.assertThat(actualExceptionHistory).hasSize(1);
        RootExceptionHistoryEntry failure = actualExceptionHistory.iterator().next();
        Assertions.assertThat((Throwable)failure.getException().deserializeError(this.classLoader)).isEqualTo((Object)expectedException);
    }

    @Test
    void testExceptionHistoryWithTaskFailureFromStopWithSavepoint() throws Exception {
        Exception expectedException = new Exception("Expected Local Exception");
        Consumer<JobGraph> setupJobGraph = jobGraph -> jobGraph.setSnapshotSettings(new JobCheckpointingSettings(CheckpointCoordinatorConfiguration.builder().setCheckpointInterval(Long.MAX_VALUE).build(), null));
        StandaloneCompletedCheckpointStore completedCheckpointStore = new StandaloneCompletedCheckpointStore(1);
        StandaloneCheckpointIDCounter checkpointIDCounter = new StandaloneCheckpointIDCounter();
        CheckpointsCleaner checkpointCleaner = new CheckpointsCleaner();
        TestingCheckpointRecoveryFactory checkpointRecoveryFactory = new TestingCheckpointRecoveryFactory((CompletedCheckpointStore)completedCheckpointStore, (CheckpointIDCounter)checkpointIDCounter);
        Consumer<AdaptiveSchedulerBuilder> setupScheduler = builder -> builder.setCheckpointRecoveryFactory(checkpointRecoveryFactory).setCheckpointCleaner(checkpointCleaner);
        BiConsumer<AdaptiveScheduler, List<ExecutionAttemptID>> testLogic = (arg_0, arg_1) -> AdaptiveSchedulerTest.lambda$testExceptionHistoryWithTaskFailureFromStopWithSavepoint$47(expectedException, (CheckpointIDCounter)checkpointIDCounter, arg_0, arg_1);
        Iterable<RootExceptionHistoryEntry> actualExceptionHistory = new ExceptionHistoryTester(this.singleThreadMainThreadExecutor).withTestLogic(testLogic).withModifiedScheduler(setupScheduler).withModifiedJobGraph(setupJobGraph).run();
        Assertions.assertThat(actualExceptionHistory).hasSize(1);
        RootExceptionHistoryEntry failure = actualExceptionHistory.iterator().next();
        Assertions.assertThat((Throwable)failure.getException().deserializeError(this.classLoader)).isEqualTo((Object)expectedException);
    }

    @Test
    void testExceptionHistoryWithTaskConcurrentGlobalFailure() throws Exception {
        Exception expectedException1 = new Exception("Expected Global Exception 1");
        Exception expectedException2 = new Exception("Expected Global Exception 2");
        BiConsumer<AdaptiveScheduler, List<ExecutionAttemptID>> testLogic = (scheduler, attemptIds) -> {
            scheduler.handleGlobalFailure((Throwable)expectedException1);
            scheduler.handleGlobalFailure((Throwable)expectedException2);
        };
        Iterable<RootExceptionHistoryEntry> entries = new ExceptionHistoryTester(this.singleThreadMainThreadExecutor).withTestLogic(testLogic).run();
        Assertions.assertThat(entries).hasSize(1);
        RootExceptionHistoryEntry failure = entries.iterator().next();
        Assertions.assertThat((Throwable)failure.getException().deserializeError(this.classLoader)).isEqualTo((Object)expectedException1);
        Iterable concurrentExceptions = failure.getConcurrentExceptions();
        List foundExceptions = IterableUtils.toStream((Iterable)concurrentExceptions).map(ErrorInfo::getException).map(exception -> exception.deserializeError(this.classLoader)).collect(Collectors.toList());
        Assertions.assertThat(foundExceptions).containsExactly((Object[])new Throwable[]{expectedException2});
    }

    @Test
    void testExceptionHistoryWithTaskConcurrentFailure() throws Exception {
        Exception expectedException1 = new Exception("Expected Local Exception 1");
        Exception expectedException2 = new Exception("Expected Local Exception 2");
        BiConsumer<AdaptiveScheduler, List<ExecutionAttemptID>> testLogic = (scheduler, attemptIds) -> {
            ExecutionAttemptID attemptId = (ExecutionAttemptID)attemptIds.remove(0);
            ExecutionAttemptID attemptId2 = (ExecutionAttemptID)attemptIds.remove(0);
            scheduler.updateTaskExecutionState(new TaskExecutionStateTransition(new TaskExecutionState(attemptId, ExecutionState.FAILED, (Throwable)expectedException1)));
            scheduler.updateTaskExecutionState(new TaskExecutionStateTransition(new TaskExecutionState(attemptId2, ExecutionState.FAILED, (Throwable)expectedException2)));
        };
        Iterable<RootExceptionHistoryEntry> entries = new ExceptionHistoryTester(this.singleThreadMainThreadExecutor).withTestLogic(testLogic).run();
        Assertions.assertThat(entries).hasSize(1);
        RootExceptionHistoryEntry failure = entries.iterator().next();
        Assertions.assertThat((Throwable)failure.getException().deserializeError(this.classLoader)).isEqualTo((Object)expectedException1);
        Iterable concurrentExceptions = failure.getConcurrentExceptions();
        List foundExceptions = IterableUtils.toStream((Iterable)concurrentExceptions).map(ErrorInfo::getException).map(exception -> exception.deserializeError(this.classLoader)).collect(Collectors.toList());
        Assertions.assertThat(foundExceptions).isEmpty();
    }

    @Test
    void testRepeatedTransitionIntoCurrentStateFails() throws Exception {
        AdaptiveScheduler scheduler = new AdaptiveSchedulerBuilder(AdaptiveSchedulerTest.createJobGraph(), this.mainThreadExecutor, (ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor()).build();
        State state = scheduler.getState();
        Assertions.assertThat((Object)state).isInstanceOf(Created.class);
        Assertions.assertThatThrownBy(() -> {
            Created cfr_ignored_0 = (Created)scheduler.transitionToState((StateFactory)new Created.Factory((Created.Context)scheduler, LOG));
        }).isInstanceOf(IllegalStateException.class);
    }

    @Test
    void testTriggerSavepointFailsInIllegalState() throws Exception {
        AdaptiveScheduler scheduler = new AdaptiveSchedulerBuilder(AdaptiveSchedulerTest.createJobGraph(), this.mainThreadExecutor, (ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor()).build();
        FlinkAssertions.assertThatFuture((CompletableFuture)scheduler.triggerSavepoint("some directory", false, SavepointFormatType.CANONICAL)).eventuallyFailsWith(ExecutionException.class).withCauseInstanceOf(CheckpointException.class);
    }

    @Test
    void testStopWithSavepointFailsInIllegalState() throws Exception {
        AdaptiveScheduler scheduler = new AdaptiveSchedulerBuilder(AdaptiveSchedulerTest.createJobGraph(), this.mainThreadExecutor, (ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor()).build();
        FlinkAssertions.assertThatFuture((CompletableFuture)scheduler.triggerSavepoint("some directory", false, SavepointFormatType.CANONICAL)).eventuallyFailsWith(ExecutionException.class).withCauseInstanceOf(CheckpointException.class);
    }

    @Test
    void testDeliverOperatorEventToCoordinatorFailsInIllegalState() throws Exception {
        AdaptiveScheduler scheduler = new AdaptiveSchedulerBuilder(AdaptiveSchedulerTest.createJobGraph(), this.mainThreadExecutor, (ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor()).build();
        Assertions.assertThatThrownBy(() -> scheduler.deliverOperatorEventToCoordinator(ExecutionGraphTestUtils.createExecutionAttemptId(), new OperatorID(), (OperatorEvent)new TestOperatorEvent())).isInstanceOf(TaskNotRunningException.class);
    }

    @Test
    void testDeliverCoordinationRequestToCoordinatorFailsInIllegalState() throws Exception {
        AdaptiveScheduler scheduler = new AdaptiveSchedulerBuilder(AdaptiveSchedulerTest.createJobGraph(), this.mainThreadExecutor, (ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor()).build();
        FlinkAssertions.assertThatFuture((CompletableFuture)scheduler.deliverCoordinationRequestToCoordinator(new OperatorID(), new CoordinationRequest(){})).eventuallyFailsWith(ExecutionException.class).withCauseInstanceOf(FlinkException.class);
    }

    @Test
    void testUpdateTaskExecutionStateReturnsFalseInIllegalState() throws Exception {
        JobGraph jobGraph = AdaptiveSchedulerTest.createJobGraph();
        AdaptiveScheduler scheduler = new AdaptiveSchedulerBuilder(jobGraph, this.mainThreadExecutor, (ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor()).build();
        Assertions.assertThat((boolean)scheduler.updateTaskExecutionState(new TaskExecutionStateTransition(new TaskExecutionState(ExecutionGraphTestUtils.createExecutionAttemptId(), ExecutionState.FAILED)))).isFalse();
    }

    @Test
    void testRequestNextInputSplitFailsInIllegalState() throws Exception {
        AdaptiveScheduler scheduler = new AdaptiveSchedulerBuilder(AdaptiveSchedulerTest.createJobGraph(), this.mainThreadExecutor, (ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor()).build();
        Assertions.assertThatThrownBy(() -> scheduler.requestNextInputSplit(JOB_VERTEX.getID(), ExecutionGraphTestUtils.createExecutionAttemptId())).isInstanceOf(IOException.class);
    }

    @Test
    public void testRequestPartitionStateFailsInIllegalState() throws Exception {
        AdaptiveScheduler scheduler = new AdaptiveSchedulerBuilder(AdaptiveSchedulerTest.createJobGraph(), this.mainThreadExecutor, (ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor()).build();
        Assertions.assertThatThrownBy(() -> scheduler.requestPartitionState(new IntermediateDataSetID(), new ResultPartitionID())).isInstanceOf(PartitionProducerDisposedException.class);
    }

    @Test
    void testTryToAssignSlotsReturnsNotPossibleIfExpectedResourcesAreNotAvailable() throws Exception {
        TestingSlotAllocator slotAllocator = TestingSlotAllocator.newBuilder().setTryReserveResourcesFunction(ignored -> Optional.empty()).build();
        AdaptiveScheduler adaptiveScheduler = new AdaptiveSchedulerBuilder(AdaptiveSchedulerTest.createJobGraph(), this.mainThreadExecutor, (ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor()).setSlotAllocator(slotAllocator).build();
        CreatingExecutionGraph.AssignmentResult assignmentResult = adaptiveScheduler.tryToAssignSlots(CreatingExecutionGraph.ExecutionGraphWithVertexParallelism.create((ExecutionGraph)new StateTrackingMockExecutionGraph(), (JobSchedulingPlan)JobSchedulingPlan.empty()));
        Assertions.assertThat((boolean)assignmentResult.isSuccess()).isFalse();
    }

    @Test
    void testComputeVertexParallelismStoreForExecutionInReactiveMode() {
        JobVertex v1 = ExecutionGraphTestUtils.createNoOpVertex("v1", 1, 50);
        JobVertex v2 = ExecutionGraphTestUtils.createNoOpVertex("v2", 50, 50);
        JobGraph graph = JobGraphTestUtils.streamingJobGraph(v1, v2);
        VertexParallelismStore parallelismStore = AdaptiveScheduler.computeVertexParallelismStoreForExecution((JobGraph)graph, (SchedulerExecutionMode)SchedulerExecutionMode.REACTIVE, SchedulerBase::getDefaultMaxParallelism);
        for (JobVertex vertex : graph.getVertices()) {
            VertexParallelismInformation info = parallelismStore.getParallelismInfo(vertex.getID());
            Assertions.assertThat((int)info.getParallelism()).isEqualTo(vertex.getParallelism());
            Assertions.assertThat((int)info.getMaxParallelism()).isEqualTo(vertex.getMaxParallelism());
        }
    }

    @Test
    void testComputeVertexParallelismStoreForExecutionInDefaultMode() {
        JobVertex v1 = ExecutionGraphTestUtils.createNoOpVertex("v1", 1, 50);
        JobVertex v2 = ExecutionGraphTestUtils.createNoOpVertex("v2", 50, 50);
        JobGraph graph = JobGraphTestUtils.streamingJobGraph(v1, v2);
        VertexParallelismStore parallelismStore = AdaptiveScheduler.computeVertexParallelismStoreForExecution((JobGraph)graph, null, SchedulerBase::getDefaultMaxParallelism);
        for (JobVertex vertex : graph.getVertices()) {
            VertexParallelismInformation info = parallelismStore.getParallelismInfo(vertex.getID());
            Assertions.assertThat((int)info.getParallelism()).isEqualTo(vertex.getParallelism());
            Assertions.assertThat((int)info.getMaxParallelism()).isEqualTo(vertex.getMaxParallelism());
        }
    }

    @Test
    void testCheckpointCleanerIsClosedAfterCheckpointServices() throws Exception {
        ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
        try {
            DefaultSchedulerTest.doTestCheckpointCleanerIsClosedAfterCheckpointServices((checkpointRecoveryFactory, checkpointCleaner) -> {
                JobGraph jobGraph = AdaptiveSchedulerTest.createJobGraph();
                SchedulerTestingUtils.enableCheckpointing(jobGraph);
                try {
                    return new AdaptiveSchedulerBuilder(jobGraph, ComponentMainThreadExecutorServiceAdapter.forSingleThreadExecutor(executorService), (ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor()).setCheckpointRecoveryFactory((CheckpointRecoveryFactory)checkpointRecoveryFactory).setCheckpointCleaner((CheckpointsCleaner)checkpointCleaner).build();
                }
                catch (Exception e) {
                    throw new RuntimeException(e);
                }
            }, executorService, LOG);
        }
        finally {
            executorService.shutdownNow();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    void testIdleSlotsAreReleasedAfterDownScalingTriggeredByLoweredResourceRequirements() throws Exception {
        JobGraph jobGraph = AdaptiveSchedulerTest.createJobGraph();
        Duration slotIdleTimeout = Duration.ofMillis(10L);
        Configuration configuration = AdaptiveSchedulerTest.createConfigurationWithNoTimeouts();
        configuration.set(JobManagerOptions.SLOT_IDLE_TIMEOUT, (Object)slotIdleTimeout.toMillis());
        DefaultDeclarativeSlotPool declarativeSlotPool = AdaptiveSchedulerTest.createDeclarativeSlotPool(jobGraph.getJobID(), slotIdleTimeout);
        AdaptiveScheduler scheduler = new AdaptiveSchedulerBuilder(jobGraph, this.singleThreadMainThreadExecutor, (ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor()).setDeclarativeSlotPool((DeclarativeSlotPool)declarativeSlotPool).setJobMasterConfiguration(configuration).build();
        try {
            int numInitialSlots = 4;
            int numSlotsAfterDownscaling = 2;
            SubmissionBufferingTaskManagerGateway taskManagerGateway = new SubmissionBufferingTaskManagerGateway(4);
            taskManagerGateway.setCancelConsumer(this.createCancelConsumer((SchedulerNG)scheduler));
            this.singleThreadMainThreadExecutor.execute(() -> AdaptiveSchedulerTest.lambda$testIdleSlotsAreReleasedAfterDownScalingTriggeredByLoweredResourceRequirements$58(scheduler, (DeclarativeSlotPool)declarativeSlotPool, taskManagerGateway));
            taskManagerGateway.waitForSubmissions(4);
            this.singleThreadMainThreadExecutor.execute(() -> scheduler.updateJobResourceRequirements(JobResourceRequirements.newBuilder().setParallelismForJobVertex(JOB_VERTEX.getID(), 1, 2).build()));
            taskManagerGateway.waitForSubmissions(2);
            taskManagerGateway.waitForFreedSlots(2);
            CompletableFuture jobStatusFuture = new CompletableFuture();
            this.singleThreadMainThreadExecutor.execute(() -> jobStatusFuture.complete(scheduler.getState().getJobStatus()));
            FlinkAssertions.assertThatFuture(jobStatusFuture).eventuallySucceeds().isEqualTo((Object)JobStatus.RUNNING);
            Assertions.assertThat(taskManagerGateway.freedSlots).isEmpty();
        }
        finally {
            CompletableFuture closeFuture = new CompletableFuture();
            this.singleThreadMainThreadExecutor.execute(() -> FutureUtils.forward((CompletableFuture)scheduler.closeAsync(), (CompletableFuture)closeFuture));
            FlinkAssertions.assertThatFuture(closeFuture).eventuallySucceeds();
        }
    }

    @Test
    public void testUpdateResourceRequirementsInReactiveModeIsNotSupported() throws Exception {
        Configuration configuration = new Configuration();
        configuration.set(JobManagerOptions.SCHEDULER_MODE, (Object)SchedulerExecutionMode.REACTIVE);
        AdaptiveScheduler scheduler = new AdaptiveSchedulerBuilder(AdaptiveSchedulerTest.createJobGraph(), this.mainThreadExecutor, (ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor()).setJobMasterConfiguration(configuration).build();
        Assertions.assertThatThrownBy(() -> scheduler.updateJobResourceRequirements(JobResourceRequirements.empty())).isInstanceOf(UnsupportedOperationException.class);
    }

    @Test
    public void testRequestDefaultResourceRequirements() throws Exception {
        JobGraph jobGraph = AdaptiveSchedulerTest.createJobGraph();
        Configuration configuration = new Configuration();
        AdaptiveScheduler scheduler = new AdaptiveSchedulerBuilder(jobGraph, this.mainThreadExecutor, (ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor()).setJobMasterConfiguration(configuration).build();
        Assertions.assertThat((Object)scheduler.requestJobResourceRequirements()).isEqualTo((Object)JobResourceRequirements.newBuilder().setParallelismForJobVertex(JOB_VERTEX.getID(), 1, JOB_VERTEX.getParallelism()).build());
    }

    @Test
    public void testRequestDefaultResourceRequirementsInReactiveMode() throws Exception {
        JobGraph jobGraph = AdaptiveSchedulerTest.createJobGraph();
        Configuration configuration = new Configuration();
        configuration.set(JobManagerOptions.SCHEDULER_MODE, (Object)SchedulerExecutionMode.REACTIVE);
        AdaptiveScheduler scheduler = new AdaptiveSchedulerBuilder(jobGraph, this.mainThreadExecutor, (ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor()).setJobMasterConfiguration(configuration).build();
        Assertions.assertThat((Object)scheduler.requestJobResourceRequirements()).isEqualTo((Object)JobResourceRequirements.newBuilder().setParallelismForJobVertex(JOB_VERTEX.getID(), 1, SchedulerBase.getDefaultMaxParallelism((JobVertex)JOB_VERTEX)).build());
    }

    @Test
    public void testRequestUpdatedResourceRequirements() throws Exception {
        JobGraph jobGraph = AdaptiveSchedulerTest.createJobGraph();
        Configuration configuration = new Configuration();
        AdaptiveScheduler scheduler = new AdaptiveSchedulerBuilder(jobGraph, this.mainThreadExecutor, (ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor()).setJobMasterConfiguration(configuration).build();
        JobResourceRequirements newJobResourceRequirements = JobResourceRequirements.newBuilder().setParallelismForJobVertex(JOB_VERTEX.getID(), 1, 12).build();
        Assertions.assertThat((Object)scheduler.requestJobResourceRequirements()).isNotEqualTo((Object)newJobResourceRequirements);
        scheduler.updateJobResourceRequirements(newJobResourceRequirements);
        Assertions.assertThat((Object)scheduler.requestJobResourceRequirements()).isEqualTo((Object)newJobResourceRequirements);
        JobResourceRequirements newJobResourceRequirements2 = JobResourceRequirements.newBuilder().setParallelismForJobVertex(JOB_VERTEX.getID(), 4, 12).build();
        Assertions.assertThat((Object)scheduler.requestJobResourceRequirements()).isNotEqualTo((Object)newJobResourceRequirements2);
        scheduler.updateJobResourceRequirements(newJobResourceRequirements2);
        Assertions.assertThat((Object)scheduler.requestJobResourceRequirements()).isEqualTo((Object)newJobResourceRequirements2);
    }

    @Test
    public void testScalingIntervalConfigurationIsRespected() throws Exception {
        JobGraph jobGraph = AdaptiveSchedulerTest.createJobGraph();
        DefaultDeclarativeSlotPool declarativeSlotPool = AdaptiveSchedulerTest.createDeclarativeSlotPool(jobGraph.getJobID());
        Duration scalingIntervalMin = Duration.ofMillis(1337L);
        Duration scalingIntervalMax = Duration.ofMillis(7331L);
        Configuration configuration = AdaptiveSchedulerTest.createConfigurationWithNoTimeouts();
        configuration.set(JobManagerOptions.SCHEDULER_SCALING_INTERVAL_MIN, (Object)scalingIntervalMin);
        configuration.set(JobManagerOptions.SCHEDULER_SCALING_INTERVAL_MAX, (Object)scalingIntervalMax);
        AdaptiveScheduler scheduler = this.prepareSchedulerWithNoTimeouts(jobGraph, (DeclarativeSlotPool)declarativeSlotPool).setJobMasterConfiguration(configuration).build();
        SubmissionBufferingTaskManagerGateway taskManagerGateway = new SubmissionBufferingTaskManagerGateway(4);
        this.startJobWithSlotsMatchingParallelism((SchedulerNG)scheduler, (DeclarativeSlotPool)declarativeSlotPool, taskManagerGateway, 4);
        taskManagerGateway.waitForSubmissions(4);
        CompletableFuture executingFuture = new CompletableFuture();
        this.singleThreadMainThreadExecutor.execute(() -> {
            Optional maybeExecuting = scheduler.getState().as(Executing.class);
            if (maybeExecuting.isPresent()) {
                executingFuture.complete(maybeExecuting.get());
            } else {
                executingFuture.completeExceptionally(new IllegalStateException(String.format("State is not [%s].", Executing.class)));
            }
        });
        FlinkAssertions.assertThatFuture(executingFuture).eventuallySucceeds().satisfies(new ThrowingConsumer[]{executing -> {
            Assertions.assertThat((Duration)executing.scalingIntervalMin).isEqualTo((Object)scalingIntervalMin);
            Assertions.assertThat((Duration)executing.scalingIntervalMax).isEqualTo((Object)scalingIntervalMax);
        }});
        CompletableFuture closeFuture = new CompletableFuture();
        this.singleThreadMainThreadExecutor.execute(() -> FutureUtils.forward((CompletableFuture)scheduler.closeAsync(), (CompletableFuture)closeFuture));
        FlinkAssertions.assertThatFuture(closeFuture).eventuallySucceeds();
    }

    private CompletableFuture<ArchivedExecutionGraph> getArchivedExecutionGraphForRunningJob(SchedulerNG scheduler) {
        return CompletableFuture.supplyAsync(() -> {
            ArchivedExecutionGraph graph = null;
            while (graph == null || graph.getState() != JobStatus.RUNNING) {
                graph = scheduler.requestJob().getArchivedExecutionGraph();
            }
            return graph;
        }, (Executor)this.singleThreadMainThreadExecutor);
    }

    private Consumer<ExecutionAttemptID> createCancelConsumer(SchedulerNG scheduler) {
        return executionAttemptId -> this.singleThreadMainThreadExecutor.execute(() -> scheduler.updateTaskExecutionState(new TaskExecutionState(executionAttemptId, ExecutionState.CANCELED)));
    }

    private static DefaultDeclarativeSlotPool createDeclarativeSlotPool(JobID jobId) {
        return AdaptiveSchedulerTest.createDeclarativeSlotPool(jobId, DEFAULT_TIMEOUT);
    }

    private static DefaultDeclarativeSlotPool createDeclarativeSlotPool(JobID jobId, Duration idleSlotTimeout) {
        return new DefaultDeclarativeSlotPool(jobId, (AllocatedSlotPool)new DefaultAllocatedSlotPool(), ignored -> {}, Time.fromDuration((Duration)idleSlotTimeout), Time.fromDuration((Duration)DEFAULT_TIMEOUT));
    }

    private static JobGraph createJobGraph() {
        return JobGraphTestUtils.streamingJobGraph(JOB_VERTEX);
    }

    private static /* synthetic */ void lambda$testIdleSlotsAreReleasedAfterDownScalingTriggeredByLoweredResourceRequirements$58(AdaptiveScheduler scheduler, DeclarativeSlotPool declarativeSlotPool, SubmissionBufferingTaskManagerGateway taskManagerGateway) {
        scheduler.startScheduling();
        SlotPoolTestUtils.offerSlots(declarativeSlotPool, SlotPoolTestUtils.createSlotOffersForResourceRequirements(ResourceCounter.withResource((ResourceProfile)ResourceProfile.UNKNOWN, (int)4)), (TaskManagerGateway)taskManagerGateway);
    }

    private static /* synthetic */ void lambda$testExceptionHistoryWithTaskFailureFromStopWithSavepoint$47(Exception expectedException, CheckpointIDCounter checkpointIDCounter, AdaptiveScheduler scheduler, List attemptIds) {
        ExecutionAttemptID attemptId = (ExecutionAttemptID)attemptIds.get(1);
        scheduler.stopWithSavepoint("file:///tmp/target", true, SavepointFormatType.CANONICAL);
        scheduler.updateTaskExecutionState(new TaskExecutionStateTransition(new TaskExecutionState(attemptId, ExecutionState.FAILED, (Throwable)expectedException)));
        for (ExecutionAttemptID id : attemptIds) {
            scheduler.declineCheckpoint(new DeclineCheckpoint(scheduler.requestJob().getJobId(), id, checkpointIDCounter.get() - 1L, new CheckpointException(CheckpointFailureReason.IO_EXCEPTION)));
        }
    }

    private static class ExceptionHistoryTester {
        private final ComponentMainThreadExecutor mainThreadExecutor;
        private BiConsumer<AdaptiveScheduler, List<ExecutionAttemptID>> testLogic = (scheduler, attempts) -> {};
        private Consumer<AdaptiveSchedulerBuilder> schedulerModifier = ignored -> {};
        private Consumer<JobGraph> jobGraphModifier = ignored -> {};
        private Collection<FailureEnricher> failureEnrichers = Collections.emptySet();

        ExceptionHistoryTester(ComponentMainThreadExecutor mainThreadExecutor) {
            this.mainThreadExecutor = mainThreadExecutor;
        }

        ExceptionHistoryTester withTestLogic(BiConsumer<AdaptiveScheduler, List<ExecutionAttemptID>> testLogic) {
            this.testLogic = testLogic;
            return this;
        }

        ExceptionHistoryTester withModifiedScheduler(Consumer<AdaptiveSchedulerBuilder> schedulerModifier) {
            this.schedulerModifier = schedulerModifier;
            return this;
        }

        ExceptionHistoryTester withModifiedJobGraph(Consumer<JobGraph> jobGraphModifier) {
            this.jobGraphModifier = jobGraphModifier;
            return this;
        }

        ExceptionHistoryTester withFailureEnrichers(Collection<FailureEnricher> failureEnrichers) {
            this.failureEnrichers = failureEnrichers;
            return this;
        }

        Iterable<RootExceptionHistoryEntry> run() throws Exception {
            JobGraph jobGraph = AdaptiveSchedulerTest.createJobGraph();
            this.jobGraphModifier.accept(jobGraph);
            StandaloneCompletedCheckpointStore completedCheckpointStore = new StandaloneCompletedCheckpointStore(1);
            StandaloneCheckpointIDCounter checkpointIDCounter = new StandaloneCheckpointIDCounter();
            CheckpointsCleaner checkpointCleaner = new CheckpointsCleaner();
            TestingCheckpointRecoveryFactory checkpointRecoveryFactory = new TestingCheckpointRecoveryFactory((CompletedCheckpointStore)completedCheckpointStore, (CheckpointIDCounter)checkpointIDCounter);
            DefaultDeclarativeSlotPool declarativeSlotPool = AdaptiveSchedulerTest.createDeclarativeSlotPool(jobGraph.getJobID());
            Configuration configuration = new Configuration();
            configuration.set(JobManagerOptions.RESOURCE_WAIT_TIMEOUT, (Object)Duration.ofMillis(1L));
            AdaptiveSchedulerBuilder builder = new AdaptiveSchedulerBuilder(jobGraph, this.mainThreadExecutor, (ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor()).setJobMasterConfiguration(configuration).setDeclarativeSlotPool((DeclarativeSlotPool)declarativeSlotPool).setCheckpointRecoveryFactory(checkpointRecoveryFactory).setCheckpointCleaner(checkpointCleaner).setFailureEnrichers(this.failureEnrichers);
            this.schedulerModifier.accept(builder);
            AdaptiveScheduler scheduler = builder.build();
            SubmissionBufferingTaskManagerGateway taskManagerGateway = new SubmissionBufferingTaskManagerGateway(4);
            taskManagerGateway.setCancelConsumer(attemptId -> this.mainThreadExecutor.execute(() -> scheduler.updateTaskExecutionState(new TaskExecutionStateTransition(new TaskExecutionState(attemptId, ExecutionState.CANCELED, null)))));
            this.mainThreadExecutor.execute(() -> {
                scheduler.startScheduling();
                SlotPoolTestUtils.offerSlots((DeclarativeSlotPool)declarativeSlotPool, SlotPoolTestUtils.createSlotOffersForResourceRequirements(ResourceCounter.withResource((ResourceProfile)ResourceProfile.UNKNOWN, (int)4)), (TaskManagerGateway)taskManagerGateway);
            });
            taskManagerGateway.waitForSubmissions(4);
            CompletableFuture vertexFuture = new CompletableFuture();
            this.mainThreadExecutor.execute(() -> vertexFuture.complete(scheduler.requestJob().getArchivedExecutionGraph().getAllExecutionVertices()));
            Iterable executionVertices = (Iterable)vertexFuture.get();
            List attemptIds = IterableUtils.toStream((Iterable)executionVertices).map(ArchivedExecutionVertex::getCurrentExecutionAttempt).map(ArchivedExecution::getAttemptId).collect(Collectors.toList());
            CompletableFuture<Void> runTestLogicFuture = CompletableFuture.runAsync(() -> this.testLogic.accept(scheduler, attemptIds), (Executor)this.mainThreadExecutor);
            runTestLogicFuture.get();
            this.mainThreadExecutor.execute(() -> ((AdaptiveScheduler)scheduler).cancel());
            scheduler.getJobTerminationFuture().get();
            return scheduler.requestJob().getExceptionHistory();
        }
    }

    static class DummyState
    implements State {
        private final JobStatus jobStatus;

        public DummyState() {
            this(JobStatus.RUNNING);
        }

        public DummyState(JobStatus jobStatus) {
            this.jobStatus = jobStatus;
        }

        public void cancel() {
        }

        public void suspend(Throwable cause) {
        }

        public JobStatus getJobStatus() {
            return this.jobStatus;
        }

        public ArchivedExecutionGraph getJob() {
            return null;
        }

        public void handleGlobalFailure(Throwable cause, CompletableFuture<Map<String, String>> failureLabels) {
        }

        public Logger getLogger() {
            return null;
        }

        private static class Factory
        implements StateFactory<DummyState> {
            private final JobStatus jobStatus;

            public Factory() {
                this(JobStatus.RUNNING);
            }

            public Factory(JobStatus jobStatus) {
                this.jobStatus = jobStatus;
            }

            public Class<DummyState> getStateClass() {
                return DummyState.class;
            }

            public DummyState getState() {
                return new DummyState(this.jobStatus);
            }
        }
    }

    private static class StateInstanceFactory
    implements StateFactory<LifecycleMethodCapturingState> {
        private final LifecycleMethodCapturingState instance;

        public StateInstanceFactory(LifecycleMethodCapturingState instance) {
            this.instance = instance;
        }

        public Class<LifecycleMethodCapturingState> getStateClass() {
            return LifecycleMethodCapturingState.class;
        }

        public LifecycleMethodCapturingState getState() {
            return this.instance;
        }
    }

    public static class SubmissionBufferingTaskManagerGateway
    extends SimpleAckingTaskManagerGateway {
        final BlockingQueue<TaskDeploymentDescriptor> submittedTasks;
        final BlockingQueue<AllocationID> freedSlots;

        public SubmissionBufferingTaskManagerGateway(int capacity) {
            this.submittedTasks = new ArrayBlockingQueue<TaskDeploymentDescriptor>(capacity);
            this.freedSlots = new ArrayBlockingQueue<AllocationID>(capacity);
            this.initializeFunctions();
        }

        @Override
        public void setSubmitConsumer(Consumer<TaskDeploymentDescriptor> submitConsumer) {
            super.setSubmitConsumer(taskDeploymentDescriptor -> {
                Preconditions.checkState((boolean)this.submittedTasks.offer((TaskDeploymentDescriptor)taskDeploymentDescriptor));
                submitConsumer.accept((TaskDeploymentDescriptor)taskDeploymentDescriptor);
            });
        }

        @Override
        public void setFreeSlotFunction(BiFunction<AllocationID, Throwable, CompletableFuture<Acknowledge>> freeSlotFunction) {
            super.setFreeSlotFunction((allocationID, throwable) -> {
                Preconditions.checkState((boolean)this.freedSlots.offer((AllocationID)allocationID));
                return (CompletableFuture)freeSlotFunction.apply((AllocationID)allocationID, (Throwable)throwable);
            });
        }

        public List<TaskDeploymentDescriptor> waitForSubmissions(int numSubmissions) throws InterruptedException {
            ArrayList<TaskDeploymentDescriptor> descriptors = new ArrayList<TaskDeploymentDescriptor>();
            for (int i = 0; i < numSubmissions; ++i) {
                descriptors.add(this.submittedTasks.take());
            }
            return descriptors;
        }

        public List<AllocationID> waitForFreedSlots(int numFreedSlots) throws InterruptedException {
            ArrayList<AllocationID> allocationIds = new ArrayList<AllocationID>();
            for (int i = 0; i < numFreedSlots; ++i) {
                allocationIds.add(this.freedSlots.take());
            }
            return allocationIds;
        }

        private void initializeFunctions() {
            this.setSubmitConsumer(ignored -> {});
            this.setFreeSlotFunction((allocationId, throwable) -> CompletableFuture.completedFuture(Acknowledge.get()));
        }
    }

    private static class LifecycleMethodCapturingState
    extends DummyState {
        boolean onLeaveCalled = false;
        @Nullable
        Class<? extends State> onLeaveNewStateArgument = null;

        private LifecycleMethodCapturingState() {
        }

        void reset() {
            this.onLeaveCalled = false;
            this.onLeaveNewStateArgument = null;
        }

        public void onLeave(Class<? extends State> newState) {
            this.onLeaveCalled = true;
            this.onLeaveNewStateArgument = newState;
        }
    }
}

