/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.scheduler.adaptive;

import java.time.Duration;
import java.util.Comparator;
import java.util.PriorityQueue;
import java.util.Queue;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import javax.annotation.Nullable;
import org.apache.flink.core.testutils.ScheduledTask;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.scheduler.adaptive.AdaptiveSchedulerTest;
import org.apache.flink.runtime.scheduler.adaptive.MockStateWithoutExecutionGraphContext;
import org.apache.flink.runtime.scheduler.adaptive.State;
import org.apache.flink.runtime.scheduler.adaptive.StateTransitionManager;
import org.apache.flink.runtime.scheduler.adaptive.StateValidator;
import org.apache.flink.runtime.scheduler.adaptive.TestingStateTransitionManager;
import org.apache.flink.runtime.scheduler.adaptive.WaitingForResources;
import org.apache.flink.util.clock.ManualClock;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.AtomicBooleanAssert;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtensionContext;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class WaitingForResourcesTest {
    private static final Logger LOG = LoggerFactory.getLogger(WaitingForResourcesTest.class);
    private static final Duration DISABLED_RESOURCE_WAIT_TIMEOUT = Duration.ofSeconds(-1L);
    @RegisterExtension
    private MockContext ctx = new MockContext();

    WaitingForResourcesTest() {
    }

    @Test
    void testTransitionToCreatingExecutionGraph() {
        AtomicBoolean onTriggerCalled = new AtomicBoolean();
        Function<StateTransitionManager.Context, StateTransitionManager> stateTransitionManagerFactory = context -> new TestingStateTransitionManager(() -> {
            Assertions.assertThat((boolean)context.hasDesiredResources()).isTrue();
            Assertions.assertThat((boolean)context.hasSufficientResources()).isTrue();
            context.transitionToSubsequentState();
        }, () -> onTriggerCalled.set(true));
        this.ctx.setHasDesiredResources(() -> true);
        this.ctx.setHasSufficientResources(() -> true);
        this.ctx.setExpectCreatingExecutionGraph();
        new WaitingForResources((WaitingForResources.Context)this.ctx, LOG, DISABLED_RESOURCE_WAIT_TIMEOUT, stateTransitionManagerFactory);
        this.ctx.runScheduledTasks();
        Assertions.assertThat((boolean)onTriggerCalled.get()).isTrue();
    }

    @Test
    void testNotEnoughResources() {
        AtomicBoolean onChangeCalled = new AtomicBoolean();
        AtomicBoolean onTriggerCalled = new AtomicBoolean();
        Function<StateTransitionManager.Context, StateTransitionManager> stateTransitionManagerFactory = context -> new TestingStateTransitionManager(() -> {
            onChangeCalled.set(true);
            Assertions.assertThat((boolean)context.hasDesiredResources()).isFalse();
            Assertions.assertThat((boolean)context.hasSufficientResources()).isFalse();
        }, () -> onTriggerCalled.set(true));
        this.ctx.setHasDesiredResources(() -> false);
        this.ctx.setHasSufficientResources(() -> false);
        WaitingForResources wfr = new WaitingForResources((WaitingForResources.Context)this.ctx, LOG, DISABLED_RESOURCE_WAIT_TIMEOUT, stateTransitionManagerFactory);
        this.ctx.runScheduledTasks();
        Assertions.assertThat((boolean)this.ctx.hasStateTransition()).isFalse();
        Assertions.assertThat((boolean)onChangeCalled.get()).isTrue();
        Assertions.assertThat((boolean)onTriggerCalled.get()).isTrue();
    }

    @Test
    void testNotifyNewResourcesAvailable() {
        AtomicInteger callsCounter = new AtomicInteger();
        Function<StateTransitionManager.Context, StateTransitionManager> stateTransitionManagerFactory = context -> TestingStateTransitionManager.withOnChangeEventOnly(() -> {
            if (callsCounter.incrementAndGet() == 0) {
                Assertions.assertThat((boolean)context.hasDesiredResources()).isFalse();
                Assertions.assertThat((boolean)context.hasSufficientResources()).isFalse();
            }
            if (context.hasDesiredResources() && context.hasSufficientResources()) {
                context.transitionToSubsequentState();
            }
        });
        this.ctx.setHasDesiredResources(() -> false);
        this.ctx.setHasSufficientResources(() -> false);
        WaitingForResources wfr = new WaitingForResources((WaitingForResources.Context)this.ctx, LOG, DISABLED_RESOURCE_WAIT_TIMEOUT, stateTransitionManagerFactory);
        this.ctx.runScheduledTasks();
        this.ctx.setHasDesiredResources(() -> true);
        this.ctx.setHasSufficientResources(() -> true);
        this.ctx.setExpectCreatingExecutionGraph();
        wfr.onNewResourcesAvailable();
    }

    @Test
    void testSchedulingWithSufficientResources() {
        Function<StateTransitionManager.Context, StateTransitionManager> stateTransitionManagerFactory = context -> TestingStateTransitionManager.withOnChangeEventOnly(() -> {
            Assertions.assertThat((boolean)context.hasDesiredResources()).isFalse();
            if (context.hasSufficientResources()) {
                context.transitionToSubsequentState();
            }
        });
        WaitingForResources wfr = new WaitingForResources((WaitingForResources.Context)this.ctx, LOG, DISABLED_RESOURCE_WAIT_TIMEOUT, stateTransitionManagerFactory);
        this.ctx.runScheduledTasks();
        Assertions.assertThat((boolean)this.ctx.hasStateTransition()).isFalse();
        this.ctx.setHasDesiredResources(() -> false);
        this.ctx.setHasSufficientResources(() -> true);
        this.ctx.setExpectCreatingExecutionGraph();
        wfr.onNewResourcesAvailable();
    }

    @Test
    void testNoStateTransitionOnNoResourceTimeout() {
        this.ctx.setHasDesiredResources(() -> false);
        this.ctx.setHasSufficientResources(() -> false);
        WaitingForResources wfr = new WaitingForResources((WaitingForResources.Context)this.ctx, LOG, DISABLED_RESOURCE_WAIT_TIMEOUT, context -> TestingStateTransitionManager.withNoOp());
        this.ctx.runScheduledTasks();
        Assertions.assertThat((boolean)this.ctx.hasStateTransition()).isFalse();
    }

    @Test
    void testStateTransitionOnResourceTimeout() {
        WaitingForResources wfr = new WaitingForResources((WaitingForResources.Context)this.ctx, LOG, Duration.ZERO, context -> TestingStateTransitionManager.withNoOp());
        this.ctx.setExpectCreatingExecutionGraph();
        this.ctx.runScheduledTasks();
    }

    @Test
    void testInternalRunScheduledTasks_correctExecutionOrder() {
        AtomicBoolean firstRun = new AtomicBoolean(false);
        AtomicBoolean secondRun = new AtomicBoolean(false);
        AtomicBoolean thirdRun = new AtomicBoolean(false);
        Runnable runFirstBecauseOfLowDelay = () -> firstRun.set(true);
        Runnable runSecondBecauseOfScheduleOrder = () -> {
            ((AtomicBooleanAssert)Assertions.assertThat((AtomicBoolean)firstRun).as("order violated", new Object[0])).isTrue();
            secondRun.set(true);
        };
        Runnable runLastBecauseOfHighDelay = () -> {
            ((AtomicBooleanAssert)Assertions.assertThat((AtomicBoolean)secondRun).as("order violated", new Object[0])).isTrue();
            thirdRun.set(true);
        };
        this.ctx.runIfState((State)new AdaptiveSchedulerTest.DummyState(this.ctx), runLastBecauseOfHighDelay, Duration.ofMillis(999L));
        this.ctx.runIfState((State)new AdaptiveSchedulerTest.DummyState(this.ctx), runFirstBecauseOfLowDelay, Duration.ZERO);
        this.ctx.runIfState((State)new AdaptiveSchedulerTest.DummyState(this.ctx), runSecondBecauseOfScheduleOrder, Duration.ZERO);
        this.ctx.runScheduledTasks();
        Assertions.assertThat((AtomicBoolean)thirdRun).isTrue();
    }

    @Test
    void testInternalRunScheduledTasks_tasksAreRemovedAfterExecution() {
        AtomicBoolean executed = new AtomicBoolean(false);
        Runnable executeOnce = () -> {
            ((AtomicBooleanAssert)Assertions.assertThat((AtomicBoolean)executed).as("Multiple executions", new Object[0])).isFalse();
            executed.set(true);
        };
        this.ctx.runIfState((State)new AdaptiveSchedulerTest.DummyState(this.ctx), executeOnce, Duration.ZERO);
        this.ctx.runScheduledTasks();
        this.ctx.runScheduledTasks();
        Assertions.assertThat((AtomicBoolean)executed).isTrue();
    }

    @Test
    void testInternalRunScheduledTasks_upperBoundRespected() {
        Runnable executeNever = () -> Assertions.fail((String)"Not expected");
        this.ctx.runIfState((State)new AdaptiveSchedulerTest.DummyState(this.ctx), executeNever, Duration.ofMillis(10L));
        this.ctx.runScheduledTasks(4L);
    }

    @Test
    void testInternalRunScheduledTasks_scheduleTaskFromRunnable() {
        AdaptiveSchedulerTest.DummyState state = new AdaptiveSchedulerTest.DummyState(this.ctx);
        AtomicBoolean executed = new AtomicBoolean(false);
        this.ctx.runIfState((State)state, () -> this.lambda$testInternalRunScheduledTasks_scheduleTaskFromRunnable$30((State)state, executed), Duration.ZERO);
        this.ctx.runScheduledTasks(10L);
        Assertions.assertThat((AtomicBoolean)executed).isTrue();
    }

    static <T> Consumer<T> assertNonNull() {
        return item -> Assertions.assertThat((Object)item).isNotNull();
    }

    private /* synthetic */ void lambda$testInternalRunScheduledTasks_scheduleTaskFromRunnable$30(State state, AtomicBoolean executed) {
        this.ctx.runIfState(state, () -> executed.set(true), Duration.ofMillis(4L));
    }

    private static class MockContext
    extends MockStateWithoutExecutionGraphContext
    implements WaitingForResources.Context {
        private static final Logger LOG = LoggerFactory.getLogger(MockContext.class);
        private final StateValidator<Void> creatingExecutionGraphStateValidator = new StateValidator("executing");
        private Supplier<Boolean> hasDesiredResourcesSupplier = () -> false;
        private Supplier<Boolean> hasSufficientResourcesSupplier = () -> false;
        private final Queue<ScheduledTask<Void>> scheduledTasks = new PriorityQueue<ScheduledTask>(Comparator.comparingLong(o -> o.getDelay(TimeUnit.MILLISECONDS)));
        private final ManualClock testingClock = new ManualClock();

        private MockContext() {
        }

        public void setHasDesiredResources(Supplier<Boolean> sup) {
            this.hasDesiredResourcesSupplier = sup;
        }

        public void setHasSufficientResources(Supplier<Boolean> sup) {
            this.hasSufficientResourcesSupplier = sup;
        }

        void setExpectCreatingExecutionGraph() {
            this.creatingExecutionGraphStateValidator.expectInput(none -> {});
        }

        void runScheduledTasks(long untilDelay) {
            LOG.info("Running scheduled tasks with a delay between 0 and {}ms:", (Object)untilDelay);
            while (this.scheduledTasks.peek() != null && this.scheduledTasks.peek().getDelay(TimeUnit.MILLISECONDS) <= untilDelay) {
                ScheduledTask<Void> scheduledTask = this.scheduledTasks.poll();
                LOG.info("Running task with delay {}", (Object)scheduledTask.getDelay(TimeUnit.MILLISECONDS));
                scheduledTask.execute();
                if (!scheduledTask.isPeriodic()) continue;
                this.scheduledTasks.add(scheduledTask);
            }
        }

        void runScheduledTasks() {
            this.runScheduledTasks(Long.MAX_VALUE);
        }

        @Override
        public void afterEach(ExtensionContext extensionContext) throws Exception {
            super.afterEach(extensionContext);
            this.creatingExecutionGraphStateValidator.close();
        }

        public boolean hasDesiredResources() {
            return this.hasDesiredResourcesSupplier.get();
        }

        public boolean hasSufficientResources() {
            return this.hasSufficientResourcesSupplier.get();
        }

        public ScheduledFuture<?> runIfState(State expectedState, Runnable action, Duration delay) {
            LOG.info("Scheduling work with delay {} for earliest execution at {}", (Object)delay.toMillis(), (Object)(this.testingClock.absoluteTimeMillis() + delay.toMillis()));
            ScheduledTask scheduledTask = new ScheduledTask(() -> {
                if (!this.hasStateTransition()) {
                    action.run();
                }
                return null;
            }, this.testingClock.absoluteTimeMillis() + delay.toMillis());
            this.scheduledTasks.add((ScheduledTask<Void>)scheduledTask);
            return scheduledTask;
        }

        public void goToCreatingExecutionGraph(@Nullable ExecutionGraph previousExecutionGraph) {
            this.creatingExecutionGraphStateValidator.validateInput(null);
            this.registerStateTransition();
        }
    }
}

