/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.scheduler.adaptive.allocator;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.IntStream;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.runtime.jobmaster.LogicalSlot;
import org.apache.flink.runtime.jobmaster.SlotInfo;
import org.apache.flink.runtime.scheduler.TestingPhysicalSlot;
import org.apache.flink.runtime.scheduler.adaptive.JobSchedulingPlan;
import org.apache.flink.runtime.scheduler.adaptive.allocator.FreeSlotFunction;
import org.apache.flink.runtime.scheduler.adaptive.allocator.IsSlotAvailableAndFreeFunction;
import org.apache.flink.runtime.scheduler.adaptive.allocator.JobAllocationsInformation;
import org.apache.flink.runtime.scheduler.adaptive.allocator.JobInformation;
import org.apache.flink.runtime.scheduler.adaptive.allocator.ReserveSlotFunction;
import org.apache.flink.runtime.scheduler.adaptive.allocator.ReservedSlots;
import org.apache.flink.runtime.scheduler.adaptive.allocator.SlotSharingSlotAllocator;
import org.apache.flink.runtime.scheduler.adaptive.allocator.TestJobInformation;
import org.apache.flink.runtime.scheduler.adaptive.allocator.TestSlotInfo;
import org.apache.flink.runtime.scheduler.adaptive.allocator.TestVertexInformation;
import org.apache.flink.runtime.scheduler.adaptive.allocator.VertexParallelism;
import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.topology.VertexID;
import org.apache.flink.runtime.util.ResourceCounter;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

class SlotSharingSlotAllocatorTest {
    private static final FreeSlotFunction TEST_FREE_SLOT_FUNCTION = (a, c, t) -> {};
    private static final ReserveSlotFunction TEST_RESERVE_SLOT_FUNCTION = (allocationId, resourceProfile) -> TestingPhysicalSlot.builder().withAllocationID(allocationId).withResourceProfile(resourceProfile).build();
    private static final IsSlotAvailableAndFreeFunction TEST_IS_SLOT_FREE_FUNCTION = ignored -> true;
    private static final boolean DISABLE_LOCAL_RECOVERY = false;
    private static final SlotSharingGroup slotSharingGroup1 = new SlotSharingGroup();
    private static final SlotSharingGroup slotSharingGroup2 = new SlotSharingGroup();
    private static final JobInformation.VertexInformation vertex1 = new TestVertexInformation(new JobVertexID(), 4, slotSharingGroup1);
    private static final JobInformation.VertexInformation vertex2 = new TestVertexInformation(new JobVertexID(), 2, slotSharingGroup1);
    private static final JobInformation.VertexInformation vertex3 = new TestVertexInformation(new JobVertexID(), 3, slotSharingGroup2);

    SlotSharingSlotAllocatorTest() {
    }

    @Test
    void testCalculateRequiredSlots() {
        SlotSharingSlotAllocator slotAllocator = SlotSharingSlotAllocator.createSlotSharingSlotAllocator((ReserveSlotFunction)TEST_RESERVE_SLOT_FUNCTION, (FreeSlotFunction)TEST_FREE_SLOT_FUNCTION, (IsSlotAvailableAndFreeFunction)TEST_IS_SLOT_FREE_FUNCTION, (boolean)false);
        ResourceCounter resourceCounter = slotAllocator.calculateRequiredSlots(Arrays.asList(vertex1, vertex2, vertex3));
        Assertions.assertThat((Collection)resourceCounter.getResources()).contains((Object[])new ResourceProfile[]{ResourceProfile.UNKNOWN});
        Assertions.assertThat((int)resourceCounter.getResourceCount(ResourceProfile.UNKNOWN)).isEqualTo(Math.max(vertex1.getParallelism(), vertex2.getParallelism()) + vertex3.getParallelism());
    }

    @Test
    void testDetermineParallelismWithMinimumSlots() {
        SlotSharingSlotAllocator slotAllocator = SlotSharingSlotAllocator.createSlotSharingSlotAllocator((ReserveSlotFunction)TEST_RESERVE_SLOT_FUNCTION, (FreeSlotFunction)TEST_FREE_SLOT_FUNCTION, (IsSlotAvailableAndFreeFunction)TEST_IS_SLOT_FREE_FUNCTION, (boolean)false);
        TestJobInformation jobInformation = new TestJobInformation(Arrays.asList(vertex1, vertex2, vertex3));
        VertexParallelism vertexParallelism = (VertexParallelism)slotAllocator.determineParallelism((JobInformation)jobInformation, SlotSharingSlotAllocatorTest.getSlots(2)).get();
        Assertions.assertThat((int)vertexParallelism.getParallelism(vertex1.getJobVertexID())).isOne();
        Assertions.assertThat((int)vertexParallelism.getParallelism(vertex2.getJobVertexID())).isOne();
        Assertions.assertThat((int)vertexParallelism.getParallelism(vertex3.getJobVertexID())).isOne();
    }

