/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.checkpoint;

import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.LongConsumer;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.flink.core.execution.SavepointFormatType;
import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
import org.apache.flink.runtime.checkpoint.CheckpointException;
import org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
import org.apache.flink.runtime.checkpoint.CheckpointProperties;
import org.apache.flink.runtime.checkpoint.CheckpointRequestDecider;
import org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy;
import org.apache.flink.util.clock.Clock;
import org.apache.flink.util.clock.ManualClock;
import org.junit.Assert;
import org.junit.Test;

public class CheckpointRequestDeciderTest {
    private static final LongConsumer NO_OP = unused -> {};

    @Test
    public void testForce() {
        CheckpointRequestDecider decider = this.decider(1, 1, Integer.MAX_VALUE, new AtomicInteger(1), new AtomicInteger(0));
        CheckpointCoordinator.CheckpointTriggerRequest request = CheckpointRequestDeciderTest.periodicSavepoint();
        Assert.assertEquals(Optional.of(request), (Object)decider.chooseRequestToExecute(request, false, 123L));
    }

    @Test
    public void testEnqueueOnTooManyPending() {
        boolean maxPending = true;
        boolean isTriggering = false;
        AtomicInteger currentPending = new AtomicInteger(1);
        CheckpointRequestDecider decider = this.decider(Integer.MAX_VALUE, 1, 1, currentPending, new AtomicInteger(0));
        CheckpointCoordinator.CheckpointTriggerRequest request = CheckpointRequestDeciderTest.regularCheckpoint();
        Assert.assertFalse((boolean)decider.chooseRequestToExecute(request, false, 0L).isPresent());
        currentPending.set(0);
        Assert.assertEquals(Optional.of(request), (Object)decider.chooseQueuedRequestToExecute(false, 0L));
    }

    @Test
    public void testNonForcedEnqueueOnTooManyPending() {
        boolean maxPending = true;
        boolean isTriggering = false;
        AtomicInteger currentPending = new AtomicInteger(1);
        AtomicInteger currentCleaning = new AtomicInteger(0);
        CheckpointRequestDecider decider = this.decider(Integer.MAX_VALUE, 1, 1, currentPending, currentCleaning);
        CheckpointCoordinator.CheckpointTriggerRequest request = CheckpointRequestDeciderTest.nonForcedSavepoint();
        Assert.assertFalse((boolean)decider.chooseRequestToExecute(request, false, 0L).isPresent());
        currentPending.set(0);
        Assert.assertEquals(Optional.of(request), (Object)decider.chooseQueuedRequestToExecute(false, 0L));
    }

    @Test
    public void testEnqueueOnTooManyCleaning() {
        boolean isTriggering = false;
        int maxQueuedRequests = 10;
        int maxConcurrentCheckpointAttempts = 10;
        AtomicInteger currentCleaning = new AtomicInteger(maxConcurrentCheckpointAttempts + 1);
        CheckpointRequestDecider decider = this.decider(maxQueuedRequests, maxConcurrentCheckpointAttempts, 1, new AtomicInteger(0), currentCleaning);
        CheckpointCoordinator.CheckpointTriggerRequest request = CheckpointRequestDeciderTest.regularCheckpoint();
        Assert.assertFalse((boolean)decider.chooseRequestToExecute(request, false, 0L).isPresent());
        currentCleaning.decrementAndGet();
        Assert.assertEquals(Optional.of(request), (Object)decider.chooseQueuedRequestToExecute(false, 0L));
    }

    @Test
    public void testUserSubmittedPrioritized() {
        CheckpointCoordinator.CheckpointTriggerRequest userSubmitted = CheckpointRequestDeciderTest.regularSavepoint();
        CheckpointCoordinator.CheckpointTriggerRequest periodic = CheckpointRequestDeciderTest.periodicSavepoint();
        this.testRequestsOrdering(new CheckpointCoordinator.CheckpointTriggerRequest[]{periodic, userSubmitted}, new CheckpointCoordinator.CheckpointTriggerRequest[]{userSubmitted, periodic});
    }

    @Test
    public void testSavepointPrioritized() {
        CheckpointCoordinator.CheckpointTriggerRequest savepoint = CheckpointRequestDeciderTest.regularSavepoint();
        CheckpointCoordinator.CheckpointTriggerRequest checkpoint = CheckpointRequestDeciderTest.regularCheckpoint();
        this.testRequestsOrdering(new CheckpointCoordinator.CheckpointTriggerRequest[]{checkpoint, savepoint}, new CheckpointCoordinator.CheckpointTriggerRequest[]{savepoint, checkpoint});
    }

