/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.jobmaster.slotpool;

import java.io.IOException;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.core.testutils.FlinkAssertions;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
import org.apache.flink.runtime.jobmaster.JobMasterId;
import org.apache.flink.runtime.jobmaster.SlotRequestId;
import org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolBridge;
import org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolBridgeTest;
import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlot;
import org.apache.flink.runtime.jobmaster.slotpool.PreferredAllocationRequestSlotMatchingStrategy;
import org.apache.flink.runtime.jobmaster.slotpool.RequestSlotMatchingStrategy;
import org.apache.flink.runtime.jobmaster.slotpool.SimpleRequestSlotMatchingStrategy;
import org.apache.flink.runtime.jobmaster.slotpool.TestingDeclarativeSlotPool;
import org.apache.flink.runtime.jobmaster.slotpool.TestingDeclarativeSlotPoolBuilder;
import org.apache.flink.runtime.jobmaster.slotpool.TestingDeclarativeSlotPoolFactory;
import org.apache.flink.runtime.util.ResourceCounter;
import org.apache.flink.testutils.junit.extensions.parameterized.Parameter;
import org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension;
import org.apache.flink.testutils.junit.extensions.parameterized.Parameters;
import org.apache.flink.util.function.TriFunction;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.TestTemplate;
import org.junit.jupiter.api.extension.ExtendWith;

@ExtendWith(value={ParameterizedTestExtension.class})
class DeclarativeSlotPoolBridgeResourceDeclarationTest {
    private static final JobMasterId jobMasterId = JobMasterId.generate();
    private final ComponentMainThreadExecutor mainThreadExecutor = ComponentMainThreadExecutorServiceAdapter.forMainThread();
    @Parameter
    private RequestSlotMatchingStrategy requestSlotMatchingStrategy;
    private RequirementListener requirementListener;
    private DeclarativeSlotPoolBridge declarativeSlotPoolBridge;

    DeclarativeSlotPoolBridgeResourceDeclarationTest() {
    }

    @Parameters(name="RequestSlotMatchingStrategy: {0}")
    public static Collection<RequestSlotMatchingStrategy> data() throws IOException {
        return Arrays.asList(SimpleRequestSlotMatchingStrategy.INSTANCE, PreferredAllocationRequestSlotMatchingStrategy.INSTANCE);
    }

    @BeforeEach
    void setup() {
        this.requirementListener = new RequirementListener();
        TestingDeclarativeSlotPoolBuilder slotPoolBuilder = TestingDeclarativeSlotPool.builder().setIncreaseResourceRequirementsByConsumer(x$0 -> this.requirementListener.increaseRequirements(x$0)).setDecreaseResourceRequirementsByConsumer(x$0 -> this.requirementListener.decreaseRequirements(x$0)).setReserveFreeSlotFunction((allocationId, resourceProfile) -> DeclarativeSlotPoolBridgeTest.createAllocatedSlot(allocationId)).setFreeReservedSlotFunction((TriFunction<AllocationID, Throwable, Long, ResourceCounter>)((TriFunction)(allocationID, throwable, aLong) -> ResourceCounter.withResource((ResourceProfile)ResourceProfile.UNKNOWN, (int)1))).setReleaseSlotFunction((allocationID, e) -> ResourceCounter.withResource((ResourceProfile)ResourceProfile.UNKNOWN, (int)1));
        TestingDeclarativeSlotPoolFactory declarativeSlotPoolFactory = new TestingDeclarativeSlotPoolFactory(slotPoolBuilder);
        this.declarativeSlotPoolBridge = DeclarativeSlotPoolBridgeTest.createDeclarativeSlotPoolBridge(declarativeSlotPoolFactory, this.requestSlotMatchingStrategy);
    }

    @AfterEach
    void teardown() {
        if (this.declarativeSlotPoolBridge != null) {
            this.declarativeSlotPoolBridge.close();
        }
    }

