/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.api.operators;

import java.io.Closeable;
import java.util.Collections;
import java.util.Iterator;
import java.util.Map;
import java.util.OptionalLong;
import java.util.Random;
import java.util.stream.Stream;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.flink.runtime.checkpoint.JobManagerTaskRestore;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.runtime.checkpoint.StateHandleDummyUtil;
import org.apache.flink.runtime.checkpoint.StateObjectCollection;
import org.apache.flink.runtime.checkpoint.SubTaskInitializationMetrics;
import org.apache.flink.runtime.checkpoint.SubTaskInitializationMetricsBuilder;
import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
import org.apache.flink.runtime.checkpoint.metadata.CheckpointTestUtils;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
import org.apache.flink.runtime.state.CheckpointableKeyedStateBackend;
import org.apache.flink.runtime.state.KeyGroupStatePartitionStreamProvider;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.OperatorStateBackend;
import org.apache.flink.runtime.state.OperatorStateHandle;
import org.apache.flink.runtime.state.OperatorStreamStateHandle;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.StateObject;
import org.apache.flink.runtime.state.StatePartitionStreamProvider;
import org.apache.flink.runtime.state.TaskExecutorStateChangelogStoragesManager;
import org.apache.flink.runtime.state.TaskLocalStateStore;
import org.apache.flink.runtime.state.TaskStateManager;
import org.apache.flink.runtime.state.TaskStateManagerImpl;
import org.apache.flink.runtime.state.TestTaskLocalStateStore;
import org.apache.flink.runtime.state.changelog.StateChangelogStorage;
import org.apache.flink.runtime.state.changelog.inmemory.InMemoryStateChangelogStorage;
import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
import org.apache.flink.runtime.taskmanager.CheckpointResponder;
import org.apache.flink.runtime.taskmanager.TestCheckpointResponder;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.InternalTimeServiceManager;
import org.apache.flink.streaming.api.operators.KeyContext;
import org.apache.flink.streaming.api.operators.StreamOperatorStateContext;
import org.apache.flink.streaming.api.operators.StreamTaskStateInitializer;
import org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.streaming.runtime.tasks.StreamTaskCancellationContext;
import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
import org.apache.flink.util.CloseableIterable;
import org.apache.flink.util.clock.SystemClock;
import org.assertj.core.api.AbstractBooleanAssert;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.MapAssert;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;

class StreamTaskStateInitializerImplTest {
    StreamTaskStateInitializerImplTest() {
    }

    @Test
    void testNoRestore() throws Exception {
        HashMapStateBackend stateBackend = (HashMapStateBackend)Mockito.spy((Object)new HashMapStateBackend());
        StreamTaskStateInitializer streamTaskStateManager = this.streamTaskStateManager((StateBackend)stateBackend, null, new SubTaskInitializationMetricsBuilder(SystemClock.getInstance().absoluteTimeMillis()), true);
        OperatorID operatorID = new OperatorID(47L, 11L);
        AbstractStreamOperator streamOperator = (AbstractStreamOperator)Mockito.mock(AbstractStreamOperator.class);
        Mockito.when((Object)streamOperator.getOperatorID()).thenReturn((Object)operatorID);
        IntSerializer typeSerializer = new IntSerializer();
        CloseableRegistry closeableRegistry = new CloseableRegistry();
        StreamOperatorStateContext stateContext = streamTaskStateManager.streamOperatorStateContext(streamOperator.getOperatorID(), streamOperator.getClass().getSimpleName(), (ProcessingTimeService)new TestProcessingTimeService(), (KeyContext)streamOperator, (TypeSerializer)typeSerializer, closeableRegistry, (MetricGroup)new UnregisteredMetricsGroup(), 1.0, false);
        OperatorStateBackend operatorStateBackend = stateContext.operatorStateBackend();
        CheckpointableKeyedStateBackend keyedStateBackend = stateContext.keyedStateBackend();
        InternalTimeServiceManager timeServiceManager = stateContext.internalTimerServiceManager();
        CloseableIterable keyedStateInputs = stateContext.rawKeyedStateInputs();
        CloseableIterable operatorStateInputs = stateContext.rawOperatorStateInputs();
        ((AbstractBooleanAssert)Assertions.assertThat((boolean)stateContext.isRestored()).as("Expected the context to NOT be restored", new Object[0])).isFalse();
        Assertions.assertThat((Object)operatorStateBackend).isNotNull();
        Assertions.assertThat((Object)keyedStateBackend).isNotNull();
        Assertions.assertThat((Object)timeServiceManager).isNotNull();
        Assertions.assertThat((Iterable)keyedStateInputs).isNotNull();
        Assertions.assertThat((Iterable)operatorStateInputs).isNotNull();
        StreamTaskStateInitializerImplTest.checkCloseablesRegistered(closeableRegistry, new Closeable[]{operatorStateBackend, keyedStateBackend, keyedStateInputs, operatorStateInputs});
        Assertions.assertThat((Iterator)keyedStateInputs.iterator()).isExhausted();
        Assertions.assertThat((Iterator)operatorStateInputs.iterator()).isExhausted();
    }

