/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.tasks;

import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.api.common.serialization.SerializerConfig;
import org.apache.flink.api.common.serialization.SerializerConfigImpl;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.core.execution.SavepointFormatType;
import org.apache.flink.core.testutils.MultiShotLatch;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.SavepointType;
import org.apache.flink.runtime.checkpoint.SnapshotType;
import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.execution.CancelTaskException;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.io.network.NettyShuffleEnvironment;
import org.apache.flink.runtime.io.network.NettyShuffleEnvironmentBuilder;
import org.apache.flink.runtime.io.network.api.EndOfData;
import org.apache.flink.runtime.io.network.api.StopMode;
import org.apache.flink.runtime.io.network.api.writer.RecordOrEventCollectingResultPartitionWriter;
import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
import org.apache.flink.runtime.io.network.partition.PartitionTestUtils;
import org.apache.flink.runtime.io.network.partition.ResultPartition;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.operators.testutils.ExpectedTestException;
import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
import org.apache.flink.runtime.taskmanager.CheckpointResponder;
import org.apache.flink.runtime.taskmanager.TestCheckpointResponder;
import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
import org.apache.flink.streaming.api.functions.source.legacy.FromElementsFunction;
import org.apache.flink.streaming.api.functions.source.legacy.RichParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.legacy.RichSourceFunction;
import org.apache.flink.streaming.api.functions.source.legacy.SourceFunction;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.BoundedOneInput;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
import org.apache.flink.streaming.api.operators.StreamSource;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.LifeCycleMonitor;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.streaming.runtime.tasks.SourceStreamTask;
import org.apache.flink.streaming.runtime.tasks.SourceStreamTaskTestBase;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.streaming.runtime.tasks.StreamTaskFinalCheckpointsTest;
import org.apache.flink.streaming.runtime.tasks.StreamTaskMailboxTestHarness;
import org.apache.flink.streaming.runtime.tasks.StreamTaskMailboxTestHarnessBuilder;
import org.apache.flink.streaming.runtime.tasks.StreamTaskTestHarness;
import org.apache.flink.streaming.runtime.tasks.TestBoundedOneInputStreamOperator;
import org.apache.flink.streaming.runtime.tasks.TestFinishedOnRestoreStreamOperator;
import org.apache.flink.streaming.util.TestHarnessUtil;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.function.CheckedSupplier;
import org.apache.flink.util.function.FunctionWithException;
import org.assertj.core.api.AbstractBooleanAssert;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.ObjectArrayAssert;
import org.junit.jupiter.api.Test;

