/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.scheduler.adaptivebatch;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.flink.configuration.BatchExecutionOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.runtime.executiongraph.ExecutionVertexInputInfo;
import org.apache.flink.runtime.executiongraph.IndexRange;
import org.apache.flink.runtime.executiongraph.JobVertexInputInfo;
import org.apache.flink.runtime.executiongraph.ParallelismAndInputInfos;
import org.apache.flink.runtime.executiongraph.ResultPartitionBytes;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.scheduler.adaptivebatch.AllToAllBlockingResultInfo;
import org.apache.flink.runtime.scheduler.adaptivebatch.BlockingResultInfo;
import org.apache.flink.runtime.scheduler.adaptivebatch.DefaultVertexParallelismAndInputInfosDecider;
import org.apache.flink.runtime.scheduler.adaptivebatch.PointwiseBlockingResultInfo;
import org.apache.flink.shaded.guava31.com.google.common.collect.Iterables;
import org.apache.flink.util.Preconditions;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

class DefaultVertexParallelismAndInputInfosDeciderTest {
    private static final long BYTE_256_MB = 0x10000000L;
    private static final long BYTE_512_MB = 0x20000000L;
    private static final long BYTE_1_GB = 0x40000000L;
    private static final long BYTE_8_GB = 0x200000000L;
    private static final long BYTE_1_TB = 0x10000000000L;
    private static final int MAX_PARALLELISM = 100;
    private static final int MIN_PARALLELISM = 3;
    private static final int VERTEX_MAX_PARALLELISM = 256;
    private static final int DEFAULT_SOURCE_PARALLELISM = 10;
    private static final long DATA_VOLUME_PER_TASK = 0x40000000L;

    DefaultVertexParallelismAndInputInfosDeciderTest() {
    }

    @Test
    void testDecideParallelism() {
        BlockingResultInfo resultInfo1 = DefaultVertexParallelismAndInputInfosDeciderTest.createFromBroadcastResult(0x10000000L);
        BlockingResultInfo resultInfo2 = DefaultVertexParallelismAndInputInfosDeciderTest.createFromNonBroadcastResult(0x210000000L);
        int parallelism = DefaultVertexParallelismAndInputInfosDeciderTest.createDeciderAndDecideParallelism(Arrays.asList(resultInfo1, resultInfo2));
        Assertions.assertThat((int)parallelism).isEqualTo(9);
    }

    @Test
    void testInitiallyNormalizedParallelismIsLargerThanMaxParallelism() {
        BlockingResultInfo resultInfo1 = DefaultVertexParallelismAndInputInfosDeciderTest.createFromBroadcastResult(0x10000000L);
        BlockingResultInfo resultInfo2 = DefaultVertexParallelismAndInputInfosDeciderTest.createFromNonBroadcastResult(0x10200000000L);
        int parallelism = DefaultVertexParallelismAndInputInfosDeciderTest.createDeciderAndDecideParallelism(Arrays.asList(resultInfo1, resultInfo2));
        Assertions.assertThat((int)parallelism).isEqualTo(100);
    }

    @Test
    void testInitiallyNormalizedParallelismIsSmallerThanMinParallelism() {
        BlockingResultInfo resultInfo1 = DefaultVertexParallelismAndInputInfosDeciderTest.createFromBroadcastResult(0x10000000L);
        BlockingResultInfo resultInfo2 = DefaultVertexParallelismAndInputInfosDeciderTest.createFromNonBroadcastResult(0x20000000L);
        int parallelism = DefaultVertexParallelismAndInputInfosDeciderTest.createDeciderAndDecideParallelism(Arrays.asList(resultInfo1, resultInfo2));
        Assertions.assertThat((int)parallelism).isEqualTo(3);
    }

    @Test
    void testNonBroadcastBytesCanNotDividedEvenly() {
        BlockingResultInfo resultInfo1 = DefaultVertexParallelismAndInputInfosDeciderTest.createFromBroadcastResult(0x20000000L);
        BlockingResultInfo resultInfo2 = DefaultVertexParallelismAndInputInfosDeciderTest.createFromNonBroadcastResult(0x210000000L);
        int parallelism = DefaultVertexParallelismAndInputInfosDeciderTest.createDeciderAndDecideParallelism(Arrays.asList(resultInfo1, resultInfo2));
        Assertions.assertThat((int)parallelism).isEqualTo(9);
    }

