/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.tasks;

import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.Metric;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils;
import org.apache.flink.runtime.io.AvailabilityProvider;
import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.io.network.api.EndOfData;
import org.apache.flink.runtime.io.network.api.StopMode;
import org.apache.flink.runtime.io.network.api.writer.RecordOrEventCollectingResultPartitionWriter;
import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriterWithAvailabilityHelper;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.metrics.MetricRegistry;
import org.apache.flink.runtime.metrics.NoOpMetricRegistry;
import org.apache.flink.runtime.metrics.groups.InternalOperatorMetricGroup;
import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.metrics.util.InterceptingOperatorMetricGroup;
import org.apache.flink.runtime.metrics.util.InterceptingTaskMetricGroup;
import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
import org.apache.flink.runtime.state.TaskStateManager;
import org.apache.flink.runtime.state.TestTaskStateManager;
import org.apache.flink.streaming.api.functions.co.CoMapFunction;
import org.apache.flink.streaming.api.functions.co.RichCoMapFunction;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.InputSelectable;
import org.apache.flink.streaming.api.operators.InputSelection;
import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
import org.apache.flink.streaming.api.operators.co.CoStreamMap;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.OneInputStreamTaskTest;
import org.apache.flink.streaming.runtime.tasks.StreamMockEnvironment;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.streaming.runtime.tasks.StreamTaskMailboxTestHarness;
import org.apache.flink.streaming.runtime.tasks.StreamTaskMailboxTestHarnessBuilder;
import org.apache.flink.streaming.runtime.tasks.StreamTaskTestHarness;
import org.apache.flink.streaming.runtime.tasks.TestBoundedOneInputStreamOperator;
import org.apache.flink.streaming.runtime.tasks.TestBoundedTwoInputOperator;
import org.apache.flink.streaming.runtime.tasks.TestFinishedOnRestoreStreamOperator;
import org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask;
import org.apache.flink.streaming.runtime.tasks.TwoInputStreamTaskTestHarness;
import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
import org.apache.flink.streaming.util.TestHarnessUtil;
import org.apache.flink.util.OutputTag;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.hamcrest.collection.IsMapContaining;
import org.junit.Assert;
import org.junit.Test;

