/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.tasks;

import java.io.IOException;
import java.util.ArrayDeque;
import java.util.Arrays;
import java.util.List;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.operators.AbstractStreamOperatorFactory;
import org.apache.flink.streaming.api.operators.AbstractStreamOperatorV2;
import org.apache.flink.streaming.api.operators.BoundedMultiInput;
import org.apache.flink.streaming.api.operators.Input;
import org.apache.flink.streaming.api.operators.InputSelectable;
import org.apache.flink.streaming.api.operators.InputSelection;
import org.apache.flink.streaming.api.operators.MultipleInputStreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.MultipleInputStreamTask;
import org.apache.flink.streaming.runtime.tasks.StreamTaskMailboxTestHarness;
import org.apache.flink.streaming.runtime.tasks.StreamTaskMailboxTestHarnessBuilder;
import org.apache.flink.streaming.util.TestAnyModeMultipleInputStreamOperator;
import org.apache.flink.streaming.util.TestSequentialMultipleInputStreamOperator;
import org.assertj.core.api.AbstractThrowableAssert;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

class StreamTaskMultipleInputSelectiveReadingTest {
    private static final StreamRecord<String>[] INPUT1 = new StreamRecord[]{new StreamRecord((Object)"Hello-1"), new StreamRecord((Object)"Hello-2"), new StreamRecord((Object)"Hello-3")};
    private static final StreamRecord<Integer>[] INPUT2 = new StreamRecord[]{new StreamRecord((Object)1), new StreamRecord((Object)2), new StreamRecord((Object)3), new StreamRecord((Object)4)};

    StreamTaskMultipleInputSelectiveReadingTest() {
    }

    @Test
    void testAnyOrderedReading() throws Exception {
        ArrayDeque<Object> expectedOutput = new ArrayDeque<Object>();
        expectedOutput.add(new StreamRecord((Object)"[1]: Hello-1"));
        expectedOutput.add(new StreamRecord((Object)"[2]: 1"));
        expectedOutput.add(new StreamRecord((Object)"[1]: Hello-2"));
        expectedOutput.add(new StreamRecord((Object)"[2]: 2"));
        expectedOutput.add(new StreamRecord((Object)"[1]: Hello-3"));
        expectedOutput.add(new StreamRecord((Object)"[2]: 3"));
        expectedOutput.add(new StreamRecord((Object)"[2]: 4"));
        this.testInputSelection((StreamOperatorFactory<String>)new TestAnyModeMultipleInputStreamOperator.Factory(), false, expectedOutput, true);
    }

    @Test
    void testAnyUnorderedReading() throws Exception {
        ArrayDeque<Object> expectedOutput = new ArrayDeque<Object>();
        expectedOutput.add(new StreamRecord((Object)"[1]: Hello-1"));
        expectedOutput.add(new StreamRecord((Object)"[2]: 1"));
        expectedOutput.add(new StreamRecord((Object)"[1]: Hello-2"));
        expectedOutput.add(new StreamRecord((Object)"[2]: 2"));
        expectedOutput.add(new StreamRecord((Object)"[1]: Hello-3"));
        expectedOutput.add(new StreamRecord((Object)"[2]: 3"));
        expectedOutput.add(new StreamRecord((Object)"[2]: 4"));
        this.testInputSelection((StreamOperatorFactory<String>)new TestAnyModeMultipleInputStreamOperator.Factory(), true, expectedOutput, false);
    }

    @Test
    void testSequentialReading() throws Exception {
        ArrayDeque<Object> expectedOutput = new ArrayDeque<Object>();
        expectedOutput.add(new StreamRecord((Object)"[1]: Hello-1"));
        expectedOutput.add(new StreamRecord((Object)"[1]: Hello-2"));
        expectedOutput.add(new StreamRecord((Object)"[1]: Hello-3"));
        expectedOutput.add(new StreamRecord((Object)"[2]: 1"));
        expectedOutput.add(new StreamRecord((Object)"[2]: 2"));
        expectedOutput.add(new StreamRecord((Object)"[2]: 3"));
        expectedOutput.add(new StreamRecord((Object)"[2]: 4"));
        this.testInputSelection((StreamOperatorFactory<String>)new TestSequentialMultipleInputStreamOperator.Factory(), true, expectedOutput, true);
    }

    @Test
    void testSpecialRuleReading() throws Exception {
        ArrayDeque<Object> expectedOutput = new ArrayDeque<Object>();
        expectedOutput.add(new StreamRecord((Object)"[1]: Hello-1"));
        expectedOutput.add(new StreamRecord((Object)"[1]: Hello-2"));
        expectedOutput.add(new StreamRecord((Object)"[2]: 1"));
        expectedOutput.add(new StreamRecord((Object)"[2]: 2"));
        expectedOutput.add(new StreamRecord((Object)"[1]: Hello-3"));
        expectedOutput.add(new StreamRecord((Object)"[2]: 3"));
        expectedOutput.add(new StreamRecord((Object)"[2]: 4"));
        this.testInputSelection((StreamOperatorFactory<String>)new SpecialRuleReadingStreamOperatorFactory(3, 4, 2), true, expectedOutput, true);
    }

