/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.jobmaster.slotpool;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.blocklist.BlockedTaskManagerChecker;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.jobmaster.SlotInfo;
import org.apache.flink.runtime.jobmaster.slotpool.AllocatedSlotPool;
import org.apache.flink.runtime.jobmaster.slotpool.BlocklistDeclarativeSlotPool;
import org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPool;
import org.apache.flink.runtime.jobmaster.slotpool.DefaultAllocatedSlotPool;
import org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool;
import org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPoolTest;
import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlot;
import org.apache.flink.runtime.jobmaster.slotpool.SlotPoolTestUtils;
import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGateway;
import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder;
import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.util.ResourceCounter;
import org.apache.flink.shaded.guava31.com.google.common.collect.Iterables;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

class BlocklistDeclarativeSlotPoolTest {
    private static final ResourceProfile RESOURCE_PROFILE = ResourceProfile.newBuilder().setCpuCores(1.7).build();

    BlocklistDeclarativeSlotPoolTest() {
    }

    @Test
    void testOfferSlotsFromBlockedTaskManager() throws Exception {
        this.testOfferSlots(true);
    }

    @Test
    void testOfferSlotsFromUnblockedTaskManager() throws Exception {
        this.testOfferSlots(false);
    }

    private void testOfferSlots(boolean isBlocked) throws Exception {
        LocalTaskManagerLocation taskManager = new LocalTaskManagerLocation();
        DefaultDeclarativeSlotPoolTest.NewSlotsService notifyNewSlots = new DefaultDeclarativeSlotPoolTest.NewSlotsService();
        BlocklistDeclarativeSlotPool slotPool = BlocklistDeclarativeSlotPoolBuilder.builder().setBlockedTaskManagerChecker(isBlocked ? arg_0 -> ((ResourceID)taskManager.getResourceID()).equals(arg_0) : ignore -> false).build();
        slotPool.registerNewSlotsListener((DeclarativeSlotPool.NewSlotsListener)notifyNewSlots);
        ResourceCounter resourceRequirements = DefaultDeclarativeSlotPoolTest.createResourceRequirements();
        slotPool.increaseResourceRequirementsBy(resourceRequirements);
        Collection<SlotOffer> slotOffers = SlotPoolTestUtils.createSlotOffersForResourceRequirements(resourceRequirements);
        if (isBlocked) {
            Assertions.assertThat(SlotPoolTestUtils.offerSlots((DeclarativeSlotPool)slotPool, slotOffers, taskManager)).isEmpty();
            Assertions.assertThat(DefaultDeclarativeSlotPoolTest.drainNewSlotService(notifyNewSlots)).isEmpty();
        } else {
            Assertions.assertThat(SlotPoolTestUtils.offerSlots((DeclarativeSlotPool)slotPool, slotOffers, taskManager)).containsExactlyInAnyOrderElementsOf(slotOffers);
            Map slotOfferMap = slotOffers.stream().collect(Collectors.toMap(SlotOffer::getAllocationId, Function.identity()));
            Assertions.assertThat(DefaultDeclarativeSlotPoolTest.drainNewSlotService(notifyNewSlots)).allMatch(slot -> this.matchSlotToOffers((PhysicalSlot)slot, (SlotOffer)slotOfferMap.remove(slot.getAllocationId())));
        }
    }

    @Test
    void testOfferDuplicateSlots() {
        LocalTaskManagerLocation taskManager = new LocalTaskManagerLocation();
        ArrayList<ResourceID> blockedTaskManagers = new ArrayList<ResourceID>();
        BlocklistDeclarativeSlotPool slotPool = BlocklistDeclarativeSlotPoolBuilder.builder().setBlockedTaskManagerChecker(blockedTaskManagers::contains).build();
        ResourceCounter resourceRequirements = ResourceCounter.withResource((ResourceProfile)RESOURCE_PROFILE, (int)2);
        slotPool.increaseResourceRequirementsBy(resourceRequirements);
        SlotOffer slot1 = new SlotOffer(new AllocationID(), 1, RESOURCE_PROFILE);
        SlotOffer slot2 = new SlotOffer(new AllocationID(), 1, RESOURCE_PROFILE);
        Assertions.assertThat(SlotPoolTestUtils.offerSlots((DeclarativeSlotPool)slotPool, Collections.singleton(slot1), taskManager)).containsExactly((Object[])new SlotOffer[]{slot1});
        blockedTaskManagers.add(taskManager.getResourceID());
        Assertions.assertThat(SlotPoolTestUtils.offerSlots((DeclarativeSlotPool)slotPool, Arrays.asList(slot1, slot2), taskManager)).containsExactly((Object[])new SlotOffer[]{slot1});
    }

