/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.jobmaster.slotpool;

import java.time.Duration;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Supplier;
import org.apache.flink.core.testutils.FlinkAssertions;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
import org.apache.flink.runtime.jobmaster.SlotInfo;
import org.apache.flink.runtime.jobmaster.SlotRequestId;
import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlot;
import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequestBulk;
import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequestBulkCheckerImpl;
import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequestBulkImpl;
import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequestBulkWithTimestamp;
import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotTestUtils;
import org.apache.flink.runtime.jobmaster.slotpool.TestingPhysicalSlotRequestBulkBuilder;
import org.apache.flink.util.clock.Clock;
import org.apache.flink.util.clock.ManualClock;
import org.assertj.core.api.AbstractThrowableAssert;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

class PhysicalSlotRequestBulkCheckerImplTest {
    private static final Duration TIMEOUT = Duration.ofMillis(50L);
    private static ScheduledExecutorService singleThreadScheduledExecutorService;
    private static ComponentMainThreadExecutor mainThreadExecutor;
    private final ManualClock clock = new ManualClock();
    private PhysicalSlotRequestBulkCheckerImpl bulkChecker;
    private Set<PhysicalSlot> slots;
    private Supplier<Set<SlotInfo>> slotsRetriever;

    PhysicalSlotRequestBulkCheckerImplTest() {
    }

    @BeforeAll
    private static void setupClass() {
        singleThreadScheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
        mainThreadExecutor = ComponentMainThreadExecutorServiceAdapter.forSingleThreadExecutor(singleThreadScheduledExecutorService);
    }

    @AfterAll
    private static void teardownClass() {
        if (singleThreadScheduledExecutorService != null) {
            singleThreadScheduledExecutorService.shutdownNow();
        }
    }

    @BeforeEach
    private void setup() {
        this.slots = new HashSet<PhysicalSlot>();
        this.slotsRetriever = () -> new HashSet<PhysicalSlot>(this.slots);
        this.bulkChecker = new PhysicalSlotRequestBulkCheckerImpl(this.slotsRetriever, (Clock)this.clock);
        this.bulkChecker.start(mainThreadExecutor);
    }

    @Test
    void testPendingBulkIsNotCancelled() throws InterruptedException, ExecutionException {
        CompletableFuture<SlotRequestId> cancellationFuture = new CompletableFuture<SlotRequestId>();
        PhysicalSlotRequestBulk bulk = this.createPhysicalSlotRequestBulkWithCancellationFuture(cancellationFuture, new SlotRequestId());
        this.bulkChecker.schedulePendingRequestBulkTimeoutCheck(bulk, TIMEOUT);
        PhysicalSlotRequestBulkCheckerImplTest.checkNotCancelledAfter(cancellationFuture, 2L * TIMEOUT.toMillis());
    }

    @Test
    void testFulfilledBulkIsNotCancelled() throws InterruptedException, ExecutionException {
        CompletableFuture<SlotRequestId> cancellationFuture = new CompletableFuture<SlotRequestId>();
        PhysicalSlotRequestBulk bulk = this.createPhysicalSlotRequestBulkWithCancellationFuture(cancellationFuture, new SlotRequestId());
        this.bulkChecker.schedulePendingRequestBulkTimeoutCheck(bulk, TIMEOUT);
        PhysicalSlotRequestBulkCheckerImplTest.checkNotCancelledAfter(cancellationFuture, 2L * TIMEOUT.toMillis());
    }

