/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.util;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.memory.ManagedMemoryUseCase;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.CheckpointType;
import org.apache.flink.runtime.checkpoint.OperatorStateRepartitioner;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.runtime.checkpoint.RoundRobinOperatorStateRepartitioner;
import org.apache.flink.runtime.checkpoint.SnapshotType;
import org.apache.flink.runtime.checkpoint.StateAssignmentOperation;
import org.apache.flink.runtime.checkpoint.StateObjectCollection;
import org.apache.flink.runtime.checkpoint.SubTaskInitializationMetricsBuilder;
import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobgraph.tasks.TaskOperatorEventGateway;
import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
import org.apache.flink.runtime.operators.coordination.OperatorEventDispatcher;
import org.apache.flink.runtime.operators.testutils.MockEnvironment;
import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
import org.apache.flink.runtime.state.CheckpointStorage;
import org.apache.flink.runtime.state.CheckpointStorageAccess;
import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
import org.apache.flink.runtime.state.CheckpointableKeyedStateBackend;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyGroupStatePartitionStreamProvider;
import org.apache.flink.runtime.state.OperatorStateHandle;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.StateBackendTestUtils;
import org.apache.flink.runtime.state.TestTaskStateManager;
import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.runtime.state.ttl.MockTtlTimeProvider;
import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
import org.apache.flink.runtime.taskmanager.NoOpTaskOperatorEventGateway;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.InternalTimeServiceManager;
import org.apache.flink.streaming.api.operators.InternalTimeServiceManagerImpl;
import org.apache.flink.streaming.api.operators.KeyContext;
import org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer;
import org.apache.flink.streaming.api.operators.OperatorSnapshotFutures;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.operators.SetupableStreamOperator;
import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
import org.apache.flink.streaming.api.operators.StreamOperatorFactoryUtil;
import org.apache.flink.streaming.api.operators.StreamTaskStateInitializer;
import org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.operators.asyncprocessing.AsyncStateProcessing;
import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
import org.apache.flink.streaming.runtime.streamrecord.RecordAttributes;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.OperatorEventDispatcherImpl;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.streaming.runtime.tasks.StreamTaskCancellationContext;
import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
import org.apache.flink.streaming.runtime.tasks.TimerService;
import org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailbox;
import org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailboxImpl;
import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
import org.apache.flink.streaming.util.MockStreamTask;
import org.apache.flink.streaming.util.MockStreamTaskBuilder;
import org.apache.flink.streaming.util.OperatorSnapshotUtil;
import org.apache.flink.util.OutputTag;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.clock.SystemClock;