    @Test
    void testWithRestore() throws Exception {
        StateBackend mockingBackend = (StateBackend)Mockito.spy((Object)new StateBackend(){

            public <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(StateBackend.KeyedStateBackendParameters<K> parameters) throws Exception {
                return (AbstractKeyedStateBackend)Mockito.mock(AbstractKeyedStateBackend.class);
            }

            public OperatorStateBackend createOperatorStateBackend(StateBackend.OperatorStateBackendParameters parameters) throws Exception {
                return (OperatorStateBackend)Mockito.mock(OperatorStateBackend.class);
            }
        });
        OperatorID operatorID = new OperatorID(47L, 11L);
        TaskStateSnapshot taskStateSnapshot = new TaskStateSnapshot();
        Random random = new Random(66L);
        OperatorSubtaskState operatorSubtaskState = OperatorSubtaskState.builder().setManagedOperatorState((OperatorStateHandle)new OperatorStreamStateHandle(Collections.singletonMap("a", new OperatorStateHandle.StateMetaInfo(new long[]{0L, 10L}, OperatorStateHandle.Mode.SPLIT_DISTRIBUTE)), CheckpointTestUtils.createDummyStreamStateHandle(random, null))).setRawOperatorState((OperatorStateHandle)new OperatorStreamStateHandle(Collections.singletonMap("_default_", new OperatorStateHandle.StateMetaInfo(new long[]{0L, 20L, 30L}, OperatorStateHandle.Mode.SPLIT_DISTRIBUTE)), CheckpointTestUtils.createDummyStreamStateHandle(random, null))).setManagedKeyedState((KeyedStateHandle)CheckpointTestUtils.createDummyKeyGroupStateHandle(random, null)).setRawKeyedState((KeyedStateHandle)CheckpointTestUtils.createDummyKeyGroupStateHandle(random, null)).setInputChannelState(StateObjectCollection.singleton((StateObject)StateHandleDummyUtil.createNewInputChannelStateHandle(10, random))).setResultSubpartitionState(StateObjectCollection.singleton((StateObject)StateHandleDummyUtil.createNewResultSubpartitionStateHandle(10, random))).build();
        taskStateSnapshot.putSubtaskStateByOperatorID(operatorID, operatorSubtaskState);
        JobManagerTaskRestore jobManagerTaskRestore = new JobManagerTaskRestore(42L, taskStateSnapshot);
        SubTaskInitializationMetricsBuilder metricsBuilder = new SubTaskInitializationMetricsBuilder(SystemClock.getInstance().absoluteTimeMillis());
        StreamTaskStateInitializer streamTaskStateManager = this.streamTaskStateManager(mockingBackend, jobManagerTaskRestore, metricsBuilder, false);
        AbstractStreamOperator streamOperator = (AbstractStreamOperator)Mockito.mock(AbstractStreamOperator.class);
        Mockito.when((Object)streamOperator.getOperatorID()).thenReturn((Object)operatorID);
        IntSerializer typeSerializer = new IntSerializer();
        CloseableRegistry closeableRegistry = new CloseableRegistry();
        StreamOperatorStateContext stateContext = streamTaskStateManager.streamOperatorStateContext(streamOperator.getOperatorID(), streamOperator.getClass().getSimpleName(), (ProcessingTimeService)new TestProcessingTimeService(), (KeyContext)streamOperator, (TypeSerializer)typeSerializer, closeableRegistry, (MetricGroup)new UnregisteredMetricsGroup(), 1.0, false);
        OperatorStateBackend operatorStateBackend = stateContext.operatorStateBackend();
        CheckpointableKeyedStateBackend keyedStateBackend = stateContext.keyedStateBackend();
        InternalTimeServiceManager timeServiceManager = stateContext.internalTimerServiceManager();
        CloseableIterable keyedStateInputs = stateContext.rawKeyedStateInputs();
        CloseableIterable operatorStateInputs = stateContext.rawOperatorStateInputs();
        ((AbstractBooleanAssert)Assertions.assertThat((boolean)stateContext.isRestored()).as("Expected the context to be restored", new Object[0])).isTrue();
        Assertions.assertThat((OptionalLong)stateContext.getRestoredCheckpointId()).hasValue(42L);
        Assertions.assertThat((Object)operatorStateBackend).isNotNull();
        Assertions.assertThat((Object)keyedStateBackend).isNotNull();
        Assertions.assertThat((Object)timeServiceManager).isNull();
        Assertions.assertThat((Iterable)keyedStateInputs).isNotNull();
        Assertions.assertThat((Iterable)operatorStateInputs).isNotNull();
        int count = 0;
        for (KeyGroupStatePartitionStreamProvider keyedStateInput : keyedStateInputs) {
            ++count;
        }
        Assertions.assertThat((int)count).isOne();
        count = 0;
        for (StatePartitionStreamProvider operatorStateInput : operatorStateInputs) {
            ++count;
        }
        Assertions.assertThat((int)count).isEqualTo(3);
        long expectedSumLocalMemory = Stream.of(operatorSubtaskState.getManagedOperatorState().stream(), operatorSubtaskState.getManagedKeyedState().stream(), operatorSubtaskState.getRawKeyedState().stream(), operatorSubtaskState.getRawOperatorState().stream()).flatMap(i -> i).mapToLong(StateObject::getStateSize).sum();
        long expectedSumUnknown = Stream.concat(operatorSubtaskState.getInputChannelState().stream(), operatorSubtaskState.getResultSubpartitionState().stream()).mapToLong(StateObject::getStateSize).sum();
        SubTaskInitializationMetrics metrics = metricsBuilder.build();
        ((MapAssert)((MapAssert)Assertions.assertThat((Map)metrics.getDurationMetrics()).hasSize(2)).containsEntry((Object)("RestoredStateSizeBytes." + StateObject.StateObjectLocation.LOCAL_MEMORY.name()), (Object)expectedSumLocalMemory)).containsEntry((Object)("RestoredStateSizeBytes." + StateObject.StateObjectLocation.UNKNOWN.name()), (Object)expectedSumUnknown);
        StreamTaskStateInitializerImplTest.checkCloseablesRegistered(closeableRegistry, new Closeable[]{operatorStateBackend, keyedStateBackend, keyedStateInputs, operatorStateInputs});
    }