    @Test
    void testReadFinishedInput() throws Exception {
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> this.testInputSelection((StreamOperatorFactory<String>)new TestReadFinishedInputStreamOperatorFactory(), true, new ArrayDeque<Object>(), true)).isInstanceOf(IOException.class)).hasMessageContaining("Can not make a progress: all selected inputs are already finished");
    }

    private void testInputSelection(StreamOperatorFactory<String> streamOperatorFactory, boolean autoProcess, ArrayDeque<Object> expectedOutput, boolean orderedCheck) throws Exception {
        try (StreamTaskMailboxTestHarness testHarness = new StreamTaskMailboxTestHarnessBuilder(MultipleInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO).addInput((TypeInformation<?>)BasicTypeInfo.STRING_TYPE_INFO).addInput((TypeInformation<?>)BasicTypeInfo.INT_TYPE_INFO).setupOutputForSingletonOperatorChain(streamOperatorFactory).build();){
            testHarness.setAutoProcess(autoProcess);
            for (StreamRecord<String> streamRecord : INPUT1) {
                testHarness.processElement(streamRecord, 0);
            }
            for (StreamRecord<Integer> streamRecord : INPUT2) {
                testHarness.processElement(streamRecord, 1);
            }
            testHarness.endInput();
            if (!autoProcess) {
                testHarness.processAll();
            }
            testHarness.waitForTaskCompletion();
            if (orderedCheck) {
                Assertions.assertThat(testHarness.getOutput()).containsExactlyElementsOf(expectedOutput);
            } else {
                Assertions.assertThat(testHarness.getOutput()).containsExactlyInAnyOrderElementsOf(expectedOutput);
            }
        }
    }

    @Test
    void testInputStarvation() throws Exception {
        try (StreamTaskMailboxTestHarness testHarness = new StreamTaskMailboxTestHarnessBuilder(MultipleInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO).addInput((TypeInformation<?>)BasicTypeInfo.STRING_TYPE_INFO).addInput((TypeInformation<?>)BasicTypeInfo.STRING_TYPE_INFO).addInput((TypeInformation<?>)BasicTypeInfo.STRING_TYPE_INFO).setupOutputForSingletonOperatorChain((StreamOperatorFactory<?>)new TestInputStarvationMultipleInputOperatorFactory()).build();){
            testHarness.processAll();
            ArrayDeque<StreamRecord> expectedOutput = new ArrayDeque<StreamRecord>();
            testHarness.setAutoProcess(false);
            testHarness.processSingleStep();
            Assertions.assertThat(testHarness.getOutput()).isEmpty();
            testHarness.processElement(new StreamRecord((Object)"NOT_SELECTED"), 0);
            testHarness.processElement(new StreamRecord((Object)"1"), 1);
            testHarness.processElement(new StreamRecord((Object)"2"), 1);
            testHarness.processElement(new StreamRecord((Object)"3"), 1);
            testHarness.processElement(new StreamRecord((Object)"4"), 1);
            testHarness.processSingleStep();
            expectedOutput.add(new StreamRecord((Object)"[2]: 1"));
            testHarness.processSingleStep();
            expectedOutput.add(new StreamRecord((Object)"[2]: 2"));
            Assertions.assertThat(testHarness.getOutput()).containsExactlyElementsOf(expectedOutput);
            testHarness.processElement(new StreamRecord((Object)"1"), 2);
            testHarness.processSingleStep();
            testHarness.processSingleStep();
            expectedOutput.add(new StreamRecord((Object)"[3]: 1"));
            expectedOutput.add(new StreamRecord((Object)"[2]: 3"));
            Assertions.assertThat(testHarness.getOutput()).containsExactlyInAnyOrderElementsOf(expectedOutput);
        }
    }

    private static class TestInputStarvationMultipleInputOperatorFactory
    extends AbstractStreamOperatorFactory<String> {
        private TestInputStarvationMultipleInputOperatorFactory() {
        }

        public <T extends StreamOperator<String>> T createStreamOperator(StreamOperatorParameters<String> parameters) {
            return (T)((Object)new TestInputStarvationMultipleInputOperator(parameters));
        }

        public Class<? extends StreamOperator> getStreamOperatorClass(ClassLoader classLoader) {
            return TestInputStarvationMultipleInputOperator.class;
        }
    }

    private static class TestInputStarvationMultipleInputOperator
    extends AbstractStreamOperatorV2<String>
    implements MultipleInputStreamOperator<String>,
    InputSelectable {
        public TestInputStarvationMultipleInputOperator(StreamOperatorParameters<String> parameters) {
            super(parameters, 3);
        }

        public InputSelection nextSelection() {
            return new InputSelection.Builder().select(2).select(3).build();
        }

        public List<Input> getInputs() {
            return Arrays.asList(new Input[]{new TestAnyModeMultipleInputStreamOperator.ToStringInput(this, 1), new TestAnyModeMultipleInputStreamOperator.ToStringInput(this, 2), new TestAnyModeMultipleInputStreamOperator.ToStringInput(this, 3)});
        }
    }

    private static class TestReadFinishedInputStreamOperatorFactory
    extends AbstractStreamOperatorFactory<String> {
        private TestReadFinishedInputStreamOperatorFactory() {
        }

        public <T extends StreamOperator<String>> T createStreamOperator(StreamOperatorParameters<String> parameters) {
            return (T)((Object)new TestReadFinishedInputStreamOperator(parameters));
        }

        public Class<? extends StreamOperator> getStreamOperatorClass(ClassLoader classLoader) {
            return TestReadFinishedInputStreamOperator.class;
        }
    }

    private static class TestReadFinishedInputStreamOperator
    extends TestAnyModeMultipleInputStreamOperator {
        TestReadFinishedInputStreamOperator(StreamOperatorParameters<String> parameters) {
            super(parameters);
        }

        @Override
        public InputSelection nextSelection() {
            return InputSelection.FIRST;
        }
    }

    private static class SpecialRuleReadingStreamOperatorFactory
    extends AbstractStreamOperatorFactory<String> {
        private final int input1Records;
        private final int input2Records;
        private final int maxContinuousReadingRecords;

        public SpecialRuleReadingStreamOperatorFactory(int input1Records, int input2Records, int maxContinuousReadingRecords) {
            this.input1Records = input1Records;
            this.input2Records = input2Records;
            this.maxContinuousReadingRecords = maxContinuousReadingRecords;
        }

        public <T extends StreamOperator<String>> T createStreamOperator(StreamOperatorParameters<String> parameters) {
            return (T)((Object)new SpecialRuleReadingStreamOperator(parameters, this.input1Records, this.input2Records, this.maxContinuousReadingRecords));
        }

        public Class<? extends StreamOperator> getStreamOperatorClass(ClassLoader classLoader) {
            return SpecialRuleReadingStreamOperator.class;
        }
    }

    private static class SpecialRuleReadingStreamOperator
    extends AbstractStreamOperatorV2<String>
    implements MultipleInputStreamOperator<String>,
    InputSelectable,
    BoundedMultiInput {
        private final int input1Records;
        private final int input2Records;
        private final int maxContinuousReadingRecords;
        private int input1ReadingRecords;
        private int input2ReadingRecords;
        private int continuousReadingRecords;
        private InputSelection inputSelection = InputSelection.FIRST;

        SpecialRuleReadingStreamOperator(StreamOperatorParameters<String> parameters, int input1Records, int input2Records, int maxContinuousReadingRecords) {
            super(parameters, 2);
            this.input1Records = input1Records;
            this.input2Records = input2Records;
            this.maxContinuousReadingRecords = maxContinuousReadingRecords;
        }

        public InputSelection nextSelection() {
            return this.inputSelection;
        }

        public void endInput(int inputId) {
            this.inputSelection = inputId == 1 ? InputSelection.SECOND : InputSelection.FIRST;
        }

        public List<Input> getInputs() {
            return Arrays.asList(new Input[]{new TestAnyModeMultipleInputStreamOperator.ToStringInput(this, 1){

                public void processElement(StreamRecord element) {
                    super.processElement(element);
                    input1ReadingRecords++;
                    continuousReadingRecords++;
                    if (continuousReadingRecords == maxContinuousReadingRecords) {
                        continuousReadingRecords = 0;
                        if (input2ReadingRecords < input2Records) {
                            inputSelection = InputSelection.SECOND;
                            return;
                        }
                    }
                    inputSelection = InputSelection.FIRST;
                }
            }, new TestAnyModeMultipleInputStreamOperator.ToStringInput(this, 2){

                public void processElement(StreamRecord element) {
                    super.processElement(element);
                    input2ReadingRecords++;
                    continuousReadingRecords++;
                    if (continuousReadingRecords == maxContinuousReadingRecords) {
                        continuousReadingRecords = 0;
                        if (input1ReadingRecords < input1Records) {
                            inputSelection = InputSelection.FIRST;
                            return;
                        }
                    }
                    inputSelection = InputSelection.SECOND;
                }
            }});
        }
    }
}

