/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.tasks;

import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.RunnableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.core.execution.SavepointFormatType;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.metrics.groups.OperatorMetricGroup;
import org.apache.flink.runtime.checkpoint.CheckpointException;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.CheckpointMetricsBuilder;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.CheckpointType;
import org.apache.flink.runtime.checkpoint.SavepointType;
import org.apache.flink.runtime.checkpoint.SnapshotType;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriterImpl;
import org.apache.flink.runtime.checkpoint.channel.ResultSubpartitionInfo;
import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
import org.apache.flink.runtime.io.network.api.writer.NonRecordWriter;
import org.apache.flink.runtime.io.network.api.writer.RecordOrEventCollectingResultPartitionWriter;
import org.apache.flink.runtime.io.network.api.writer.RecordWriterDelegate;
import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
import org.apache.flink.runtime.operators.testutils.MockEnvironment;
import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
import org.apache.flink.runtime.state.CheckpointStorage;
import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
import org.apache.flink.runtime.state.CheckpointStorageWorkerView;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.DoneFuture;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.SnapshotResult;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.TaskStateManager;
import org.apache.flink.runtime.state.TestCheckpointStorageWorkerView;
import org.apache.flink.runtime.state.TestTaskStateManager;
import org.apache.flink.runtime.state.storage.JobManagerCheckpointStorage;
import org.apache.flink.shaded.guava31.com.google.common.util.concurrent.MoreExecutors;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.OperatorSnapshotFutures;
import org.apache.flink.streaming.api.operators.StreamMap;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamTaskStateInitializer;
import org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.MockSubtaskCheckpointCoordinatorBuilder;
import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask;
import org.apache.flink.streaming.runtime.tasks.OneInputStreamTaskTestHarness;
import org.apache.flink.streaming.runtime.tasks.OperatorChain;
import org.apache.flink.streaming.runtime.tasks.OperatorChainTest;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.streaming.runtime.tasks.RegularOperatorChain;
import org.apache.flink.streaming.runtime.tasks.StreamMockEnvironment;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor;
import org.apache.flink.streaming.runtime.tasks.StreamTaskITCase;
import org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinator;
import org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl;
import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
import org.apache.flink.streaming.runtime.tasks.TestStateBackend;
import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
import org.apache.flink.streaming.util.MockStreamTaskBuilder;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.function.BiFunctionWithException;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

class SubtaskCheckpointCoordinatorTest {
    private static final CheckpointStorage CHECKPOINT_STORAGE = new JobManagerCheckpointStorage();

    SubtaskCheckpointCoordinatorTest() {
    }

    @Test
    void testInitCheckpoint() throws IOException, CheckpointException {
        Assertions.assertThat((boolean)this.initCheckpoint(true, (SnapshotType)CheckpointType.CHECKPOINT)).isTrue();
        Assertions.assertThat((boolean)this.initCheckpoint(false, (SnapshotType)CheckpointType.CHECKPOINT)).isFalse();
        Assertions.assertThat((boolean)this.initCheckpoint(false, (SnapshotType)SavepointType.savepoint((SavepointFormatType)SavepointFormatType.CANONICAL))).isFalse();
    }

    private boolean initCheckpoint(boolean unalignedCheckpointEnabled, SnapshotType checkpointType) throws IOException, CheckpointException {
        class MockWriter
        extends ChannelStateWriter.NoOpChannelStateWriter {
            private boolean started;

            MockWriter() {
            }

            public void start(long checkpointId, CheckpointOptions checkpointOptions) {
                this.started = true;
            }
        }
        MockWriter writer = new MockWriter();
        SubtaskCheckpointCoordinator coordinator = SubtaskCheckpointCoordinatorTest.coordinator((ChannelStateWriter)writer);
        CheckpointStorageLocationReference locationReference = CheckpointStorageLocationReference.getDefault();
        coordinator.initInputsCheckpoint(1L, unalignedCheckpointEnabled ? CheckpointOptions.unaligned((SnapshotType)CheckpointType.CHECKPOINT, (CheckpointStorageLocationReference)locationReference) : CheckpointOptions.alignedNoTimeout((SnapshotType)checkpointType, (CheckpointStorageLocationReference)locationReference));
        return writer.started;
    }