    private static void checkCloseablesRegistered(CloseableRegistry closeableRegistry, Closeable ... closeables) {
        for (Closeable closeable : closeables) {
            Assertions.assertThat((boolean)closeableRegistry.unregisterCloseable((AutoCloseable)closeable)).isTrue();
        }
    }

    private StreamTaskStateInitializer streamTaskStateManager(StateBackend stateBackend, JobManagerTaskRestore jobManagerTaskRestore, SubTaskInitializationMetricsBuilder metricsBuilder, boolean createTimerServiceManager) {
        JobID jobID = new JobID(42L, 43L);
        ExecutionAttemptID executionAttemptID = ExecutionGraphTestUtils.createExecutionAttemptId();
        TestCheckpointResponder checkpointResponderMock = new TestCheckpointResponder();
        TestTaskLocalStateStore taskLocalStateStore = new TestTaskLocalStateStore();
        InMemoryStateChangelogStorage changelogStorage = new InMemoryStateChangelogStorage();
        TaskStateManagerImpl taskStateManager = new TaskStateManagerImpl(jobID, executionAttemptID, (TaskLocalStateStore)taskLocalStateStore, null, (StateChangelogStorage)changelogStorage, new TaskExecutorStateChangelogStoragesManager(), jobManagerTaskRestore, (CheckpointResponder)checkpointResponderMock);
        DummyEnvironment dummyEnvironment = new DummyEnvironment("test-task", 1, executionAttemptID.getExecutionVertexId().getSubtaskIndex());
        dummyEnvironment.setTaskStateManager((TaskStateManager)taskStateManager);
        if (createTimerServiceManager) {
            return new StreamTaskStateInitializerImpl((Environment)dummyEnvironment, stateBackend);
        }
        return new StreamTaskStateInitializerImpl((Environment)dummyEnvironment, stateBackend, metricsBuilder, TtlTimeProvider.DEFAULT, new InternalTimeServiceManager.Provider(){

            public <K> InternalTimeServiceManager<K> create(TaskIOMetricGroup taskIOMetricGroup, CheckpointableKeyedStateBackend<K> keyedStatedBackend, ClassLoader userClassloader, KeyContext keyContext, ProcessingTimeService processingTimeService, Iterable<KeyGroupStatePartitionStreamProvider> rawKeyedStates, StreamTaskCancellationContext cancellationContext) throws Exception {
                return null;
            }
        }, StreamTaskCancellationContext.alwaysRunning());
    }
}