    @Test
    void testRegisterSlotsFromBlockedTaskManager() {
        this.testRegisterSlots(true);
    }

    @Test
    void testRegisterSlotsFromUnblockedTaskManager() {
        this.testRegisterSlots(false);
    }

    private void testRegisterSlots(boolean isBlocked) {
        LocalTaskManagerLocation taskManager = new LocalTaskManagerLocation();
        BlocklistDeclarativeSlotPool slotPool = BlocklistDeclarativeSlotPoolBuilder.builder().setBlockedTaskManagerChecker(isBlocked ? arg_0 -> ((ResourceID)taskManager.getResourceID()).equals(arg_0) : ignore -> false).build();
        int numberSlots = 10;
        Collection<SlotOffer> slotOffers = SlotPoolTestUtils.createSlotOffersForResourceRequirements(ResourceCounter.withResource((ResourceProfile)RESOURCE_PROFILE, (int)10));
        Collection acceptedOffers = slotPool.registerSlots(slotOffers, (TaskManagerLocation)taskManager, SlotPoolTestUtils.createTaskManagerGateway(null), 0L);
        Collection allSlotsInformation = slotPool.getAllSlotsInformation();
        if (isBlocked) {
            Assertions.assertThat((Collection)acceptedOffers).isEmpty();
            Assertions.assertThat((Collection)allSlotsInformation).isEmpty();
        } else {
            Assertions.assertThat((Collection)acceptedOffers).containsExactlyInAnyOrderElementsOf(slotOffers);
            Assertions.assertThat((Collection)allSlotsInformation.stream().map(SlotInfo::getAllocationId).collect(Collectors.toSet())).isEqualTo(slotOffers.stream().map(SlotOffer::getAllocationId).collect(Collectors.toSet()));
        }
    }

    @Test
    void testRegisterDuplicateSlots() {
        LocalTaskManagerLocation taskManager = new LocalTaskManagerLocation();
        ArrayList<ResourceID> blockedTaskManagers = new ArrayList<ResourceID>();
        BlocklistDeclarativeSlotPool slotPool = BlocklistDeclarativeSlotPoolBuilder.builder().setBlockedTaskManagerChecker(blockedTaskManagers::contains).build();
        SlotOffer slot1 = new SlotOffer(new AllocationID(), 1, RESOURCE_PROFILE);
        SlotOffer slot2 = new SlotOffer(new AllocationID(), 1, RESOURCE_PROFILE);
        Collection acceptedOffers = slotPool.registerSlots(Collections.singleton(slot1), (TaskManagerLocation)taskManager, SlotPoolTestUtils.createTaskManagerGateway(null), 0L);
        Assertions.assertThat((Collection)acceptedOffers).containsExactly((Object[])new SlotOffer[]{slot1});
        blockedTaskManagers.add(taskManager.getResourceID());
        acceptedOffers = slotPool.registerSlots(Arrays.asList(slot1, slot2), (TaskManagerLocation)taskManager, SlotPoolTestUtils.createTaskManagerGateway(null), 0L);
        Assertions.assertThat((Collection)acceptedOffers).containsExactly((Object[])new SlotOffer[]{slot1});
    }

    @Test
    void testFreeReservedSlotsOnBlockedTaskManager() throws Exception {
        this.testFreeReservedSlots(true);
    }

    @Test
    void testFreeReservedSlotsOnUnblockedTaskManager() throws Exception {
        this.testFreeReservedSlots(false);
    }