    @Test
    void testNotifyCheckpointComplete() throws Exception {
        TestTaskStateManager stateManager = new TestTaskStateManager();
        MockEnvironment mockEnvironment = MockEnvironment.builder().setTaskStateManager((TaskStateManager)stateManager).build();
        try (SubtaskCheckpointCoordinator subtaskCheckpointCoordinator = new MockSubtaskCheckpointCoordinatorBuilder().setEnvironment((Environment)mockEnvironment).build();){
            OperatorChain<?, ?> operatorChain = this.getOperatorChain(mockEnvironment);
            long checkpointId = 42L;
            subtaskCheckpointCoordinator.notifyCheckpointComplete(checkpointId, operatorChain, () -> true);
            Assertions.assertThat((long)stateManager.getNotifiedCompletedCheckpointId()).isEqualTo(checkpointId);
            long newCheckpointId = checkpointId + 1L;
            subtaskCheckpointCoordinator.notifyCheckpointComplete(newCheckpointId, operatorChain, () -> false);
            Assertions.assertThat((long)stateManager.getNotifiedCompletedCheckpointId()).isEqualTo(newCheckpointId);
        }
    }

    @Test
    void testSavepointNotResultingInPriorityEvents() throws Exception {
        MockEnvironment mockEnvironment = MockEnvironment.builder().build();
        try (SubtaskCheckpointCoordinator coordinator = new MockSubtaskCheckpointCoordinatorBuilder().setUnalignedCheckpointEnabled(true).setEnvironment((Environment)mockEnvironment).build();){
            final AtomicReference<Object> broadcastedPriorityEvent = new AtomicReference<Object>(null);
            RegularOperatorChain operatorChain = new RegularOperatorChain(new MockStreamTaskBuilder((Environment)mockEnvironment).build(), (RecordWriterDelegate)new NonRecordWriter()){

                public void broadcastEvent(AbstractEvent event, boolean isPriorityEvent) throws IOException {
                    super.broadcastEvent(event, isPriorityEvent);
                    broadcastedPriorityEvent.set(isPriorityEvent);
                }
            };
            coordinator.checkpointState(new CheckpointMetaData(0L, 0L), new CheckpointOptions((SnapshotType)SavepointType.savepoint((SavepointFormatType)SavepointFormatType.CANONICAL), CheckpointStorageLocationReference.getDefault()), new CheckpointMetricsBuilder(), (OperatorChain)operatorChain, false, () -> true);
            Assertions.assertThat((Boolean)broadcastedPriorityEvent.get()).isFalse();
        }
    }

    @Test
    void testForceAlignedCheckpointResultingInPriorityEvents() throws Exception {
        long checkpointId = 42L;
        MockEnvironment mockEnvironment = MockEnvironment.builder().build();
        mockEnvironment.setCheckpointStorageAccess(CHECKPOINT_STORAGE.createCheckpointStorage(mockEnvironment.getJobID()));
        try (final SubtaskCheckpointCoordinator coordinator = new MockSubtaskCheckpointCoordinatorBuilder().setUnalignedCheckpointEnabled(true).setEnvironment((Environment)mockEnvironment).build();){
            final AtomicReference<Object> broadcastedPriorityEvent = new AtomicReference<Object>(null);
            RegularOperatorChain operatorChain = new RegularOperatorChain(new MockStreamTaskBuilder((Environment)mockEnvironment).build(), (RecordWriterDelegate)new NonRecordWriter()){

                public void broadcastEvent(AbstractEvent event, boolean isPriorityEvent) throws IOException {
                    super.broadcastEvent(event, isPriorityEvent);
                    broadcastedPriorityEvent.set(isPriorityEvent);
                    coordinator.getChannelStateWriter().addOutputData(42L, new ResultSubpartitionInfo(0, 0), 0, new Buffer[]{BufferBuilderTestUtils.buildSomeBuffer((int)500)});
                }
            };
            CheckpointOptions forcedAlignedOptions = CheckpointOptions.unaligned((SnapshotType)CheckpointType.CHECKPOINT, (CheckpointStorageLocationReference)CheckpointStorageLocationReference.getDefault()).withUnalignedUnsupported();
            coordinator.checkpointState(new CheckpointMetaData(42L, 0L), forcedAlignedOptions, new CheckpointMetricsBuilder(), (OperatorChain)operatorChain, false, () -> true);
            Assertions.assertThat((Boolean)broadcastedPriorityEvent.get()).isTrue();
        }
    }

