/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.graph;

import java.io.Serializable;
import java.util.List;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.v2.DiscardingSink;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

class StreamGraphCoLocationConstraintTest {
    StreamGraphCoLocationConstraintTest() {
    }

    @Test
    void testSettingCoLocationConstraint() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(7);
        DataStreamSource source = env.fromSequence(1L, 10000000L);
        source.getTransformation().setCoLocationGroupKey("group1");
        SingleOutputStreamOperator step1 = source.keyBy((KeySelector & Serializable)v -> v).map((MapFunction & Serializable)v -> v);
        step1.getTransformation().setCoLocationGroupKey("group2");
        SingleOutputStreamOperator step2 = step1.keyBy((KeySelector & Serializable)v -> v).map((MapFunction & Serializable)v -> v);
        step2.getTransformation().setCoLocationGroupKey("group1");
        DataStreamSink result = step2.keyBy((KeySelector & Serializable)v -> v).sinkTo((Sink)new DiscardingSink());
        result.getTransformation().setCoLocationGroupKey("group2");
        JobGraph jobGraph = env.getStreamGraph().getJobGraph();
        Assertions.assertThat((int)jobGraph.getNumberOfVertices()).isEqualTo(4);
        List vertices = jobGraph.getVerticesSortedTopologicallyFromSources();
        for (JobVertex vertex : vertices) {
            Assertions.assertThat((Object)vertex.getCoLocationGroup()).isNotNull();
        }
        Assertions.assertThat((Object)((JobVertex)vertices.get(0)).getCoLocationGroup()).isEqualTo((Object)((JobVertex)vertices.get(2)).getCoLocationGroup());
        Assertions.assertThat((Object)((JobVertex)vertices.get(1)).getCoLocationGroup()).isEqualTo((Object)((JobVertex)vertices.get(3)).getCoLocationGroup());
    }

    @Test
    void testCoLocateDifferenSharingGroups() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(7);
        DataStreamSource source = env.fromSequence(1L, 10000000L);
        source.getTransformation().setSlotSharingGroup("ssg1");
        source.getTransformation().setCoLocationGroupKey("co1");
        SingleOutputStreamOperator step1 = source.keyBy((KeySelector & Serializable)v -> v).map((MapFunction & Serializable)v -> v);
        step1.getTransformation().setSlotSharingGroup("ssg2");
        step1.getTransformation().setCoLocationGroupKey("co2");
        SingleOutputStreamOperator step2 = step1.keyBy((KeySelector & Serializable)v -> v).map((MapFunction & Serializable)v -> v);
        step2.getTransformation().setSlotSharingGroup("ssg3");
        step2.getTransformation().setCoLocationGroupKey("co1");
        DataStreamSink result = step2.keyBy((KeySelector & Serializable)v -> v).sinkTo((Sink)new DiscardingSink());
        result.getTransformation().setSlotSharingGroup("ssg4");
        result.getTransformation().setCoLocationGroupKey("co2");
        Assertions.assertThatThrownBy(() -> env.getStreamGraph().getJobGraph()).isInstanceOf(IllegalStateException.class);
    }
}