public class AbstractStreamOperatorTestHarness<OUT>
implements AutoCloseable {
    protected StreamOperator<OUT> operator;
    protected final StreamOperatorFactory<OUT> factory;
    protected final ConcurrentLinkedQueue<Object> outputList;
    protected final Map<OutputTag<?>, ConcurrentLinkedQueue<Object>> sideOutputLists;
    protected final StreamConfig config;
    protected final ExecutionConfig executionConfig;
    protected final TestProcessingTimeService processingTimeService;
    protected final MockTtlTimeProvider ttlTimeProvider;
    protected final MockStreamTask<OUT, ?> mockTask;
    protected final TestTaskStateManager taskStateManager;
    final MockEnvironment environment;
    private final Optional<MockEnvironment> internalEnvironment;
    protected StreamTaskStateInitializer streamTaskStateInitializer;
    private final TaskMailbox taskMailbox;
    protected StateBackend stateBackend = new MemoryStateBackend();
    private CheckpointStorageAccess checkpointStorageAccess = new MemoryStateBackend().createCheckpointStorage(new JobID());
    private final Object checkpointLock;
    private static final OperatorStateRepartitioner<OperatorStateHandle> operatorStateRepartitioner = RoundRobinOperatorStateRepartitioner.INSTANCE;
    private InternalTimeServiceManagerImpl<?> timeServiceManager;
    private InternalTimeServiceManager.Provider timeServiceManagerProvider = new InternalTimeServiceManager.Provider(){

        public <K> InternalTimeServiceManager<K> create(TaskIOMetricGroup taskIOMetricGroup, CheckpointableKeyedStateBackend<K> keyedStatedBackend, ClassLoader userClassloader, KeyContext keyContext, ProcessingTimeService processingTimeService, Iterable<KeyGroupStatePartitionStreamProvider> rawKeyedStates, StreamTaskCancellationContext cancellationContext) throws Exception {
            InternalTimeServiceManagerImpl typedTimeServiceManager = InternalTimeServiceManagerImpl.create((TaskIOMetricGroup)taskIOMetricGroup, keyedStatedBackend, (ClassLoader)userClassloader, (KeyContext)keyContext, (ProcessingTimeService)processingTimeService, rawKeyedStates, (StreamTaskCancellationContext)cancellationContext);
            AbstractStreamOperatorTestHarness.this.timeServiceManager = typedTimeServiceManager;
            return typedTimeServiceManager;
        }
    };
    private boolean setupCalled = false;
    private boolean initializeCalled = false;
    private volatile boolean wasFailedExternally = false;

    public AbstractStreamOperatorTestHarness(StreamOperator<OUT> operator, int maxParallelism, int parallelism, int subtaskIndex) throws Exception {
        this(operator, maxParallelism, parallelism, subtaskIndex, new OperatorID());
    }

    public AbstractStreamOperatorTestHarness(StreamOperator<OUT> operator, int maxParallelism, int parallelism, int subtaskIndex, OperatorID operatorID) throws Exception {
        this(operator, (StreamOperatorFactory<OUT>)SimpleOperatorFactory.of(operator), new MockEnvironmentBuilder().setTaskName("MockTask").setManagedMemorySize(0x300000L).setInputSplitProvider(new MockInputSplitProvider()).setBufferSize(1024).setMaxParallelism(maxParallelism).setParallelism(parallelism).setSubtaskIndex(subtaskIndex).build(), true, operatorID);
    }

    public AbstractStreamOperatorTestHarness(StreamOperatorFactory<OUT> factory, MockEnvironment env) throws Exception {
        this(null, factory, env, false, new OperatorID());
    }

    public AbstractStreamOperatorTestHarness(StreamOperatorFactory<OUT> factory, int maxParallelism, int parallelism, int subtaskIndex) throws Exception {
        this(factory, maxParallelism, parallelism, subtaskIndex, new OperatorID());
    }

    public AbstractStreamOperatorTestHarness(StreamOperatorFactory<OUT> factory, int maxParallelism, int parallelism, int subtaskIndex, OperatorID operatorID) throws Exception {
        this(null, factory, new MockEnvironmentBuilder().setTaskName("MockTask").setManagedMemorySize(0x300000L).setInputSplitProvider(new MockInputSplitProvider()).setBufferSize(1024).setMaxParallelism(maxParallelism).setParallelism(parallelism).setSubtaskIndex(subtaskIndex).build(), true, operatorID);
    }

    public AbstractStreamOperatorTestHarness(StreamOperator<OUT> operator, MockEnvironment env) throws Exception {
        this(operator, (StreamOperatorFactory<OUT>)SimpleOperatorFactory.of(operator), env, false, new OperatorID());
    }

    public AbstractStreamOperatorTestHarness(StreamOperator<OUT> operator, String taskName, OperatorID operatorID) throws Exception {
        this(operator, (StreamOperatorFactory<OUT>)SimpleOperatorFactory.of(operator), new MockEnvironmentBuilder().setTaskName(taskName).setManagedMemorySize(0x300000L).setInputSplitProvider(new MockInputSplitProvider()).setBufferSize(1024).setMaxParallelism(1).setParallelism(1).setSubtaskIndex(0).build(), false, operatorID);
    }

    private AbstractStreamOperatorTestHarness(StreamOperator<OUT> operator, StreamOperatorFactory<OUT> factory, MockEnvironment env, boolean environmentIsInternal, OperatorID operatorID) throws Exception {
        this.operator = operator;
        this.factory = factory;
        this.outputList = new ConcurrentLinkedQueue();
        this.sideOutputLists = new HashMap();
        Configuration underlyingConfig = env.getTaskConfiguration();
        this.config = new StreamConfig(underlyingConfig);
        this.config.setCheckpointingEnabled(true);
        this.config.setOperatorID(operatorID);
        this.config.setStateBackendUsesManagedMemory(true);
        this.config.setManagedMemoryFractionOperatorOfUseCase(ManagedMemoryUseCase.STATE_BACKEND, 1.0);
        this.config.setManagedMemoryFractionOperatorOfUseCase(ManagedMemoryUseCase.OPERATOR, 1.0);
        this.executionConfig = env.getExecutionConfig();
        this.checkpointLock = new Object();
        this.environment = (MockEnvironment)Preconditions.checkNotNull((Object)env);
        this.taskStateManager = (TestTaskStateManager)env.getTaskStateManager();
        this.internalEnvironment = environmentIsInternal ? Optional.of(this.environment) : Optional.empty();
        this.processingTimeService = new TestProcessingTimeService();
        this.processingTimeService.setCurrentTime(0L);
        this.ttlTimeProvider = new MockTtlTimeProvider();
        this.ttlTimeProvider.setCurrentTimestamp(0L);
        if (operator instanceof AsyncStateProcessing || factory instanceof SimpleOperatorFactory && ((SimpleOperatorFactory)factory).getOperator() instanceof AsyncStateProcessing) {
            this.setStateBackend(StateBackendTestUtils.buildAsyncStateBackend((StateBackend)new HashMapStateBackend()));
        }
        this.streamTaskStateInitializer = this.createStreamTaskStateManager((Environment)this.environment, this.stateBackend, (TtlTimeProvider)this.ttlTimeProvider, this.timeServiceManagerProvider);
        BiConsumer<String, Throwable> handleAsyncException = (message, t) -> {
            this.wasFailedExternally = true;
        };
        this.taskMailbox = new TaskMailboxImpl();
        try {
            this.checkpointStorageAccess = this.environment.getCheckpointStorageAccess();
        }
        catch (NullPointerException | UnsupportedOperationException runtimeException) {
            // empty catch block
        }
        this.mockTask = new MockStreamTaskBuilder((Environment)env).setCheckpointLock(this.checkpointLock).setConfig(this.config).setExecutionConfig(this.executionConfig).setStreamTaskStateInitializer(this.streamTaskStateInitializer).setCheckpointStorage(this.checkpointStorageAccess).setTimerService((TimerService)this.processingTimeService).setHandleAsyncException(handleAsyncException).setTaskMailbox(this.taskMailbox).build();
    }

    private StreamTaskStateInitializer createStreamTaskStateManager(Environment env, StateBackend stateBackend, TtlTimeProvider ttlTimeProvider, InternalTimeServiceManager.Provider timeServiceManagerProvider) {
        return new StreamTaskStateInitializerImpl(env, stateBackend, new SubTaskInitializationMetricsBuilder(SystemClock.getInstance().absoluteTimeMillis()), ttlTimeProvider, timeServiceManagerProvider, StreamTaskCancellationContext.alwaysRunning());
    }

    public void setStateBackend(StateBackend stateBackend) {
        this.stateBackend = stateBackend;
        if (stateBackend instanceof CheckpointStorage) {
            this.setCheckpointStorage((CheckpointStorage)stateBackend);
        }
    }

    public void setCheckpointStorage(CheckpointStorage storage) {
        if (this.stateBackend instanceof CheckpointStorage) {
            return;
        }
        try {
            this.checkpointStorageAccess = storage.createCheckpointStorage(new JobID());
        }
        catch (IOException e) {
            throw new RuntimeException(e.getMessage(), e);
        }
    }

    @Deprecated
    public Object getCheckpointLock() {
        return this.mockTask.getCheckpointLock();
    }

    public MockEnvironment getEnvironment() {
        return this.environment;
    }

    public ExecutionConfig getExecutionConfig() {
        return this.executionConfig;
    }

    public StreamConfig getStreamConfig() {
        return this.config;
    }

    public ConcurrentLinkedQueue<Object> getOutput() {
        return this.outputList;
    }

    public Collection<StreamRecord<OUT>> getRecordOutput() {
        return this.outputList.stream().filter(element -> element instanceof StreamRecord).map(element -> (StreamRecord)element).collect(Collectors.toList());
    }

    public <X> ConcurrentLinkedQueue<StreamRecord<X>> getSideOutput(OutputTag<X> tag) {
        return this.sideOutputLists.get(tag);
    }

    public List<StreamRecord<? extends OUT>> extractOutputStreamRecords() {
        LinkedList<StreamRecord<OUT>> resultElements = new LinkedList<StreamRecord<OUT>>();
        for (Object e : this.getOutput()) {
            if (!(e instanceof StreamRecord)) continue;
            resultElements.add((StreamRecord)e);
        }
        return resultElements;
    }

    public List<OUT> extractOutputValues() {
        List<StreamRecord<OUT>> streamRecords = this.extractOutputStreamRecords();
        ArrayList<Object> outputValues = new ArrayList<Object>();
        for (StreamRecord<OUT> streamRecord : streamRecords) {
            outputValues.add(streamRecord.getValue());
        }
        return outputValues;
    }

    public void setup() {
        this.setup(null);
    }

    public void setup(TypeSerializer<OUT> outputSerializer) {
        if (!this.setupCalled) {
            this.streamTaskStateInitializer = this.createStreamTaskStateManager((Environment)this.environment, this.stateBackend, (TtlTimeProvider)this.ttlTimeProvider, this.timeServiceManagerProvider);
            this.mockTask.setStreamTaskStateInitializer(this.streamTaskStateInitializer);
            if (this.operator == null) {
                this.operator = (StreamOperator)StreamOperatorFactoryUtil.createOperator(this.factory, this.mockTask, (StreamConfig)this.config, (Output)(AbstractStreamOperatorTestHarness)this.new MockOutput(outputSerializer), (OperatorEventDispatcher)new OperatorEventDispatcherImpl((ClassLoader)this.getClass().getClassLoader(), (TaskOperatorEventGateway)new NoOpTaskOperatorEventGateway())).f0;
            } else {
                if (this.operator instanceof AbstractStreamOperator) {
                    ((AbstractStreamOperator)this.operator).setProcessingTimeService((ProcessingTimeService)this.processingTimeService);
                }
                if (this.operator instanceof SetupableStreamOperator) {
                    ((SetupableStreamOperator)this.operator).setup(this.mockTask, this.config, (Output)new MockOutput(outputSerializer));
                }
            }
            this.setupCalled = true;
            this.mockTask.init();
        }
    }

    public void initializeState(OperatorSubtaskState operatorStateHandles) throws Exception {
        this.initializeState(operatorStateHandles, null);
    }

    public void initializeState(String operatorStateSnapshotPath) throws Exception {
        this.initializeState(OperatorSnapshotUtil.readStateHandle(operatorStateSnapshotPath));
    }

    public void initializeEmptyState() throws Exception {
        this.initializeState((OperatorSubtaskState)null);
    }

    public static OperatorSubtaskState repartitionOperatorState(OperatorSubtaskState operatorStateHandles, int numKeyGroups, int oldParallelism, int newParallelism, int subtaskIndex) {
        Collection<Object> localRawOperatorState;
        Collection<Object> localManagedOperatorState;
        Preconditions.checkNotNull((Object)operatorStateHandles, (String)"the previous operatorStateHandles should not be null.");
        List keyGroupPartitions = StateAssignmentOperation.createKeyGroupPartitions((int)numKeyGroups, (int)newParallelism);
        KeyGroupRange localKeyGroupRange = (KeyGroupRange)keyGroupPartitions.get(subtaskIndex);
        ArrayList localManagedKeyGroupState = new ArrayList();
        StateAssignmentOperation.extractIntersectingState((Collection)operatorStateHandles.getManagedKeyedState(), (KeyGroupRange)localKeyGroupRange, localManagedKeyGroupState);
        ArrayList localRawKeyGroupState = new ArrayList();
        StateAssignmentOperation.extractIntersectingState((Collection)operatorStateHandles.getRawKeyedState(), (KeyGroupRange)localKeyGroupRange, localRawKeyGroupState);
        StateObjectCollection managedOperatorStates = operatorStateHandles.getManagedOperatorState();
        if (!managedOperatorStates.isEmpty()) {
            List managedOperatorState = managedOperatorStates.stream().map(Collections::singletonList).collect(Collectors.toList());
            localManagedOperatorState = (Collection)operatorStateRepartitioner.repartitionState(managedOperatorState, oldParallelism, newParallelism).get(subtaskIndex);
        } else {
            localManagedOperatorState = Collections.emptyList();
        }
        StateObjectCollection rawOperatorStates = operatorStateHandles.getRawOperatorState();
        if (!rawOperatorStates.isEmpty()) {
            List rawOperatorState = rawOperatorStates.stream().map(Collections::singletonList).collect(Collectors.toList());
            localRawOperatorState = (Collection)operatorStateRepartitioner.repartitionState(rawOperatorState, oldParallelism, newParallelism).get(subtaskIndex);
        } else {
            localRawOperatorState = Collections.emptyList();
        }
        return OperatorSubtaskState.builder().setManagedOperatorState(new StateObjectCollection(AbstractStreamOperatorTestHarness.nullToEmptyCollection(localManagedOperatorState))).setRawOperatorState(new StateObjectCollection(AbstractStreamOperatorTestHarness.nullToEmptyCollection(localRawOperatorState))).setManagedKeyedState(new StateObjectCollection(AbstractStreamOperatorTestHarness.nullToEmptyCollection(localManagedKeyGroupState))).setRawKeyedState(new StateObjectCollection(AbstractStreamOperatorTestHarness.nullToEmptyCollection(localRawKeyGroupState))).build();
    }

    public void initializeState(OperatorSubtaskState jmOperatorStateHandles, OperatorSubtaskState tmOperatorStateHandles) throws Exception {
        Preconditions.checkState((!this.initializeCalled ? 1 : 0) != 0, (Object)"TestHarness has already been initialized. Have you opened this harness before initializing it?");
        if (!this.setupCalled) {
            this.setup();
        }
        if (jmOperatorStateHandles != null) {
            TaskStateSnapshot jmTaskStateSnapshot = new TaskStateSnapshot();
            jmTaskStateSnapshot.putSubtaskStateByOperatorID(this.operator.getOperatorID(), jmOperatorStateHandles);
            this.taskStateManager.setReportedCheckpointId(0L);
            this.taskStateManager.setJobManagerTaskStateSnapshotsByCheckpointId(Collections.singletonMap(0L, jmTaskStateSnapshot));
            if (tmOperatorStateHandles != null) {
                TaskStateSnapshot tmTaskStateSnapshot = new TaskStateSnapshot();
                tmTaskStateSnapshot.putSubtaskStateByOperatorID(this.operator.getOperatorID(), tmOperatorStateHandles);
                this.taskStateManager.setTaskManagerTaskStateSnapshotsByCheckpointId(Collections.singletonMap(0L, tmTaskStateSnapshot));
            }
        }
        this.operator.initializeState(this.mockTask.createStreamTaskStateInitializer(new SubTaskInitializationMetricsBuilder(SystemClock.getInstance().absoluteTimeMillis())));
        this.initializeCalled = true;
    }

    private static <T> Collection<T> nullToEmptyCollection(Collection<T> collection) {
        return collection != null ? collection : Collections.emptyList();
    }

    public static OperatorSubtaskState repackageState(OperatorSubtaskState ... handles) throws Exception {
        if (handles.length < 1) {
            return null;
        }
        if (handles.length == 1) {
            return handles[0];
        }
        ArrayList mergedManagedOperatorState = new ArrayList(handles.length);
        ArrayList mergedRawOperatorState = new ArrayList(handles.length);
        ArrayList mergedManagedKeyedState = new ArrayList(handles.length);
        ArrayList mergedRawKeyedState = new ArrayList(handles.length);
        for (OperatorSubtaskState handle : handles) {
            StateObjectCollection managedOperatorState = handle.getManagedOperatorState();
            StateObjectCollection rawOperatorState = handle.getRawOperatorState();
            StateObjectCollection managedKeyedState = handle.getManagedKeyedState();
            StateObjectCollection rawKeyedState = handle.getRawKeyedState();
            mergedManagedOperatorState.addAll(managedOperatorState);
            mergedRawOperatorState.addAll(rawOperatorState);
            mergedManagedKeyedState.addAll(managedKeyedState);
            mergedRawKeyedState.addAll(rawKeyedState);
        }
        return OperatorSubtaskState.builder().setManagedOperatorState(new StateObjectCollection(mergedManagedOperatorState)).setRawOperatorState(new StateObjectCollection(mergedRawOperatorState)).setManagedKeyedState(new StateObjectCollection(mergedManagedKeyedState)).setRawKeyedState(new StateObjectCollection(mergedRawKeyedState)).build();
    }

    public void open() throws Exception {
        if (!this.initializeCalled) {
            this.initializeEmptyState();
        }
        this.operator.open();
    }

    public void prepareSnapshotPreBarrier(long checkpointId) throws Exception {
        this.operator.prepareSnapshotPreBarrier(checkpointId);
    }

    public OperatorSubtaskState snapshot(long checkpointId, long timestamp) throws Exception {
        return this.snapshotWithLocalState(checkpointId, timestamp).getJobManagerOwnedState();
    }

    public OperatorSnapshotFinalizer snapshotWithLocalState(long checkpointId, long timestamp) throws Exception {
        return this.snapshotWithLocalState(checkpointId, timestamp, (SnapshotType)CheckpointType.CHECKPOINT);
    }

    public OperatorSnapshotFinalizer snapshotWithLocalState(long checkpointId, long timestamp, SnapshotType checkpointType) throws Exception {
        CheckpointStorageLocationReference locationReference = CheckpointStorageLocationReference.getDefault();
        OperatorSnapshotFutures operatorStateResult = this.operator.snapshotState(checkpointId, timestamp, new CheckpointOptions(checkpointType, locationReference), this.checkpointStorageAccess.resolveCheckpointStorageLocation(checkpointId, locationReference));
        return new OperatorSnapshotFinalizer(operatorStateResult);
    }

    public void notifyOfCompletedCheckpoint(long checkpointId) throws Exception {
        this.operator.notifyCheckpointComplete(checkpointId);
    }

    @Override
    public void close() throws Exception {
        if (this.processingTimeService != null) {
            this.processingTimeService.shutdownService();
        }
        this.setupCalled = false;
        this.operator.finish();
        this.operator.close();
        if (this.internalEnvironment.isPresent()) {
            this.internalEnvironment.get().close();
        }
        this.mockTask.cleanUpInternal();
    }

    public AbstractStreamOperator<OUT> getOperator() {
        return (AbstractStreamOperator)this.operator;
    }

    public StreamOperatorFactory<OUT> getOperatorFactory() {
        return this.factory;
    }

    public void setProcessingTime(long time) throws Exception {
        this.processingTimeService.setCurrentTime(time);
    }

    public void setStateTtlProcessingTime(long timeStamp) {
        this.ttlTimeProvider.setCurrentTimestamp(timeStamp);
    }

    public long getProcessingTime() {
        return this.processingTimeService.getCurrentProcessingTime();
    }

    public void setTimeCharacteristic(TimeCharacteristic timeCharacteristic) {
        this.config.setTimeCharacteristic(timeCharacteristic);
    }

    public TimeCharacteristic getTimeCharacteristic() {
        return this.config.getTimeCharacteristic();
    }

    public boolean wasFailedExternally() {
        return this.wasFailedExternally;
    }

    @VisibleForTesting
    public int numProcessingTimeTimers() {
        if (this.timeServiceManager != null) {
            return this.timeServiceManager.numProcessingTimeTimers();
        }
        throw new UnsupportedOperationException();
    }

    @VisibleForTesting
    public int numEventTimeTimers() {
        if (this.timeServiceManager != null) {
            return this.timeServiceManager.numEventTimeTimers();
        }
        throw new UnsupportedOperationException();
    }

    @VisibleForTesting
    public TestProcessingTimeService getProcessingTimeService() {
        return this.processingTimeService;
    }

    @VisibleForTesting
    public TaskMailbox getTaskMailbox() {
        return this.taskMailbox;
    }

    public void setTimeServiceManagerProvider(InternalTimeServiceManager.Provider timeServiceManagerProvider) {
        this.timeServiceManagerProvider = timeServiceManagerProvider;
    }

    class MockOutput
    implements Output<StreamRecord<OUT>> {
        private TypeSerializer<OUT> outputSerializer;
        private TypeSerializer sideOutputSerializer;

        MockOutput() {
            this(null);
        }

        MockOutput(TypeSerializer<OUT> outputSerializer) {
            this.outputSerializer = outputSerializer;
        }

        public void emitWatermark(Watermark mark) {
            AbstractStreamOperatorTestHarness.this.outputList.add(mark);
        }

        public void emitWatermarkStatus(WatermarkStatus watermarkStatus) {
            AbstractStreamOperatorTestHarness.this.outputList.add(watermarkStatus);
        }

        public void emitLatencyMarker(LatencyMarker latencyMarker) {
            AbstractStreamOperatorTestHarness.this.outputList.add(latencyMarker);
        }

        public void emitRecordAttributes(RecordAttributes recordAttributes) {
            AbstractStreamOperatorTestHarness.this.outputList.add(recordAttributes);
        }

        public void collect(StreamRecord<OUT> element) {
            if (this.outputSerializer == null) {
                this.outputSerializer = TypeExtractor.getForObject((Object)element.getValue()).createSerializer(AbstractStreamOperatorTestHarness.this.executionConfig.getSerializerConfig());
            }
            if (element.hasTimestamp()) {
                AbstractStreamOperatorTestHarness.this.outputList.add(new StreamRecord(this.outputSerializer.copy(element.getValue()), element.getTimestamp()));
            } else {
                AbstractStreamOperatorTestHarness.this.outputList.add(new StreamRecord(this.outputSerializer.copy(element.getValue())));
            }
        }

        public <X> void collect(OutputTag<X> outputTag, StreamRecord<X> record) {
            this.sideOutputSerializer = outputTag.getTypeInfo().createSerializer(AbstractStreamOperatorTestHarness.this.executionConfig.getSerializerConfig());
            ConcurrentLinkedQueue<Object> sideOutputList = AbstractStreamOperatorTestHarness.this.sideOutputLists.get(outputTag);
            if (sideOutputList == null) {
                sideOutputList = new ConcurrentLinkedQueue();
                AbstractStreamOperatorTestHarness.this.sideOutputLists.put(outputTag, sideOutputList);
            }
            if (record.hasTimestamp()) {
                sideOutputList.add(new StreamRecord(this.sideOutputSerializer.copy(record.getValue()), record.getTimestamp()));
            } else {
                sideOutputList.add(new StreamRecord(this.sideOutputSerializer.copy(record.getValue())));
            }
        }

        public void close() {
        }
    }
}