    @Test
    void testDecideParallelismWithMaxSubpartitionLimitation() {
        TestingBlockingResultInfo resultInfo1 = new TestingBlockingResultInfo(false, 1L, 1024, 1024);
        TestingBlockingResultInfo resultInfo2 = new TestingBlockingResultInfo(false, 1L, 512, 512);
        int parallelism = DefaultVertexParallelismAndInputInfosDeciderTest.createDeciderAndDecideParallelism(1, 100, 0x10000000L, Arrays.asList(resultInfo1, resultInfo2));
        Assertions.assertThat((int)parallelism).isEqualTo(32);
    }

    @Test
    void testAllEdgesAllToAll() {
        AllToAllBlockingResultInfo resultInfo1 = this.createAllToAllBlockingResultInfo(new long[]{10L, 15L, 13L, 12L, 1L, 10L, 8L, 20L, 12L, 17L});
        AllToAllBlockingResultInfo resultInfo2 = this.createAllToAllBlockingResultInfo(new long[]{8L, 12L, 21L, 9L, 13L, 7L, 19L, 13L, 14L, 5L});
        ParallelismAndInputInfos parallelismAndInputInfos = DefaultVertexParallelismAndInputInfosDeciderTest.createDeciderAndDecideParallelismAndInputInfos(1, 10, 60L, Arrays.asList(resultInfo1, resultInfo2));
        Assertions.assertThat((int)parallelismAndInputInfos.getParallelism()).isEqualTo(5);
        Assertions.assertThat((Map)parallelismAndInputInfos.getJobVertexInputInfos()).hasSize(2);
        List<IndexRange> subpartitionRanges = Arrays.asList(new IndexRange(0, 1), new IndexRange(2, 3), new IndexRange(4, 6), new IndexRange(7, 8), new IndexRange(9, 9));
        DefaultVertexParallelismAndInputInfosDeciderTest.checkAllToAllJobVertexInputInfo((JobVertexInputInfo)parallelismAndInputInfos.getJobVertexInputInfos().get(resultInfo1.getResultId()), subpartitionRanges);
        DefaultVertexParallelismAndInputInfosDeciderTest.checkAllToAllJobVertexInputInfo((JobVertexInputInfo)parallelismAndInputInfos.getJobVertexInputInfos().get(resultInfo2.getResultId()), subpartitionRanges);
    }

    @Test
    void testAllEdgesAllToAllAndDecidedParallelismIsMaxParallelism() {
        AllToAllBlockingResultInfo resultInfo = this.createAllToAllBlockingResultInfo(new long[]{10L, 15L, 13L, 12L, 1L, 10L, 8L, 20L, 12L, 17L});
        ParallelismAndInputInfos parallelismAndInputInfos = DefaultVertexParallelismAndInputInfosDeciderTest.createDeciderAndDecideParallelismAndInputInfos(1, 2, 10L, Collections.singletonList(resultInfo));
        Assertions.assertThat((int)parallelismAndInputInfos.getParallelism()).isEqualTo(2);
        Assertions.assertThat((Map)parallelismAndInputInfos.getJobVertexInputInfos()).hasSize(1);
        DefaultVertexParallelismAndInputInfosDeciderTest.checkAllToAllJobVertexInputInfo((JobVertexInputInfo)Iterables.getOnlyElement(parallelismAndInputInfos.getJobVertexInputInfos().values()), Arrays.asList(new IndexRange(0, 5), new IndexRange(6, 9)));
    }

    @Test
    void testAllEdgesAllToAllAndDecidedParallelismIsMinParallelism() {
        AllToAllBlockingResultInfo resultInfo = this.createAllToAllBlockingResultInfo(new long[]{10L, 15L, 13L, 12L, 1L, 10L, 8L, 20L, 12L, 17L});
        ParallelismAndInputInfos parallelismAndInputInfos = DefaultVertexParallelismAndInputInfosDeciderTest.createDeciderAndDecideParallelismAndInputInfos(4, 10, 1000L, Collections.singletonList(resultInfo));
        Assertions.assertThat((int)parallelismAndInputInfos.getParallelism()).isEqualTo(4);
        Assertions.assertThat((Map)parallelismAndInputInfos.getJobVertexInputInfos()).hasSize(1);
        DefaultVertexParallelismAndInputInfosDeciderTest.checkAllToAllJobVertexInputInfo((JobVertexInputInfo)Iterables.getOnlyElement(parallelismAndInputInfos.getJobVertexInputInfos().values()), Arrays.asList(new IndexRange(0, 1), new IndexRange(2, 5), new IndexRange(6, 7), new IndexRange(8, 9)));
    }

