/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.api.environment;

import java.io.Serializable;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.operators.SlotSharingGroup;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.connector.source.Source;
import org.apache.flink.api.connector.source.lib.NumberSequenceSource;
import org.apache.flink.api.java.typeutils.GenericTypeInfo;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ExecutionOptions;
import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.configuration.StateChangelogOptions;
import org.apache.flink.connector.datagen.source.DataGeneratorSource;
import org.apache.flink.core.testutils.CheckedThread;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.v2.DiscardingSink;
import org.apache.flink.streaming.api.functions.source.FromElementsFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.transformations.SourceTransformation;
import org.apache.flink.types.Row;
import org.apache.flink.util.Collector;
import org.apache.flink.util.SplittableIterator;
import org.assertj.core.api.AbstractIntegerAssert;
import org.assertj.core.api.AbstractThrowableAssert;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

class StreamExecutionEnvironmentTest {
    StreamExecutionEnvironmentTest() {
    }

    @Test
    void fromElementsWithBaseTypeTest1() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.fromData(ParentClass.class, (Object[])new ParentClass[]{new SubClass(1, "Java"), new ParentClass(1, "hello")});
    }

    @Test
    void fromElementsWithBaseTypeTest2() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        Assertions.assertThatThrownBy(() -> env.fromData(new Object[]{SubClass.class, new SubClass(1, "Java"), new ParentClass(1, "hello")})).isInstanceOf(IllegalArgumentException.class);
    }

    @Test
    void testFromElementsDeducedType() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource source = env.fromData((Object[])new String[]{"a", "b"});
        DataGeneratorSource generatorSource = (DataGeneratorSource)StreamExecutionEnvironmentTest.getSourceFromStream(source);
        Assertions.assertThat((Object)generatorSource.getProducedType()).isEqualTo((Object)BasicTypeInfo.STRING_TYPE_INFO);
    }

    @Test
    void testFromElementsPostConstructionType() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource source = env.fromData((Object[])new String[]{"a", "b"});
        GenericTypeInfo customType = new GenericTypeInfo(String.class);
        source.returns((TypeInformation)customType);
        DataGeneratorSource generatorSource = (DataGeneratorSource)StreamExecutionEnvironmentTest.getSourceFromStream(source);
        source.sinkTo((Sink)new DiscardingSink());
        env.getStreamGraph();
        Assertions.assertThat((Object)generatorSource.getProducedType()).isNotEqualTo((Object)BasicTypeInfo.STRING_TYPE_INFO);
        Assertions.assertThat((Object)generatorSource.getProducedType()).isEqualTo((Object)customType);
    }

    @Test
    void testFromElementsPostConstructionTypeIncompatible() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource source = env.fromData((Object[])new String[]{"a", "b"});
        source.returns((TypeInformation)BasicTypeInfo.INT_TYPE_INFO);
        source.sinkTo((Sink)new DiscardingSink());
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> ((StreamExecutionEnvironment)env).getStreamGraph()).isInstanceOf(IllegalArgumentException.class)).hasMessageContaining("not all subclasses of java.lang.Integer");
    }

    @Test
    void testFromElementsNullElement() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> env.fromData((Object[])new String[]{"a", null, "c"})).isInstanceOf(IllegalArgumentException.class)).hasMessageContaining("contains a null element");
    }

    @Test
    void testFromCollectionParallelism() {
        try {
            BasicTypeInfo typeInfo = BasicTypeInfo.INT_TYPE_INFO;
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            DataStreamSource dataStream2 = env.fromParallelCollection(new DummySplittableIterator(), (TypeInformation)typeInfo).setParallelism(4);
            dataStream2.sinkTo((Sink)new DiscardingSink());
            StreamGraph streamGraph = env.getStreamGraph();
            streamGraph.getStreamingPlanAsJSON();
            ((AbstractIntegerAssert)Assertions.assertThat((int)streamGraph.getStreamNode(Integer.valueOf(dataStream2.getId())).getParallelism()).as("Parallelism of parallel collection source must be 4.", new Object[0])).isEqualTo(4);
        }
        catch (Exception e) {
            e.printStackTrace();
            Assertions.fail((String)e.getMessage());
        }
    }

    @Test
    void testSources() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        SourceFunction<Integer> srcFun = new SourceFunction<Integer>(){
            private static final long serialVersionUID = 1L;

            public void run(SourceFunction.SourceContext<Integer> ctx) throws Exception {
            }

            public void cancel() {
            }
        };
        DataStreamSource src1 = env.addSource((SourceFunction)srcFun);
        src1.sinkTo((Sink)new DiscardingSink());
        Assertions.assertThat(StreamExecutionEnvironmentTest.getFunctionFromDataSource(src1)).isEqualTo((Object)srcFun);
        List<Long> list = Arrays.asList(0L, 1L, 2L);
        DataStreamSource src2 = env.fromSequence(0L, 2L);
        Object generatorSource = StreamExecutionEnvironmentTest.getSourceFromStream(src2);
        Assertions.assertThat(generatorSource).isInstanceOf(NumberSequenceSource.class);
        DataStreamSource src3 = env.fromData((Object[])new Long[]{0L, 1L, 2L});
        Assertions.assertThat(StreamExecutionEnvironmentTest.getSourceFromDataSourceTyped(src3)).isInstanceOf(DataGeneratorSource.class);
        DataStreamSource src4 = env.fromCollection(list);
        Assertions.assertThat(StreamExecutionEnvironmentTest.getFunctionFromDataSource(src4)).isInstanceOf(FromElementsFunction.class);
    }

    @Test
    void testFromSequence() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource src = env.fromSequence(0L, 2L);
        Assertions.assertThat((Object)src.getType()).isEqualTo((Object)BasicTypeInfo.LONG_TYPE_INFO);
    }

    @Test
    void testParallelismBounds() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        SourceFunction<Integer> srcFun = new SourceFunction<Integer>(){
            private static final long serialVersionUID = 1L;

            public void run(SourceFunction.SourceContext<Integer> ctx) throws Exception {
            }

            public void cancel() {
            }
        };
        SingleOutputStreamOperator operator = env.addSource((SourceFunction)srcFun).flatMap((FlatMapFunction)new FlatMapFunction<Integer, Object>(){
            private static final long serialVersionUID = 1L;

            public void flatMap(Integer value, Collector<Object> out) throws Exception {
            }
        });
        Assertions.assertThat((int)operator.getTransformation().getMaxParallelism()).isEqualTo(-1);
        Assertions.assertThatThrownBy(() -> operator.setParallelism(0)).isInstanceOf(IllegalArgumentException.class);
        operator.setParallelism(1);
        Assertions.assertThat((int)operator.getParallelism()).isOne();
        operator.setParallelism(32768);
        Assertions.assertThat((int)operator.getParallelism()).isEqualTo(32768);
        env.getStreamGraph(false).getJobGraph();
        Assertions.assertThat((int)operator.getTransformation().getMaxParallelism()).isEqualTo(-1);
        env.setMaxParallelism(42);
        env.getStreamGraph(false).getJobGraph();
        Assertions.assertThat((int)operator.getTransformation().getMaxParallelism()).isEqualTo(42);
        Assertions.assertThatThrownBy(() -> env.setMaxParallelism(0)).isInstanceOf(IllegalArgumentException.class);
        Assertions.assertThatThrownBy(() -> env.setMaxParallelism(32769)).isInstanceOf(IllegalArgumentException.class);
        Assertions.assertThatThrownBy(() -> operator.setMaxParallelism(0)).isInstanceOf(IllegalArgumentException.class);
        Assertions.assertThatThrownBy(() -> operator.setMaxParallelism(32769)).isInstanceOf(IllegalArgumentException.class);
        operator.setMaxParallelism(1);
        Assertions.assertThat((int)operator.getTransformation().getMaxParallelism()).isOne();
        operator.setMaxParallelism(32768);
        Assertions.assertThat((int)operator.getTransformation().getMaxParallelism()).isEqualTo(32768);
        env.getStreamGraph(false).getJobGraph();
        Assertions.assertThat((int)operator.getTransformation().getMaxParallelism()).isEqualTo(32768);
    }

    @Test
    void testRegisterSlotSharingGroup() {
        SlotSharingGroup ssg1 = SlotSharingGroup.newBuilder((String)"ssg1").setCpuCores(1.0).setTaskHeapMemoryMB(100).build();
        SlotSharingGroup ssg2 = SlotSharingGroup.newBuilder((String)"ssg2").setCpuCores(2.0).setTaskHeapMemoryMB(200).build();
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.registerSlotSharingGroup(ssg1);
        env.registerSlotSharingGroup(ssg2);
        env.registerSlotSharingGroup(SlotSharingGroup.newBuilder((String)"ssg3").build());
        SingleOutputStreamOperator source = env.fromData((Object[])new Integer[]{1}).slotSharingGroup("ssg1");
        source.map((MapFunction & Serializable)value -> value).slotSharingGroup(ssg2).sinkTo((Sink)new DiscardingSink());
        StreamGraph streamGraph = env.getStreamGraph();
        Assertions.assertThat(streamGraph.getSlotSharingGroupResource("ssg1").get()).isEqualTo((Object)ResourceProfile.fromResources((double)1.0, (int)100));
        Assertions.assertThat(streamGraph.getSlotSharingGroupResource("ssg2").get()).isEqualTo((Object)ResourceProfile.fromResources((double)2.0, (int)200));
        Assertions.assertThat((Optional)streamGraph.getSlotSharingGroupResource("ssg3")).isNotPresent();
    }

    @Test
    void testRegisterSlotSharingGroupConflict() {
        SlotSharingGroup ssg = SlotSharingGroup.newBuilder((String)"ssg1").setCpuCores(1.0).setTaskHeapMemoryMB(100).build();
        SlotSharingGroup ssgConflict = SlotSharingGroup.newBuilder((String)"ssg1").setCpuCores(2.0).setTaskHeapMemoryMB(200).build();
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.registerSlotSharingGroup(ssg);
        SingleOutputStreamOperator source = env.fromData((Object[])new Integer[]{1}).slotSharingGroup("ssg1");
        source.map((MapFunction & Serializable)value -> value).slotSharingGroup(ssgConflict).sinkTo((Sink)new DiscardingSink());
        Assertions.assertThatThrownBy(() -> ((StreamExecutionEnvironment)env).getStreamGraph()).isInstanceOf(IllegalArgumentException.class);
    }

    @Test
    void testGetStreamGraph() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource dataStream1 = env.fromData((Object[])new Integer[]{1, 2, 3});
        dataStream1.sinkTo((Sink)new DiscardingSink());
        Assertions.assertThat((int)env.getStreamGraph().getStreamNodes().size()).isEqualTo(2);
        DataStreamSource dataStream2 = env.fromData((Object[])new Integer[]{1, 2, 3});
        dataStream2.sinkTo((Sink)new DiscardingSink());
        Assertions.assertThat((int)env.getStreamGraph().getStreamNodes().size()).isEqualTo(2);
        DataStreamSource dataStream3 = env.fromData((Object[])new Integer[]{1, 2, 3});
        dataStream3.sinkTo((Sink)new DiscardingSink());
        env.getExecutionPlan();
        DataStreamSource dataStream4 = env.fromData((Object[])new Integer[]{1, 2, 3});
        dataStream4.sinkTo((Sink)new DiscardingSink());
        Assertions.assertThat((int)env.getStreamGraph().getStreamNodes().size()).isEqualTo(4);
    }

    @Test
    void testDefaultJobName() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        this.testJobName("Flink Streaming Job", env);
        env.setRuntimeMode(RuntimeExecutionMode.BATCH);
        this.testJobName("Flink Batch Job", env);
    }

    @Test
    void testUserDefinedJobName() {
        String jobName = "MyTestJob";
        Configuration config = new Configuration();
        config.set(PipelineOptions.NAME, (Object)jobName);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment((Configuration)config);
        this.testJobName(jobName, env);
    }

    @Test
    void testUserDefinedJobNameWithConfigure() {
        String jobName = "MyTestJob";
        Configuration config = new Configuration();
        config.set(PipelineOptions.NAME, (Object)jobName);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.configure((ReadableConfig)config, this.getClass().getClassLoader());
        this.testJobName(jobName, env);
    }

    private void testJobName(String expectedJobName, StreamExecutionEnvironment env) {
        env.fromData((Object[])new Integer[]{1, 2, 3}).print();
        StreamGraph streamGraph = env.getStreamGraph();
        Assertions.assertThat((String)streamGraph.getJobName()).isEqualTo(expectedJobName);
    }

    @Test
    void testAddSourceWithUserDefinedTypeInfo() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource source1 = env.addSource((SourceFunction)new RowSourceFunction(), Types.ROW((TypeInformation[])new TypeInformation[]{Types.STRING}));
        Assertions.assertThat((Object)source1.getType()).isEqualTo((Object)Types.ROW((TypeInformation[])new TypeInformation[]{Types.STRING}));
        DataStreamSource source2 = env.addSource((SourceFunction)new RowSourceFunction());
        Assertions.assertThat((Object)source2.getType()).isEqualTo((Object)new GenericTypeInfo(Row.class));
    }

    @Test
    void testPeriodicMaterializeEnabled() {
        Configuration config = new Configuration();
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.configure((ReadableConfig)config, this.getClass().getClassLoader());
        Assertions.assertThat((boolean)env.getConfig().isPeriodicMaterializeEnabled()).isEqualTo(StateChangelogOptions.PERIODIC_MATERIALIZATION_ENABLED.defaultValue());
        config.setBoolean(StateChangelogOptions.PERIODIC_MATERIALIZATION_ENABLED.key(), false);
        env.configure((ReadableConfig)config, this.getClass().getClassLoader());
        Assertions.assertThat((boolean)env.getConfig().isPeriodicMaterializeEnabled()).isFalse();
    }

    @Test
    void testPeriodicMaterializeInterval() {
        Configuration config = new Configuration();
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.configure((ReadableConfig)config, this.getClass().getClassLoader());
        Assertions.assertThat((long)env.getConfig().getPeriodicMaterializeIntervalMillis()).isEqualTo(((Duration)StateChangelogOptions.PERIODIC_MATERIALIZATION_INTERVAL.defaultValue()).toMillis());
        config.setString(StateChangelogOptions.PERIODIC_MATERIALIZATION_INTERVAL.key(), "60s");
        env.configure((ReadableConfig)config, this.getClass().getClassLoader());
        Assertions.assertThat((long)env.getConfig().getPeriodicMaterializeIntervalMillis()).isEqualTo(60000L);
        Assertions.assertThatThrownBy(() -> {
            config.setString(StateChangelogOptions.PERIODIC_MATERIALIZATION_INTERVAL.key(), "-1ms");
            env.configure((ReadableConfig)config, this.getClass().getClassLoader());
        }).isInstanceOf(IllegalArgumentException.class);
    }

    @Test
    void testBufferTimeoutByDefault() {
        Configuration config = new Configuration();
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        this.testBufferTimeout(config, env);
    }

    @Test
    void testBufferTimeoutEnabled() {
        Configuration config = new Configuration();
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        config.set(ExecutionOptions.BUFFER_TIMEOUT_ENABLED, (Object)true);
        this.testBufferTimeout(config, env);
    }

    @Test
    void testBufferTimeoutDisabled() {
        Configuration config = new Configuration();
        config.set(ExecutionOptions.BUFFER_TIMEOUT_ENABLED, (Object)false);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.configure((ReadableConfig)config, this.getClass().getClassLoader());
        Assertions.assertThat((long)env.getBufferTimeout()).isEqualTo(-1L);
        env.setBufferTimeout(0L);
        Assertions.assertThat((long)env.getBufferTimeout()).isEqualTo(-1L);
        env.setBufferTimeout(-1L);
        Assertions.assertThat((long)env.getBufferTimeout()).isEqualTo(-1L);
    }

    private void testBufferTimeout(Configuration config, StreamExecutionEnvironment env) {
        env.configure((ReadableConfig)config, this.getClass().getClassLoader());
        Assertions.assertThat((long)env.getBufferTimeout()).isEqualTo(((Duration)ExecutionOptions.BUFFER_TIMEOUT.defaultValue()).toMillis());
        config.setString(ExecutionOptions.BUFFER_TIMEOUT.key(), "0ms");
        env.configure((ReadableConfig)config, this.getClass().getClassLoader());
        Assertions.assertThat((long)env.getBufferTimeout()).isZero();
        Assertions.assertThatThrownBy(() -> {
            config.setString(ExecutionOptions.BUFFER_TIMEOUT.key(), "-1ms");
            env.configure((ReadableConfig)config, this.getClass().getClassLoader());
            env.getBufferTimeout();
        }).isInstanceOf(IllegalArgumentException.class);
    }

    @Test
    void testConcurrentSetContext() throws Exception {
        int numThreads = 20;
        final CountDownLatch waitingThreadCount = new CountDownLatch(numThreads);
        final OneShotLatch latch = new OneShotLatch();
        ArrayList<4> threads = new ArrayList<4>();
        for (int x = 0; x < numThreads; ++x) {
            CheckedThread checkedThread = new CheckedThread(){

                public void go() {
                    StreamExecutionEnvironment preparedEnvironment = new StreamExecutionEnvironment();
                    StreamExecutionEnvironment.initializeContextEnvironment(configuration -> preparedEnvironment);
                    try {
                        waitingThreadCount.countDown();
                        latch.awaitQuietly();
                        Assertions.assertThat((Object)StreamExecutionEnvironment.getExecutionEnvironment()).isSameAs((Object)preparedEnvironment);
                    }
                    finally {
                        StreamExecutionEnvironment.resetContextEnvironment();
                    }
                }
            };
            checkedThread.start();
            threads.add(checkedThread);
        }
        waitingThreadCount.await();
        latch.trigger();
        for (CheckedThread checkedThread : threads) {
            checkedThread.sync();
        }
    }

    private static StreamOperator<?> getOperatorFromDataStream(DataStream<?> dataStream) {
        StreamExecutionEnvironment env = dataStream.getExecutionEnvironment();
        StreamGraph streamGraph = env.getStreamGraph();
        return streamGraph.getStreamNode(Integer.valueOf(dataStream.getId())).getOperator();
    }

    private static <T> SourceFunction<T> getFunctionFromDataSource(DataStreamSource<T> dataStreamSource) {
        dataStreamSource.sinkTo((Sink)new DiscardingSink());
        AbstractUdfStreamOperator operator = (AbstractUdfStreamOperator)StreamExecutionEnvironmentTest.getOperatorFromDataStream(dataStreamSource);
        return (SourceFunction)operator.getUserFunction();
    }

    private static <T, S extends Source<T, ?, ?>> S getSourceFromStream(DataStream<T> stream) {
        return (S)((SourceTransformation)stream.getTransformation()).getSource();
    }

    private static <T> Source<T, ?, ?> getSourceFromDataSourceTyped(DataStreamSource<T> dataStreamSource) {
        dataStreamSource.sinkTo((Sink)new DiscardingSink());
        dataStreamSource.getExecutionEnvironment().getStreamGraph();
        return ((SourceTransformation)dataStreamSource.getTransformation()).getSource();
    }

    private static class RowSourceFunction
    implements SourceFunction<Row>,
    ResultTypeQueryable<Row> {
        private static final long serialVersionUID = 5216362688122691404L;

        private RowSourceFunction() {
        }

        public TypeInformation<Row> getProducedType() {
            return TypeInformation.of(Row.class);
        }

        public void run(SourceFunction.SourceContext<Row> ctx) throws Exception {
        }

        public void cancel() {
        }
    }

    private static class SubClass
    extends ParentClass {
        public SubClass(int num, String string) {
            super(num, string);
        }
    }

    private static class ParentClass {
        int num;
        String string;

        public ParentClass(int num, String string) {
            this.num = num;
            this.string = string;
        }
    }

    private static class DummySplittableIterator<T>
    extends SplittableIterator<T> {
        private static final long serialVersionUID = 1312752876092210499L;

        private DummySplittableIterator() {
        }

        public Iterator<T>[] split(int numPartitions) {
            return new Iterator[0];
        }

        public int getMaximumNumberOfSplits() {
            return 0;
        }

        public boolean hasNext() {
            return false;
        }

        public T next() {
            throw new NoSuchElementException();
        }

        public void remove() {
            throw new UnsupportedOperationException();
        }
    }
}