    @Test
    void testDetermineParallelismWithManySlots() {
        SlotSharingSlotAllocator slotAllocator = SlotSharingSlotAllocator.createSlotSharingSlotAllocator((ReserveSlotFunction)TEST_RESERVE_SLOT_FUNCTION, (FreeSlotFunction)TEST_FREE_SLOT_FUNCTION, (IsSlotAvailableAndFreeFunction)TEST_IS_SLOT_FREE_FUNCTION, (boolean)false);
        TestJobInformation jobInformation = new TestJobInformation(Arrays.asList(vertex1, vertex2, vertex3));
        VertexParallelism vertexParallelism = (VertexParallelism)slotAllocator.determineParallelism((JobInformation)jobInformation, SlotSharingSlotAllocatorTest.getSlots(50)).get();
        Assertions.assertThat((int)vertexParallelism.getParallelism(vertex1.getJobVertexID())).isEqualTo(vertex1.getParallelism());
        Assertions.assertThat((int)vertexParallelism.getParallelism(vertex2.getJobVertexID())).isEqualTo(vertex2.getParallelism());
        Assertions.assertThat((int)vertexParallelism.getParallelism(vertex3.getJobVertexID())).isEqualTo(vertex3.getParallelism());
    }

    @Test
    void testDetermineParallelismWithVariedParallelism() {
        SlotSharingSlotAllocator slotAllocator = SlotSharingSlotAllocator.createSlotSharingSlotAllocator((ReserveSlotFunction)TEST_RESERVE_SLOT_FUNCTION, (FreeSlotFunction)TEST_FREE_SLOT_FUNCTION, (IsSlotAvailableAndFreeFunction)TEST_IS_SLOT_FREE_FUNCTION, (boolean)false);
        SlotSharingGroup slotSharingGroup1 = new SlotSharingGroup();
        TestVertexInformation vertex11 = new TestVertexInformation(new JobVertexID(), 4, slotSharingGroup1);
        TestVertexInformation vertex12 = new TestVertexInformation(new JobVertexID(), 1, slotSharingGroup1);
        TestVertexInformation vertex2 = new TestVertexInformation(new JobVertexID(), 2, new SlotSharingGroup());
        TestJobInformation testJobInformation = new TestJobInformation(Arrays.asList(vertex11, vertex12, vertex2));
        VertexParallelism vertexParallelism = (VertexParallelism)slotAllocator.determineParallelism((JobInformation)testJobInformation, SlotSharingSlotAllocatorTest.getSlots(vertex11.getParallelism() + vertex2.getParallelism())).get();
        Assertions.assertThat((int)vertexParallelism.getParallelism(vertex11.getJobVertexID())).isEqualTo(vertex11.getParallelism());
        Assertions.assertThat((int)vertexParallelism.getParallelism(vertex12.getJobVertexID())).isEqualTo(vertex12.getParallelism());
        Assertions.assertThat((int)vertexParallelism.getParallelism(vertex2.getJobVertexID())).isEqualTo(vertex2.getParallelism());
    }

    @Test
    void testDetermineParallelismUnsuccessfulWithLessSlotsThanSlotSharingGroups() {
        SlotSharingSlotAllocator slotAllocator = SlotSharingSlotAllocator.createSlotSharingSlotAllocator((ReserveSlotFunction)TEST_RESERVE_SLOT_FUNCTION, (FreeSlotFunction)TEST_FREE_SLOT_FUNCTION, (IsSlotAvailableAndFreeFunction)TEST_IS_SLOT_FREE_FUNCTION, (boolean)false);
        TestJobInformation jobInformation = new TestJobInformation(Arrays.asList(vertex1, vertex2, vertex3));
        Optional vertexParallelism = slotAllocator.determineParallelism((JobInformation)jobInformation, SlotSharingSlotAllocatorTest.getSlots(1));
        Assertions.assertThat((Optional)vertexParallelism).isNotPresent();
    }