    @Test
    void testFallBackToEvenlyDistributeSubpartitions() {
        AllToAllBlockingResultInfo resultInfo = this.createAllToAllBlockingResultInfo(new long[]{10L, 1L, 10L, 1L, 10L, 1L, 10L, 1L, 10L, 1L});
        ParallelismAndInputInfos parallelismAndInputInfos = DefaultVertexParallelismAndInputInfosDeciderTest.createDeciderAndDecideParallelismAndInputInfos(8, 8, 10L, Collections.singletonList(resultInfo));
        Assertions.assertThat((int)parallelismAndInputInfos.getParallelism()).isEqualTo(8);
        Assertions.assertThat((Map)parallelismAndInputInfos.getJobVertexInputInfos()).hasSize(1);
        DefaultVertexParallelismAndInputInfosDeciderTest.checkAllToAllJobVertexInputInfo((JobVertexInputInfo)Iterables.getOnlyElement(parallelismAndInputInfos.getJobVertexInputInfos().values()), Arrays.asList(new IndexRange(0, 0), new IndexRange(1, 1), new IndexRange(2, 2), new IndexRange(3, 4), new IndexRange(5, 5), new IndexRange(6, 6), new IndexRange(7, 7), new IndexRange(8, 9)));
    }

    @Test
    void testAllEdgesAllToAllAndOneIsBroadcast() {
        AllToAllBlockingResultInfo resultInfo1 = this.createAllToAllBlockingResultInfo(new long[]{10L, 15L, 13L, 12L, 1L, 10L, 8L, 20L, 12L, 17L});
        AllToAllBlockingResultInfo resultInfo2 = this.createAllToAllBlockingResultInfo(new long[]{10L}, true);
        ParallelismAndInputInfos parallelismAndInputInfos = DefaultVertexParallelismAndInputInfosDeciderTest.createDeciderAndDecideParallelismAndInputInfos(1, 10, 60L, Arrays.asList(resultInfo1, resultInfo2));
        Assertions.assertThat((int)parallelismAndInputInfos.getParallelism()).isEqualTo(3);
        Assertions.assertThat((Map)parallelismAndInputInfos.getJobVertexInputInfos()).hasSize(2);
        DefaultVertexParallelismAndInputInfosDeciderTest.checkAllToAllJobVertexInputInfo((JobVertexInputInfo)parallelismAndInputInfos.getJobVertexInputInfos().get(resultInfo1.getResultId()), Arrays.asList(new IndexRange(0, 4), new IndexRange(5, 8), new IndexRange(9, 9)));
        DefaultVertexParallelismAndInputInfosDeciderTest.checkAllToAllJobVertexInputInfo((JobVertexInputInfo)parallelismAndInputInfos.getJobVertexInputInfos().get(resultInfo2.getResultId()), Arrays.asList(new IndexRange(0, 0), new IndexRange(0, 0), new IndexRange(0, 0)));
    }

    @Test
    void testAllEdgesBroadcast() {
        AllToAllBlockingResultInfo resultInfo1 = this.createAllToAllBlockingResultInfo(new long[]{10L}, true);
        AllToAllBlockingResultInfo resultInfo2 = this.createAllToAllBlockingResultInfo(new long[]{10L}, true);
        ParallelismAndInputInfos parallelismAndInputInfos = DefaultVertexParallelismAndInputInfosDeciderTest.createDeciderAndDecideParallelismAndInputInfos(1, 10, 60L, Arrays.asList(resultInfo1, resultInfo2));
        Assertions.assertThat((int)parallelismAndInputInfos.getParallelism()).isOne();
        Assertions.assertThat((Map)parallelismAndInputInfos.getJobVertexInputInfos()).hasSize(2);
        DefaultVertexParallelismAndInputInfosDeciderTest.checkAllToAllJobVertexInputInfo((JobVertexInputInfo)parallelismAndInputInfos.getJobVertexInputInfos().get(resultInfo1.getResultId()), Collections.singletonList(new IndexRange(0, 0)));
        DefaultVertexParallelismAndInputInfosDeciderTest.checkAllToAllJobVertexInputInfo((JobVertexInputInfo)parallelismAndInputInfos.getJobVertexInputInfos().get(resultInfo2.getResultId()), Collections.singletonList(new IndexRange(0, 0)));
    }