    @Test
    void testSkipChannelStateForSavepoints() throws Exception {
        try (SubtaskCheckpointCoordinator coordinator = new MockSubtaskCheckpointCoordinatorBuilder().setUnalignedCheckpointEnabled(true).setPrepareInputSnapshot((BiFunctionWithException<ChannelStateWriter, Long, CompletableFuture<Void>, CheckpointException>)((BiFunctionWithException)(u1, u2) -> {
            Assertions.fail((String)"should not prepare input snapshot for savepoint");
            return null;
        })).build();){
            coordinator.checkpointState(new CheckpointMetaData(0L, 0L), new CheckpointOptions((SnapshotType)SavepointType.savepoint((SavepointFormatType)SavepointFormatType.CANONICAL), CheckpointStorageLocationReference.getDefault()), new CheckpointMetricsBuilder(), (OperatorChain)new RegularOperatorChain(new StreamTaskITCase.NoOpStreamTask((Environment)new DummyEnvironment()), (RecordWriterDelegate)new NonRecordWriter()), false, () -> true);
        }
    }

    @Test
    void testNotifyCheckpointSubsumed() throws Exception {
        TestTaskStateManager stateManager = new TestTaskStateManager();
        MockEnvironment mockEnvironment = MockEnvironment.builder().setTaskStateManager((TaskStateManager)stateManager).build();
        try (SubtaskCheckpointCoordinatorImpl subtaskCheckpointCoordinator = (SubtaskCheckpointCoordinatorImpl)new MockSubtaskCheckpointCoordinatorBuilder().setEnvironment((Environment)mockEnvironment).setUnalignedCheckpointEnabled(true).build();){
            StreamMap streamMap = new StreamMap((MapFunction & Serializable)value -> value);
            streamMap.setProcessingTimeService((ProcessingTimeService)new TestProcessingTimeService());
            OperatorChain operatorChain = this.operatorChain(new OneInputStreamOperator[]{streamMap});
            StreamTaskStateInitializerImpl stateInitializer = new StreamTaskStateInitializerImpl((Environment)mockEnvironment, (StateBackend)new TestStateBackend());
            operatorChain.initializeStateAndOpenOperators((StreamTaskStateInitializer)stateInitializer);
            long checkpointId = 42L;
            subtaskCheckpointCoordinator.getChannelStateWriter().start(checkpointId, CheckpointOptions.forCheckpointWithDefaultLocation());
            subtaskCheckpointCoordinator.checkpointState(new CheckpointMetaData(checkpointId, System.currentTimeMillis()), CheckpointOptions.forCheckpointWithDefaultLocation(), new CheckpointMetricsBuilder(), operatorChain, false, () -> false);
            long notifySubsumeCheckpointId = checkpointId + 1L;
            subtaskCheckpointCoordinator.notifyCheckpointSubsumed(notifySubsumeCheckpointId, operatorChain, () -> true);
            Assertions.assertThat((long)((TestStateBackend.TestKeyedStateBackend)streamMap.getKeyedStateBackend()).getSubsumeCheckpointId()).isEqualTo(notifySubsumeCheckpointId);
        }
    }

    @Test
    void testNotifyCheckpointAbortedManyTimes() throws Exception {
        MockEnvironment mockEnvironment = MockEnvironment.builder().build();
        int maxRecordAbortedCheckpoints = 256;
        try (SubtaskCheckpointCoordinatorImpl subtaskCheckpointCoordinator = (SubtaskCheckpointCoordinatorImpl)new MockSubtaskCheckpointCoordinatorBuilder().setEnvironment((Environment)mockEnvironment).setMaxRecordAbortedCheckpoints(maxRecordAbortedCheckpoints).build();){
            OperatorChain<?, ?> operatorChain = this.getOperatorChain(mockEnvironment);
            long notifyAbortedTimes = maxRecordAbortedCheckpoints + 42;
            int i = 1;
            while ((long)i < notifyAbortedTimes) {
                subtaskCheckpointCoordinator.notifyCheckpointAborted((long)i, operatorChain, () -> true);
                Assertions.assertThat((int)subtaskCheckpointCoordinator.getAbortedCheckpointSize()).isEqualTo(Math.min(maxRecordAbortedCheckpoints, i));
                ++i;
            }
        }
    }

