/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.scheduler.strategy;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.scheduler.SchedulerOperations;
import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
import org.apache.flink.runtime.scheduler.strategy.ResultPartitionState;
import org.apache.flink.runtime.scheduler.strategy.SchedulingExecutionVertex;
import org.apache.flink.runtime.scheduler.strategy.SchedulingTopology;
import org.apache.flink.runtime.scheduler.strategy.StrategyTestUtil;
import org.apache.flink.runtime.scheduler.strategy.TestingInputConsumableDecider;
import org.apache.flink.runtime.scheduler.strategy.TestingSchedulerOperations;
import org.apache.flink.runtime.scheduler.strategy.TestingSchedulingExecutionVertex;
import org.apache.flink.runtime.scheduler.strategy.TestingSchedulingTopology;
import org.apache.flink.runtime.scheduler.strategy.VertexwiseSchedulingStrategy;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

class VertexwiseSchedulingStrategyTest {
    private TestingSchedulerOperations testingSchedulerOperation;
    private static final int PARALLELISM = 2;
    private TestingSchedulingTopology testingSchedulingTopology;
    private TestingInputConsumableDecider inputConsumableDecider;
    private List<TestingSchedulingExecutionVertex> source;
    private List<TestingSchedulingExecutionVertex> map;
    private List<TestingSchedulingExecutionVertex> sink;

    VertexwiseSchedulingStrategyTest() {
    }

    @BeforeEach
    void setUp() {
        this.testingSchedulerOperation = new TestingSchedulerOperations();
        this.inputConsumableDecider = new TestingInputConsumableDecider();
        this.buildTopology();
    }

    private void buildTopology() {
        this.testingSchedulingTopology = new TestingSchedulingTopology();
        this.source = this.testingSchedulingTopology.addExecutionVertices().withParallelism(2).finish();
        this.map = this.testingSchedulingTopology.addExecutionVertices().withParallelism(2).finish();
        this.sink = this.testingSchedulingTopology.addExecutionVertices().withParallelism(2).finish();
        this.testingSchedulingTopology.connectPointwise(this.source, this.map).withResultPartitionState(ResultPartitionState.CREATED).withResultPartitionType(ResultPartitionType.BLOCKING).finish();
        this.testingSchedulingTopology.connectAllToAll(this.map, this.sink).withResultPartitionState(ResultPartitionState.CREATED).withResultPartitionType(ResultPartitionType.BLOCKING).finish();
    }

    @Test
    void testStartScheduling() {
        VertexwiseSchedulingStrategy schedulingStrategy = this.createSchedulingStrategy(this.testingSchedulingTopology);
        ArrayList<List<TestingSchedulingExecutionVertex>> expectedScheduledVertices = new ArrayList<List<TestingSchedulingExecutionVertex>>();
        expectedScheduledVertices.add(Collections.singletonList(this.source.get(0)));
        expectedScheduledVertices.add(Collections.singletonList(this.source.get(1)));
        this.inputConsumableDecider.addSourceVertices(new HashSet<SchedulingExecutionVertex>(this.source));
        schedulingStrategy.startScheduling();
        StrategyTestUtil.assertLatestScheduledVerticesAreEqualTo(expectedScheduledVertices, this.testingSchedulerOperation);
    }

    @Test
    void testRestartTasks() {
        VertexwiseSchedulingStrategy schedulingStrategy = this.createSchedulingStrategy(this.testingSchedulingTopology);
        this.inputConsumableDecider.addSourceVertices(new HashSet<SchedulingExecutionVertex>(this.source));
        Set verticesToRestart = Stream.of(this.source, this.map, this.sink).flatMap(Collection::stream).map(TestingSchedulingExecutionVertex::getId).collect(Collectors.toSet());
        schedulingStrategy.restartTasks(verticesToRestart);
        ArrayList<List<TestingSchedulingExecutionVertex>> expectedScheduledVertices = new ArrayList<List<TestingSchedulingExecutionVertex>>();
        expectedScheduledVertices.add(Collections.singletonList(this.source.get(0)));
        expectedScheduledVertices.add(Collections.singletonList(this.source.get(1)));
        StrategyTestUtil.assertLatestScheduledVerticesAreEqualTo(expectedScheduledVertices, this.testingSchedulerOperation);
    }