    @Test
    public void testNonForcedUserSubmittedPrioritized() {
        CheckpointCoordinator.CheckpointTriggerRequest userSubmitted = CheckpointRequestDeciderTest.nonForcedSavepoint();
        CheckpointCoordinator.CheckpointTriggerRequest periodic = CheckpointRequestDeciderTest.nonForcedPeriodicSavepoint();
        this.testRequestsOrdering(new CheckpointCoordinator.CheckpointTriggerRequest[]{periodic, userSubmitted}, new CheckpointCoordinator.CheckpointTriggerRequest[]{userSubmitted, periodic});
    }

    @Test
    public void testNonForcedSavepointPrioritized() {
        CheckpointCoordinator.CheckpointTriggerRequest savepoint = CheckpointRequestDeciderTest.nonForcedSavepoint();
        CheckpointCoordinator.CheckpointTriggerRequest checkpoint = CheckpointRequestDeciderTest.regularCheckpoint();
        this.testRequestsOrdering(new CheckpointCoordinator.CheckpointTriggerRequest[]{checkpoint, savepoint}, new CheckpointCoordinator.CheckpointTriggerRequest[]{savepoint, checkpoint});
    }

    @Test
    public void testQueueSizeLimit() {
        int maxQueuedRequests = 10;
        boolean isTriggering = true;
        CheckpointRequestDecider decider = this.decider(10);
        List requests = IntStream.rangeClosed(0, 10).mapToObj(i -> CheckpointRequestDeciderTest.regularCheckpoint()).collect(Collectors.toList());
        int numAdded = 0;
        for (CheckpointCoordinator.CheckpointTriggerRequest request : requests) {
            Assert.assertFalse((boolean)decider.chooseRequestToExecute(request, true, 0L).isPresent());
            List<CheckpointCoordinator.CheckpointTriggerRequest> completed = requests.stream().filter(r1 -> r1.getOnCompletionFuture().isDone()).collect(Collectors.toList());
            completed.forEach(r -> this.assertFailed((CheckpointCoordinator.CheckpointTriggerRequest)r, CheckpointFailureReason.TOO_MANY_CHECKPOINT_REQUESTS));
            Assert.assertEquals((long)Math.max(++numAdded - 10, 0), (long)completed.size());
        }
    }

    @Test
    public void testQueueSizeLimitPriority() {
        boolean maxQueuedRequests = true;
        boolean isTriggering = true;
        CheckpointRequestDecider decider = this.decider(1);
        CheckpointCoordinator.CheckpointTriggerRequest checkpoint = CheckpointRequestDeciderTest.regularCheckpoint();
        CheckpointCoordinator.CheckpointTriggerRequest savepoint = CheckpointRequestDeciderTest.regularSavepoint();
        decider.chooseRequestToExecute(checkpoint, true, 0L);
        decider.chooseRequestToExecute(savepoint, true, 0L);
        this.assertFailed(checkpoint, CheckpointFailureReason.TOO_MANY_CHECKPOINT_REQUESTS);
        Assert.assertFalse((boolean)savepoint.getOnCompletionFuture().isDone());
    }

    @Test
    public void testSavepointTiming() {
        this.testTiming(CheckpointRequestDeciderTest.regularSavepoint(), TriggerExpectation.IMMEDIATELY);
        this.testTiming(CheckpointRequestDeciderTest.periodicSavepoint(), TriggerExpectation.IMMEDIATELY);
        this.testTiming(CheckpointRequestDeciderTest.nonForcedSavepoint(), TriggerExpectation.IMMEDIATELY);
    }

    @Test
    public void testCheckpointTiming() {
        this.testTiming(CheckpointRequestDeciderTest.regularCheckpoint(), TriggerExpectation.DROPPED);
        this.testTiming(CheckpointRequestDeciderTest.manualCheckpoint(), TriggerExpectation.IMMEDIATELY);
    }