    @Test
    void testDetermineParallelismWithPartiallyEqualLowerUpperBound() {
        SlotSharingSlotAllocator slotAllocator = SlotSharingSlotAllocator.createSlotSharingSlotAllocator((ReserveSlotFunction)TEST_RESERVE_SLOT_FUNCTION, (FreeSlotFunction)TEST_FREE_SLOT_FUNCTION, (IsSlotAvailableAndFreeFunction)TEST_IS_SLOT_FREE_FUNCTION, (boolean)false);
        TestVertexInformation vertex1 = new TestVertexInformation(new JobVertexID(), 1, 8, new SlotSharingGroup());
        TestVertexInformation vertex2 = new TestVertexInformation(new JobVertexID(), 10, 10, new SlotSharingGroup());
        TestJobInformation jobInformation = new TestJobInformation(Arrays.asList(vertex1, vertex2));
        Optional vertexParallelism = slotAllocator.determineParallelism((JobInformation)jobInformation, SlotSharingSlotAllocatorTest.getSlots(13));
        Assertions.assertThat((Optional)vertexParallelism).hasValueSatisfying(vertexParallelism1 -> {
            Assertions.assertThat((int)vertexParallelism1.getParallelism(vertex1.getJobVertexID())).isEqualTo(3);
            Assertions.assertThat((int)vertexParallelism1.getParallelism(vertex2.getJobVertexID())).isEqualTo(10);
        });
    }

    @Test
    void testDetermineParallelismWithLowerBoundsInsufficientSlots() {
        SlotSharingSlotAllocator slotAllocator = SlotSharingSlotAllocator.createSlotSharingSlotAllocator((ReserveSlotFunction)TEST_RESERVE_SLOT_FUNCTION, (FreeSlotFunction)TEST_FREE_SLOT_FUNCTION, (IsSlotAvailableAndFreeFunction)TEST_IS_SLOT_FREE_FUNCTION, (boolean)false);
        TestVertexInformation vertex1 = new TestVertexInformation(new JobVertexID(), 4, 4, new SlotSharingGroup());
        TestVertexInformation vertex2 = new TestVertexInformation(new JobVertexID(), 10, 10, new SlotSharingGroup());
        TestJobInformation jobInformation = new TestJobInformation(Arrays.asList(vertex1, vertex2));
        Optional vertexParallelism = slotAllocator.determineParallelism((JobInformation)jobInformation, SlotSharingSlotAllocatorTest.getSlots(3));
        Assertions.assertThat((Optional)vertexParallelism).isNotPresent();
    }

    @Test
    void testDetermineParallelismWithLowerBoundsInsufficientSlotsForPartialVertices() {
        SlotSharingSlotAllocator slotAllocator = SlotSharingSlotAllocator.createSlotSharingSlotAllocator((ReserveSlotFunction)TEST_RESERVE_SLOT_FUNCTION, (FreeSlotFunction)TEST_FREE_SLOT_FUNCTION, (IsSlotAvailableAndFreeFunction)TEST_IS_SLOT_FREE_FUNCTION, (boolean)false);
        SlotSharingGroup slotSharingGroup = new SlotSharingGroup();
        TestVertexInformation vertex1 = new TestVertexInformation(new JobVertexID(), 2, 2, slotSharingGroup);
        TestVertexInformation vertex2 = new TestVertexInformation(new JobVertexID(), 8, 8, slotSharingGroup);
        TestJobInformation jobInformation = new TestJobInformation(Arrays.asList(vertex1, vertex2));
        Optional vertexParallelism = slotAllocator.determineParallelism((JobInformation)jobInformation, SlotSharingSlotAllocatorTest.getSlots(5));
        Assertions.assertThat((Optional)vertexParallelism).isNotPresent();
    }

    @Test
    void testDetermineParallelismWithAllEqualLowerUpperBoundFreSlots() {
        SlotSharingSlotAllocator slotAllocator = SlotSharingSlotAllocator.createSlotSharingSlotAllocator((ReserveSlotFunction)TEST_RESERVE_SLOT_FUNCTION, (FreeSlotFunction)TEST_FREE_SLOT_FUNCTION, (IsSlotAvailableAndFreeFunction)TEST_IS_SLOT_FREE_FUNCTION, (boolean)false);
        TestVertexInformation vertex1 = new TestVertexInformation(new JobVertexID(), 4, 10, new SlotSharingGroup());
        TestVertexInformation vertex2 = new TestVertexInformation(new JobVertexID(), 4, 10, new SlotSharingGroup());
        TestJobInformation jobInformation = new TestJobInformation(Arrays.asList(vertex1, vertex2));
        Optional vertexParallelism = slotAllocator.determineParallelism((JobInformation)jobInformation, SlotSharingSlotAllocatorTest.getSlots(vertex1.getMinParallelism() + vertex2.getMinParallelism()));
        Assertions.assertThat((Optional)vertexParallelism).hasValueSatisfying(vertexParallelism1 -> {
            Assertions.assertThat((int)vertexParallelism1.getParallelism(vertex1.getJobVertexID())).isEqualTo(vertex1.getMinParallelism());
            Assertions.assertThat((int)vertexParallelism1.getParallelism(vertex2.getJobVertexID())).isEqualTo(vertex2.getMinParallelism());
        });
    }