    @Test
    void testOnExecutionStateChangeToFinished() {
        VertexwiseSchedulingStrategy schedulingStrategy = this.createSchedulingStrategy(this.testingSchedulingTopology);
        this.inputConsumableDecider.addSourceVertices(new HashSet<SchedulingExecutionVertex>(this.source));
        schedulingStrategy.startScheduling();
        Assertions.assertThat(this.testingSchedulerOperation.getScheduledVertices()).hasSize(2);
        TestingSchedulingExecutionVertex source1 = this.source.get(0);
        this.inputConsumableDecider.setInputConsumable(this.map.get(0));
        schedulingStrategy.onExecutionStateChange(source1.getId(), ExecutionState.FINISHED);
        Assertions.assertThat(this.testingSchedulerOperation.getScheduledVertices()).hasSize(3);
        TestingSchedulingExecutionVertex source2 = this.source.get(1);
        this.inputConsumableDecider.setInputConsumable(this.map.get(1));
        schedulingStrategy.onExecutionStateChange(source2.getId(), ExecutionState.FINISHED);
        Assertions.assertThat(this.testingSchedulerOperation.getScheduledVertices()).hasSize(4);
        TestingSchedulingExecutionVertex map1 = this.map.get(0);
        schedulingStrategy.onExecutionStateChange(map1.getId(), ExecutionState.FINISHED);
        Assertions.assertThat((Comparable)((ExecutionVertexID)this.inputConsumableDecider.getLastExecutionToDecideInputConsumable().getId()).getJobVertexId()).isEqualTo((Object)this.sink.get(0).getId().getJobVertexId());
        Assertions.assertThat(this.testingSchedulerOperation.getScheduledVertices()).hasSize(4);
        TestingSchedulingExecutionVertex map2 = this.map.get(1);
        this.inputConsumableDecider.setInputConsumable(this.sink.get(0));
        this.inputConsumableDecider.setInputConsumable(this.sink.get(1));
        schedulingStrategy.onExecutionStateChange(map2.getId(), ExecutionState.FINISHED);
        Assertions.assertThat(this.testingSchedulerOperation.getScheduledVertices()).hasSize(6);
        ArrayList<List<TestingSchedulingExecutionVertex>> expectedScheduledVertices = new ArrayList<List<TestingSchedulingExecutionVertex>>();
        expectedScheduledVertices.add(Collections.singletonList(this.source.get(0)));
        expectedScheduledVertices.add(Collections.singletonList(this.source.get(1)));
        expectedScheduledVertices.add(Collections.singletonList(this.map.get(0)));
        expectedScheduledVertices.add(Collections.singletonList(this.map.get(1)));
        expectedScheduledVertices.add(Collections.singletonList(this.sink.get(0)));
        expectedScheduledVertices.add(Collections.singletonList(this.sink.get(1)));
        StrategyTestUtil.assertLatestScheduledVerticesAreEqualTo(expectedScheduledVertices, this.testingSchedulerOperation);
    }

    @Test
    void testScheduleDownstreamOfHybridEdge() {
        TestingSchedulingTopology topology = new TestingSchedulingTopology();
        List<TestingSchedulingExecutionVertex> producers = topology.addExecutionVertices().withParallelism(2).finish();
        List<TestingSchedulingExecutionVertex> consumers = topology.addExecutionVertices().withParallelism(2).finish();
        topology.connectAllToAll(producers, consumers).withResultPartitionType(ResultPartitionType.HYBRID_FULL).finish();
        VertexwiseSchedulingStrategy schedulingStrategy = this.createSchedulingStrategy(topology);
        this.inputConsumableDecider.addSourceVertices(new HashSet<SchedulingExecutionVertex>(producers));
        this.inputConsumableDecider.setInputConsumable(consumers.get(0));
        this.inputConsumableDecider.setInputConsumable(consumers.get(1));
        schedulingStrategy.startScheduling();
        StrategyTestUtil.assertLatestScheduledVerticesAreEqualTo(Arrays.asList(Collections.singletonList(producers.get(0)), Collections.singletonList(producers.get(1)), Collections.singletonList(consumers.get(0)), Collections.singletonList(consumers.get(1))), this.testingSchedulerOperation);
    }

    @Test
    void testUpdateStrategyWithAllToAll() {
        this.testUpdateStrategyOnTopologyUpdate(true);
    }

    @Test
    void testUpdateStrategyWithPointWise() {
        this.testUpdateStrategyOnTopologyUpdate(false);
    }

    private void testUpdateStrategyOnTopologyUpdate(boolean allToAll) {
        TestingSchedulingTopology topology = new TestingSchedulingTopology();
        List<TestingSchedulingExecutionVertex> producers = topology.addExecutionVertices().withParallelism(2).finish();
        VertexwiseSchedulingStrategy schedulingStrategy = this.createSchedulingStrategy(topology);
        this.inputConsumableDecider.addSourceVertices(new HashSet<SchedulingExecutionVertex>(producers));
        schedulingStrategy.startScheduling();
        List<TestingSchedulingExecutionVertex> consumers = topology.addExecutionVertices().withParallelism(2).finish();
        schedulingStrategy.onExecutionStateChange(producers.get(0).getId(), ExecutionState.FINISHED);
        if (allToAll) {
            topology.connectAllToAll(producers, consumers).withResultPartitionType(ResultPartitionType.BLOCKING).finish();
        } else {
            topology.connectPointwise(producers, consumers).withResultPartitionType(ResultPartitionType.BLOCKING).finish();
        }
        schedulingStrategy.notifySchedulingTopologyUpdated((SchedulingTopology)topology, consumers.stream().map(TestingSchedulingExecutionVertex::getId).collect(Collectors.toList()));
        this.inputConsumableDecider.setInputConsumable(consumers.get(0));
        this.inputConsumableDecider.setInputConsumable(consumers.get(1));
        schedulingStrategy.onExecutionStateChange(producers.get(1).getId(), ExecutionState.FINISHED);
        StrategyTestUtil.assertLatestScheduledVerticesAreEqualTo(Arrays.asList(Collections.singletonList(producers.get(0)), Collections.singletonList(producers.get(1)), Collections.singletonList(consumers.get(0)), Collections.singletonList(consumers.get(1))), this.testingSchedulerOperation);
    }

    VertexwiseSchedulingStrategy createSchedulingStrategy(SchedulingTopology schedulingTopology) {
        return new VertexwiseSchedulingStrategy((SchedulerOperations)this.testingSchedulerOperation, schedulingTopology, (ignore1, ignore2, ignore3) -> this.inputConsumableDecider);
    }
}