    @Test
    void testHavePointwiseEdges() {
        AllToAllBlockingResultInfo resultInfo1 = this.createAllToAllBlockingResultInfo(new long[]{10L, 15L, 13L, 12L, 1L, 10L, 8L, 20L, 12L, 17L});
        PointwiseBlockingResultInfo resultInfo2 = this.createPointwiseBlockingResultInfo({8L, 12L, 21L, 9L, 13L}, {7L, 19L, 13L, 14L, 5L});
        ParallelismAndInputInfos parallelismAndInputInfos = DefaultVertexParallelismAndInputInfosDeciderTest.createDeciderAndDecideParallelismAndInputInfos(1, 10, 60L, Arrays.asList(resultInfo1, resultInfo2));
        Assertions.assertThat((int)parallelismAndInputInfos.getParallelism()).isEqualTo(4);
        Assertions.assertThat((Map)parallelismAndInputInfos.getJobVertexInputInfos()).hasSize(2);
        DefaultVertexParallelismAndInputInfosDeciderTest.checkAllToAllJobVertexInputInfo((JobVertexInputInfo)parallelismAndInputInfos.getJobVertexInputInfos().get(resultInfo1.getResultId()), Arrays.asList(new IndexRange(0, 1), new IndexRange(2, 4), new IndexRange(5, 6), new IndexRange(7, 9)));
        DefaultVertexParallelismAndInputInfosDeciderTest.checkPointwiseJobVertexInputInfo((JobVertexInputInfo)parallelismAndInputInfos.getJobVertexInputInfos().get(resultInfo2.getResultId()), Arrays.asList(new IndexRange(0, 0), new IndexRange(0, 0), new IndexRange(1, 1), new IndexRange(1, 1)), Arrays.asList(new IndexRange(0, 1), new IndexRange(2, 4), new IndexRange(0, 1), new IndexRange(2, 4)));
    }

    @Test
    void testParallelismAlreadyDecided() {
        DefaultVertexParallelismAndInputInfosDecider decider = DefaultVertexParallelismAndInputInfosDeciderTest.createDecider(3, 100, 0x40000000L);
        AllToAllBlockingResultInfo allToAllBlockingResultInfo = this.createAllToAllBlockingResultInfo(new long[]{10L, 15L, 13L, 12L, 1L, 10L, 8L, 20L, 12L, 17L});
        ParallelismAndInputInfos parallelismAndInputInfos = decider.decideParallelismAndInputInfosForVertex(new JobVertexID(), Collections.singletonList(allToAllBlockingResultInfo), 3, 3, 100);
        Assertions.assertThat((int)parallelismAndInputInfos.getParallelism()).isEqualTo(3);
        Assertions.assertThat((Map)parallelismAndInputInfos.getJobVertexInputInfos()).hasSize(1);
        DefaultVertexParallelismAndInputInfosDeciderTest.checkAllToAllJobVertexInputInfo((JobVertexInputInfo)Iterables.getOnlyElement(parallelismAndInputInfos.getJobVertexInputInfos().values()), Arrays.asList(new IndexRange(0, 2), new IndexRange(3, 5), new IndexRange(6, 9)));
    }

    @Test
    void testSourceJobVertex() {
        ParallelismAndInputInfos parallelismAndInputInfos = DefaultVertexParallelismAndInputInfosDeciderTest.createDeciderAndDecideParallelismAndInputInfos(3, 100, 0x40000000L, Collections.emptyList());
        Assertions.assertThat((int)parallelismAndInputInfos.getParallelism()).isEqualTo(10);
        Assertions.assertThat((Map)parallelismAndInputInfos.getJobVertexInputInfos()).isEmpty();
    }