    @Test
    void testDetermineParallelismWithAllEqualLowerUpperBoundManySlots() {
        SlotSharingSlotAllocator slotAllocator = SlotSharingSlotAllocator.createSlotSharingSlotAllocator((ReserveSlotFunction)TEST_RESERVE_SLOT_FUNCTION, (FreeSlotFunction)TEST_FREE_SLOT_FUNCTION, (IsSlotAvailableAndFreeFunction)TEST_IS_SLOT_FREE_FUNCTION, (boolean)false);
        TestVertexInformation vertex1 = new TestVertexInformation(new JobVertexID(), 4, 4, new SlotSharingGroup());
        TestVertexInformation vertex2 = new TestVertexInformation(new JobVertexID(), 10, 10, new SlotSharingGroup());
        TestJobInformation jobInformation = new TestJobInformation(Arrays.asList(vertex1, vertex2));
        Optional vertexParallelism = slotAllocator.determineParallelism((JobInformation)jobInformation, SlotSharingSlotAllocatorTest.getSlots(vertex1.getMinParallelism() + vertex2.getMinParallelism() + 12));
        Assertions.assertThat((Optional)vertexParallelism).hasValueSatisfying(vertexParallelism1 -> {
            Assertions.assertThat((int)vertexParallelism1.getParallelism(vertex1.getJobVertexID())).isEqualTo(vertex1.getMinParallelism());
            Assertions.assertThat((int)vertexParallelism1.getParallelism(vertex2.getJobVertexID())).isEqualTo(vertex2.getMinParallelism());
        });
    }

    @Test
    void testReserveAvailableResources() {
        SlotSharingSlotAllocator slotAllocator = SlotSharingSlotAllocator.createSlotSharingSlotAllocator((ReserveSlotFunction)TEST_RESERVE_SLOT_FUNCTION, (FreeSlotFunction)TEST_FREE_SLOT_FUNCTION, (IsSlotAvailableAndFreeFunction)TEST_IS_SLOT_FREE_FUNCTION, (boolean)false);
        TestJobInformation jobInformation = new TestJobInformation(Arrays.asList(vertex1, vertex2, vertex3));
        JobSchedulingPlan jobSchedulingPlan = (JobSchedulingPlan)slotAllocator.determineParallelismAndCalculateAssignment((JobInformation)jobInformation, SlotSharingSlotAllocatorTest.getSlots(50), JobAllocationsInformation.empty()).get();
        ReservedSlots reservedSlots = (ReservedSlots)slotAllocator.tryReserveResources(jobSchedulingPlan).orElseThrow(() -> new RuntimeException("Expected that reservation succeeds."));
        HashMap<ExecutionVertexID, SlotInfo> expectedAssignments = new HashMap<ExecutionVertexID, SlotInfo>();
        for (JobSchedulingPlan.SlotAssignment slotAssignment : jobSchedulingPlan.getSlotAssignments()) {
            SlotSharingSlotAllocator.ExecutionSlotSharingGroup target = (SlotSharingSlotAllocator.ExecutionSlotSharingGroup)slotAssignment.getTargetAs(SlotSharingSlotAllocator.ExecutionSlotSharingGroup.class);
            for (ExecutionVertexID containedExecutionVertex : target.getContainedExecutionVertices()) {
                expectedAssignments.put(containedExecutionVertex, slotAssignment.getSlotInfo());
            }
        }
        for (Map.Entry entry : expectedAssignments.entrySet()) {
            LogicalSlot assignedSlot = reservedSlots.getSlotFor((ExecutionVertexID)entry.getKey());
            SlotInfo backingSlot = (SlotInfo)entry.getValue();
            Assertions.assertThat((Comparable)assignedSlot.getAllocationId()).isEqualTo((Object)backingSlot.getAllocationId());
        }
    }

