/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.api.operators;

import java.util.Optional;
import java.util.concurrent.FutureTask;
import junit.framework.TestCase;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.checkpoint.CheckpointException;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.metrics.util.InterceptingOperatorMetricGroup;
import org.apache.flink.runtime.operators.testutils.ExpectedTestException;
import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl;
import org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.streaming.api.operators.KeyContext;
import org.apache.flink.streaming.api.operators.OperatorSnapshotFutures;
import org.apache.flink.streaming.api.operators.StreamOperatorStateContext;
import org.apache.flink.streaming.api.operators.StreamOperatorStateHandler;
import org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
import org.apache.flink.util.ExceptionUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Test;

public class StreamOperatorStateHandlerTest {
    @Test
    public void testFailingBackendSnapshotMethod() throws Exception {
        long checkpointId = 42L;
        long timestamp = 1L;
        try (CloseableRegistry closeableRegistry = new CloseableRegistry();){
            StreamOperatorStateHandler stateHandler;
            StreamOperatorStateContext stateContext;
            TestStateSnapshotContextSynchronousImpl context;
            CancelableFuture resultSubpartitionStateFuture;
            CancelableFuture inputChannelStateFuture;
            CancelableFuture operatorStateRawFuture;
            CancelableFuture operatorStateManagedFuture;
            CancelableFuture keyedStateRawFuture;
            CancelableFuture keyedStateManagedFuture;
            block14: {
                keyedStateManagedFuture = new CancelableFuture();
                keyedStateRawFuture = new CancelableFuture();
                operatorStateManagedFuture = new CancelableFuture();
                operatorStateRawFuture = new CancelableFuture();
                inputChannelStateFuture = new CancelableFuture();
                resultSubpartitionStateFuture = new CancelableFuture();
                OperatorSnapshotFutures operatorSnapshotResult = new OperatorSnapshotFutures(keyedStateManagedFuture, keyedStateRawFuture, operatorStateManagedFuture, operatorStateRawFuture, inputChannelStateFuture, resultSubpartitionStateFuture);
                context = new TestStateSnapshotContextSynchronousImpl(42L, 1L, closeableRegistry);
                context.getRawKeyedOperatorStateOutput();
                context.getRawOperatorStateOutput();
                StreamTaskStateInitializerImpl stateInitializer = new StreamTaskStateInitializerImpl((Environment)new MockEnvironmentBuilder().build(), (StateBackend)new MemoryStateBackend());
                stateContext = stateInitializer.streamOperatorStateContext(new OperatorID(), "whatever", (ProcessingTimeService)new TestProcessingTimeService(), (KeyContext)new UnUsedKeyContext(), (TypeSerializer)IntSerializer.INSTANCE, closeableRegistry, (MetricGroup)new InterceptingOperatorMetricGroup(), 1.0, false);
                stateHandler = new StreamOperatorStateHandler(stateContext, new ExecutionConfig(), closeableRegistry);
                String keyedStateField = "keyedStateField";
                String operatorStateField = "operatorStateField";
                StreamOperatorStateHandler.CheckpointedStreamOperator checkpointedStreamOperator = new StreamOperatorStateHandler.CheckpointedStreamOperator(){

                    public void initializeState(StateInitializationContext context) throws Exception {
                        context.getKeyedStateStore().getState(new ValueStateDescriptor("keyedStateField", (TypeSerializer)LongSerializer.INSTANCE)).update((Object)42L);
                        context.getOperatorStateStore().getListState(new ListStateDescriptor("operatorStateField", (TypeSerializer)LongSerializer.INSTANCE)).add((Object)42L);
                    }

                    public void snapshotState(StateSnapshotContext context) throws Exception {
                        throw new ExpectedTestException();
                    }
                };
                stateHandler.setCurrentKey((Object)"44");
                stateHandler.initializeOperatorState(checkpointedStreamOperator);
                MatcherAssert.assertThat((Object)stateContext.operatorStateBackend().getRegisteredStateNames(), (Matcher)CoreMatchers.is((Matcher)CoreMatchers.not((Matcher)Matchers.empty())));
                MatcherAssert.assertThat((Object)((AbstractKeyedStateBackend)stateContext.keyedStateBackend()).numKeyValueStatesByName(), (Matcher)CoreMatchers.equalTo((Object)1));
                try {
                    stateHandler.snapshotState(checkpointedStreamOperator, Optional.of(stateContext.internalTimerServiceManager()), "42", 42L, 42L, CheckpointOptions.forCheckpointWithDefaultLocation(), (CheckpointStreamFactory)new MemCheckpointStreamFactory(1024), operatorSnapshotResult, (StateSnapshotContextSynchronousImpl)context, false);
                    Assert.fail((String)"Exception expected.");
                }
                catch (CheckpointException e) {
                    if (ExceptionUtils.findThrowableWithMessage((Throwable)e, (String)"Expected Test Exception").isPresent()) break block14;
                    throw e;
                }
            }
            TestCase.assertTrue((boolean)keyedStateManagedFuture.isCancelled());
            TestCase.assertTrue((boolean)keyedStateRawFuture.isCancelled());
            TestCase.assertTrue((boolean)context.getKeyedStateStreamFuture().isCancelled());
            TestCase.assertTrue((boolean)operatorStateManagedFuture.isCancelled());
            TestCase.assertTrue((boolean)operatorStateRawFuture.isCancelled());
            TestCase.assertTrue((boolean)context.getOperatorStateStreamFuture().isCancelled());
            TestCase.assertTrue((boolean)inputChannelStateFuture.isCancelled());
            TestCase.assertTrue((boolean)resultSubpartitionStateFuture.isCancelled());
            stateHandler.dispose();
            MatcherAssert.assertThat((Object)stateContext.operatorStateBackend().getRegisteredBroadcastStateNames(), (Matcher)CoreMatchers.is((Matcher)Matchers.empty()));
            MatcherAssert.assertThat((Object)stateContext.operatorStateBackend().getRegisteredStateNames(), (Matcher)CoreMatchers.is((Matcher)Matchers.empty()));
            MatcherAssert.assertThat((Object)((AbstractKeyedStateBackend)stateContext.keyedStateBackend()).numKeyValueStatesByName(), (Matcher)CoreMatchers.equalTo((Object)0));
        }
    }

    private static class UnUsedKeyContext
    implements KeyContext {
        private UnUsedKeyContext() {
        }

        public void setCurrentKey(Object key) {
            throw new UnsupportedOperationException();
        }

        public Object getCurrentKey() {
            throw new UnsupportedOperationException();
        }
    }

    private static class CancelableFuture<T>
    extends FutureTask<T> {
        public CancelableFuture() {
            super(() -> {
                throw new UnsupportedOperationException();
            });
        }
    }

    private static class TestStateSnapshotContextSynchronousImpl
    extends StateSnapshotContextSynchronousImpl {
        public TestStateSnapshotContextSynchronousImpl(long checkpointId, long timestamp, CloseableRegistry closeableRegistry) {
            super(checkpointId, timestamp, (CheckpointStreamFactory)new MemCheckpointStreamFactory(1024), new KeyGroupRange(0, 2), closeableRegistry);
            this.keyedStateCheckpointClosingFuture = new CancelableFuture();
            this.operatorStateCheckpointClosingFuture = new CancelableFuture();
        }
    }
}