public class TwoInputStreamTaskTest {
    @Test
    public void testOpenCloseAndTimestamps() throws Exception {
        TwoInputStreamTaskTestHarness testHarness = new TwoInputStreamTaskTestHarness(TwoInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
        testHarness.setupOutputForSingletonOperatorChain();
        StreamConfig streamConfig = testHarness.getStreamConfig();
        CoStreamMap coMapOperator = new CoStreamMap((CoMapFunction)new TestOpenCloseMapFunction());
        streamConfig.setStreamOperator((StreamOperator)coMapOperator);
        streamConfig.setOperatorID(new OperatorID());
        long initialTime = 0L;
        ConcurrentLinkedQueue<StreamRecord> expectedOutput = new ConcurrentLinkedQueue<StreamRecord>();
        testHarness.invoke();
        testHarness.waitForTaskRunning();
        testHarness.processElement(new StreamRecord((Object)"Hello", initialTime + 1L), 0, 0);
        expectedOutput.add(new StreamRecord((Object)"Hello", initialTime + 1L));
        testHarness.waitForInputProcessing();
        testHarness.processElement(new StreamRecord((Object)1337, initialTime + 2L), 1, 0);
        expectedOutput.add(new StreamRecord((Object)"1337", initialTime + 2L));
        testHarness.waitForInputProcessing();
        testHarness.endInput();
        testHarness.waitForTaskCompletion();
        Assert.assertTrue((String)"RichFunction methods were not called.", (boolean)TestOpenCloseMapFunction.closeCalled);
        TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
    }

    @Test
    public void testWatermarkAndWatermarkStatusForwarding() throws Exception {
        TwoInputStreamTaskTestHarness testHarness = new TwoInputStreamTaskTestHarness(TwoInputStreamTask::new, 2, 2, new int[]{1, 2}, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
        testHarness.setupOutputForSingletonOperatorChain();
        StreamConfig streamConfig = testHarness.getStreamConfig();
        CoStreamMap coMapOperator = new CoStreamMap((CoMapFunction)new IdentityMap());
        streamConfig.setStreamOperator((StreamOperator)coMapOperator);
        streamConfig.setOperatorID(new OperatorID());
        ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
        long initialTime = 0L;
        testHarness.invoke();
        testHarness.waitForTaskRunning();
        testHarness.processElement(new Watermark(initialTime), 0, 0);
        testHarness.processElement(new Watermark(initialTime), 0, 1);
        testHarness.processElement(new Watermark(initialTime), 1, 0);
        testHarness.waitForInputProcessing();
        TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.processElement(new Watermark(initialTime), 1, 1);
        testHarness.waitForInputProcessing();
        expectedOutput.add(new Watermark(initialTime));
        TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.processElement(new StreamRecord((Object)"Hello", initialTime), 0, 0);
        testHarness.processElement(new StreamRecord((Object)42, initialTime), 1, 1);
        expectedOutput.add(new StreamRecord((Object)"Hello", initialTime));
        expectedOutput.add(new StreamRecord((Object)"42", initialTime));
        testHarness.waitForInputProcessing();
        TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.processElement(new Watermark(initialTime + 4L), 0, 0);
        testHarness.processElement(new Watermark(initialTime + 3L), 0, 1);
        testHarness.processElement(new Watermark(initialTime + 3L), 1, 0);
        testHarness.processElement(new Watermark(initialTime + 2L), 1, 1);
        expectedOutput.add(new Watermark(initialTime + 2L));
        testHarness.waitForInputProcessing();
        TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.processElement(new Watermark(initialTime + 4L), 1, 1);
        testHarness.waitForInputProcessing();
        expectedOutput.add(new Watermark(initialTime + 3L));
        TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.processElement(new Watermark(initialTime + 4L), 0, 1);
        testHarness.processElement(new Watermark(initialTime + 4L), 1, 0);
        testHarness.waitForInputProcessing();
        expectedOutput.add(new Watermark(initialTime + 4L));
        TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.processElement(WatermarkStatus.IDLE, 0, 1);
        testHarness.processElement(WatermarkStatus.IDLE, 1, 0);
        testHarness.processElement(new Watermark(initialTime + 6L), 0, 0);
        testHarness.processElement(new Watermark(initialTime + 5L), 1, 1);
        testHarness.processElement(WatermarkStatus.IDLE, 1, 1);
        testHarness.waitForInputProcessing();
        expectedOutput.add(new Watermark(initialTime + 5L));
        expectedOutput.add(new Watermark(initialTime + 6L));
        TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.processElement(WatermarkStatus.IDLE, 0, 0);
        testHarness.waitForInputProcessing();
        expectedOutput.add(WatermarkStatus.IDLE);
        TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.processElement(WatermarkStatus.ACTIVE, 1, 0);
        testHarness.processElement(WatermarkStatus.ACTIVE, 0, 1);
        testHarness.waitForInputProcessing();
        expectedOutput.add(WatermarkStatus.ACTIVE);
        TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.endInput();
        testHarness.waitForTaskCompletion();
        List resultElements = TestHarnessUtil.getRawElementsFromOutput(testHarness.getOutput());
        Assert.assertEquals((long)2L, (long)resultElements.size());
    }

    @Test
    public void testCheckpointBarriers() throws Exception {
        TwoInputStreamTaskTestHarness testHarness = new TwoInputStreamTaskTestHarness(TwoInputStreamTask::new, 2, 2, new int[]{1, 2}, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
        testHarness.setupOutputForSingletonOperatorChain();
        StreamConfig streamConfig = testHarness.getStreamConfig();
        CoStreamMap coMapOperator = new CoStreamMap((CoMapFunction)new IdentityMap());
        streamConfig.setStreamOperator((StreamOperator)coMapOperator);
        streamConfig.setOperatorID(new OperatorID());
        ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
        long initialTime = 0L;
        testHarness.invoke();
        testHarness.waitForTaskRunning();
        testHarness.processEvent((AbstractEvent)new CheckpointBarrier(0L, 0L, CheckpointOptions.forCheckpointWithDefaultLocation()), 0, 0);
        testHarness.processElement(new StreamRecord((Object)"Ciao-0-0", initialTime), 0, 1);
        expectedOutput.add(new StreamRecord((Object)"Ciao-0-0", initialTime));
        testHarness.waitForInputProcessing();
        testHarness.processElement(new StreamRecord((Object)11, initialTime), 1, 1);
        testHarness.processElement(new StreamRecord((Object)111, initialTime), 1, 1);
        expectedOutput.add(new StreamRecord((Object)"11", initialTime));
        expectedOutput.add(new StreamRecord((Object)"111", initialTime));
        testHarness.waitForInputProcessing();
        for (int i = 0; i < 20 && testHarness.getOutput().size() < expectedOutput.size(); ++i) {
            Thread.sleep(100L);
        }
        TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.processEvent((AbstractEvent)new CheckpointBarrier(0L, 0L, CheckpointOptions.forCheckpointWithDefaultLocation()), 0, 1);
        testHarness.processEvent((AbstractEvent)new CheckpointBarrier(0L, 0L, CheckpointOptions.forCheckpointWithDefaultLocation()), 1, 0);
        testHarness.processEvent((AbstractEvent)new CheckpointBarrier(0L, 0L, CheckpointOptions.forCheckpointWithDefaultLocation()), 1, 1);
        testHarness.waitForInputProcessing();
        testHarness.endInput();
        testHarness.waitForTaskCompletion();
        expectedOutput.add(new CheckpointBarrier(0L, 0L, CheckpointOptions.forCheckpointWithDefaultLocation()));
        TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
        List resultElements = TestHarnessUtil.getRawElementsFromOutput(testHarness.getOutput());
        Assert.assertEquals((long)3L, (long)resultElements.size());
    }

    @Test
    public void testOvertakingCheckpointBarriers() throws Exception {
        TwoInputStreamTaskTestHarness testHarness = new TwoInputStreamTaskTestHarness(TwoInputStreamTask::new, 2, 2, new int[]{1, 2}, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
        testHarness.setupOutputForSingletonOperatorChain();
        StreamConfig streamConfig = testHarness.getStreamConfig();
        CoStreamMap coMapOperator = new CoStreamMap((CoMapFunction)new IdentityMap());
        streamConfig.setStreamOperator((StreamOperator)coMapOperator);
        streamConfig.setOperatorID(new OperatorID());
        ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
        long initialTime = 0L;
        testHarness.invoke();
        testHarness.waitForTaskRunning();
        testHarness.processEvent((AbstractEvent)new CheckpointBarrier(0L, 0L, CheckpointOptions.forCheckpointWithDefaultLocation()), 0, 0);
        testHarness.processElement(new StreamRecord((Object)42, initialTime), 1, 1);
        testHarness.processElement(new StreamRecord((Object)1337, initialTime), 1, 1);
        expectedOutput.add(new StreamRecord((Object)"42", initialTime));
        expectedOutput.add(new StreamRecord((Object)"1337", initialTime));
        testHarness.waitForInputProcessing();
        TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.processEvent((AbstractEvent)new CheckpointBarrier(1L, 1L, CheckpointOptions.forCheckpointWithDefaultLocation()), 0, 1);
        testHarness.processEvent((AbstractEvent)new CheckpointBarrier(1L, 1L, CheckpointOptions.forCheckpointWithDefaultLocation()), 0, 0);
        testHarness.processEvent((AbstractEvent)new CheckpointBarrier(1L, 1L, CheckpointOptions.forCheckpointWithDefaultLocation()), 1, 0);
        testHarness.processEvent((AbstractEvent)new CheckpointBarrier(1L, 1L, CheckpointOptions.forCheckpointWithDefaultLocation()), 1, 1);
        expectedOutput.add(new CancelCheckpointMarker(0L));
        expectedOutput.add(new CheckpointBarrier(1L, 1L, CheckpointOptions.forCheckpointWithDefaultLocation()));
        testHarness.waitForInputProcessing();
        TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.processEvent((AbstractEvent)new CheckpointBarrier(0L, 0L, CheckpointOptions.forCheckpointWithDefaultLocation()), 0, 1);
        testHarness.processEvent((AbstractEvent)new CheckpointBarrier(0L, 0L, CheckpointOptions.forCheckpointWithDefaultLocation()), 1, 0);
        testHarness.processEvent((AbstractEvent)new CheckpointBarrier(0L, 0L, CheckpointOptions.forCheckpointWithDefaultLocation()), 1, 1);
        testHarness.waitForInputProcessing();
        testHarness.endInput();
        testHarness.waitForTaskCompletion();
        TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
    }

    @Test
    public void testOperatorMetricReuse() throws Exception {
        int x;
        TwoInputStreamTaskTestHarness testHarness = new TwoInputStreamTaskTestHarness(TwoInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
        testHarness.setupOperatorChain(new OperatorID(), (StreamOperator<?>)new DuplicatingOperator()).chain(new OperatorID(), new OneInputStreamTaskTest.DuplicatingOperator(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig())).chain(new OperatorID(), new OneInputStreamTaskTest.DuplicatingOperator(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig())).finish();
        final TaskMetricGroup taskMetricGroup = TaskManagerMetricGroup.createTaskManagerMetricGroup((MetricRegistry)NoOpMetricRegistry.INSTANCE, (String)"host", (ResourceID)ResourceID.generate()).addJob(new JobID(), "jobname").addTask(ExecutionGraphTestUtils.createExecutionAttemptId(), "task");
        StreamMockEnvironment env = new StreamMockEnvironment(testHarness.jobConfig, testHarness.taskConfig, testHarness.memorySize, new MockInputSplitProvider(), testHarness.bufferSize, (TaskStateManager)new TestTaskStateManager()){

            @Override
            public TaskMetricGroup getMetricGroup() {
                return taskMetricGroup;
            }
        };
        Counter numRecordsInCounter = taskMetricGroup.getIOMetricGroup().getNumRecordsInCounter();
        Counter numRecordsOutCounter = taskMetricGroup.getIOMetricGroup().getNumRecordsOutCounter();
        testHarness.invoke(env);
        testHarness.waitForTaskRunning();
        int numRecords1 = 5;
        int numRecords2 = 3;
        for (x = 0; x < 5; ++x) {
            testHarness.processElement(new StreamRecord((Object)"hello"), 0, 0);
        }
        for (x = 0; x < 3; ++x) {
            testHarness.processElement(new StreamRecord((Object)"hello"), 1, 0);
        }
        testHarness.waitForInputProcessing();
        Assert.assertEquals((long)8L, (long)numRecordsInCounter.getCount());
        Assert.assertEquals((long)64L, (long)numRecordsOutCounter.getCount());
        testHarness.endInput();
        testHarness.waitForTaskCompletion();
    }

    @Test
    public void testSkipExecutionsIfFinishedOnRestore() throws Exception {
        OperatorID nonSourceOperatorId = new OperatorID();
        try (StreamTaskMailboxTestHarness testHarness = new StreamTaskMailboxTestHarnessBuilder(TwoInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO).setCollectNetworkEvents().modifyStreamConfig(config -> config.setCheckpointingEnabled(true)).addInput((TypeInformation<?>)BasicTypeInfo.INT_TYPE_INFO).addInput((TypeInformation<?>)BasicTypeInfo.INT_TYPE_INFO).setTaskStateSnapshot(1L, TaskStateSnapshot.FINISHED_ON_RESTORE).setupOperatorChain(nonSourceOperatorId, (StreamOperator<?>)new TestFinishedOnRestoreStreamOperator()).finishForSingletonOperatorChain(StringSerializer.INSTANCE).build();){
            testHarness.processElement(Watermark.MAX_WATERMARK, 0);
            testHarness.processElement(Watermark.MAX_WATERMARK, 1);
            testHarness.waitForTaskCompletion();
            MatcherAssert.assertThat(testHarness.getOutput(), (Matcher)Matchers.contains((Object[])new Object[]{Watermark.MAX_WATERMARK, new EndOfData(StopMode.DRAIN)}));
        }
    }

    @Test
    public void testWatermarkMetrics() throws Exception {
        TwoInputStreamTaskTestHarness testHarness = new TwoInputStreamTaskTestHarness(TwoInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
        CoStreamMap headOperator = new CoStreamMap((CoMapFunction)new IdentityMap());
        final OperatorID headOperatorId = new OperatorID();
        OneInputStreamTaskTest.WatermarkMetricOperator chainedOperator = new OneInputStreamTaskTest.WatermarkMetricOperator();
        final OperatorID chainedOperatorId = new OperatorID();
        testHarness.setupOperatorChain(headOperatorId, (StreamOperator<?>)headOperator).chain(chainedOperatorId, chainedOperator, BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig())).finish();
        final InterceptingOperatorMetricGroup headOperatorMetricGroup = new InterceptingOperatorMetricGroup();
        final InterceptingOperatorMetricGroup chainedOperatorMetricGroup = new InterceptingOperatorMetricGroup();
        final InterceptingTaskMetricGroup taskMetricGroup = new InterceptingTaskMetricGroup(){

            public InternalOperatorMetricGroup getOrAddOperator(OperatorID id, String name) {
                if (id.equals((Object)headOperatorId)) {
                    return headOperatorMetricGroup;
                }
                if (id.equals((Object)chainedOperatorId)) {
                    return chainedOperatorMetricGroup;
                }
                return super.getOrAddOperator(id, name);
            }
        };
        StreamMockEnvironment env = new StreamMockEnvironment(testHarness.jobConfig, testHarness.taskConfig, testHarness.memorySize, new MockInputSplitProvider(), testHarness.bufferSize, (TaskStateManager)new TestTaskStateManager()){

            @Override
            public TaskMetricGroup getMetricGroup() {
                return taskMetricGroup;
            }
        };
        testHarness.invoke(env);
        testHarness.waitForTaskRunning();
        Gauge taskInputWatermarkGauge = (Gauge)taskMetricGroup.get("currentInputWatermark");
        Gauge headInput1WatermarkGauge = (Gauge)headOperatorMetricGroup.get("currentInput1Watermark");
        Gauge headInput2WatermarkGauge = (Gauge)headOperatorMetricGroup.get("currentInput2Watermark");
        Gauge headInputWatermarkGauge = (Gauge)headOperatorMetricGroup.get("currentInputWatermark");
        Gauge headOutputWatermarkGauge = (Gauge)headOperatorMetricGroup.get("currentOutputWatermark");
        Gauge chainedInputWatermarkGauge = (Gauge)chainedOperatorMetricGroup.get("currentInputWatermark");
        Gauge chainedOutputWatermarkGauge = (Gauge)chainedOperatorMetricGroup.get("currentOutputWatermark");
        Assert.assertEquals((String)"A metric was registered multiple times.", (long)7L, (long)new HashSet<Gauge>(Arrays.asList(taskInputWatermarkGauge, headInput1WatermarkGauge, headInput2WatermarkGauge, headInputWatermarkGauge, headOutputWatermarkGauge, chainedInputWatermarkGauge, chainedOutputWatermarkGauge)).size());
        Assert.assertEquals((long)Long.MIN_VALUE, (long)((Long)taskInputWatermarkGauge.getValue()));
        Assert.assertEquals((long)Long.MIN_VALUE, (long)((Long)headInputWatermarkGauge.getValue()));
        Assert.assertEquals((long)Long.MIN_VALUE, (long)((Long)headInput1WatermarkGauge.getValue()));
        Assert.assertEquals((long)Long.MIN_VALUE, (long)((Long)headInput2WatermarkGauge.getValue()));
        Assert.assertEquals((long)Long.MIN_VALUE, (long)((Long)headOutputWatermarkGauge.getValue()));
        Assert.assertEquals((long)Long.MIN_VALUE, (long)((Long)chainedInputWatermarkGauge.getValue()));
        Assert.assertEquals((long)Long.MIN_VALUE, (long)((Long)chainedOutputWatermarkGauge.getValue()));
        testHarness.processElement(new Watermark(1L), 0, 0);
        testHarness.waitForInputProcessing();
        Assert.assertEquals((long)Long.MIN_VALUE, (long)((Long)taskInputWatermarkGauge.getValue()));
        Assert.assertEquals((long)Long.MIN_VALUE, (long)((Long)headInputWatermarkGauge.getValue()));
        Assert.assertEquals((long)1L, (long)((Long)headInput1WatermarkGauge.getValue()));
        Assert.assertEquals((long)Long.MIN_VALUE, (long)((Long)headInput2WatermarkGauge.getValue()));
        Assert.assertEquals((long)Long.MIN_VALUE, (long)((Long)headOutputWatermarkGauge.getValue()));
        Assert.assertEquals((long)Long.MIN_VALUE, (long)((Long)chainedInputWatermarkGauge.getValue()));
        Assert.assertEquals((long)Long.MIN_VALUE, (long)((Long)chainedOutputWatermarkGauge.getValue()));
        testHarness.processElement(new Watermark(2L), 1, 0);
        testHarness.waitForInputProcessing();
        Assert.assertEquals((long)1L, (long)((Long)taskInputWatermarkGauge.getValue()));
        Assert.assertEquals((long)1L, (long)((Long)headInputWatermarkGauge.getValue()));
        Assert.assertEquals((long)1L, (long)((Long)headInput1WatermarkGauge.getValue()));
        Assert.assertEquals((long)2L, (long)((Long)headInput2WatermarkGauge.getValue()));
        Assert.assertEquals((long)1L, (long)((Long)headOutputWatermarkGauge.getValue()));
        Assert.assertEquals((long)1L, (long)((Long)chainedInputWatermarkGauge.getValue()));
        Assert.assertEquals((long)2L, (long)((Long)chainedOutputWatermarkGauge.getValue()));
        testHarness.processElement(new Watermark(3L), 0, 0);
        testHarness.waitForInputProcessing();
        Assert.assertEquals((long)2L, (long)((Long)taskInputWatermarkGauge.getValue()));
        Assert.assertEquals((long)2L, (long)((Long)headInputWatermarkGauge.getValue()));
        Assert.assertEquals((long)3L, (long)((Long)headInput1WatermarkGauge.getValue()));
        Assert.assertEquals((long)2L, (long)((Long)headInput2WatermarkGauge.getValue()));
        Assert.assertEquals((long)2L, (long)((Long)headOutputWatermarkGauge.getValue()));
        Assert.assertEquals((long)2L, (long)((Long)chainedInputWatermarkGauge.getValue()));
        Assert.assertEquals((long)4L, (long)((Long)chainedOutputWatermarkGauge.getValue()));
        testHarness.endInput();
        testHarness.waitForTaskCompletion();
    }

    @Test
    public void testClosingAllOperatorsOnChainProperly() throws Exception {
        TwoInputStreamTaskTestHarness testHarness = new TwoInputStreamTaskTestHarness(TwoInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
        testHarness.setupOperatorChain(new OperatorID(), (StreamOperator<?>)new TestBoundedTwoInputOperator("Operator0")).chain(new OperatorID(), new TestBoundedOneInputStreamOperator("Operator1"), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig())).finish();
        testHarness.invoke();
        testHarness.waitForTaskRunning();
        testHarness.processElement(new StreamRecord((Object)"Hello-1"), 0, 0);
        testHarness.endInput(0, 0);
        testHarness.waitForInputProcessing();
        testHarness.processElement(new StreamRecord((Object)"Hello-2"), 1, 0);
        testHarness.endInput(1, 0);
        testHarness.waitForTaskCompletion();
        ArrayList expected = new ArrayList();
        Collections.addAll(expected, new StreamRecord((Object)"[Operator0-1]: Hello-1"), new StreamRecord((Object)"[Operator0-1]: End of input"), new StreamRecord((Object)"[Operator0-2]: Hello-2"), new StreamRecord((Object)"[Operator0-2]: End of input"), new StreamRecord((Object)"[Operator0]: Finish"), new StreamRecord((Object)"[Operator1]: End of input"), new StreamRecord((Object)"[Operator1]: Finish"));
        Object[] output = testHarness.getOutput().toArray();
        Assert.assertArrayEquals((String)"Output was not correct.", (Object[])expected.toArray(), (Object[])output);
    }

    @Test
    public void testCheckpointBarrierMetrics() throws Exception {
        TwoInputStreamTaskTestHarness testHarness = new TwoInputStreamTaskTestHarness(TwoInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
        StreamConfig streamConfig = testHarness.getStreamConfig();
        CoStreamMap coMapOperator = new CoStreamMap((CoMapFunction)new IdentityMap());
        testHarness.setupOutputForSingletonOperatorChain();
        streamConfig.setStreamOperator((StreamOperator)coMapOperator);
        ConcurrentHashMap<String, Metric> metrics = new ConcurrentHashMap<String, Metric>();
        TaskMetricGroup taskMetricGroup = StreamTaskTestHarness.createTaskMetricGroup(metrics);
        StreamMockEnvironment environment = testHarness.createEnvironment();
        environment.setTaskMetricGroup(taskMetricGroup);
        testHarness.invoke(environment);
        testHarness.waitForTaskRunning();
        MatcherAssert.assertThat(metrics, (Matcher)IsMapContaining.hasKey((Object)"checkpointAlignmentTime"));
        MatcherAssert.assertThat(metrics, (Matcher)IsMapContaining.hasKey((Object)"checkpointStartDelayNanos"));
        testHarness.endInput();
        testHarness.waitForTaskCompletion();
    }

    @Test
    public void testCanEmitBatchOfRecords() throws Exception {
        AvailabilityProvider.AvailabilityHelper availabilityHelper = new AvailabilityProvider.AvailabilityHelper();
        try (StreamTaskMailboxTestHarness testHarness = new StreamTaskMailboxTestHarnessBuilder(TwoInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO).addInput((TypeInformation<?>)BasicTypeInfo.STRING_TYPE_INFO).addInput((TypeInformation<?>)BasicTypeInfo.STRING_TYPE_INFO).addAdditionalOutput(new ResultPartitionWriter[]{new ResultPartitionWriterWithAvailabilityHelper(availabilityHelper)}).setupOperatorChain((StreamOperator<?>)new DuplicatingOperator()).finishForSingletonOperatorChain(IntSerializer.INSTANCE).build();){
            StreamTask.CanEmitBatchOfRecordsChecker canEmitBatchOfRecordsChecker = testHarness.streamTask.getCanEmitBatchOfRecords();
            testHarness.processAll();
            availabilityHelper.resetAvailable();
            Assert.assertFalse((boolean)canEmitBatchOfRecordsChecker.check());
            availabilityHelper.resetUnavailable();
            Assert.assertFalse((boolean)canEmitBatchOfRecordsChecker.check());
            availabilityHelper.resetAvailable();
            Assert.assertFalse((boolean)canEmitBatchOfRecordsChecker.check());
            testHarness.streamTask.mainMailboxExecutor.execute(() -> {}, "mail");
            Assert.assertFalse((boolean)canEmitBatchOfRecordsChecker.check());
            testHarness.processAll();
            Assert.assertFalse((boolean)canEmitBatchOfRecordsChecker.check());
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testTaskSideOutputStatistics() throws Exception {
        TaskMetricGroup taskMetricGroup = UnregisteredMetricGroups.createUnregisteredTaskMetricGroup();
        ResultPartitionWriter[] partitionWriters = new ResultPartitionWriter[3];
        for (int i = 0; i < partitionWriters.length; ++i) {
            partitionWriters[i] = new RecordOrEventCollectingResultPartitionWriter(new ArrayDeque(), (TypeSerializer)new StreamElementSerializer(BasicTypeInfo.INT_TYPE_INFO.createSerializer(new ExecutionConfig())));
            partitionWriters[i].setup();
        }
        try (StreamTaskMailboxTestHarness testHarness = new StreamTaskMailboxTestHarnessBuilder(TwoInputStreamTask::new, BasicTypeInfo.INT_TYPE_INFO).addInput((TypeInformation<?>)BasicTypeInfo.INT_TYPE_INFO).addInput((TypeInformation<?>)BasicTypeInfo.INT_TYPE_INFO).addAdditionalOutput(partitionWriters).setupOperatorChain(new OperatorID(), (StreamOperator<?>)new PassThroughOperator()).chain(BasicTypeInfo.INT_TYPE_INFO.createSerializer(new ExecutionConfig())).setOperatorFactory((StreamOperatorFactory)SimpleOperatorFactory.of((StreamOperator)new OneInputStreamTaskTest.OddEvenOperator())).addNonChainedOutputsCount(new OutputTag("odd", (TypeInformation)BasicTypeInfo.INT_TYPE_INFO), 2).addNonChainedOutputsCount(1).build().chain(BasicTypeInfo.INT_TYPE_INFO.createSerializer(new ExecutionConfig())).setOperatorFactory((StreamOperatorFactory)SimpleOperatorFactory.of((StreamOperator)new OneInputStreamTaskTest.DuplicatingOperator())).addNonChainedOutputsCount(1).build().finish().setTaskMetricGroup(taskMetricGroup).build();){
            int x;
            Counter numRecordsInCounter = taskMetricGroup.getIOMetricGroup().getNumRecordsInCounter();
            Counter numRecordsOutCounter = taskMetricGroup.getIOMetricGroup().getNumRecordsOutCounter();
            int numEvenRecords = 5;
            int numOddRecords = 3;
            for (x = 0; x < 5; ++x) {
                testHarness.processElement(new StreamRecord((Object)(2 * x)));
            }
            for (x = 0; x < 3; ++x) {
                testHarness.processElement(new StreamRecord((Object)(2 * x + 1)));
            }
            int oddEvenOperatorOutputsWithOddTag = 3;
            int oddEvenOperatorOutputsWithoutTag = 8;
            int duplicatingOperatorOutput = 16;
            Assert.assertEquals((long)8L, (long)numRecordsInCounter.getCount());
            Assert.assertEquals((long)27L, (long)numRecordsOutCounter.getCount());
            testHarness.waitForTaskCompletion();
        }
        finally {
            for (ResultPartitionWriter partitionWriter : partitionWriters) {
                partitionWriter.close();
            }
        }
    }

    private static class IdentityMap
    implements CoMapFunction<String, Integer, String> {
        private static final long serialVersionUID = 1L;

        private IdentityMap() {
        }

        public String map1(String value) {
            return value;
        }

        public String map2(Integer value) {
            return value.toString();
        }
    }

    private static class TestOpenCloseMapFunction
    extends RichCoMapFunction<String, Integer, String> {
        private static final long serialVersionUID = 1L;
        public static boolean openCalled = false;
        public static boolean closeCalled = false;

        TestOpenCloseMapFunction() {
            openCalled = false;
            closeCalled = false;
        }

        public void open(Configuration parameters) throws Exception {
            super.open(parameters);
            if (closeCalled) {
                Assert.fail((String)"Close called before open.");
            }
            openCalled = true;
        }

        public void close() throws Exception {
            super.close();
            if (!openCalled) {
                Assert.fail((String)"Open was not called before close.");
            }
            closeCalled = true;
        }

        public String map1(String value) {
            if (!openCalled) {
                Assert.fail((String)"Open was not called before run.");
            }
            return value;
        }

        public String map2(Integer value) {
            if (!openCalled) {
                Assert.fail((String)"Open was not called before run.");
            }
            return value.toString();
        }
    }

    static class OddEvenOperator
    extends AbstractStreamOperator<Integer>
    implements TwoInputStreamOperator<Integer, Integer, Integer> {
        private final OutputTag<Integer> oddOutputTag = new OutputTag("odd", (TypeInformation)BasicTypeInfo.INT_TYPE_INFO);
        private final OutputTag<Integer> evenOutputTag = new OutputTag("even", (TypeInformation)BasicTypeInfo.INT_TYPE_INFO);

        OddEvenOperator() {
        }

        public void processElement1(StreamRecord<Integer> element) throws Exception {
            this.processElement(element);
        }

        public void processElement2(StreamRecord<Integer> element) throws Exception {
            this.processElement(element);
        }

        private void processElement(StreamRecord<Integer> element) {
            if ((Integer)element.getValue() % 2 == 0) {
                this.output.collect(this.evenOutputTag, element);
            } else {
                this.output.collect(this.oddOutputTag, element);
            }
            this.output.collect(element);
        }
    }

    static class PassThroughOperator<T>
    extends AbstractStreamOperator<T>
    implements TwoInputStreamOperator<T, T, T> {
        PassThroughOperator() {
        }

        public void processElement1(StreamRecord<T> element) throws Exception {
            this.output.collect(element);
        }

        public void processElement2(StreamRecord<T> element) throws Exception {
            this.output.collect(element);
        }
    }

    static class DuplicatingOperator
    extends AbstractStreamOperator<String>
    implements TwoInputStreamOperator<String, String, String>,
    InputSelectable {
        DuplicatingOperator() {
        }

        public void processElement1(StreamRecord<String> element) {
            this.output.collect(element);
            this.output.collect(element);
        }

        public void processElement2(StreamRecord<String> element) {
            this.output.collect(element);
            this.output.collect(element);
        }

        public InputSelection nextSelection() {
            return InputSelection.ALL;
        }
    }
}