    private static void checkNotCancelledAfter(CompletableFuture<?> cancellationFuture, long milli) throws ExecutionException, InterruptedException {
        mainThreadExecutor.schedule(() -> {}, milli, TimeUnit.MILLISECONDS).get();
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> {
            FlinkAssertions.assertThatFuture((CompletableFuture)cancellationFuture).isNotDone();
            cancellationFuture.get(milli, TimeUnit.MILLISECONDS);
        }).withFailMessage("The future must not have been cancelled", new Object[0])).isInstanceOf(TimeoutException.class);
        FlinkAssertions.assertThatFuture(cancellationFuture).isNotDone();
    }

    @Test
    void testUnfulfillableBulkIsCancelled() {
        CompletableFuture<SlotRequestId> cancellationFuture = new CompletableFuture<SlotRequestId>();
        SlotRequestId slotRequestId = new SlotRequestId();
        PhysicalSlotRequestBulk bulk = this.createPhysicalSlotRequestBulkWithCancellationFuture(cancellationFuture, slotRequestId);
        this.bulkChecker.schedulePendingRequestBulkTimeoutCheck(bulk, TIMEOUT);
        this.clock.advanceTime(TIMEOUT.toMillis() + 1L, TimeUnit.MILLISECONDS);
        Assertions.assertThat((Comparable)cancellationFuture.join()).isEqualTo((Object)slotRequestId);
    }

    @Test
    void testBulkFulfilledOnCheck() {
        SlotRequestId slotRequestId = new SlotRequestId();
        PhysicalSlotRequestBulkImpl bulk = PhysicalSlotRequestBulkCheckerImplTest.createPhysicalSlotRequestBulk(slotRequestId);
        bulk.markRequestFulfilled(slotRequestId, new AllocationID());
        PhysicalSlotRequestBulkWithTimestamp bulkWithTimestamp = new PhysicalSlotRequestBulkWithTimestamp((PhysicalSlotRequestBulk)bulk);
        Assertions.assertThat((Comparable)this.checkBulkTimeout(bulkWithTimestamp)).isEqualTo((Object)PhysicalSlotRequestBulkCheckerImpl.TimeoutCheckResult.FULFILLED);
    }

    @Test
    void testBulkTimeoutOnCheck() {
        PhysicalSlotRequestBulkWithTimestamp bulk = this.createPhysicalSlotRequestBulkWithTimestamp(new SlotRequestId());
        this.clock.advanceTime(TIMEOUT.toMillis() + 1L, TimeUnit.MILLISECONDS);
        Assertions.assertThat((Comparable)this.checkBulkTimeout(bulk)).isEqualTo((Object)PhysicalSlotRequestBulkCheckerImpl.TimeoutCheckResult.TIMEOUT);
    }

    @Test
    void testBulkPendingOnCheckIfFulfillable() {
        PhysicalSlotRequestBulkWithTimestamp bulk = this.createPhysicalSlotRequestBulkWithTimestamp(new SlotRequestId());
        PhysicalSlot slot = this.addOneSlot();
        PhysicalSlotTestUtils.occupyPhysicalSlot(slot, false);
        Assertions.assertThat((Comparable)this.checkBulkTimeout(bulk)).isEqualTo((Object)PhysicalSlotRequestBulkCheckerImpl.TimeoutCheckResult.PENDING);
    }

    @Test
    void testBulkPendingOnCheckIfUnfulfillableButNotTimedOut() {
        PhysicalSlotRequestBulkWithTimestamp bulk = this.createPhysicalSlotRequestBulkWithTimestamp(new SlotRequestId());
        Assertions.assertThat((Comparable)this.checkBulkTimeout(bulk)).isEqualTo((Object)PhysicalSlotRequestBulkCheckerImpl.TimeoutCheckResult.PENDING);
    }

    @Test
    void testBulkFulfillable() {
        PhysicalSlotRequestBulkImpl bulk = PhysicalSlotRequestBulkCheckerImplTest.createPhysicalSlotRequestBulk(new SlotRequestId());
        this.addOneSlot();
        Assertions.assertThat((boolean)this.isFulfillable((PhysicalSlotRequestBulk)bulk)).isTrue();
    }

    @Test
    void testBulkUnfulfillableWithInsufficientSlots() {
        PhysicalSlotRequestBulkImpl bulk = PhysicalSlotRequestBulkCheckerImplTest.createPhysicalSlotRequestBulk(new SlotRequestId(), new SlotRequestId());
        this.addOneSlot();
        Assertions.assertThat((boolean)this.isFulfillable((PhysicalSlotRequestBulk)bulk)).isFalse();
    }

    @Test
    void testBulkUnfulfillableWithSlotAlreadyAssignedToBulk() {
        SlotRequestId slotRequestId = new SlotRequestId();
        PhysicalSlotRequestBulkImpl bulk = PhysicalSlotRequestBulkCheckerImplTest.createPhysicalSlotRequestBulk(slotRequestId, new SlotRequestId());
        PhysicalSlot slot = this.addOneSlot();
        bulk.markRequestFulfilled(slotRequestId, slot.getAllocationId());
        Assertions.assertThat((boolean)this.isFulfillable((PhysicalSlotRequestBulk)bulk)).isFalse();
    }

    @Test
    void testBulkUnfulfillableWithSlotOccupiedIndefinitely() {
        PhysicalSlotRequestBulkImpl bulk = PhysicalSlotRequestBulkCheckerImplTest.createPhysicalSlotRequestBulk(new SlotRequestId(), new SlotRequestId());
        PhysicalSlot slot1 = this.addOneSlot();
        this.addOneSlot();
        PhysicalSlotTestUtils.occupyPhysicalSlot(slot1, true);
        Assertions.assertThat((boolean)this.isFulfillable((PhysicalSlotRequestBulk)bulk)).isFalse();
    }

    @Test
    void testBulkFulfillableWithSlotOccupiedTemporarily() {
        PhysicalSlotRequestBulkImpl bulk = PhysicalSlotRequestBulkCheckerImplTest.createPhysicalSlotRequestBulk(new SlotRequestId(), new SlotRequestId());
        PhysicalSlot slot1 = this.addOneSlot();
        this.addOneSlot();
        PhysicalSlotTestUtils.occupyPhysicalSlot(slot1, false);
        Assertions.assertThat((boolean)this.isFulfillable((PhysicalSlotRequestBulk)bulk)).isTrue();
    }

    private PhysicalSlotRequestBulkWithTimestamp createPhysicalSlotRequestBulkWithTimestamp(SlotRequestId ... slotRequestIds) {
        PhysicalSlotRequestBulkWithTimestamp bulk = new PhysicalSlotRequestBulkWithTimestamp((PhysicalSlotRequestBulk)PhysicalSlotRequestBulkCheckerImplTest.createPhysicalSlotRequestBulk(slotRequestIds));
        bulk.markUnfulfillable(this.clock.relativeTimeMillis());
        return bulk;
    }

    private static PhysicalSlotRequestBulkImpl createPhysicalSlotRequestBulk(SlotRequestId ... slotRequestIds) {
        TestingPhysicalSlotRequestBulkBuilder builder = TestingPhysicalSlotRequestBulkBuilder.newBuilder();
        for (SlotRequestId slotRequestId : slotRequestIds) {
            builder.addPendingRequest(slotRequestId, ResourceProfile.UNKNOWN);
        }
        return builder.buildPhysicalSlotRequestBulkImpl();
    }

    private PhysicalSlotRequestBulk createPhysicalSlotRequestBulkWithCancellationFuture(CompletableFuture<SlotRequestId> cancellationFuture, SlotRequestId slotRequestId) {
        return TestingPhysicalSlotRequestBulkBuilder.newBuilder().addPendingRequest(slotRequestId, ResourceProfile.UNKNOWN).setCanceller((id, t) -> cancellationFuture.complete((SlotRequestId)id)).buildPhysicalSlotRequestBulkImpl();
    }

    private PhysicalSlot addOneSlot() {
        PhysicalSlot slot = PhysicalSlotTestUtils.createPhysicalSlot();
        CompletableFuture.runAsync(() -> this.slots.add(slot), (Executor)mainThreadExecutor).join();
        return slot;
    }

    private PhysicalSlotRequestBulkCheckerImpl.TimeoutCheckResult checkBulkTimeout(PhysicalSlotRequestBulkWithTimestamp bulk) {
        return this.bulkChecker.checkPhysicalSlotRequestBulkTimeout(bulk, TIMEOUT);
    }

    private boolean isFulfillable(PhysicalSlotRequestBulk bulk) {
        return PhysicalSlotRequestBulkCheckerImpl.isSlotRequestBulkFulfillable((PhysicalSlotRequestBulk)bulk, this.slotsRetriever);
    }
}