    @TestTemplate
    void testRequirementsIncreasedOnNewAllocation() throws Exception {
        this.declarativeSlotPoolBridge.start(jobMasterId, "localhost", this.mainThreadExecutor);
        this.declarativeSlotPoolBridge.requestNewAllocatedSlot(new SlotRequestId(), ResourceProfile.UNKNOWN, Time.minutes((long)5L));
        Assertions.assertThat((int)this.requirementListener.getRequirements().getResourceCount(ResourceProfile.UNKNOWN)).isOne();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @TestTemplate
    void testRequirementsDecreasedOnAllocationTimeout() throws Exception {
        ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
        try {
            ComponentMainThreadExecutor mainThreadExecutor = ComponentMainThreadExecutorServiceAdapter.forSingleThreadExecutor(scheduledExecutorService);
            this.declarativeSlotPoolBridge.start(jobMasterId, "localhost", mainThreadExecutor);
            CompletableFuture allocationFuture = CompletableFuture.supplyAsync(() -> this.declarativeSlotPoolBridge.requestNewAllocatedSlot(new SlotRequestId(), ResourceProfile.UNKNOWN, Time.milliseconds((long)5L)), (Executor)mainThreadExecutor).get();
            FlinkAssertions.assertThatFuture((CompletableFuture)allocationFuture).failsWithin(Duration.ofMinutes(1L));
            CompletableFuture.runAsync(() -> Assertions.assertThat((int)this.requirementListener.getRequirements().getResourceCount(ResourceProfile.UNKNOWN)).isZero(), (Executor)mainThreadExecutor).join();
        }
        finally {
            scheduledExecutorService.shutdown();
        }
    }

    @TestTemplate
    void testRequirementsUnchangedOnNewSlotsNotification() throws Exception {
        this.declarativeSlotPoolBridge.start(jobMasterId, "localhost", this.mainThreadExecutor);
        PhysicalSlot newSlot = DeclarativeSlotPoolBridgeTest.createAllocatedSlot(new AllocationID());
        this.declarativeSlotPoolBridge.newSlotsAreAvailable(Collections.singleton(newSlot));
        Assertions.assertThat((int)this.requirementListener.getRequirements().getResourceCount(ResourceProfile.UNKNOWN)).isZero();
    }

    @TestTemplate
    void testRequirementsIncreasedOnSlotReservation() throws Exception {
        this.declarativeSlotPoolBridge.start(jobMasterId, "localhost", this.mainThreadExecutor);
        PhysicalSlot newSlot = DeclarativeSlotPoolBridgeTest.createAllocatedSlot(new AllocationID());
        this.declarativeSlotPoolBridge.newSlotsAreAvailable(Collections.singleton(newSlot));
        SlotRequestId slotRequestId = new SlotRequestId();
        this.declarativeSlotPoolBridge.allocateAvailableSlot(slotRequestId, newSlot.getAllocationId(), ResourceProfile.UNKNOWN);
        Assertions.assertThat((int)this.requirementListener.getRequirements().getResourceCount(ResourceProfile.UNKNOWN)).isOne();
    }

    @TestTemplate
    void testRequirementsDecreasedOnSlotFreeing() throws Exception {
        this.declarativeSlotPoolBridge.start(jobMasterId, "localhost", this.mainThreadExecutor);
        PhysicalSlot newSlot = DeclarativeSlotPoolBridgeTest.createAllocatedSlot(new AllocationID());
        this.declarativeSlotPoolBridge.newSlotsAreAvailable(Collections.singleton(newSlot));
        SlotRequestId slotRequestId = new SlotRequestId();
        this.declarativeSlotPoolBridge.allocateAvailableSlot(slotRequestId, newSlot.getAllocationId(), ResourceProfile.UNKNOWN);
        this.declarativeSlotPoolBridge.releaseSlot(slotRequestId, (Throwable)new RuntimeException("Test exception"));
        Assertions.assertThat((int)this.requirementListener.getRequirements().getResourceCount(ResourceProfile.UNKNOWN)).isZero();
    }

    @TestTemplate
    void testRequirementsDecreasedOnSlotAllocationFailure() throws Exception {
        this.declarativeSlotPoolBridge.start(jobMasterId, "localhost", this.mainThreadExecutor);
        PhysicalSlot newSlot = DeclarativeSlotPoolBridgeTest.createAllocatedSlot(new AllocationID());
        this.declarativeSlotPoolBridge.newSlotsAreAvailable(Collections.singleton(newSlot));
        this.declarativeSlotPoolBridge.allocateAvailableSlot(new SlotRequestId(), newSlot.getAllocationId(), ResourceProfile.UNKNOWN);
        this.declarativeSlotPoolBridge.failAllocation(newSlot.getTaskManagerLocation().getResourceID(), newSlot.getAllocationId(), (Exception)new RuntimeException("Test exception"));
        Assertions.assertThat((int)this.requirementListener.getRequirements().getResourceCount(ResourceProfile.UNKNOWN)).isZero();
    }

    private static final class RequirementListener {
        private ResourceCounter requirements = ResourceCounter.empty();

        private RequirementListener() {
        }

        private void increaseRequirements(ResourceCounter requirements) {
            this.requirements = this.requirements.add(requirements);
        }

        private void decreaseRequirements(ResourceCounter requirements) {
            this.requirements = this.requirements.subtract(requirements);
        }

        public ResourceCounter getRequirements() {
            return this.requirements;
        }
    }
}

