/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.executiongraph;

import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.flink.runtime.executiongraph.failover.flip1.partitionrelease.RegionPartitionGroupReleaseStrategy;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
import org.apache.flink.runtime.scheduler.strategy.SchedulingTopology;
import org.apache.flink.runtime.scheduler.strategy.TestingSchedulingExecutionVertex;
import org.apache.flink.runtime.scheduler.strategy.TestingSchedulingResultPartition;
import org.apache.flink.runtime.scheduler.strategy.TestingSchedulingTopology;
import org.apache.flink.util.IterableUtils;
import org.apache.flink.util.TestLogger;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.Before;
import org.junit.Test;

public class RegionPartitionGroupReleaseStrategyTest
extends TestLogger {
    private TestingSchedulingTopology testingSchedulingTopology;

    @Before
    public void setUp() throws Exception {
        this.testingSchedulingTopology = new TestingSchedulingTopology();
    }

    @Test
    public void releasePartitionsIfDownstreamRegionIsFinished() {
        List<TestingSchedulingExecutionVertex> producers = this.testingSchedulingTopology.addExecutionVertices().finish();
        List<TestingSchedulingExecutionVertex> consumers = this.testingSchedulingTopology.addExecutionVertices().finish();
        List<TestingSchedulingResultPartition> resultPartitions = this.testingSchedulingTopology.connectPointwise(producers, consumers).finish();
        ExecutionVertexID onlyConsumerVertexId = consumers.get(0).getId();
        IntermediateResultPartitionID onlyResultPartitionId = resultPartitions.get(0).getId();
        RegionPartitionGroupReleaseStrategy regionPartitionGroupReleaseStrategy = new RegionPartitionGroupReleaseStrategy((SchedulingTopology)this.testingSchedulingTopology);
        List<IntermediateResultPartitionID> partitionsToRelease = RegionPartitionGroupReleaseStrategyTest.getReleasablePartitions(regionPartitionGroupReleaseStrategy, onlyConsumerVertexId);
        MatcherAssert.assertThat(partitionsToRelease, (Matcher)Matchers.contains((Object[])new IntermediateResultPartitionID[]{onlyResultPartitionId}));
    }

    @Test
    public void releasePartitionsIfDownstreamRegionWithMultipleOperatorsIsFinished() {
        List<TestingSchedulingExecutionVertex> sourceVertices = this.testingSchedulingTopology.addExecutionVertices().finish();
        List<TestingSchedulingExecutionVertex> intermediateVertices = this.testingSchedulingTopology.addExecutionVertices().finish();
        List<TestingSchedulingExecutionVertex> sinkVertices = this.testingSchedulingTopology.addExecutionVertices().finish();
        List<TestingSchedulingResultPartition> sourceResultPartitions = this.testingSchedulingTopology.connectAllToAll(sourceVertices, intermediateVertices).finish();
        this.testingSchedulingTopology.connectAllToAll(intermediateVertices, sinkVertices).withResultPartitionType(ResultPartitionType.PIPELINED).finish();
        ExecutionVertexID onlyIntermediateVertexId = intermediateVertices.get(0).getId();
        ExecutionVertexID onlySinkVertexId = sinkVertices.get(0).getId();
        IntermediateResultPartitionID onlySourceResultPartitionId = sourceResultPartitions.get(0).getId();
        RegionPartitionGroupReleaseStrategy regionPartitionGroupReleaseStrategy = new RegionPartitionGroupReleaseStrategy((SchedulingTopology)this.testingSchedulingTopology);
        regionPartitionGroupReleaseStrategy.vertexFinished(onlyIntermediateVertexId);
        List<IntermediateResultPartitionID> partitionsToRelease = RegionPartitionGroupReleaseStrategyTest.getReleasablePartitions(regionPartitionGroupReleaseStrategy, onlySinkVertexId);
        MatcherAssert.assertThat(partitionsToRelease, (Matcher)Matchers.contains((Object[])new IntermediateResultPartitionID[]{onlySourceResultPartitionId}));
    }

    @Test
    public void notReleasePartitionsIfDownstreamRegionIsNotFinished() {
        List<TestingSchedulingExecutionVertex> producers = this.testingSchedulingTopology.addExecutionVertices().finish();
        List<TestingSchedulingExecutionVertex> consumers = this.testingSchedulingTopology.addExecutionVertices().withParallelism(2).finish();
        this.testingSchedulingTopology.connectAllToAll(producers, consumers).finish();
        ExecutionVertexID consumerVertex1 = consumers.get(0).getId();
        RegionPartitionGroupReleaseStrategy regionPartitionGroupReleaseStrategy = new RegionPartitionGroupReleaseStrategy((SchedulingTopology)this.testingSchedulingTopology);
        List<IntermediateResultPartitionID> partitionsToRelease = RegionPartitionGroupReleaseStrategyTest.getReleasablePartitions(regionPartitionGroupReleaseStrategy, consumerVertex1);
        MatcherAssert.assertThat(partitionsToRelease, (Matcher)Matchers.is((Matcher)Matchers.empty()));
    }

    @Test
    public void toggleVertexFinishedUnfinished() {
        List<TestingSchedulingExecutionVertex> producers = this.testingSchedulingTopology.addExecutionVertices().finish();
        List<TestingSchedulingExecutionVertex> consumers = this.testingSchedulingTopology.addExecutionVertices().withParallelism(2).finish();
        this.testingSchedulingTopology.connectAllToAll(producers, consumers).finish();
        ExecutionVertexID consumerVertex1 = consumers.get(0).getId();
        ExecutionVertexID consumerVertex2 = consumers.get(1).getId();
        RegionPartitionGroupReleaseStrategy regionPartitionGroupReleaseStrategy = new RegionPartitionGroupReleaseStrategy((SchedulingTopology)this.testingSchedulingTopology);
        regionPartitionGroupReleaseStrategy.vertexFinished(consumerVertex1);
        regionPartitionGroupReleaseStrategy.vertexFinished(consumerVertex2);
        regionPartitionGroupReleaseStrategy.vertexUnfinished(consumerVertex2);
        List<IntermediateResultPartitionID> partitionsToRelease = RegionPartitionGroupReleaseStrategyTest.getReleasablePartitions(regionPartitionGroupReleaseStrategy, consumerVertex1);
        MatcherAssert.assertThat(partitionsToRelease, (Matcher)Matchers.is((Matcher)Matchers.empty()));
    }

    private static List<IntermediateResultPartitionID> getReleasablePartitions(RegionPartitionGroupReleaseStrategy regionPartitionGroupReleaseStrategy, ExecutionVertexID finishedVertex) {
        return regionPartitionGroupReleaseStrategy.vertexFinished(finishedVertex).stream().flatMap(IterableUtils::toStream).collect(Collectors.toList());
    }

    @Test
    public void updateStrategyOnTopologyUpdate() {
        TestingSchedulingExecutionVertex ev1 = this.testingSchedulingTopology.newExecutionVertex();
        RegionPartitionGroupReleaseStrategy regionPartitionReleaseStrategy = new RegionPartitionGroupReleaseStrategy((SchedulingTopology)this.testingSchedulingTopology);
        regionPartitionReleaseStrategy.vertexFinished(ev1.getId());
        TestingSchedulingExecutionVertex ev2 = this.testingSchedulingTopology.newExecutionVertex();
        this.testingSchedulingTopology.connect(ev1, ev2, ResultPartitionType.BLOCKING);
        regionPartitionReleaseStrategy.notifySchedulingTopologyUpdated((SchedulingTopology)this.testingSchedulingTopology, Collections.singletonList(ev2.getId()));
        MatcherAssert.assertThat((Object)regionPartitionReleaseStrategy.isRegionOfVertexFinished(ev1.getId()), (Matcher)Matchers.is((Object)true));
        MatcherAssert.assertThat((Object)regionPartitionReleaseStrategy.isRegionOfVertexFinished(ev2.getId()), (Matcher)Matchers.is((Object)false));
        List<IntermediateResultPartitionID> releasablePartitions = RegionPartitionGroupReleaseStrategyTest.getReleasablePartitions(regionPartitionReleaseStrategy, ev2.getId());
        MatcherAssert.assertThat((Object)regionPartitionReleaseStrategy.isRegionOfVertexFinished(ev2.getId()), (Matcher)Matchers.is((Object)true));
        MatcherAssert.assertThat(releasablePartitions, (Matcher)Matchers.contains((Object[])new IntermediateResultPartitionID[]{ev1.getProducedResults().iterator().next().getId()}));
    }
}