    @Test
    void testNotifyCheckpointAbortedBeforeAsyncPhase() throws Exception {
        TestTaskStateManager stateManager = new TestTaskStateManager();
        MockEnvironment mockEnvironment = MockEnvironment.builder().setTaskStateManager((TaskStateManager)stateManager).build();
        try (SubtaskCheckpointCoordinatorImpl subtaskCheckpointCoordinator = (SubtaskCheckpointCoordinatorImpl)new MockSubtaskCheckpointCoordinatorBuilder().setEnvironment((Environment)mockEnvironment).setUnalignedCheckpointEnabled(true).build();){
            CheckpointOperator checkpointOperator = new CheckpointOperator(new OperatorSnapshotFutures());
            OperatorChain operatorChain = this.operatorChain(checkpointOperator);
            long checkpointId = 42L;
            subtaskCheckpointCoordinator.notifyCheckpointAborted(checkpointId, operatorChain, () -> true);
            Assertions.assertThat((int)subtaskCheckpointCoordinator.getAbortedCheckpointSize()).isOne();
            subtaskCheckpointCoordinator.getChannelStateWriter().start(checkpointId, CheckpointOptions.forCheckpointWithDefaultLocation());
            subtaskCheckpointCoordinator.checkpointState(new CheckpointMetaData(checkpointId, System.currentTimeMillis()), CheckpointOptions.forCheckpointWithDefaultLocation(), new CheckpointMetricsBuilder(), operatorChain, false, () -> false);
            Assertions.assertThat((boolean)checkpointOperator.isCheckpointed()).isFalse();
            Assertions.assertThat((long)stateManager.getReportedCheckpointId()).isEqualTo(-1L);
            Assertions.assertThat((int)subtaskCheckpointCoordinator.getAbortedCheckpointSize()).isZero();
            Assertions.assertThat((int)subtaskCheckpointCoordinator.getAsyncCheckpointRunnableSize()).isZero();
        }
    }

