/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.api.graph;

import java.util.ArrayList;
import java.util.Collections;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.IntegerTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.Source;
import org.apache.flink.api.connector.source.mocks.MockSource;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ExecutionOptions;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.runtime.jobgraph.JobType;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.graph.GlobalStreamExchangeMode;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.streaming.api.graph.StreamGraphGenerator;
import org.apache.flink.streaming.api.graph.StreamGraphGeneratorTest;
import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
import org.apache.flink.streaming.api.transformations.SourceTransformation;
import org.apache.flink.streaming.api.transformations.TwoInputTransformation;
import org.apache.flink.util.TestLogger;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Description;
import org.hamcrest.Matcher;
import org.hamcrest.TypeSafeMatcher;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;

public class StreamGraphGeneratorExecutionModeDetectionTest
extends TestLogger {
    @Rule
    public ExpectedException thrown = ExpectedException.none();

    @Test
    public void testExecutionModePropagationFromEnvWithDefaultAndBoundedSource() {
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
        environment.enableCheckpointing(100L);
        environment.fromSource((Source)new MockSource(Boundedness.BOUNDED, 100), WatermarkStrategy.noWatermarks(), "bounded-source").print();
        Assert.assertThat((Object)environment.getStreamGraph(), StreamGraphGeneratorExecutionModeDetectionTest.hasProperties(GlobalStreamExchangeMode.ALL_EDGES_PIPELINED, JobType.STREAMING, true, true));
    }

    @Test
    public void testExecutionModePropagationFromEnvWithDefaultAndUnboundedSource() {
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
        environment.fromSource((Source)new MockSource(Boundedness.CONTINUOUS_UNBOUNDED, 100), WatermarkStrategy.noWatermarks(), "unbounded-source").print();
        Assert.assertThat((Object)environment.getStreamGraph(), StreamGraphGeneratorExecutionModeDetectionTest.hasProperties(GlobalStreamExchangeMode.ALL_EDGES_PIPELINED, JobType.STREAMING, false, true));
    }

    @Test
    public void testExecutionModePropagationFromEnvWithAutomaticAndBoundedSource() {
        Configuration config = new Configuration();
        config.set(ExecutionOptions.RUNTIME_MODE, (Object)RuntimeExecutionMode.AUTOMATIC);
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
        environment.enableCheckpointing(100L);
        environment.configure((ReadableConfig)config, ((Object)((Object)this)).getClass().getClassLoader());
        environment.fromSource((Source)new MockSource(Boundedness.BOUNDED, 100), WatermarkStrategy.noWatermarks(), "bounded-source").print();
        Assert.assertTrue((boolean)environment.isChainingEnabled());
        Assert.assertThat((Object)environment.getCheckpointInterval(), (Matcher)CoreMatchers.is((Matcher)CoreMatchers.equalTo((Object)100L)));
        StreamGraph streamGraph = environment.getStreamGraph();
        Assert.assertThat((Object)streamGraph, StreamGraphGeneratorExecutionModeDetectionTest.hasProperties(GlobalStreamExchangeMode.ALL_EDGES_BLOCKING, JobType.BATCH, false, false));
    }

    @Test
    public void testExecutionModePropagationFromEnvWithBatchAndUnboundedSource() {
        Configuration config = new Configuration();
        config.set(ExecutionOptions.RUNTIME_MODE, (Object)RuntimeExecutionMode.BATCH);
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
        environment.configure((ReadableConfig)config, ((Object)((Object)this)).getClass().getClassLoader());
        environment.fromSource((Source)new MockSource(Boundedness.CONTINUOUS_UNBOUNDED, 100), WatermarkStrategy.noWatermarks(), "unbounded-source").print();
        this.thrown.expect(IllegalStateException.class);
        this.thrown.expectMessage("combination is not allowed");
        environment.getStreamGraph();
    }

    @Test
    public void testDetectionThroughTransitivePredecessors() {
        SourceTransformation<Integer, ?, ?> bounded = this.getSourceTransformation("Bounded Source", Boundedness.BOUNDED);
        Assert.assertEquals((Object)Boundedness.BOUNDED, (Object)bounded.getBoundedness());
        SourceTransformation<Integer, ?, ?> unbounded = this.getSourceTransformation("Unbounded Source", Boundedness.CONTINUOUS_UNBOUNDED);
        Assert.assertEquals((Object)Boundedness.CONTINUOUS_UNBOUNDED, (Object)unbounded.getBoundedness());
        TwoInputTransformation resultTransform = new TwoInputTransformation(bounded, unbounded, "Test Two Input Transformation", (StreamOperatorFactory)SimpleOperatorFactory.of((StreamOperator)new StreamGraphGeneratorTest.OutputTypeConfigurableOperationWithTwoInputs()), (TypeInformation)BasicTypeInfo.INT_TYPE_INFO, 1);
        StreamGraph graph = this.generateStreamGraph(RuntimeExecutionMode.AUTOMATIC, new Transformation[]{resultTransform});
        Assert.assertThat((Object)graph, StreamGraphGeneratorExecutionModeDetectionTest.hasProperties(GlobalStreamExchangeMode.ALL_EDGES_PIPELINED, JobType.STREAMING, false, true));
    }

    @Test
    public void testBoundedDetection() {
        SourceTransformation<Integer, ?, ?> bounded = this.getSourceTransformation("Bounded Source", Boundedness.BOUNDED);
        Assert.assertEquals((Object)Boundedness.BOUNDED, (Object)bounded.getBoundedness());
        StreamGraph graph = this.generateStreamGraph(RuntimeExecutionMode.AUTOMATIC, new Transformation[]{bounded});
        Assert.assertThat((Object)graph, StreamGraphGeneratorExecutionModeDetectionTest.hasProperties(GlobalStreamExchangeMode.ALL_EDGES_BLOCKING, JobType.BATCH, false, false));
    }

    @Test
    public void testUnboundedDetection() {
        SourceTransformation<Integer, ?, ?> unbounded = this.getSourceTransformation("Unbounded Source", Boundedness.CONTINUOUS_UNBOUNDED);
        Assert.assertEquals((Object)Boundedness.CONTINUOUS_UNBOUNDED, (Object)unbounded.getBoundedness());
        StreamGraph graph = this.generateStreamGraph(RuntimeExecutionMode.AUTOMATIC, new Transformation[]{unbounded});
        Assert.assertThat((Object)graph, StreamGraphGeneratorExecutionModeDetectionTest.hasProperties(GlobalStreamExchangeMode.ALL_EDGES_PIPELINED, JobType.STREAMING, false, true));
    }

    @Test
    public void testMixedDetection() {
        SourceTransformation<Integer, ?, ?> unbounded = this.getSourceTransformation("Unbounded Source", Boundedness.CONTINUOUS_UNBOUNDED);
        Assert.assertEquals((Object)Boundedness.CONTINUOUS_UNBOUNDED, (Object)unbounded.getBoundedness());
        SourceTransformation<Integer, ?, ?> bounded = this.getSourceTransformation("Bounded Source", Boundedness.BOUNDED);
        Assert.assertEquals((Object)Boundedness.BOUNDED, (Object)bounded.getBoundedness());
        StreamGraph graph = this.generateStreamGraph(RuntimeExecutionMode.AUTOMATIC, new Transformation[]{unbounded});
        Assert.assertThat((Object)graph, StreamGraphGeneratorExecutionModeDetectionTest.hasProperties(GlobalStreamExchangeMode.ALL_EDGES_PIPELINED, JobType.STREAMING, false, true));
    }

    @Test
    public void testExplicitOverridesDetectedMode() {
        SourceTransformation<Integer, ?, ?> bounded = this.getSourceTransformation("Bounded Source", Boundedness.BOUNDED);
        Assert.assertEquals((Object)Boundedness.BOUNDED, (Object)bounded.getBoundedness());
        StreamGraph graph = this.generateStreamGraph(RuntimeExecutionMode.AUTOMATIC, new Transformation[]{bounded});
        Assert.assertThat((Object)graph, StreamGraphGeneratorExecutionModeDetectionTest.hasProperties(GlobalStreamExchangeMode.ALL_EDGES_BLOCKING, JobType.BATCH, false, false));
        StreamGraph streamingGraph = this.generateStreamGraph(RuntimeExecutionMode.STREAMING, new Transformation[]{bounded});
        Assert.assertThat((Object)streamingGraph, StreamGraphGeneratorExecutionModeDetectionTest.hasProperties(GlobalStreamExchangeMode.ALL_EDGES_PIPELINED, JobType.STREAMING, false, true));
    }

    private StreamGraph generateStreamGraph(RuntimeExecutionMode initMode, Transformation<?> ... transformations) {
        ArrayList registeredTransformations = new ArrayList();
        Collections.addAll(registeredTransformations, transformations);
        Configuration configuration = new Configuration();
        configuration.set(ExecutionOptions.RUNTIME_MODE, (Object)initMode);
        return new StreamGraphGenerator(registeredTransformations, new ExecutionConfig(), new CheckpointConfig(), configuration).generate();
    }

    private SourceTransformation<Integer, ?, ?> getSourceTransformation(String name, Boundedness boundedness) {
        return new SourceTransformation(name, (Source)new MockSource(boundedness, 100), WatermarkStrategy.noWatermarks(), IntegerTypeInfo.of(Integer.class), 1);
    }

    private static TypeSafeMatcher<StreamGraph> hasProperties(final GlobalStreamExchangeMode exchangeMode, final JobType jobType, final boolean isCheckpointingEnabled, final boolean isAllVerticesInSameSlotSharingGroupByDefault) {
        return new TypeSafeMatcher<StreamGraph>(){

            protected boolean matchesSafely(StreamGraph actualStreamGraph) {
                return exchangeMode == actualStreamGraph.getGlobalStreamExchangeMode() && jobType == actualStreamGraph.getJobType() && actualStreamGraph.getCheckpointConfig().isCheckpointingEnabled() == isCheckpointingEnabled && actualStreamGraph.isAllVerticesInSameSlotSharingGroupByDefault() == isAllVerticesInSameSlotSharingGroupByDefault;
            }

            public void describeTo(Description description) {
                description.appendText("a StreamGraph with exchangeMode=").appendValue((Object)exchangeMode).appendText(", jobType=").appendValue((Object)jobType).appendText(", isCheckpointingEnabled=").appendValue((Object)isCheckpointingEnabled).appendText(", isAllVerticesInSameSlotSharingGroupByDefault=").appendValue((Object)isAllVerticesInSameSlotSharingGroupByDefault);
            }

            protected void describeMismatchSafely(StreamGraph item, Description mismatchDescription) {
                mismatchDescription.appendText("was ").appendText("a StreamGraph with exchangeMode=").appendValue((Object)exchangeMode).appendText(", jobType=").appendValue((Object)jobType).appendText(", isCheckpointingEnabled=").appendValue((Object)isCheckpointingEnabled).appendText(", isAllVerticesInSameSlotSharingGroupByDefault=").appendValue((Object)isAllVerticesInSameSlotSharingGroupByDefault);
            }
        };
    }
}