    @Test
    void testDynamicSourceParallelismWithUpstreamInputs() {
        DefaultVertexParallelismAndInputInfosDecider decider = DefaultVertexParallelismAndInputInfosDeciderTest.createDecider(3, 100, 0x40000000L);
        AllToAllBlockingResultInfo allToAllBlockingResultInfo = this.createAllToAllBlockingResultInfo(new long[]{10L, 15L, 13L, 12L, 1L, 10L, 8L, 20L, 12L, 17L});
        int dynamicSourceParallelism = 4;
        ParallelismAndInputInfos parallelismAndInputInfos = decider.decideParallelismAndInputInfosForVertex(new JobVertexID(), Collections.singletonList(allToAllBlockingResultInfo), -1, dynamicSourceParallelism, 100);
        Assertions.assertThat((int)parallelismAndInputInfos.getParallelism()).isEqualTo(4);
        Assertions.assertThat((Map)parallelismAndInputInfos.getJobVertexInputInfos()).hasSize(1);
        DefaultVertexParallelismAndInputInfosDeciderTest.checkAllToAllJobVertexInputInfo((JobVertexInputInfo)Iterables.getOnlyElement(parallelismAndInputInfos.getJobVertexInputInfos().values()), Arrays.asList(new IndexRange(0, 1), new IndexRange(2, 5), new IndexRange(6, 7), new IndexRange(8, 9)));
    }

    @Test
    void testEvenlyDistributeDataWithMaxSubpartitionLimitation() {
        long[] subpartitionBytes = new long[1024];
        Arrays.fill(subpartitionBytes, 1L);
        AllToAllBlockingResultInfo resultInfo = new AllToAllBlockingResultInfo(new IntermediateDataSetID(), 1024, 1024, false);
        for (int i = 0; i < 1024; ++i) {
            resultInfo.recordPartitionInfo(i, new ResultPartitionBytes(subpartitionBytes));
        }
        ParallelismAndInputInfos parallelismAndInputInfos = DefaultVertexParallelismAndInputInfosDeciderTest.createDeciderAndDecideParallelismAndInputInfos(1, 100, 0x10000000L, Collections.singletonList(resultInfo));
        Assertions.assertThat((int)parallelismAndInputInfos.getParallelism()).isEqualTo(32);
        ArrayList<IndexRange> subpartitionRanges = new ArrayList<IndexRange>();
        for (int i = 0; i < 32; ++i) {
            subpartitionRanges.add(new IndexRange(i * 32, (i + 1) * 32 - 1));
        }
        DefaultVertexParallelismAndInputInfosDeciderTest.checkAllToAllJobVertexInputInfo((JobVertexInputInfo)Iterables.getOnlyElement(parallelismAndInputInfos.getJobVertexInputInfos().values()), new IndexRange(0, 1023), subpartitionRanges);
    }

    @Test
    void testComputeSourceParallelismUpperBound() {
        Configuration configuration = new Configuration();
        configuration.setInteger(BatchExecutionOptions.ADAPTIVE_AUTO_PARALLELISM_DEFAULT_SOURCE_PARALLELISM, 10);
        DefaultVertexParallelismAndInputInfosDecider vertexParallelismAndInputInfosDecider = DefaultVertexParallelismAndInputInfosDecider.from((int)100, (Configuration)configuration);
        Assertions.assertThat((int)vertexParallelismAndInputInfosDecider.computeSourceParallelismUpperBound(new JobVertexID(), 256)).isEqualTo(10);
    }

    @Test
    void testComputeSourceParallelismUpperBoundFallback() {
        Configuration configuration = new Configuration();
        DefaultVertexParallelismAndInputInfosDecider vertexParallelismAndInputInfosDecider = DefaultVertexParallelismAndInputInfosDecider.from((int)100, (Configuration)configuration);
        Assertions.assertThat((int)vertexParallelismAndInputInfosDecider.computeSourceParallelismUpperBound(new JobVertexID(), 256)).isEqualTo(100);
    }