    private void testTiming(CheckpointCoordinator.CheckpointTriggerRequest request, TriggerExpectation expectation) {
        long pause = 10L;
        ManualClock clock = new ManualClock();
        CheckpointRequestDecider decider = new CheckpointRequestDecider(1, NO_OP, (Clock)clock, 10L, () -> 0, () -> 0, Integer.MAX_VALUE);
        long lastCompletionMs = clock.relativeTimeMillis();
        boolean isTriggering = false;
        switch (expectation) {
            case IMMEDIATELY: {
                Assert.assertTrue((boolean)decider.chooseRequestToExecute(request, false, lastCompletionMs).isPresent());
                break;
            }
            case AFTER_PAUSE: {
                Assert.assertFalse((boolean)decider.chooseRequestToExecute(request, false, lastCompletionMs).isPresent());
                clock.advanceTime(10L, TimeUnit.MILLISECONDS);
                Assert.assertTrue((boolean)decider.chooseQueuedRequestToExecute(false, lastCompletionMs).isPresent());
                break;
            }
            case DROPPED: {
                Assert.assertFalse((boolean)decider.chooseRequestToExecute(request, false, lastCompletionMs).isPresent());
                this.assertFailed(request, CheckpointFailureReason.MINIMUM_TIME_BETWEEN_CHECKPOINTS);
                break;
            }
            default: {
                throw new IllegalArgumentException("unknown expectation: " + (Object)((Object)expectation));
            }
        }
    }

    private void testRequestsOrdering(CheckpointCoordinator.CheckpointTriggerRequest[] requests, CheckpointCoordinator.CheckpointTriggerRequest[] expectedExecutionOrder) {
        CheckpointRequestDecider decider = this.decider(10);
        for (CheckpointCoordinator.CheckpointTriggerRequest r : requests) {
            Assert.assertFalse((boolean)decider.chooseRequestToExecute(r, true, 123L).isPresent());
        }
        for (CheckpointCoordinator.CheckpointTriggerRequest r : expectedExecutionOrder) {
            Assert.assertEquals(Optional.of(r), (Object)decider.chooseQueuedRequestToExecute(false, 123L));
        }
    }

    private void assertFailed(CheckpointCoordinator.CheckpointTriggerRequest request, CheckpointFailureReason reason) {
        Assert.assertTrue((boolean)request.getOnCompletionFuture().isCompletedExceptionally());
        ((CompletableFuture)request.getOnCompletionFuture().handle((checkpoint, throwable) -> {
            Assert.assertNull((Object)checkpoint);
            Assert.assertNotNull((Object)throwable);
            Assert.assertTrue((boolean)(throwable instanceof CheckpointException));
            Assert.assertEquals((Object)reason, (Object)((CheckpointException)throwable).getCheckpointFailureReason());
            return null;
        })).join();
    }

    public CheckpointRequestDecider decider(int maxQueuedRequests) {
        return this.decider(maxQueuedRequests, 1, 1, new AtomicInteger(0), new AtomicInteger(0));
    }

    private CheckpointRequestDecider decider(int maxQueued, int maxPending, int minPause, AtomicInteger currentPending, AtomicInteger currentCleaning) {
        ManualClock clock = new ManualClock();
        clock.advanceTime(1L, TimeUnit.DAYS);
        return new CheckpointRequestDecider(maxPending, NO_OP, (Clock)clock, (long)minPause, currentPending::get, currentCleaning::get, maxQueued);
    }

    static CheckpointCoordinator.CheckpointTriggerRequest regularCheckpoint() {
        return CheckpointRequestDeciderTest.checkpointRequest(true);
    }

    private static CheckpointCoordinator.CheckpointTriggerRequest manualCheckpoint() {
        return CheckpointRequestDeciderTest.checkpointRequest(false);
    }

    private static CheckpointCoordinator.CheckpointTriggerRequest regularSavepoint() {
        return CheckpointRequestDeciderTest.savepointRequest(true, false);
    }

    private static CheckpointCoordinator.CheckpointTriggerRequest periodicSavepoint() {
        return CheckpointRequestDeciderTest.savepointRequest(true, true);
    }

    private static CheckpointCoordinator.CheckpointTriggerRequest nonForcedPeriodicSavepoint() {
        return CheckpointRequestDeciderTest.savepointRequest(false, true);
    }

    private static CheckpointCoordinator.CheckpointTriggerRequest nonForcedSavepoint() {
        return CheckpointRequestDeciderTest.savepointRequest(false, false);
    }

    private static CheckpointCoordinator.CheckpointTriggerRequest savepointRequest(boolean force, boolean periodic) {
        return new CheckpointCoordinator.CheckpointTriggerRequest(CheckpointProperties.forSavepoint((boolean)force, (SavepointFormatType)SavepointFormatType.CANONICAL), null, periodic);
    }

    private static CheckpointCoordinator.CheckpointTriggerRequest checkpointRequest(boolean periodic) {
        return new CheckpointCoordinator.CheckpointTriggerRequest(CheckpointProperties.forCheckpoint((CheckpointRetentionPolicy)CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION), null, periodic);
    }

    private static enum TriggerExpectation {
        IMMEDIATELY,
        AFTER_PAUSE,
        DROPPED;

    }
}