    private void testFreeReservedSlots(boolean isBlocked) throws Exception {
        DefaultDeclarativeSlotPoolTest.FreeSlotConsumer freeSlotConsumer = new DefaultDeclarativeSlotPoolTest.FreeSlotConsumer();
        TestingTaskExecutorGateway testingTaskExecutorGateway = new TestingTaskExecutorGatewayBuilder().setFreeSlotFunction(freeSlotConsumer).createTestingTaskExecutorGateway();
        DefaultDeclarativeSlotPoolTest.NewSlotsService notifyNewSlots = new DefaultDeclarativeSlotPoolTest.NewSlotsService();
        HashSet<ResourceID> blockedTaskManagers = new HashSet<ResourceID>();
        BlocklistDeclarativeSlotPool slotPool = BlocklistDeclarativeSlotPoolBuilder.builder().setBlockedTaskManagerChecker(blockedTaskManagers::contains).build();
        slotPool.registerNewSlotsListener((DeclarativeSlotPool.NewSlotsListener)notifyNewSlots);
        DefaultDeclarativeSlotPoolTest.increaseRequirementsAndOfferSlotsToSlotPool((DefaultDeclarativeSlotPool)slotPool, ResourceCounter.withResource((ResourceProfile)RESOURCE_PROFILE, (int)1), null, testingTaskExecutorGateway);
        Collection<PhysicalSlot> newSlots = DefaultDeclarativeSlotPoolTest.drainNewSlotService(notifyNewSlots);
        PhysicalSlot offeredSlot = (PhysicalSlot)Iterables.getOnlyElement(newSlots);
        AllocationID allocationID = offeredSlot.getAllocationId();
        slotPool.reserveFreeSlot(allocationID, RESOURCE_PROFILE);
        if (isBlocked) {
            blockedTaskManagers.add(offeredSlot.getTaskManagerLocation().getResourceID());
        }
        ResourceCounter previouslyFulfilledRequirement = slotPool.freeReservedSlot(allocationID, null, 0L);
        Collection<PhysicalSlot> recycledSlots = DefaultDeclarativeSlotPoolTest.drainNewSlotService(notifyNewSlots);
        Assertions.assertThat((Object)previouslyFulfilledRequirement).isEqualTo((Object)ResourceCounter.withResource((ResourceProfile)RESOURCE_PROFILE, (int)1));
        if (isBlocked) {
            Assertions.assertThat(recycledSlots).isEmpty();
            Assertions.assertThat((Comparable)((Comparable)Iterables.getOnlyElement(freeSlotConsumer.drainFreedSlots()))).isEqualTo((Object)allocationID);
            Assertions.assertThat((Collection)slotPool.getAllSlotsInformation()).isEmpty();
        } else {
            Assertions.assertThat((Comparable)((PhysicalSlot)Iterables.getOnlyElement(recycledSlots)).getAllocationId()).isEqualTo((Object)allocationID);
            Assertions.assertThat(freeSlotConsumer.drainFreedSlots()).isEmpty();
            Assertions.assertThat((Comparable)((SlotInfo)Iterables.getOnlyElement((Iterable)slotPool.getAllSlotsInformation())).getAllocationId()).isEqualTo((Object)allocationID);
        }
    }

    private boolean matchSlotToOffers(PhysicalSlot physicalSlot, SlotOffer slotOffer) {
        return physicalSlot.getAllocationId().equals((Object)slotOffer.getAllocationId()) && physicalSlot.getResourceProfile().equals((Object)slotOffer.getResourceProfile()) && physicalSlot.getPhysicalSlotNumber() == slotOffer.getSlotIndex();
    }

    private static class BlocklistDeclarativeSlotPoolBuilder {
        private BlockedTaskManagerChecker blockedTaskManagerChecker = resourceID -> false;

        private BlocklistDeclarativeSlotPoolBuilder() {
        }

        public BlocklistDeclarativeSlotPoolBuilder setBlockedTaskManagerChecker(BlockedTaskManagerChecker blockedTaskManagerChecker) {
            this.blockedTaskManagerChecker = blockedTaskManagerChecker;
            return this;
        }

        public BlocklistDeclarativeSlotPool build() {
            return new BlocklistDeclarativeSlotPool(new JobID(), (AllocatedSlotPool)new DefaultAllocatedSlotPool(), ignored -> {}, this.blockedTaskManagerChecker, Time.seconds((long)20L), Time.seconds((long)20L));
        }

        public static BlocklistDeclarativeSlotPoolBuilder builder() {
            return new BlocklistDeclarativeSlotPoolBuilder();
        }
    }
}