    @Test
    void testComputeSourceParallelismUpperBoundNotExceedMaxParallelism() {
        Configuration configuration = new Configuration();
        configuration.setInteger(BatchExecutionOptions.ADAPTIVE_AUTO_PARALLELISM_DEFAULT_SOURCE_PARALLELISM, 512);
        DefaultVertexParallelismAndInputInfosDecider vertexParallelismAndInputInfosDecider = DefaultVertexParallelismAndInputInfosDecider.from((int)100, (Configuration)configuration);
        Assertions.assertThat((int)vertexParallelismAndInputInfosDecider.computeSourceParallelismUpperBound(new JobVertexID(), 256)).isEqualTo(256);
    }

    private static void checkAllToAllJobVertexInputInfo(JobVertexInputInfo jobVertexInputInfo, List<IndexRange> subpartitionRanges) {
        DefaultVertexParallelismAndInputInfosDeciderTest.checkAllToAllJobVertexInputInfo(jobVertexInputInfo, new IndexRange(0, 0), subpartitionRanges);
    }

    private static void checkAllToAllJobVertexInputInfo(JobVertexInputInfo jobVertexInputInfo, IndexRange indexRange, List<IndexRange> subpartitionRanges) {
        ArrayList<ExecutionVertexInputInfo> executionVertexInputInfos = new ArrayList<ExecutionVertexInputInfo>();
        for (int i = 0; i < subpartitionRanges.size(); ++i) {
            executionVertexInputInfos.add(new ExecutionVertexInputInfo(i, indexRange, subpartitionRanges.get(i)));
        }
        Assertions.assertThat((List)jobVertexInputInfo.getExecutionVertexInputInfos()).containsExactlyInAnyOrderElementsOf(executionVertexInputInfos);
    }

    private static void checkPointwiseJobVertexInputInfo(JobVertexInputInfo jobVertexInputInfo, List<IndexRange> partitionRanges, List<IndexRange> subpartitionRanges) {
        Assertions.assertThat(partitionRanges).hasSameSizeAs(subpartitionRanges);
        ArrayList<ExecutionVertexInputInfo> executionVertexInputInfos = new ArrayList<ExecutionVertexInputInfo>();
        for (int i = 0; i < subpartitionRanges.size(); ++i) {
            executionVertexInputInfos.add(new ExecutionVertexInputInfo(i, partitionRanges.get(i), subpartitionRanges.get(i)));
        }
        Assertions.assertThat((List)jobVertexInputInfo.getExecutionVertexInputInfos()).containsExactlyInAnyOrderElementsOf(executionVertexInputInfos);
    }

    static DefaultVertexParallelismAndInputInfosDecider createDecider(int minParallelism, int maxParallelism, long dataVolumePerTask) {
        return DefaultVertexParallelismAndInputInfosDeciderTest.createDecider(minParallelism, maxParallelism, dataVolumePerTask, 10);
    }

    static DefaultVertexParallelismAndInputInfosDecider createDecider(int minParallelism, int maxParallelism, long dataVolumePerTask, int defaultSourceParallelism) {
        Configuration configuration = new Configuration();
        configuration.set(BatchExecutionOptions.ADAPTIVE_AUTO_PARALLELISM_MIN_PARALLELISM, (Object)minParallelism);
        configuration.set(BatchExecutionOptions.ADAPTIVE_AUTO_PARALLELISM_AVG_DATA_VOLUME_PER_TASK, (Object)new MemorySize(dataVolumePerTask));
        configuration.set(BatchExecutionOptions.ADAPTIVE_AUTO_PARALLELISM_DEFAULT_SOURCE_PARALLELISM, (Object)defaultSourceParallelism);
        return DefaultVertexParallelismAndInputInfosDecider.from((int)maxParallelism, (Configuration)configuration);
    }

    private static int createDeciderAndDecideParallelism(List<BlockingResultInfo> consumedResults) {
        return DefaultVertexParallelismAndInputInfosDeciderTest.createDeciderAndDecideParallelism(3, 100, 0x40000000L, consumedResults);
    }

    private static int createDeciderAndDecideParallelism(int minParallelism, int maxParallelism, long dataVolumePerTask, List<BlockingResultInfo> consumedResults) {
        DefaultVertexParallelismAndInputInfosDecider decider = DefaultVertexParallelismAndInputInfosDeciderTest.createDecider(minParallelism, maxParallelism, dataVolumePerTask);
        return decider.decideParallelism(new JobVertexID(), consumedResults, minParallelism, maxParallelism);
    }