    @Test
    void testBroadcastCancelCheckpointMarkerOnAbortingFromCoordinator() throws Exception {
        OneInputStreamTaskTestHarness testHarness = new OneInputStreamTaskTestHarness(OneInputStreamTask::new, 1, 1, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
        testHarness.setupOutputForSingletonOperatorChain();
        StreamConfig streamConfig = testHarness.getStreamConfig();
        streamConfig.setStreamOperator((StreamOperator)new MapOperator());
        StreamMockEnvironment mockEnvironment = new StreamMockEnvironment(testHarness.jobConfig, testHarness.taskConfig, testHarness.executionConfig, testHarness.memorySize, new MockInputSplitProvider(), testHarness.bufferSize, (TaskStateManager)testHarness.taskStateManager);
        try (SubtaskCheckpointCoordinator subtaskCheckpointCoordinator = new MockSubtaskCheckpointCoordinatorBuilder().setEnvironment(mockEnvironment).build();){
            ArrayList recordOrEvents = new ArrayList();
            StreamElementSerializer stringStreamElementSerializer = new StreamElementSerializer((TypeSerializer)StringSerializer.INSTANCE);
            RecordOrEventCollectingResultPartitionWriter resultPartitionWriter = new RecordOrEventCollectingResultPartitionWriter(recordOrEvents, (TypeSerializer)stringStreamElementSerializer);
            mockEnvironment.addOutput((ResultPartitionWriter)resultPartitionWriter);
            testHarness.invoke(mockEnvironment);
            testHarness.waitForTaskRunning();
            OneInputStreamTask task = testHarness.getTask();
            OperatorChain operatorChain = task.operatorChain;
            long checkpointId = 42L;
            subtaskCheckpointCoordinator.notifyCheckpointAborted(checkpointId, operatorChain, () -> true);
            subtaskCheckpointCoordinator.checkpointState(new CheckpointMetaData(checkpointId, System.currentTimeMillis()), CheckpointOptions.forCheckpointWithDefaultLocation(), new CheckpointMetricsBuilder(), operatorChain, false, () -> false);
            Assertions.assertThat(recordOrEvents).hasSize(1);
            Object recordOrEvent = recordOrEvents.get(0);
            Assertions.assertThat(recordOrEvent).isInstanceOf(CancelCheckpointMarker.class);
            Assertions.assertThat((long)((CancelCheckpointMarker)recordOrEvent).getCheckpointId()).isEqualTo(checkpointId);
            testHarness.endInput();
            testHarness.waitForTaskCompletion();
        }
    }

    @Test
    void testNotifyCheckpointAbortedDuringAsyncPhase() throws Exception {
        MockEnvironment mockEnvironment = MockEnvironment.builder().build();
        try (SubtaskCheckpointCoordinatorImpl subtaskCheckpointCoordinator = (SubtaskCheckpointCoordinatorImpl)new MockSubtaskCheckpointCoordinatorBuilder().setEnvironment((Environment)mockEnvironment).setExecutor(Executors.newFixedThreadPool(2)).setUnalignedCheckpointEnabled(true).build();){
            BlockingRunnableFuture rawKeyedStateHandleFuture = new BlockingRunnableFuture();
            OperatorSnapshotFutures operatorSnapshotResult = new OperatorSnapshotFutures((RunnableFuture)DoneFuture.of((Object)SnapshotResult.empty()), (RunnableFuture)rawKeyedStateHandleFuture, (RunnableFuture)DoneFuture.of((Object)SnapshotResult.empty()), (RunnableFuture)DoneFuture.of((Object)SnapshotResult.empty()), (Future)DoneFuture.of((Object)SnapshotResult.empty()), (Future)DoneFuture.of((Object)SnapshotResult.empty()));
            OperatorChain operatorChain = this.operatorChain(new CheckpointOperator(operatorSnapshotResult));
            long checkpointId = 42L;
            subtaskCheckpointCoordinator.getChannelStateWriter().start(checkpointId, CheckpointOptions.forCheckpointWithDefaultLocation());
            subtaskCheckpointCoordinator.checkpointState(new CheckpointMetaData(checkpointId, System.currentTimeMillis()), CheckpointOptions.forCheckpointWithDefaultLocation(), new CheckpointMetricsBuilder(), operatorChain, false, () -> false);
            rawKeyedStateHandleFuture.awaitRun();
            Assertions.assertThat((int)subtaskCheckpointCoordinator.getAsyncCheckpointRunnableSize()).isOne();
            Assertions.assertThat((boolean)rawKeyedStateHandleFuture.isCancelled()).isFalse();
            subtaskCheckpointCoordinator.notifyCheckpointAborted(checkpointId, operatorChain, () -> true);
            while (!rawKeyedStateHandleFuture.isDone()) {
                Thread.sleep(10L);
            }
            Assertions.assertThat((boolean)rawKeyedStateHandleFuture.isCancelled()).isTrue();
            Assertions.assertThat((int)subtaskCheckpointCoordinator.getAbortedCheckpointSize()).isZero();
        }
    }

    @Test
    void testNotifyCheckpointAbortedAfterAsyncPhase() throws Exception {
        TestTaskStateManager stateManager = new TestTaskStateManager();
        MockEnvironment mockEnvironment = MockEnvironment.builder().setTaskStateManager((TaskStateManager)stateManager).build();
        try (SubtaskCheckpointCoordinatorImpl subtaskCheckpointCoordinator = (SubtaskCheckpointCoordinatorImpl)new MockSubtaskCheckpointCoordinatorBuilder().setEnvironment((Environment)mockEnvironment).build();){
            OperatorChain<?, ?> operatorChain = this.getOperatorChain(mockEnvironment);
            long checkpointId = 42L;
            subtaskCheckpointCoordinator.checkpointState(new CheckpointMetaData(checkpointId, System.currentTimeMillis()), CheckpointOptions.forCheckpointWithDefaultLocation(), new CheckpointMetricsBuilder(), operatorChain, false, () -> false);
            subtaskCheckpointCoordinator.notifyCheckpointAborted(checkpointId, operatorChain, () -> true);
            Assertions.assertThat((int)subtaskCheckpointCoordinator.getAbortedCheckpointSize()).isZero();
            Assertions.assertThat((long)stateManager.getNotifiedAbortedCheckpointId()).isEqualTo(checkpointId);
        }
    }

    @Test
    void testTimeoutableAlignedBarrierNotPriorityAndChannelStateResult() throws Exception {
        long checkpointId = 66L;
        MockEnvironment mockEnvironment = MockEnvironment.builder().build();
        try (SubtaskCheckpointCoordinator coordinator = new MockSubtaskCheckpointCoordinatorBuilder().setUnalignedCheckpointEnabled(true).setEnvironment((Environment)mockEnvironment).build();){
            final AtomicReference<Object> broadcastedPriorityEvent = new AtomicReference<Object>(null);
            final AtomicReference<Object> channelStateResult = new AtomicReference<Object>(null);
            RegularOperatorChain operatorChain = new RegularOperatorChain(new StreamTaskITCase.NoOpStreamTask((Environment)new DummyEnvironment()), (RecordWriterDelegate)new NonRecordWriter()){

                public void broadcastEvent(AbstractEvent event, boolean isPriorityEvent) throws IOException {
                    super.broadcastEvent(event, isPriorityEvent);
                    broadcastedPriorityEvent.set(isPriorityEvent);
                }

                public void snapshotState(Map operatorSnapshotsInProgress, CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions, Supplier isRunning, ChannelStateWriter.ChannelStateWriteResult channelStateWriteResult, CheckpointStreamFactory storage) {
                    channelStateResult.set(channelStateWriteResult);
                    this.sendAcknowledgeCheckpointEvent(checkpointMetaData.getCheckpointId());
                }
            };
            CheckpointOptions checkpointOptions = new CheckpointOptions((SnapshotType)CheckpointType.CHECKPOINT, CheckpointStorageLocationReference.getDefault(), CheckpointOptions.AlignmentType.ALIGNED, 200L);
            coordinator.initInputsCheckpoint(checkpointId, checkpointOptions);
            coordinator.checkpointState(new CheckpointMetaData(checkpointId, System.currentTimeMillis()), checkpointOptions, new CheckpointMetricsBuilder(), (OperatorChain)operatorChain, false, () -> true);
            Assertions.assertThat((Boolean)broadcastedPriorityEvent.get()).isFalse();
            Assertions.assertThat(channelStateResult.get()).isNotNull();
        }
    }

    @Test
    void testChannelStateWriteResultLeakAndNotFailAfterCheckpointAborted() throws Exception {
        String taskName = "test";
        DummyEnvironment env = new DummyEnvironment();
        env.setCheckpointStorageAccess(CHECKPOINT_STORAGE.createCheckpointStorage(env.getJobID()));
        ChannelStateWriterImpl writer = new ChannelStateWriterImpl(env.getJobVertexId(), taskName, 0, () -> env.getCheckpointStorageAccess(), env.getChannelStateExecutorFactory(), 5);
        try (MockEnvironment mockEnvironment = MockEnvironment.builder().build();
             SubtaskCheckpointCoordinatorImpl coordinator = new SubtaskCheckpointCoordinatorImpl((CheckpointStorageWorkerView)new TestCheckpointStorageWorkerView(100), taskName, StreamTaskActionExecutor.IMMEDIATE, (ExecutorService)MoreExecutors.newDirectExecutorService(), (Environment)env, (unused1, unused2) -> {}, (unused1, unused2) -> CompletableFuture.completedFuture(null), 1, (ChannelStateWriter)writer, true, (callable, duration) -> () -> {});){
            OperatorChain<?, ?> operatorChain = this.getOperatorChain(mockEnvironment);
            int checkpointId = 1;
            coordinator.notifyCheckpointAborted((long)checkpointId, operatorChain, () -> true);
            coordinator.initInputsCheckpoint((long)checkpointId, CheckpointOptions.unaligned((SnapshotType)CheckpointType.CHECKPOINT, (CheckpointStorageLocationReference)CheckpointStorageLocationReference.getDefault()));
            ChannelStateWriter.ChannelStateWriteResult writeResult = writer.getWriteResult((long)checkpointId);
            Assertions.assertThat((Object)writeResult).isNotNull();
            coordinator.checkpointState(new CheckpointMetaData((long)checkpointId, System.currentTimeMillis()), CheckpointOptions.forCheckpointWithDefaultLocation(), new CheckpointMetricsBuilder(), operatorChain, false, () -> true);
            Assertions.assertThat((Object)writer.getWriteResult((long)checkpointId)).isNull();
            writeResult.waitForDone();
            Assertions.assertThat((boolean)writeResult.isDone()).isTrue();
            Assertions.assertThat((boolean)writeResult.getInputChannelStateHandles().isCompletedExceptionally()).isTrue();
            Assertions.assertThat((boolean)writeResult.getResultSubpartitionStateHandles().isCompletedExceptionally()).isTrue();
        }
    }

    @Test
    void testAbortOldAndStartNewCheckpoint() throws Exception {
        String taskName = "test";
        CheckpointOptions unalignedOptions = CheckpointOptions.unaligned((SnapshotType)CheckpointType.CHECKPOINT, (CheckpointStorageLocationReference)CheckpointStorageLocationReference.getDefault());
        DummyEnvironment env = new DummyEnvironment();
        env.setCheckpointStorageAccess(CHECKPOINT_STORAGE.createCheckpointStorage(env.getJobID()));
        ChannelStateWriterImpl writer = new ChannelStateWriterImpl(env.getJobVertexId(), taskName, 0, () -> env.getCheckpointStorageAccess(), env.getChannelStateExecutorFactory(), 5);
        try (MockEnvironment mockEnvironment = MockEnvironment.builder().build();
             SubtaskCheckpointCoordinatorImpl coordinator = new SubtaskCheckpointCoordinatorImpl((CheckpointStorageWorkerView)new TestCheckpointStorageWorkerView(100), taskName, StreamTaskActionExecutor.IMMEDIATE, (ExecutorService)MoreExecutors.newDirectExecutorService(), (Environment)env, (unused1, unused2) -> {}, (unused1, unused2) -> CompletableFuture.completedFuture(null), 1, (ChannelStateWriter)writer, true, (callable, duration) -> () -> {});){
            OperatorChain<?, ?> operatorChain = this.getOperatorChain(mockEnvironment);
            int checkpoint42 = 42;
            int checkpoint43 = 43;
            coordinator.initInputsCheckpoint((long)checkpoint42, unalignedOptions);
            ChannelStateWriter.ChannelStateWriteResult result42 = writer.getWriteResult((long)checkpoint42);
            Assertions.assertThat((Object)result42).isNotNull();
            coordinator.notifyCheckpointAborted((long)checkpoint42, operatorChain, () -> true);
            coordinator.initInputsCheckpoint((long)checkpoint43, unalignedOptions);
            ChannelStateWriter.ChannelStateWriteResult result43 = writer.getWriteResult((long)checkpoint43);
            result42.waitForDone();
            Assertions.assertThat((boolean)result42.isDone()).isTrue();
            Assertions.assertThatThrownBy(() -> {
                Collection cfr_ignored_0 = (Collection)result42.getInputChannelStateHandles().get();
            }).isInstanceOf(CancellationException.class);
            coordinator.checkpointState(new CheckpointMetaData((long)checkpoint43, System.currentTimeMillis()), unalignedOptions, new CheckpointMetricsBuilder(), operatorChain, false, () -> true);
            result43.waitForDone();
            Assertions.assertThat((Object)result43).isNotNull();
            Assertions.assertThat((boolean)result43.isDone()).isTrue();
            Assertions.assertThat((boolean)result43.getInputChannelStateHandles().isCompletedExceptionally()).isFalse();
            Assertions.assertThat((boolean)result43.getResultSubpartitionStateHandles().isCompletedExceptionally()).isFalse();
        }
    }

    private OperatorChain<?, ?> getOperatorChain(MockEnvironment mockEnvironment) throws Exception {
        return new RegularOperatorChain((StreamTask)new MockStreamTaskBuilder((Environment)mockEnvironment).build(), (RecordWriterDelegate)new NonRecordWriter());
    }

    private <T> OperatorChain<T, AbstractStreamOperator<T>> operatorChain(OneInputStreamOperator<T, T> ... streamOperators) throws Exception {
        return OperatorChainTest.setupOperatorChain(streamOperators);
    }

    private static SubtaskCheckpointCoordinator coordinator(ChannelStateWriter channelStateWriter) throws IOException {
        return new SubtaskCheckpointCoordinatorImpl((CheckpointStorageWorkerView)new TestCheckpointStorageWorkerView(100), "test", StreamTaskActionExecutor.IMMEDIATE, (ExecutorService)MoreExecutors.newDirectExecutorService(), (Environment)new DummyEnvironment(), (message, unused) -> Assertions.fail((String)message), (unused1, unused2) -> CompletableFuture.completedFuture(null), 0, channelStateWriter, true, (callable, duration) -> () -> {});
    }

    private static class CheckpointOperator
    implements OneInputStreamOperator<String, String> {
        private static final long serialVersionUID = 1L;
        private final OperatorSnapshotFutures operatorSnapshotFutures;
        private boolean checkpointed = false;

        CheckpointOperator(OperatorSnapshotFutures operatorSnapshotFutures) {
            this.operatorSnapshotFutures = operatorSnapshotFutures;
        }

        boolean isCheckpointed() {
            return this.checkpointed;
        }

        public void open() throws Exception {
        }

        public void finish() throws Exception {
        }

        public void close() throws Exception {
        }

        public void prepareSnapshotPreBarrier(long checkpointId) {
        }

        public OperatorSnapshotFutures snapshotState(long checkpointId, long timestamp, CheckpointOptions checkpointOptions, CheckpointStreamFactory storageLocation) throws Exception {
            this.checkpointed = true;
            return this.operatorSnapshotFutures;
        }

        public void initializeState(StreamTaskStateInitializer streamTaskStateManager) throws Exception {
        }

        public void setKeyContextElement1(StreamRecord<?> record) {
        }

        public void setKeyContextElement2(StreamRecord<?> record) {
        }

        public OperatorMetricGroup getMetricGroup() {
            return UnregisteredMetricGroups.createUnregisteredOperatorMetricGroup();
        }

        public OperatorID getOperatorID() {
            return new OperatorID();
        }

        public void notifyCheckpointComplete(long checkpointId) {
        }

        public void notifyCheckpointAborted(long checkpointId) {
        }

        public void setCurrentKey(Object key) {
        }

        public Object getCurrentKey() {
            return null;
        }

        public void processElement(StreamRecord<String> element) throws Exception {
        }

        public void processWatermark(Watermark mark) throws Exception {
        }

        public void processLatencyMarker(LatencyMarker latencyMarker) {
        }

        public void processWatermarkStatus(WatermarkStatus watermarkStatus) throws Exception {
        }
    }

    private static final class BlockingRunnableFuture
    implements RunnableFuture<SnapshotResult<KeyedStateHandle>> {
        private final CompletableFuture<SnapshotResult<KeyedStateHandle>> future = new CompletableFuture();
        private final OneShotLatch signalRunLatch = new OneShotLatch();
        private final CountDownLatch countDownLatch = new CountDownLatch(2);
        private final SnapshotResult<KeyedStateHandle> value = SnapshotResult.empty();

        private BlockingRunnableFuture() {
        }

        @Override
        public void run() {
            this.signalRunLatch.trigger();
            this.countDownLatch.countDown();
            try {
                this.countDownLatch.await();
            }
            catch (InterruptedException e) {
                ExceptionUtils.rethrow((Throwable)e);
            }
            this.future.complete(this.value);
        }

        @Override
        public boolean cancel(boolean mayInterruptIfRunning) {
            this.future.cancel(mayInterruptIfRunning);
            return true;
        }

        @Override
        public boolean isCancelled() {
            return this.future.isCancelled();
        }

        @Override
        public boolean isDone() {
            return this.future.isDone();
        }

        @Override
        public SnapshotResult<KeyedStateHandle> get() throws InterruptedException, ExecutionException {
            return this.future.get();
        }

        @Override
        public SnapshotResult<KeyedStateHandle> get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException {
            return this.future.get();
        }

        void awaitRun() throws InterruptedException {
            this.signalRunLatch.await();
        }
    }

    private static class MapOperator
    extends StreamMap<String, String> {
        private static final long serialVersionUID = 1L;

        public MapOperator() {
            super((MapFunction & Serializable)value -> value);
        }

        public void notifyCheckpointAborted(long checkpointId) throws Exception {
        }
    }
}