class SourceStreamTaskTest
extends SourceStreamTaskTestBase {
    SourceStreamTaskTest() {
    }

    @Test
    void testOpenClose() throws Exception {
        StreamTaskTestHarness testHarness = new StreamTaskTestHarness(SourceStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO);
        testHarness.setupOutputForSingletonOperatorChain();
        StreamConfig streamConfig = testHarness.getStreamConfig();
        StreamSource sourceOperator = new StreamSource((SourceFunction)new OpenCloseTestSource());
        streamConfig.setStreamOperator((StreamOperator)sourceOperator);
        streamConfig.setOperatorID(new OperatorID());
        testHarness.invoke();
        testHarness.waitForTaskCompletion();
        ((AbstractBooleanAssert)Assertions.assertThat((boolean)OpenCloseTestSource.closeCalled).as("RichFunction methods where not called.", new Object[0])).isTrue();
        List resultElements = TestHarnessUtil.getRawElementsFromOutput(testHarness.getOutput());
        Assertions.assertThat((int)resultElements.size()).isEqualTo(10);
    }

    @Test
    void testMetrics() throws Exception {
        this.testMetrics((FunctionWithException<Environment, ? extends StreamTask<Integer, ?>, Exception>)((FunctionWithException)SourceStreamTask::new), (StreamOperatorFactory<?>)SimpleOperatorFactory.of((StreamOperator)new StreamSource(new CancelTestSource<Object>(BasicTypeInfo.INT_TYPE_INFO.createSerializer((SerializerConfig)new SerializerConfigImpl()), 42))), busyTime -> busyTime.isNaN());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    void testCheckpointing() throws Exception {
        int numElements = 100;
        int numCheckpoints = 100;
        boolean numCheckpointers = true;
        int checkpointInterval = 5;
        int sourceCheckpointDelay = 1000;
        boolean sourceReadDelay = true;
        ExecutorService executor = Executors.newFixedThreadPool(10);
        try {
            int i;
            TupleTypeInfo typeInfo = new TupleTypeInfo(new TypeInformation[]{BasicTypeInfo.LONG_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO});
            StreamTaskTestHarness testHarness = new StreamTaskTestHarness(SourceStreamTask::new, typeInfo);
            testHarness.setupOutputForSingletonOperatorChain();
            StreamConfig streamConfig = testHarness.getStreamConfig();
            StreamSource sourceOperator = new StreamSource((SourceFunction)new MockSource(100, 1000, 1));
            streamConfig.setStreamOperator((StreamOperator)sourceOperator);
            streamConfig.setOperatorID(new OperatorID());
            Future[] checkpointerResults = new Future[1];
            testHarness.invoke();
            testHarness.waitForTaskRunning();
            StreamTask sourceTask = testHarness.getTask();
            for (i = 0; i < 1; ++i) {
                checkpointerResults[i] = executor.submit(new Checkpointer(100, 5, sourceTask));
            }
            testHarness.waitForTaskCompletion();
            for (i = 0; i < 1; ++i) {
                if (!checkpointerResults[i].isDone()) {
                    checkpointerResults[i].cancel(true);
                }
                if (checkpointerResults[i].isCancelled()) continue;
                checkpointerResults[i].get();
            }
            List resultElements = TestHarnessUtil.getRawElementsFromOutput(testHarness.getOutput());
            Assertions.assertThat((int)resultElements.size()).isEqualTo(100);
        }
        finally {
            executor.shutdown();
        }
    }

    @Test
    void testClosingAllOperatorsOnChainProperly() throws Exception {
        StreamTaskTestHarness testHarness = new StreamTaskTestHarness(SourceStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO);
        testHarness.setupOperatorChain(new OperatorID(), (StreamOperator<?>)new OutputRecordInCloseTestSource<FromElementsFunction>("Source0", new FromElementsFunction((TypeSerializer)StringSerializer.INSTANCE, (Object[])new String[]{"Hello"}))).chain(new OperatorID(), (OneInputStreamOperator)new TestBoundedOneInputStreamOperator("Operator1"), BasicTypeInfo.STRING_TYPE_INFO.createSerializer((SerializerConfig)new SerializerConfigImpl())).finish();
        StreamConfig streamConfig = testHarness.getStreamConfig();
        testHarness.invoke();
        testHarness.waitForTaskCompletion();
        ArrayList expected = new ArrayList();
        Collections.addAll(expected, new StreamRecord((Object)"Hello"), new StreamRecord((Object)"[Source0]: End of input"), Watermark.MAX_WATERMARK, new StreamRecord((Object)"[Source0]: Finish"), new StreamRecord((Object)"[Operator1]: End of input"), new StreamRecord((Object)"[Operator1]: Finish"));
        Object[] output = testHarness.getOutput().toArray();
        ((ObjectArrayAssert)Assertions.assertThat((Object[])output).as("Output was not correct.", new Object[0])).isEqualTo((Object)expected.toArray());
    }

    @Test
    void testNotMarkingEndOfInputWhenTaskCancelled() throws Exception {
        ConcurrentLinkedQueue<StreamRecord> expectedOutput;
        StreamTaskTestHarness testHarness;
        block2: {
            testHarness = new StreamTaskTestHarness(SourceStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO);
            testHarness.setupOperatorChain(new OperatorID(), (StreamOperator<?>)new StreamSource(new CancelTestSource<Object>(BasicTypeInfo.STRING_TYPE_INFO.createSerializer((SerializerConfig)new SerializerConfigImpl()), "Hello"))).chain(new OperatorID(), (OneInputStreamOperator)new TestBoundedOneInputStreamOperator("Operator1"), BasicTypeInfo.STRING_TYPE_INFO.createSerializer((SerializerConfig)new SerializerConfigImpl())).finish();
            StreamConfig streamConfig = testHarness.getStreamConfig();
            expectedOutput = new ConcurrentLinkedQueue<StreamRecord>();
            testHarness.invoke();
            CancelTestSource.getDataProcessing().await();
            testHarness.getTask().cancel();
            try {
                testHarness.waitForTaskCompletion();
            }
            catch (Throwable t) {
                if (ExceptionUtils.findThrowable((Throwable)t, CancelTaskException.class).isPresent()) break block2;
                throw t;
            }
        }
        expectedOutput.add(new StreamRecord((Object)"Hello"));
        TestHarnessUtil.assertOutputEquals((String)"Output was not correct.", expectedOutput, testHarness.getOutput());
    }

    @Test
    void testCancellationWithSourceBlockedOnLock() throws Exception {
        this.testCancellationWithSourceBlockedOnLock(false, false);
    }

    @Test
    void testCancellationWithSourceBlockedOnLockWithPendingMail() throws Exception {
        this.testCancellationWithSourceBlockedOnLock(true, false);
    }

    @Test
    void testCancellationWithSourceBlockedOnLockAndThrowingOnError() throws Exception {
        this.testCancellationWithSourceBlockedOnLock(false, true);
    }

    @Test
    void testCancellationWithSourceBlockedOnLockWithPendingMailAndThrowingOnError() throws Exception {
        this.testCancellationWithSourceBlockedOnLock(true, true);
    }

    void testCancellationWithSourceBlockedOnLock(boolean withPendingMail, boolean throwInCancel) throws Exception {
        block5: {
            StreamTaskTestHarness testHarness = new StreamTaskTestHarness(SourceStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO);
            CancelLockingSource.reset();
            testHarness.setupOperatorChain(new OperatorID(), (StreamOperator<?>)new StreamSource((SourceFunction)new CancelLockingSource(throwInCancel))).chain(new OperatorID(), (OneInputStreamOperator)new TestBoundedOneInputStreamOperator("Operator1"), BasicTypeInfo.STRING_TYPE_INFO.createSerializer((SerializerConfig)new SerializerConfigImpl())).finish();
            StreamConfig streamConfig = testHarness.getStreamConfig();
            testHarness.invoke();
            CancelLockingSource.awaitRunning();
            if (withPendingMail) {
                testHarness.getTask().getMailboxExecutorFactory().createExecutor(0).execute(() -> ((AbstractBooleanAssert)Assertions.assertThat((boolean)testHarness.getTask().isRunning()).as("This should never execute before task cancelation", new Object[0])).isFalse(), "Test");
            }
            try {
                testHarness.getTask().cancel();
            }
            catch (ExpectedTestException e) {
                Preconditions.checkState((boolean)throwInCancel);
            }
            try {
                testHarness.waitForTaskCompletion();
            }
            catch (Throwable t) {
                if (ExceptionUtils.findThrowable((Throwable)t, InterruptedException.class).isPresent() || ExceptionUtils.findThrowable((Throwable)t, CancelTaskException.class).isPresent()) break block5;
                throw t;
            }
        }
    }

    @Test
    void testInterruptionExceptionNotSwallowed() throws Exception {
        this.testInterruptionExceptionNotSwallowed(InterruptedException::new);
    }

    @Test
    void testWrappedInterruptionExceptionNotSwallowed() throws Exception {
        this.testInterruptionExceptionNotSwallowed(() -> new RuntimeException((Throwable)new FlinkRuntimeException((Throwable)new InterruptedException())));
    }

    private void testInterruptionExceptionNotSwallowed(InterruptedSource.ExceptionGenerator exceptionGenerator) throws Exception {
        block2: {
            StreamTaskTestHarness testHarness = new StreamTaskTestHarness(SourceStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO);
            CancelLockingSource.reset();
            testHarness.setupOperatorChain(new OperatorID(), (StreamOperator<?>)new StreamSource((SourceFunction)new InterruptedSource(exceptionGenerator))).chain(new OperatorID(), (OneInputStreamOperator)new TestBoundedOneInputStreamOperator("Operator1"), BasicTypeInfo.STRING_TYPE_INFO.createSerializer((SerializerConfig)new SerializerConfigImpl())).finish();
            testHarness.invoke();
            try {
                testHarness.waitForTaskCompletion();
            }
            catch (Exception e) {
                if (ExceptionUtils.findThrowable((Throwable)e, InterruptedException.class).isPresent()) break block2;
                throw e;
            }
        }
    }

    @Test
    void testWaitsForSourceThreadOnCancel() throws Exception {
        StreamTaskTestHarness harness = new StreamTaskTestHarness(SourceStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO);
        harness.setupOutputForSingletonOperatorChain();
        harness.getStreamConfig().setStreamOperator((StreamOperator)new StreamSource((SourceFunction)new NonStoppingSource()));
        harness.invoke();
        NonStoppingSource.waitForStart();
        harness.getTask().cancel();
        harness.waitForTaskCompletion(50L, true);
        Assertions.assertThat((boolean)harness.taskThread.isAlive()).isTrue();
        for (int i = 0; i < 10; ++i) {
            harness.getTask().maybeInterruptOnCancel(harness.getTaskThread(), null, null);
            harness.waitForTaskCompletion(50L, true);
            Assertions.assertThat((boolean)harness.taskThread.isAlive()).isTrue();
        }
        NonStoppingSource.forceCancel();
        harness.waitForTaskCompletion(Long.MAX_VALUE, true);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    void testTriggeringCheckpointAfterSourceThreadFinished() throws Exception {
        ResultPartition[] partitionWriters = new ResultPartition[2];
        try (NettyShuffleEnvironment env = new NettyShuffleEnvironmentBuilder().setNumNetworkBuffers(partitionWriters.length * 2).build();){
            for (int i = 0; i < partitionWriters.length; ++i) {
                partitionWriters[i] = PartitionTestUtils.createPartition((NettyShuffleEnvironment)env, (ResultPartitionType)ResultPartitionType.PIPELINED_BOUNDED, (int)1);
                partitionWriters[i].setup();
            }
            final CompletableFuture checkpointCompleted = new CompletableFuture();
            try (StreamTaskMailboxTestHarness<String> testHarness = ((StreamTaskMailboxTestHarnessBuilder)new StreamTaskMailboxTestHarnessBuilder(SourceStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO).modifyStreamConfig(config -> config.setCheckpointingEnabled(true)).setCheckpointResponder((CheckpointResponder)new TestCheckpointResponder(){

                public void acknowledgeCheckpoint(JobID jobID, ExecutionAttemptID executionAttemptID, long checkpointId, CheckpointMetrics checkpointMetrics, TaskStateSnapshot subtaskState) {
                    super.acknowledgeCheckpoint(jobID, executionAttemptID, checkpointId, checkpointMetrics, subtaskState);
                    checkpointCompleted.complete(checkpointId);
                }
            }).addAdditionalOutput((ResultPartitionWriter[])partitionWriters).setupOperatorChain((StreamOperator<?>)new StreamSource((SourceFunction)new MockSource(0, 0, 1))).finishForSingletonOperatorChain((TypeSerializer)StringSerializer.INSTANCE)).build();){
                testHarness.processAll();
                CompletableFuture taskFinished = testHarness.getStreamTask().getCompletionFuture();
                do {
                    testHarness.processAll();
                } while (!taskFinished.isDone());
                CompletableFuture<Boolean> checkpointFuture = StreamTaskFinalCheckpointsTest.triggerCheckpoint(testHarness, 2L);
                Preconditions.checkState((boolean)(checkpointFuture instanceof CompletableFuture), (Object)"The trigger future should  be also CompletableFuture.");
                checkpointFuture.thenAccept(ignored -> {
                    for (ResultPartition resultPartition : partitionWriters) {
                        resultPartition.onSubpartitionAllDataProcessed(0);
                    }
                });
                checkpointCompleted.whenComplete((id, error) -> testHarness.getStreamTask().notifyCheckpointCompleteAsync(2L));
                testHarness.finishProcessing();
                Assertions.assertThat((boolean)checkpointFuture.isDone()).isTrue();
                for (ResultPartition resultPartition : partitionWriters) {
                    Assertions.assertThat((int)resultPartition.getNumberOfQueuedBuffers()).isEqualTo(3);
                }
            }
        }
        finally {
            for (ResultPartition writer : partitionWriters) {
                if (writer == null) continue;
                writer.close();
            }
        }
    }

    @Test
    void testDisableOverdraftBuffer() throws Exception {
        try (NettyShuffleEnvironment env = new NettyShuffleEnvironmentBuilder().setNumNetworkBuffers(2).build();
             ResultPartition partitionWriter = PartitionTestUtils.createPartition((NettyShuffleEnvironment)env, (ResultPartitionType)ResultPartitionType.PIPELINED_BOUNDED, (int)1);){
            partitionWriter.setup();
            org.junit.jupiter.api.Assertions.assertTrue((partitionWriter.getBufferPool().getMaxOverdraftBuffersPerGate() > 0 ? 1 : 0) != 0);
            CompletableFuture checkpointCompleted = new CompletableFuture();
            try (StreamTaskMailboxTestHarness testHarness = ((StreamTaskMailboxTestHarnessBuilder)new StreamTaskMailboxTestHarnessBuilder(SourceStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO).addAdditionalOutput(new ResultPartitionWriter[]{partitionWriter}).setupOperatorChain((StreamOperator<?>)new StreamSource((SourceFunction)new MockSource(0, 0, 1))).finishForSingletonOperatorChain((TypeSerializer)StringSerializer.INSTANCE)).build();){
                org.junit.jupiter.api.Assertions.assertEquals((int)0, (int)partitionWriter.getBufferPool().getMaxOverdraftBuffersPerGate());
            }
        }
    }

    @Test
    void testClosedOnRestoreSourceSkipExecution() throws Exception {
        LifeCycleMonitorSource testSource = new LifeCycleMonitorSource();
        ArrayList output = new ArrayList();
        try (StreamTaskMailboxTestHarness harness = ((StreamTaskMailboxTestHarnessBuilder)new StreamTaskMailboxTestHarnessBuilder(SourceStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO).setTaskStateSnapshot(1L, TaskStateSnapshot.FINISHED_ON_RESTORE).addAdditionalOutput(new ResultPartitionWriter[]{new RecordOrEventCollectingResultPartitionWriter<StreamElement>(output, (TypeSerializer)new StreamElementSerializer((TypeSerializer)IntSerializer.INSTANCE)){

            public void notifyEndOfData(StopMode mode) throws IOException {
                this.broadcastEvent((AbstractEvent)new EndOfData(mode), false);
            }
        }}).setupOperatorChain((StreamOperator<?>)new StreamSource((SourceFunction)testSource)).chain((OneInputStreamOperator)new TestFinishedOnRestoreStreamOperator(), (TypeSerializer)StringSerializer.INSTANCE).finish()).build();){
            harness.getStreamTask().invoke();
            harness.processAll();
            harness.streamTask.getCompletionFuture().get();
            Assertions.assertThat(output).containsExactly(new Object[]{Watermark.MAX_WATERMARK, new EndOfData(StopMode.DRAIN)});
            LifeCycleMonitorSource source = (LifeCycleMonitorSource)((StreamSource)harness.getStreamTask().getMainOperator()).getUserFunction();
            source.getLifeCycleMonitor().assertCallTimes(0, new LifeCycleMonitor.LifeCyclePhase[]{LifeCycleMonitor.LifeCyclePhase.OPEN, LifeCycleMonitor.LifeCyclePhase.PROCESS_ELEMENT, LifeCycleMonitor.LifeCyclePhase.CLOSE});
        }
    }

    @Test
    void testTriggeringStopWithSavepointWithDrain() throws Exception {
        EmptySource testSource = new EmptySource();
        final CompletableFuture checkpointCompleted = new CompletableFuture();
        TestCheckpointResponder checkpointResponder = new TestCheckpointResponder(){

            public void acknowledgeCheckpoint(JobID jobID, ExecutionAttemptID executionAttemptID, long checkpointId, CheckpointMetrics checkpointMetrics, TaskStateSnapshot subtaskState) {
                super.acknowledgeCheckpoint(jobID, executionAttemptID, checkpointId, checkpointMetrics, subtaskState);
                checkpointCompleted.complete(null);
            }
        };
        try (StreamTaskMailboxTestHarness harness = new StreamTaskMailboxTestHarnessBuilder(SourceStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO).setTaskStateSnapshot(1L, TaskStateSnapshot.FINISHED_ON_RESTORE).setCheckpointResponder((CheckpointResponder)checkpointResponder).setupOutputForSingletonOperatorChain((StreamOperator<?>)new StreamSource((SourceFunction)testSource)).build();){
            CompletableFuture triggerResult = harness.streamTask.triggerCheckpointAsync(new CheckpointMetaData(2L, 2L), CheckpointOptions.alignedNoTimeout((SnapshotType)SavepointType.terminate((SavepointFormatType)SavepointFormatType.CANONICAL), (CheckpointStorageLocationReference)CheckpointStorageLocationReference.getDefault()));
            checkpointCompleted.whenComplete((ignored, exception) -> harness.streamTask.notifyCheckpointCompleteAsync(2L));
            harness.streamTask.runMailboxLoop();
            harness.finishProcessing();
            Assertions.assertThat((Boolean)((Boolean)triggerResult.get())).isTrue();
            Assertions.assertThat((boolean)checkpointCompleted.isDone()).isTrue();
        }
    }

    private static class EmptySource
    implements SourceFunction<String> {
        private volatile boolean isCanceled;

        private EmptySource() {
        }

        public void run(SourceFunction.SourceContext<String> ctx) throws Exception {
            while (!this.isCanceled) {
                Thread.sleep(100L);
            }
        }

        public void cancel() {
            this.isCanceled = true;
        }
    }

    private static final class OutputRecordInCloseTestSource<SRC extends SourceFunction<String>>
    extends StreamSource<String, SRC>
    implements BoundedOneInput {
        private final String name;

        public OutputRecordInCloseTestSource(String name, SRC sourceFunction) {
            super(sourceFunction);
            this.name = name;
        }

        public void endInput() {
            this.output("[" + this.name + "]: End of input");
        }

        public void finish() throws Exception {
            ProcessingTimeService timeService = this.getProcessingTimeService();
            timeService.registerTimer(timeService.getCurrentProcessingTime(), t -> this.output("[" + this.name + "]: Timer registered in close"));
            this.output("[" + this.name + "]: Finish");
            super.finish();
        }

        private void output(String record) {
            this.output.collect((Object)new StreamRecord((Object)record));
        }
    }

    private static class CancelTestSource<T>
    extends FromElementsFunction<T> {
        private static final long serialVersionUID = 8713065281092996067L;
        private static MultiShotLatch dataProcessing = new MultiShotLatch();
        private static MultiShotLatch cancellationWaiting = new MultiShotLatch();

        public CancelTestSource(TypeSerializer<T> serializer, T ... elements) throws IOException {
            super(serializer, (Object[])elements);
        }

        public void run(SourceFunction.SourceContext<T> ctx) throws Exception {
            super.run(ctx);
            dataProcessing.trigger();
            cancellationWaiting.await();
        }

        public void cancel() {
            super.cancel();
            cancellationWaiting.trigger();
        }

        public static MultiShotLatch getDataProcessing() {
            return dataProcessing;
        }
    }

    private static class OpenCloseTestSource
    extends RichSourceFunction<String> {
        private static final long serialVersionUID = 1L;
        public static boolean openCalled = false;
        public static boolean closeCalled = false;

        OpenCloseTestSource() {
            openCalled = false;
            closeCalled = false;
        }

        public void open(OpenContext openContext) throws Exception {
            super.open(openContext);
            if (closeCalled) {
                Assertions.fail((String)"Close called before open.");
            }
            openCalled = true;
        }

        public void close() throws Exception {
            super.close();
            if (!openCalled) {
                Assertions.fail((String)"Open was not called before close.");
            }
            closeCalled = true;
        }

        public void run(SourceFunction.SourceContext<String> ctx) throws Exception {
            if (!openCalled) {
                Assertions.fail((String)"Open was not called before run.");
            }
            for (int i = 0; i < 10; ++i) {
                ctx.collect((Object)("Hello" + i));
            }
        }

        public void cancel() {
        }
    }

    private static class NonStoppingSource
    implements SourceFunction<String> {
        private static final long serialVersionUID = 1L;
        private static boolean running = true;
        private static CompletableFuture<Void> startFuture = new CompletableFuture();

        private NonStoppingSource() {
        }

        public void run(SourceFunction.SourceContext<String> ctx) throws Exception {
            startFuture.complete(null);
            while (running) {
                try {
                    Thread.sleep(500L);
                }
                catch (InterruptedException interruptedException) {}
            }
        }

        public void cancel() {
        }

        static void forceCancel() {
            running = false;
        }

        static void waitForStart() {
            startFuture.join();
        }
    }

    private static class Checkpointer
    implements Callable<Boolean> {
        private final int numCheckpoints;
        private final int checkpointInterval;
        private final AtomicLong checkpointId;
        private final StreamTask<Tuple2<Long, Integer>, ?> sourceTask;

        Checkpointer(int numCheckpoints, int checkpointInterval, StreamTask<Tuple2<Long, Integer>, ?> task) {
            this.numCheckpoints = numCheckpoints;
            this.checkpointId = new AtomicLong(0L);
            this.sourceTask = task;
            this.checkpointInterval = checkpointInterval;
        }

        @Override
        public Boolean call() throws Exception {
            for (int i = 0; i < this.numCheckpoints; ++i) {
                long currentCheckpointId = this.checkpointId.getAndIncrement();
                try {
                    this.sourceTask.triggerCheckpointAsync(new CheckpointMetaData(currentCheckpointId, 0L), CheckpointOptions.forCheckpointWithDefaultLocation());
                }
                catch (RejectedExecutionException e) {
                    return false;
                }
                Thread.sleep(this.checkpointInterval);
            }
            return true;
        }
    }

    private static class MockSource
    implements SourceFunction<Tuple2<Long, Integer>>,
    ListCheckpointed<Serializable> {
        private static final long serialVersionUID = 1L;
        private int maxElements;
        private int checkpointDelay;
        private int readDelay;
        private volatile int count;
        private volatile long lastCheckpointId = -1L;
        private Semaphore semaphore;
        private volatile boolean isRunning = true;

        public MockSource(int maxElements, int checkpointDelay, int readDelay) {
            this.maxElements = maxElements;
            this.checkpointDelay = checkpointDelay;
            this.readDelay = readDelay;
            this.count = 0;
            this.semaphore = new Semaphore(1);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void run(SourceFunction.SourceContext<Tuple2<Long, Integer>> ctx) {
            while (this.isRunning && this.count < this.maxElements) {
                try {
                    Thread.sleep(this.readDelay);
                }
                catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
                Object object = ctx.getCheckpointLock();
                synchronized (object) {
                    ctx.collect((Object)new Tuple2((Object)this.lastCheckpointId, (Object)this.count));
                    ++this.count;
                }
            }
        }

        public void cancel() {
            this.isRunning = false;
        }

        public List<Serializable> snapshotState(long checkpointId, long timestamp) throws Exception {
            if (!this.semaphore.tryAcquire()) {
                Assertions.fail((String)"Concurrent invocation of snapshotState.");
            }
            int startCount = this.count;
            this.lastCheckpointId = checkpointId;
            long sum = 0L;
            for (int i = 0; i < this.checkpointDelay; ++i) {
                sum += new Random().nextLong();
            }
            if (startCount != this.count) {
                this.semaphore.release();
                Assertions.fail((String)"Count is different at start end end of snapshot.");
            }
            this.semaphore.release();
            return Collections.singletonList(sum);
        }

        public void restoreState(List<Serializable> state) throws Exception {
        }
    }

    private static class LifeCycleMonitorSource
    extends RichParallelSourceFunction<String> {
        private final LifeCycleMonitor lifeCycleMonitor = new LifeCycleMonitor();

        private LifeCycleMonitorSource() {
        }

        public void open(OpenContext openContext) throws Exception {
            this.lifeCycleMonitor.incrementCallTime(LifeCycleMonitor.LifeCyclePhase.OPEN);
        }

        public void run(SourceFunction.SourceContext<String> ctx) throws Exception {
            this.lifeCycleMonitor.incrementCallTime(LifeCycleMonitor.LifeCyclePhase.PROCESS_ELEMENT);
        }

        public void close() throws Exception {
            this.lifeCycleMonitor.incrementCallTime(LifeCycleMonitor.LifeCyclePhase.CLOSE);
        }

        public LifeCycleMonitor getLifeCycleMonitor() {
            return this.lifeCycleMonitor;
        }

        public void cancel() {
        }
    }

    public static class InterruptedSource
    implements SourceFunction<String> {
        private static final long serialVersionUID = 8713065281092996042L;
        private ExceptionGenerator exceptionGenerator;

        public InterruptedSource(ExceptionGenerator exceptionGenerator) {
            this.exceptionGenerator = exceptionGenerator;
        }

        public void run(SourceFunction.SourceContext<String> ctx) throws Exception {
            Object object = ctx.getCheckpointLock();
            synchronized (object) {
                Thread.currentThread().interrupt();
                throw (Exception)this.exceptionGenerator.get();
            }
        }

        public void cancel() {
        }

        static interface ExceptionGenerator
        extends CheckedSupplier<Exception>,
        Serializable {
        }
    }

    public static class CancelLockingSource
    implements SourceFunction<String> {
        private static final long serialVersionUID = 8713065281092996042L;
        private static CompletableFuture<Void> isRunning = new CompletableFuture();
        private final boolean throwOnCancel;
        private volatile boolean cancelled = false;

        public CancelLockingSource(boolean throwOnCancel) {
            this.throwOnCancel = throwOnCancel;
        }

        public static void reset() {
            isRunning = new CompletableFuture();
        }

        public static void awaitRunning() throws ExecutionException, InterruptedException {
            isRunning.get();
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void run(SourceFunction.SourceContext<String> ctx) throws Exception {
            Object object = ctx.getCheckpointLock();
            synchronized (object) {
                while (!this.cancelled) {
                    isRunning.complete(null);
                    if (this.throwOnCancel) {
                        Thread.sleep(1000000000L);
                        continue;
                    }
                    try {
                        Thread.sleep(1000000000L);
                    }
                    catch (InterruptedException interruptedException) {}
                }
            }
        }

        public void cancel() {
            if (this.throwOnCancel) {
                throw new ExpectedTestException();
            }
            this.cancelled = true;
        }
    }
}