    private static ParallelismAndInputInfos createDeciderAndDecideParallelismAndInputInfos(int minParallelism, int maxParallelism, long dataVolumePerTask, List<BlockingResultInfo> consumedResults) {
        DefaultVertexParallelismAndInputInfosDecider decider = DefaultVertexParallelismAndInputInfosDeciderTest.createDecider(minParallelism, maxParallelism, dataVolumePerTask);
        return decider.decideParallelismAndInputInfosForVertex(new JobVertexID(), consumedResults, -1, minParallelism, maxParallelism);
    }

    private AllToAllBlockingResultInfo createAllToAllBlockingResultInfo(long[] aggregatedSubpartitionBytes) {
        return this.createAllToAllBlockingResultInfo(aggregatedSubpartitionBytes, false);
    }

    private AllToAllBlockingResultInfo createAllToAllBlockingResultInfo(long[] aggregatedSubpartitionBytes, boolean isBroadcast) {
        AllToAllBlockingResultInfo resultInfo = new AllToAllBlockingResultInfo(new IntermediateDataSetID(), 1, aggregatedSubpartitionBytes.length, isBroadcast);
        resultInfo.recordPartitionInfo(0, new ResultPartitionBytes(aggregatedSubpartitionBytes));
        return resultInfo;
    }

    private PointwiseBlockingResultInfo createPointwiseBlockingResultInfo(long[] ... subpartitionBytesByPartition) {
        Set subpartitionNumSet = Arrays.stream(subpartitionBytesByPartition).map(array -> ((long[])array).length).collect(Collectors.toSet());
        Preconditions.checkState((subpartitionNumSet.size() == 1 ? 1 : 0) != 0);
        int numSubpartitions = (Integer)subpartitionNumSet.iterator().next();
        int numPartitions = subpartitionBytesByPartition.length;
        PointwiseBlockingResultInfo resultInfo = new PointwiseBlockingResultInfo(new IntermediateDataSetID(), numPartitions, numSubpartitions);
        int partitionIndex = 0;
        for (long[] subpartitionBytes : subpartitionBytesByPartition) {
            resultInfo.recordPartitionInfo(partitionIndex++, new ResultPartitionBytes(subpartitionBytes));
        }
        return resultInfo;
    }

    private static BlockingResultInfo createFromBroadcastResult(long producedBytes) {
        return new TestingBlockingResultInfo(true, producedBytes);
    }

    private static BlockingResultInfo createFromNonBroadcastResult(long producedBytes) {
        return new TestingBlockingResultInfo(false, producedBytes);
    }

    private static class TestingBlockingResultInfo
    implements BlockingResultInfo {
        private final boolean isBroadcast;
        private final long producedBytes;
        private final int numPartitions;
        private final int numSubpartitions;

        private TestingBlockingResultInfo(boolean isBroadcast, long producedBytes) {
            this(isBroadcast, producedBytes, 100, 100);
        }

        private TestingBlockingResultInfo(boolean isBroadcast, long producedBytes, int numPartitions, int numSubpartitions) {
            this.isBroadcast = isBroadcast;
            this.producedBytes = producedBytes;
            this.numPartitions = numPartitions;
            this.numSubpartitions = numSubpartitions;
        }

        public IntermediateDataSetID getResultId() {
            return new IntermediateDataSetID();
        }

        public boolean isBroadcast() {
            return this.isBroadcast;
        }

        public boolean isPointwise() {
            return false;
        }

        public int getNumPartitions() {
            return this.numPartitions;
        }

        public int getNumSubpartitions(int partitionIndex) {
            return this.numSubpartitions;
        }

        public long getNumBytesProduced() {
            return this.producedBytes;
        }

        public long getNumBytesProduced(IndexRange partitionIndexRange, IndexRange subpartitionIndexRange) {
            throw new UnsupportedOperationException();
        }

        public void recordPartitionInfo(int partitionIndex, ResultPartitionBytes partitionBytes) {
        }

        public void resetPartitionInfo(int partitionIndex) {
        }
    }
}