    @Test
    void testReserveUnavailableResources() {
        SlotSharingSlotAllocator slotSharingSlotAllocator = SlotSharingSlotAllocator.createSlotSharingSlotAllocator((ReserveSlotFunction)TEST_RESERVE_SLOT_FUNCTION, (FreeSlotFunction)TEST_FREE_SLOT_FUNCTION, ignored -> false, (boolean)false);
        TestJobInformation jobInformation = new TestJobInformation(Arrays.asList(vertex1, vertex2, vertex3));
        JobSchedulingPlan jobSchedulingPlan = (JobSchedulingPlan)slotSharingSlotAllocator.determineParallelismAndCalculateAssignment((JobInformation)jobInformation, SlotSharingSlotAllocatorTest.getSlots(50), JobAllocationsInformation.empty()).get();
        Optional reservedSlots = slotSharingSlotAllocator.tryReserveResources(jobSchedulingPlan);
        Assertions.assertThat((Optional)reservedSlots).isNotPresent();
    }

    @Test
    void testStickyAllocation() {
        HashMap<JobVertexID, List<JobAllocationsInformation.VertexAllocationInformation>> locality = new HashMap<JobVertexID, List<JobAllocationsInformation.VertexAllocationInformation>>();
        AllocationID allocation1 = new AllocationID();
        locality.put(vertex1.getJobVertexID(), Collections.singletonList(new JobAllocationsInformation.VertexAllocationInformation(allocation1, vertex1.getJobVertexID(), KeyGroupRange.of((int)1, (int)100))));
        locality.put(vertex2.getJobVertexID(), Collections.singletonList(new JobAllocationsInformation.VertexAllocationInformation(allocation1, vertex2.getJobVertexID(), KeyGroupRange.of((int)1, (int)100))));
        AllocationID allocation2 = new AllocationID();
        locality.put(vertex3.getJobVertexID(), Collections.singletonList(new JobAllocationsInformation.VertexAllocationInformation(allocation2, vertex3.getJobVertexID(), KeyGroupRange.of((int)1, (int)100))));
        ArrayList<TestSlotInfo> freeSlots = new ArrayList<TestSlotInfo>();
        IntStream.range(0, 10).forEach(i -> freeSlots.add(new TestSlotInfo(new AllocationID())));
        freeSlots.add(new TestSlotInfo(allocation1));
        freeSlots.add(new TestSlotInfo(allocation2));
        JobSchedulingPlan schedulingPlan = (JobSchedulingPlan)SlotSharingSlotAllocator.createSlotSharingSlotAllocator((allocationId, resourceProfile) -> TestingPhysicalSlot.builder().build(), (allocationID, cause, ts) -> {}, id -> false, (boolean)true).determineParallelismAndCalculateAssignment((JobInformation)new TestJobInformation(Arrays.asList(vertex1, vertex2, vertex3)), freeSlots, new JobAllocationsInformation(locality)).get();
        HashMap<AllocationID, Set> allocated = new HashMap<AllocationID, Set>();
        for (JobSchedulingPlan.SlotAssignment assignment : schedulingPlan.getSlotAssignments()) {
            SlotSharingSlotAllocator.ExecutionSlotSharingGroup target = (SlotSharingSlotAllocator.ExecutionSlotSharingGroup)assignment.getTargetAs(SlotSharingSlotAllocator.ExecutionSlotSharingGroup.class);
            Set set = allocated.computeIfAbsent(assignment.getSlotInfo().getAllocationId(), ign -> new HashSet());
            for (ExecutionVertexID id2 : target.getContainedExecutionVertices()) {
                set.add(id2.getJobVertexId());
            }
        }
        Assertions.assertThat((Collection)((Collection)allocated.get(allocation1))).contains((Object[])new VertexID[]{vertex1.getJobVertexID()});
        Assertions.assertThat((Collection)((Collection)allocated.get(allocation1))).contains((Object[])new VertexID[]{vertex2.getJobVertexID()});
        Assertions.assertThat((Collection)((Collection)allocated.get(allocation2))).contains((Object[])new VertexID[]{vertex3.getJobVertexID()});
    }

    private static Collection<SlotInfo> getSlots(int count) {
        ArrayList<SlotInfo> slotInfo = new ArrayList<SlotInfo>();
        for (int i = 0; i < count; ++i) {
            slotInfo.add(new TestSlotInfo());
        }
        return slotInfo;
    }
}

