/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.graph;

import java.io.Serializable;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.util.TestLogger;
import org.junit.Assert;
import org.junit.Test;

public class StreamingJobGraphGeneratorNodeHashTest
extends TestLogger {
    @Test
    public void testNodeHashIsDeterministic() throws Exception {
        LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
        env.setParallelism(4);
        SingleOutputStreamOperator src0 = env.addSource((SourceFunction)new NoOpSourceFunction(), "src0").map((MapFunction)new NoOpMapFunction()).filter((FilterFunction)new NoOpFilterFunction()).keyBy((KeySelector)new NoOpKeySelector()).reduce((ReduceFunction)new NoOpReduceFunction()).name("reduce");
        SingleOutputStreamOperator src1 = env.addSource((SourceFunction)new NoOpSourceFunction(), "src1").filter((FilterFunction)new NoOpFilterFunction());
        SingleOutputStreamOperator src2 = env.addSource((SourceFunction)new NoOpSourceFunction(), "src2").filter((FilterFunction)new NoOpFilterFunction());
        src0.map((MapFunction)new NoOpMapFunction()).union(new DataStream[]{src1, src2}).sinkTo((Sink)new org.apache.flink.streaming.api.functions.sink.v2.DiscardingSink()).name("sink");
        JobGraph jobGraph = env.getStreamGraph().getJobGraph();
        Map<JobVertexID, String> ids = this.rememberIds(jobGraph);
        env = StreamExecutionEnvironment.createLocalEnvironment();
        env.setParallelism(4);
        src0 = env.addSource((SourceFunction)new NoOpSourceFunction(), "src0").map((MapFunction)new NoOpMapFunction()).filter((FilterFunction)new NoOpFilterFunction()).keyBy((KeySelector)new NoOpKeySelector()).reduce((ReduceFunction)new NoOpReduceFunction()).name("reduce");
        src1 = env.addSource((SourceFunction)new NoOpSourceFunction(), "src1").filter((FilterFunction)new NoOpFilterFunction());
        src2 = env.addSource((SourceFunction)new NoOpSourceFunction(), "src2").filter((FilterFunction)new NoOpFilterFunction());
        src0.map((MapFunction)new NoOpMapFunction()).union(new DataStream[]{src1, src2}).sinkTo((Sink)new org.apache.flink.streaming.api.functions.sink.v2.DiscardingSink()).name("sink");
        jobGraph = env.getStreamGraph().getJobGraph();
        this.verifyIdsEqual(jobGraph, ids);
    }

    @Test
    public void testNodeHashIdenticalSources() throws Exception {
        LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
        env.setParallelism(4);
        env.disableOperatorChaining();
        DataStreamSource src0 = env.addSource((SourceFunction)new NoOpSourceFunction());
        DataStreamSource src1 = env.addSource((SourceFunction)new NoOpSourceFunction());
        src0.union(new DataStream[]{src1}).sinkTo((Sink)new org.apache.flink.streaming.api.functions.sink.v2.DiscardingSink());
        JobGraph jobGraph = env.getStreamGraph().getJobGraph();
        List vertices = jobGraph.getVerticesSortedTopologicallyFromSources();
        Assert.assertTrue((boolean)((JobVertex)vertices.get(0)).isInputVertex());
        Assert.assertTrue((boolean)((JobVertex)vertices.get(1)).isInputVertex());
        Assert.assertNotNull((Object)((JobVertex)vertices.get(0)).getID());
        Assert.assertNotNull((Object)((JobVertex)vertices.get(1)).getID());
        Assert.assertNotEquals((Object)((JobVertex)vertices.get(0)).getID(), (Object)((JobVertex)vertices.get(1)).getID());
    }

    @Test
    public void testNodeHashAfterSourceUnchaining() throws Exception {
        LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
        env.setParallelism(4);
        env.addSource((SourceFunction)new NoOpSourceFunction()).map((MapFunction)new NoOpMapFunction()).filter((FilterFunction)new NoOpFilterFunction()).sinkTo((Sink)new org.apache.flink.streaming.api.functions.sink.v2.DiscardingSink());
        JobGraph jobGraph = env.getStreamGraph().getJobGraph();
        JobVertexID sourceId = ((JobVertex)jobGraph.getVerticesSortedTopologicallyFromSources().get(0)).getID();
        env = StreamExecutionEnvironment.createLocalEnvironment();
        env.setParallelism(4);
        env.addSource((SourceFunction)new NoOpSourceFunction()).map((MapFunction)new NoOpMapFunction()).startNewChain().filter((FilterFunction)new NoOpFilterFunction()).sinkTo((Sink)new org.apache.flink.streaming.api.functions.sink.v2.DiscardingSink());
        jobGraph = env.getStreamGraph().getJobGraph();
        JobVertexID unchainedSourceId = ((JobVertex)jobGraph.getVerticesSortedTopologicallyFromSources().get(0)).getID();
        Assert.assertNotEquals((Object)sourceId, (Object)unchainedSourceId);
    }

    @Test
    public void testNodeHashAfterIntermediateUnchaining() throws Exception {
        LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
        env.setParallelism(4);
        env.addSource((SourceFunction)new NoOpSourceFunction()).map((MapFunction)new NoOpMapFunction()).name("map").startNewChain().filter((FilterFunction)new NoOpFilterFunction()).sinkTo((Sink)new org.apache.flink.streaming.api.functions.sink.v2.DiscardingSink());
        JobGraph jobGraph = env.getStreamGraph().getJobGraph();
        JobVertex chainedMap = (JobVertex)jobGraph.getVerticesSortedTopologicallyFromSources().get(1);
        Assert.assertTrue((boolean)chainedMap.getName().startsWith("map"));
        JobVertexID chainedMapId = chainedMap.getID();
        env = StreamExecutionEnvironment.createLocalEnvironment();
        env.setParallelism(4);
        env.addSource((SourceFunction)new NoOpSourceFunction()).map((MapFunction)new NoOpMapFunction()).name("map").startNewChain().filter((FilterFunction)new NoOpFilterFunction()).startNewChain().sinkTo((Sink)new org.apache.flink.streaming.api.functions.sink.v2.DiscardingSink());
        jobGraph = env.getStreamGraph().getJobGraph();
        JobVertex unchainedMap = (JobVertex)jobGraph.getVerticesSortedTopologicallyFromSources().get(1);
        Assert.assertEquals((Object)"map", (Object)unchainedMap.getName());
        JobVertexID unchainedMapId = unchainedMap.getID();
        Assert.assertNotEquals((Object)chainedMapId, (Object)unchainedMapId);
    }

    @Test
    public void testNodeHashIdenticalNodes() throws Exception {
        LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
        env.setParallelism(4);
        env.disableOperatorChaining();
        DataStreamSource src = env.addSource((SourceFunction)new NoOpSourceFunction());
        src.map((MapFunction)new NoOpMapFunction()).sinkTo((Sink)new org.apache.flink.streaming.api.functions.sink.v2.DiscardingSink());
        src.map((MapFunction)new NoOpMapFunction()).sinkTo((Sink)new org.apache.flink.streaming.api.functions.sink.v2.DiscardingSink());
        JobGraph jobGraph = env.getStreamGraph().getJobGraph();
        HashSet<JobVertexID> vertexIds = new HashSet<JobVertexID>();
        for (JobVertex vertex : jobGraph.getVertices()) {
            Assert.assertTrue((boolean)vertexIds.add(vertex.getID()));
        }
    }

    @Test
    public void testChangedOperatorName() throws Exception {
        LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
        env.addSource((SourceFunction)new NoOpSourceFunction(), "A").map((MapFunction)new NoOpMapFunction());
        JobGraph jobGraph = env.getStreamGraph().getJobGraph();
        JobVertexID expected = jobGraph.getVerticesAsArray()[0].getID();
        env = StreamExecutionEnvironment.createLocalEnvironment();
        env.addSource((SourceFunction)new NoOpSourceFunction(), "B").map((MapFunction)new NoOpMapFunction());
        jobGraph = env.getStreamGraph().getJobGraph();
        JobVertexID actual = jobGraph.getVerticesAsArray()[0].getID();
        Assert.assertEquals((Object)expected, (Object)actual);
    }

    @Test
    public void testManualHashAssignment() throws Exception {
        LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
        env.setParallelism(4);
        env.disableOperatorChaining();
        SingleOutputStreamOperator src = env.addSource((SourceFunction)new NoOpSourceFunction()).name("source").uid("source");
        src.map((MapFunction)new NoOpMapFunction()).sinkTo((Sink)new org.apache.flink.streaming.api.functions.sink.v2.DiscardingSink()).name("sink0").uid("sink0");
        src.map((MapFunction)new NoOpMapFunction()).sinkTo((Sink)new org.apache.flink.streaming.api.functions.sink.v2.DiscardingSink()).name("sink1").uid("sink1");
        JobGraph jobGraph = env.getStreamGraph().getJobGraph();
        HashSet<JobVertexID> ids = new HashSet<JobVertexID>();
        for (JobVertex vertex : jobGraph.getVertices()) {
            Assert.assertTrue((boolean)ids.add(vertex.getID()));
        }
        env = StreamExecutionEnvironment.createLocalEnvironment();
        env.setParallelism(4);
        env.disableOperatorChaining();
        src = env.addSource((SourceFunction)new NoOpSourceFunction()).map((MapFunction)new NoOpMapFunction()).name("source").uid("source");
        src.map((MapFunction)new NoOpMapFunction()).keyBy((KeySelector)new NoOpKeySelector()).reduce((ReduceFunction)new NoOpReduceFunction()).sinkTo((Sink)new org.apache.flink.streaming.api.functions.sink.v2.DiscardingSink()).name("sink0").uid("sink0");
        src.map((MapFunction)new NoOpMapFunction()).keyBy((KeySelector)new NoOpKeySelector()).reduce((ReduceFunction)new NoOpReduceFunction()).sinkTo((Sink)new org.apache.flink.streaming.api.functions.sink.v2.DiscardingSink()).name("sink1").uid("sink1");
        JobGraph newJobGraph = env.getStreamGraph().getJobGraph();
        Assert.assertNotEquals((Object)jobGraph.getJobID(), (Object)newJobGraph.getJobID());
        for (JobVertex vertex : newJobGraph.getVertices()) {
            if (!vertex.getName().endsWith("source") && !vertex.getName().endsWith("sink0") && !vertex.getName().endsWith("sink1")) continue;
            Assert.assertTrue((boolean)ids.contains(vertex.getID()));
        }
    }

    @Test(expected=IllegalArgumentException.class)
    public void testManualHashAssignmentCollisionThrowsException() throws Exception {
        LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
        env.setParallelism(4);
        env.disableOperatorChaining();
        env.addSource((SourceFunction)new NoOpSourceFunction()).uid("source").map((MapFunction)new NoOpMapFunction()).uid("source").sinkTo((Sink)new org.apache.flink.streaming.api.functions.sink.v2.DiscardingSink());
        env.getStreamGraph().getJobGraph();
    }

    @Test
    public void testManualHashAssignmentForIntermediateNodeInChain() throws Exception {
        LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
        env.setParallelism(4);
        env.addSource((SourceFunction)new NoOpSourceFunction()).map((MapFunction)new NoOpMapFunction()).uid("map").sinkTo((Sink)new org.apache.flink.streaming.api.functions.sink.v2.DiscardingSink());
        env.getStreamGraph().getJobGraph();
    }

    @Test
    public void testManualHashAssignmentForStartNodeInInChain() throws Exception {
        LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
        env.setParallelism(4);
        env.addSource((SourceFunction)new NoOpSourceFunction()).uid("source").map((MapFunction)new NoOpMapFunction()).sinkTo((Sink)new org.apache.flink.streaming.api.functions.sink.v2.DiscardingSink());
        env.getStreamGraph().getJobGraph();
    }

    @Test
    public void testUserProvidedHashingOnChainSupported() {
        LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
        env.addSource((SourceFunction)new NoOpSourceFunction(), "src").setUidHash("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa").map((MapFunction)new NoOpMapFunction()).setUidHash("bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb").filter((FilterFunction)new NoOpFilterFunction()).setUidHash("cccccccccccccccccccccccccccccccc").keyBy((KeySelector)new NoOpKeySelector()).reduce((ReduceFunction)new NoOpReduceFunction()).name("reduce").setUidHash("dddddddddddddddddddddddddddddddd");
        env.getStreamGraph().getJobGraph();
    }

    @Test(expected=IllegalStateException.class)
    public void testDisablingAutoUidsFailsStreamGraphCreation() {
        LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
        env.getConfig().disableAutoGeneratedUIDs();
        env.addSource((SourceFunction)new NoOpSourceFunction()).sinkTo((Sink)new org.apache.flink.streaming.api.functions.sink.v2.DiscardingSink());
        env.getStreamGraph();
    }

    @Test
    public void testDisablingAutoUidsAcceptsManuallySetId() {
        LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
        env.getConfig().disableAutoGeneratedUIDs();
        env.addSource((SourceFunction)new NoOpSourceFunction()).uid("uid1").sinkTo((Sink)new org.apache.flink.streaming.api.functions.sink.v2.DiscardingSink()).uid("uid2");
        env.getStreamGraph();
    }

    @Test
    public void testDisablingAutoUidsAcceptsManuallySetHash() {
        LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
        env.getConfig().disableAutoGeneratedUIDs();
        env.addSource((SourceFunction)new NoOpSourceFunction()).setUidHash("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa").addSink((SinkFunction)new DiscardingSink()).setUidHash("bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb");
        env.getStreamGraph();
    }

    @Test
    public void testDisablingAutoUidsWorksWithKeyBy() throws Exception {
        LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
        env.getConfig().disableAutoGeneratedUIDs();
        env.addSource((SourceFunction)new NoOpSourceFunction()).setUidHash("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa").keyBy((KeySelector & Serializable)o -> o).addSink((SinkFunction)new DiscardingSink()).setUidHash("bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb");
        env.getStreamGraph();
    }

    private Map<JobVertexID, String> rememberIds(JobGraph jobGraph) {
        HashMap<JobVertexID, String> ids = new HashMap<JobVertexID, String>();
        for (JobVertex vertex : jobGraph.getVertices()) {
            ids.put(vertex.getID(), vertex.getName());
        }
        return ids;
    }

    private void verifyIdsEqual(JobGraph jobGraph, Map<JobVertexID, String> ids) {
        Assert.assertEquals((long)jobGraph.getNumberOfVertices(), (long)ids.size());
        for (JobVertex vertex : jobGraph.getVertices()) {
            String expectedName = ids.get(vertex.getID());
            Assert.assertNotNull((Object)expectedName);
            Assert.assertEquals((Object)expectedName, (Object)vertex.getName());
        }
    }

    private void verifyIdsNotEqual(JobGraph jobGraph, Map<JobVertexID, String> ids) {
        Assert.assertEquals((long)jobGraph.getNumberOfVertices(), (long)ids.size());
        for (JobVertex vertex : jobGraph.getVertices()) {
            Assert.assertFalse((boolean)ids.containsKey(vertex.getID()));
        }
    }

    private static class NoOpReduceFunction
    implements ReduceFunction<String> {
        private static final long serialVersionUID = -8775747640749256372L;

        private NoOpReduceFunction() {
        }

        public String reduce(String value1, String value2) throws Exception {
            return value1;
        }
    }

    private static class NoOpKeySelector
    implements KeySelector<String, String> {
        private static final long serialVersionUID = -96127515593422991L;

        private NoOpKeySelector() {
        }

        public String getKey(String value) throws Exception {
            return value;
        }
    }

    private static class NoOpFilterFunction
    implements FilterFunction<String> {
        private static final long serialVersionUID = 500005424900187476L;

        private NoOpFilterFunction() {
        }

        public boolean filter(String value) throws Exception {
            return true;
        }
    }

    private static class NoOpMapFunction
    implements MapFunction<String, String> {
        private static final long serialVersionUID = 6584823409744624276L;

        private NoOpMapFunction() {
        }

        public String map(String value) throws Exception {
            return value;
        }
    }

    private static class NoOpSourceFunction
    implements ParallelSourceFunction<String> {
        private static final long serialVersionUID = -5459224792698512636L;

        private NoOpSourceFunction() {
        }

        public void run(SourceFunction.SourceContext<String> ctx) throws Exception {
        }

        public void cancel() {
        }
    }
}

