/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.scheduler.adaptive.allocator;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.runtime.scheduler.adaptive.JobSchedulingPlan;
import org.apache.flink.runtime.scheduler.adaptive.allocator.JobAllocationsInformation;
import org.apache.flink.runtime.scheduler.adaptive.allocator.JobInformation;
import org.apache.flink.runtime.scheduler.adaptive.allocator.StateLocalitySlotAssigner;
import org.apache.flink.runtime.scheduler.adaptive.allocator.TestJobInformation;
import org.apache.flink.runtime.scheduler.adaptive.allocator.TestSlotInfo;
import org.apache.flink.runtime.scheduler.adaptive.allocator.TestVertexInformation;
import org.apache.flink.runtime.scheduler.adaptive.allocator.VertexParallelism;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.jupiter.api.Test;

class StateLocalitySlotAssignerTest {
    StateLocalitySlotAssignerTest() {
    }

    @Test
    void testSlotsAreNotWasted() {
        JobInformation.VertexInformation vertex = StateLocalitySlotAssignerTest.createVertex(2);
        AllocationID alloc1 = new AllocationID();
        AllocationID alloc2 = new AllocationID();
        List<JobAllocationsInformation.VertexAllocationInformation> allocations = Arrays.asList(new JobAllocationsInformation.VertexAllocationInformation(alloc1, vertex.getJobVertexID(), KeyGroupRange.of((int)0, (int)9)), new JobAllocationsInformation.VertexAllocationInformation(alloc2, vertex.getJobVertexID(), KeyGroupRange.of((int)10, (int)19)));
        StateLocalitySlotAssignerTest.assign(vertex, Arrays.asList(alloc1, alloc2), allocations);
    }

    @Test
    void testUpScaling() {
        int oldParallelism = 3;
        int newParallelism = 7;
        int numFreeSlots = 100;
        JobInformation.VertexInformation vertex = StateLocalitySlotAssignerTest.createVertex(7);
        List<AllocationID> allocationIDs = StateLocalitySlotAssignerTest.createAllocationIDS(100);
        ArrayList<JobAllocationsInformation.VertexAllocationInformation> prevAllocations = new ArrayList<JobAllocationsInformation.VertexAllocationInformation>();
        Iterator<AllocationID> iterator = allocationIDs.iterator();
        for (int i = 0; i < 3; ++i) {
            prevAllocations.add(new JobAllocationsInformation.VertexAllocationInformation(iterator.next(), vertex.getJobVertexID(), KeyGroupRangeAssignment.computeKeyGroupRangeForOperatorIndex((int)vertex.getMaxParallelism(), (int)3, (int)i)));
        }
        Collection<JobSchedulingPlan.SlotAssignment> assignments = StateLocalitySlotAssignerTest.assign(vertex, allocationIDs, prevAllocations);
        StateLocalitySlotAssignerTest.verifyAssignments(assignments, 7, (AllocationID[])prevAllocations.stream().map(JobAllocationsInformation.VertexAllocationInformation::getAllocationID).toArray(AllocationID[]::new));
    }

    @Test
    void testDownScaling() {
        int oldParallelism = 5;
        boolean newParallelism = true;
        int numFreeSlots = 100;
        JobInformation.VertexInformation vertex = StateLocalitySlotAssignerTest.createVertex(1);
        List<AllocationID> allocationIDs = StateLocalitySlotAssignerTest.createAllocationIDS(100);
        Iterator<AllocationID> iterator = allocationIDs.iterator();
        AllocationID biggestAllocation = iterator.next();
        ArrayList<JobAllocationsInformation.VertexAllocationInformation> prevAllocations = new ArrayList<JobAllocationsInformation.VertexAllocationInformation>();
        int halfOfKeyGroupRange = vertex.getMaxParallelism() / 2;
        prevAllocations.add(new JobAllocationsInformation.VertexAllocationInformation(biggestAllocation, vertex.getJobVertexID(), KeyGroupRange.of((int)0, (int)(halfOfKeyGroupRange - 1))));
        for (int subtaskIdx = 1; subtaskIdx < 5; ++subtaskIdx) {
            int keyGroup = halfOfKeyGroupRange + subtaskIdx;
            prevAllocations.add(new JobAllocationsInformation.VertexAllocationInformation(iterator.next(), vertex.getJobVertexID(), KeyGroupRange.of((int)keyGroup, (int)keyGroup)));
        }
        Collection<JobSchedulingPlan.SlotAssignment> assignments = StateLocalitySlotAssignerTest.assign(vertex, allocationIDs, prevAllocations);
        StateLocalitySlotAssignerTest.verifyAssignments(assignments, 1, biggestAllocation);
    }

    private static void verifyAssignments(Collection<JobSchedulingPlan.SlotAssignment> assignments, int expectedSize, AllocationID ... mustHaveAllocationID) {
        MatcherAssert.assertThat(assignments, (Matcher)Matchers.hasSize((int)expectedSize));
        MatcherAssert.assertThat(assignments.stream().map(e -> e.getSlotInfo().getAllocationId()).collect(Collectors.toSet()), (Matcher)Matchers.hasItems((Object[])mustHaveAllocationID));
    }

    private static Collection<JobSchedulingPlan.SlotAssignment> assign(JobInformation.VertexInformation vertexInformation, List<AllocationID> allocationIDs, List<JobAllocationsInformation.VertexAllocationInformation> allocations) {
        return new StateLocalitySlotAssigner().assignSlots((JobInformation)new TestJobInformation(Collections.singletonList(vertexInformation)), (Collection)allocationIDs.stream().map(TestSlotInfo::new).collect(Collectors.toList()), new VertexParallelism(Collections.singletonMap(vertexInformation.getJobVertexID(), vertexInformation.getParallelism())), new JobAllocationsInformation(Collections.singletonMap(vertexInformation.getJobVertexID(), allocations)));
    }

    private static JobInformation.VertexInformation createVertex(int parallelism) {
        JobVertexID id = new JobVertexID();
        SlotSharingGroup slotSharingGroup = new SlotSharingGroup();
        slotSharingGroup.addVertexToGroup(id);
        return new TestVertexInformation(id, parallelism, slotSharingGroup);
    }

    private static List<AllocationID> createAllocationIDS(int numFreeSlots) {
        return IntStream.range(0, numFreeSlots).mapToObj(i -> new AllocationID()).collect(Collectors.toList());
    }
}

