/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.state;

import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
import java.io.IOException;
import java.io.OutputStream;
import java.io.Serializable;
import java.lang.invoke.CallSite;
import java.time.Duration;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.PrimitiveIterator;
import java.util.Random;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.RunnableFuture;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
import org.apache.commons.io.output.ByteArrayOutputStream;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.serialization.SerializerConfigImpl;
import org.apache.flink.api.common.state.AggregatingState;
import org.apache.flink.api.common.state.AggregatingStateDescriptor;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ReducingState;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.DoubleSerializer;
import org.apache.flink.api.common.typeutils.base.FloatSerializer;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.api.java.typeutils.GenericTypeInfo;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.api.java.typeutils.runtime.PojoSerializer;
import org.apache.flink.api.java.typeutils.runtime.kryo.JavaSerializer;
import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.configuration.StateLatencyTrackOptions;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.core.testutils.CheckedThread;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.flink.queryablestate.KvStateID;
import org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.StateAssignmentOperation;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
import org.apache.flink.runtime.operators.testutils.MockEnvironment;
import org.apache.flink.runtime.query.KvStateRegistry;
import org.apache.flink.runtime.query.KvStateRegistryListener;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
import org.apache.flink.runtime.state.AbstractStateBackend;
import org.apache.flink.runtime.state.CheckpointStorage;
import org.apache.flink.runtime.state.CheckpointStorageAccess;
import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.CheckpointableKeyedStateBackend;
import org.apache.flink.runtime.state.ConfigurableStateBackend;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue;
import org.apache.flink.runtime.state.KeyedStateBackendParametersImpl;
import org.apache.flink.runtime.state.KeyedStateFunction;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.SharedStateRegistry;
import org.apache.flink.runtime.state.SharedStateRegistryImpl;
import org.apache.flink.runtime.state.SnapshotResult;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.StateSnapshotTransformerTest;
import org.apache.flink.runtime.state.TestTaskStateManager;
import org.apache.flink.runtime.state.TestableKeyedStateBackend;
import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.runtime.state.VoidNamespaceSerializer;
import org.apache.flink.runtime.state.heap.AbstractHeapState;
import org.apache.flink.runtime.state.heap.StateTable;
import org.apache.flink.runtime.state.internal.InternalAggregatingState;
import org.apache.flink.runtime.state.internal.InternalKvState;
import org.apache.flink.runtime.state.internal.InternalListState;
import org.apache.flink.runtime.state.internal.InternalMapState;
import org.apache.flink.runtime.state.internal.InternalReducingState;
import org.apache.flink.runtime.state.internal.InternalValueState;
import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
import org.apache.flink.runtime.testutils.statemigration.TestType;
import org.apache.flink.runtime.util.BlockerCheckpointStreamFactory;
import org.apache.flink.shaded.guava32.com.google.common.base.Joiner;
import org.apache.flink.types.IntValue;
import org.apache.flink.util.CloseableIterator;
import org.apache.flink.util.IOUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.StateMigrationException;
import org.assertj.core.api.AbstractBooleanAssert;
import org.assertj.core.api.AbstractIntegerAssert;
import org.assertj.core.api.AbstractStringAssert;
import org.assertj.core.api.AbstractThrowableAssert;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.Assumptions;
import org.assertj.core.api.IterableAssert;
import org.assertj.core.api.ObjectAssert;
import org.assertj.core.api.ThrowingConsumer;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.TestTemplate;

public abstract class StateBackendTestBase<B extends AbstractStateBackend> {
    private CheckpointStreamFactory checkpointStreamFactory;
    protected MockEnvironment env;

    @BeforeEach
    void before() throws Exception {
        this.env = this.buildMockEnv();
    }

    @AfterEach
    void after() {
        IOUtils.closeQuietly((AutoCloseable)this.env);
    }

    protected abstract ConfigurableStateBackend getStateBackend() throws Exception;

    protected CheckpointStorage getCheckpointStorage() throws Exception {
        ConfigurableStateBackend stateBackend = this.getStateBackend();
        if (stateBackend instanceof CheckpointStorage) {
            return (CheckpointStorage)stateBackend;
        }
        throw new IllegalStateException("The state backend under test does not implement CheckpointStorage.Please override 'createCheckpointStorage' and provide an appropriatecheckpoint storage instance");
    }

    protected CheckpointStorageAccess getCheckpointStorageAccess() throws Exception {
        return this.getCheckpointStorage().createCheckpointStorage(new JobID());
    }

    protected abstract boolean isSerializerPresenceRequiredOnRestore();

    protected abstract boolean supportsAsynchronousSnapshots();

    protected CheckpointStreamFactory createStreamFactory() throws Exception {
        if (this.checkpointStreamFactory == null) {
            this.checkpointStreamFactory = this.getCheckpointStorage().createCheckpointStorage(new JobID()).resolveCheckpointStorageLocation(1L, CheckpointStorageLocationReference.getDefault());
        }
        return this.checkpointStreamFactory;
    }

    protected <K> CheckpointableKeyedStateBackend<K> createKeyedBackend(TypeSerializer<K> keySerializer) throws Exception {
        return this.createKeyedBackend(keySerializer, this.env);
    }

    protected <K> CheckpointableKeyedStateBackend<K> createKeyedBackend(TypeSerializer<K> keySerializer, Environment env) throws Exception {
        return this.createKeyedBackend(keySerializer, 10, new KeyGroupRange(0, 9), env);
    }

    protected <K> CheckpointableKeyedStateBackend<K> createKeyedBackend(TypeSerializer<K> keySerializer, int numberOfKeyGroups, KeyGroupRange keyGroupRange, Environment env) throws Exception {
        env.setCheckpointStorageAccess(this.getCheckpointStorageAccess());
        CheckpointableKeyedStateBackend backend = this.getStateBackend().createKeyedStateBackend((StateBackend.KeyedStateBackendParameters)new KeyedStateBackendParametersImpl(env, new JobID(), "test_op", keySerializer, numberOfKeyGroups, keyGroupRange, env.getTaskKvStateRegistry(), TtlTimeProvider.DEFAULT, this.getMetricGroup(), this.getCustomInitializationMetrics(), Collections.emptyList(), new CloseableRegistry(), 1.0));
        return backend;
    }

    protected StateBackend.CustomInitializationMetrics getCustomInitializationMetrics() {
        return (name, value) -> {};
    }

    protected <K> CheckpointableKeyedStateBackend<K> restoreKeyedBackend(TypeSerializer<K> keySerializer, KeyedStateHandle state) throws Exception {
        return this.restoreKeyedBackend(keySerializer, state, this.env);
    }

    protected <K> CheckpointableKeyedStateBackend<K> restoreKeyedBackend(TypeSerializer<K> keySerializer, KeyedStateHandle state, Environment env) throws Exception {
        return this.restoreKeyedBackend(keySerializer, 10, new KeyGroupRange(0, 9), Collections.singletonList(state), env);
    }

    protected <K> CheckpointableKeyedStateBackend<K> restoreKeyedBackend(TypeSerializer<K> keySerializer, int numberOfKeyGroups, KeyGroupRange keyGroupRange, List<KeyedStateHandle> state, Environment env) throws Exception {
        return this.getStateBackend().createKeyedStateBackend((StateBackend.KeyedStateBackendParameters)new KeyedStateBackendParametersImpl(env, new JobID(), "test_op", keySerializer, numberOfKeyGroups, keyGroupRange, env.getTaskKvStateRegistry(), TtlTimeProvider.DEFAULT, this.getMetricGroup(), this.getCustomInitializationMetrics(), state, new CloseableRegistry(), 1.0));
    }

    protected MetricGroup getMetricGroup() {
        return new UnregisteredMetricsGroup();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @TestTemplate
    void testEnableStateLatencyTracking() throws Exception {
        ConfigurableStateBackend stateBackend = this.getStateBackend();
        Configuration config = new Configuration();
        config.set(StateLatencyTrackOptions.LATENCY_TRACK_ENABLED, (Object)true);
        StateBackend configuredBackend = stateBackend.configure((ReadableConfig)config, Thread.currentThread().getContextClassLoader());
        KeyGroupRange groupRange = new KeyGroupRange(0, 1);
        JobID jobID = new JobID();
        int numberOfKeyGroups = groupRange.getNumberOfKeyGroups();
        TaskKvStateRegistry kvStateRegistry = this.env.getTaskKvStateRegistry();
        CloseableRegistry cancelStreamRegistry = new CloseableRegistry();
        CheckpointableKeyedStateBackend keyedStateBackend = configuredBackend.createKeyedStateBackend((StateBackend.KeyedStateBackendParameters)new KeyedStateBackendParametersImpl((Environment)this.env, jobID, "test_op", (TypeSerializer)IntSerializer.INSTANCE, numberOfKeyGroups, groupRange, kvStateRegistry, TtlTimeProvider.DEFAULT, this.getMetricGroup(), this.getCustomInitializationMetrics(), Collections.emptyList(), cancelStreamRegistry, 1.0));
        try {
            CheckpointableKeyedStateBackend nested = keyedStateBackend instanceof TestableKeyedStateBackend ? ((TestableKeyedStateBackend)keyedStateBackend).getDelegatedKeyedStateBackend(true) : keyedStateBackend;
            Assertions.assertThat((boolean)((AbstractKeyedStateBackend)nested).getLatencyTrackingStateConfig().isEnabled()).isTrue();
        }
        finally {
            IOUtils.closeQuietly((AutoCloseable)keyedStateBackend);
            keyedStateBackend.dispose();
        }
    }

    @TestTemplate
    void testIsSafeToReuseState() throws Exception {
        CheckpointableKeyedStateBackend backend = this.createKeyedBackend((TypeSerializer)IntSerializer.INSTANCE);
        try {
            Assertions.assertThat((boolean)backend.isSafeToReuseKVState()).isEqualTo(this.isSafeToReuseKVState());
        }
        finally {
            IOUtils.closeQuietly(backend);
            backend.dispose();
        }
    }

    @TestTemplate
    void testKeyGroupedInternalPriorityQueue() throws Exception {
        this.testKeyGroupedInternalPriorityQueue(false);
    }

    @TestTemplate
    void testKeyGroupedInternalPriorityQueueAddAll() throws Exception {
        this.testKeyGroupedInternalPriorityQueue(true);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void testKeyGroupedInternalPriorityQueue(boolean addAll) throws Exception {
        String fieldName = "key-grouped-priority-queue";
        CheckpointableKeyedStateBackend backend = this.createKeyedBackend((TypeSerializer)IntSerializer.INSTANCE);
        try {
            KeyGroupedInternalPriorityQueue priorityQueue = backend.create(fieldName, (TypeSerializer)new TestType.V1TestTypeSerializer());
            TestType elementA42 = new TestType("a", 42);
            TestType elementA44 = new TestType("a", 44);
            TestType elementB1 = new TestType("b", 1);
            TestType elementB3 = new TestType("b", 3);
            TestType[] elements = new TestType[]{elementA44, elementB1, elementB1, elementB3, elementA42};
            if (addAll) {
                priorityQueue.addAll(Arrays.asList(elements));
            } else {
                Assertions.assertThat((boolean)priorityQueue.add((Object)elements[0])).isTrue();
                Assertions.assertThat((boolean)priorityQueue.add((Object)elements[1])).isTrue();
                Assertions.assertThat((boolean)priorityQueue.add((Object)elements[2])).isFalse();
                Assertions.assertThat((boolean)priorityQueue.add((Object)elements[3])).isFalse();
                Assertions.assertThat((boolean)priorityQueue.add((Object)elements[4])).isFalse();
            }
            Assertions.assertThat((boolean)priorityQueue.isEmpty()).isFalse();
            Assertions.assertThat((Collection)priorityQueue.getSubsetForKeyGroup(1)).containsExactlyInAnyOrder((Object[])new TestType[]{elementA42, elementA44});
            Assertions.assertThat((Collection)priorityQueue.getSubsetForKeyGroup(8)).containsExactlyInAnyOrder((Object[])new TestType[]{elementB1, elementB3});
            Assertions.assertThat((Object)((Object)((TestType)((Object)priorityQueue.peek())))).isEqualTo((Object)elementB1);
            Assertions.assertThat((Object)((Object)((TestType)((Object)priorityQueue.poll())))).isEqualTo((Object)elementB1);
            Assertions.assertThat((Object)((Object)((TestType)((Object)priorityQueue.peek())))).isEqualTo((Object)elementB3);
            ArrayList actualList = new ArrayList();
            try (CloseableIterator iterator = priorityQueue.iterator();){
                iterator.forEachRemaining(actualList::add);
            }
            Assertions.assertThat(actualList).containsExactlyInAnyOrder((Object[])new TestType[]{elementB3, elementA42, elementA44});
            Assertions.assertThat((int)priorityQueue.size()).isEqualTo(3);
            Assertions.assertThat((boolean)priorityQueue.remove((Object)elementB1)).isFalse();
            Assertions.assertThat((boolean)priorityQueue.remove((Object)elementB3)).isTrue();
            Assertions.assertThat((Object)((Object)((TestType)((Object)priorityQueue.peek())))).isEqualTo((Object)elementA42);
        }
        finally {
            IOUtils.closeQuietly(backend);
            backend.dispose();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @TestTemplate
    void testGetKeys() throws Exception {
        int namespace1ElementsNum = 1000;
        int namespace2ElementsNum = 1000;
        String fieldName = "get-keys-test";
        CheckpointableKeyedStateBackend backend = this.createKeyedBackend((TypeSerializer)IntSerializer.INSTANCE);
        try {
            int expectedKey;
            PrimitiveIterator.OfInt actualIterator;
            String ns1 = "ns1";
            ValueState keyedState1 = (ValueState)backend.getPartitionedState((Object)"ns1", (TypeSerializer)StringSerializer.INSTANCE, (StateDescriptor)new ValueStateDescriptor(fieldName, (TypeSerializer)IntSerializer.INSTANCE));
            for (int key = 0; key < 1000; ++key) {
                backend.setCurrentKey((Object)key);
                keyedState1.update((Object)(key * 2));
            }
            String ns2 = "ns2";
            ValueState keyedState2 = (ValueState)backend.getPartitionedState((Object)"ns2", (TypeSerializer)StringSerializer.INSTANCE, (StateDescriptor)new ValueStateDescriptor(fieldName, (TypeSerializer)IntSerializer.INSTANCE));
            for (int key = 1000; key < 2000; ++key) {
                backend.setCurrentKey((Object)key);
                keyedState2.update((Object)(key * 2));
            }
            try (Stream<Integer> keysStream = backend.getKeys(fieldName, (Object)"ns1").sorted();){
                actualIterator = keysStream.mapToInt(value -> value).iterator();
                for (expectedKey = 0; expectedKey < 1000; ++expectedKey) {
                    Assertions.assertThat((Iterator)actualIterator).hasNext();
                    Assertions.assertThat((int)actualIterator.nextInt()).isEqualTo(expectedKey);
                }
                Assertions.assertThat((Iterator)actualIterator).isExhausted();
            }
            keysStream = backend.getKeys(fieldName, (Object)"ns2").sorted();
            try {
                actualIterator = keysStream.mapToInt(value -> value).iterator();
                for (expectedKey = 1000; expectedKey < 2000; ++expectedKey) {
                    Assertions.assertThat((Iterator)actualIterator).hasNext();
                    Assertions.assertThat((int)actualIterator.nextInt()).isEqualTo(expectedKey);
                }
                Assertions.assertThat((Iterator)actualIterator).isExhausted();
            }
            finally {
                if (keysStream != null) {
                    keysStream.close();
                }
            }
        }
        finally {
            IOUtils.closeQuietly(backend);
            backend.dispose();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @TestTemplate
    void testGetKeysAndNamespaces() throws Exception {
        int elementsNum = 1000;
        String fieldName = "get-keys-test";
        CheckpointableKeyedStateBackend backend = this.createKeyedBackend((TypeSerializer)IntSerializer.INSTANCE);
        try {
            String[] namespaces = new String[]{"ns1", "ns2"};
            InternalValueState keyedState = (InternalValueState)backend.createOrUpdateInternalState((TypeSerializer)StringSerializer.INSTANCE, (StateDescriptor)new ValueStateDescriptor(fieldName, (TypeSerializer)IntSerializer.INSTANCE));
            for (int key = 0; key < 1000; ++key) {
                backend.setCurrentKey((Object)key);
                for (String ns : namespaces) {
                    keyedState.setCurrentNamespace((Object)ns);
                    keyedState.update((Object)(key * 2));
                }
            }
            try (Stream stream = backend.getKeysAndNamespaces(fieldName);){
                HashMap keysByNamespace = new HashMap();
                stream.forEach(entry -> {
                    ((AbstractStringAssert)Assertions.assertThat((String)((String)entry.f1)).withFailMessage("Unexpected namespace", new Object[0])).isIn((Object[])namespaces);
                    ((AbstractIntegerAssert)Assertions.assertThat((Integer)((Integer)entry.f0)).withFailMessage("Unexpected key", new Object[0])).isGreaterThanOrEqualTo(0).isLessThan(1000);
                    Set keys = keysByNamespace.computeIfAbsent((String)entry.f1, k -> new HashSet());
                    ((AbstractBooleanAssert)Assertions.assertThat((boolean)keys.add((Integer)entry.f0)).withFailMessage("Duplicate key for namespace", new Object[0])).isTrue();
                });
                ((AbstractIntegerAssert)Assertions.assertThat((int)keysByNamespace.size()).withFailMessage("Unexpected namespaces count", new Object[0])).isEqualTo(namespaces.length);
            }
            KeyedStateHandle snapshot = StateBackendTestBase.runSnapshot(backend.snapshot(682375462378L, 2L, this.createStreamFactory(), CheckpointOptions.forCheckpointWithDefaultLocation()), (SharedStateRegistry)new SharedStateRegistryImpl());
            IOUtils.closeQuietly(backend);
            backend.dispose();
            backend = this.restoreKeyedBackend((TypeSerializer)IntSerializer.INSTANCE, snapshot, this.env);
            try (Stream stream = backend.getKeysAndNamespaces(fieldName);){
                HashMap keysByNamespace = new HashMap();
                stream.forEach(entry -> {
                    ((AbstractStringAssert)Assertions.assertThat((String)((String)entry.f1)).withFailMessage("Unexpected namespace", new Object[0])).isIn((Object[])namespaces);
                    ((AbstractIntegerAssert)Assertions.assertThat((Integer)((Integer)entry.f0)).withFailMessage("Unexpected key", new Object[0])).isGreaterThanOrEqualTo(0).isLessThan(1000);
                    Set keys = keysByNamespace.computeIfAbsent((String)entry.f1, k -> new HashSet());
                    ((AbstractBooleanAssert)Assertions.assertThat((boolean)keys.add((Integer)entry.f0)).withFailMessage("Duplicate key for namespace", new Object[0])).isTrue();
                });
                ((AbstractIntegerAssert)Assertions.assertThat((int)keysByNamespace.size()).withFailMessage("Unexpected namespaces count", new Object[0])).isEqualTo(namespaces.length);
            }
        }
        finally {
            IOUtils.closeQuietly(backend);
            backend.dispose();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @TestTemplate
    void testBackendUsesRegisteredKryoDefaultSerializer() throws Exception {
        CheckpointStreamFactory streamFactory = this.createStreamFactory();
        SharedStateRegistryImpl sharedStateRegistry = new SharedStateRegistryImpl();
        CheckpointableKeyedStateBackend backend = this.createKeyedBackend((TypeSerializer)IntSerializer.INSTANCE, this.env);
        try {
            ((SerializerConfigImpl)this.env.getExecutionConfig().getSerializerConfig()).addDefaultKryoSerializer(TestPojo.class, ExceptionThrowingTestSerializer.class);
            GenericTypeInfo pojoType = new GenericTypeInfo(TestPojo.class);
            Assertions.assertThat((Object)pojoType.createSerializer(this.env.getExecutionConfig().getSerializerConfig())).isInstanceOf(KryoSerializer.class);
            ValueStateDescriptor kvId = new ValueStateDescriptor("id", (TypeInformation)pojoType);
            ValueState state = (ValueState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
            int numExceptions = 0;
            backend.setCurrentKey((Object)1);
            try {
                state.update((Object)new TestPojo("u1", 1));
            }
            catch (ExpectedKryoTestException e) {
                ++numExceptions;
            }
            catch (Exception e) {
                if (e.getCause() instanceof ExpectedKryoTestException) {
                    ++numExceptions;
                }
                throw e;
            }
            try {
                StateBackendTestBase.runSnapshot(backend.snapshot(682375462378L, 2L, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()), (SharedStateRegistry)sharedStateRegistry);
            }
            catch (ExpectedKryoTestException e) {
                ++numExceptions;
            }
            catch (Exception e) {
                if (e.getCause() instanceof ExpectedKryoTestException) {
                    ++numExceptions;
                }
                throw e;
            }
            ((AbstractIntegerAssert)Assertions.assertThat((int)numExceptions).withFailMessage("Didn't see the expected Kryo exception.", new Object[0])).isGreaterThan(0);
        }
        finally {
            IOUtils.closeQuietly(backend);
            backend.dispose();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @TestTemplate
    void testBackendUsesRegisteredKryoDefaultSerializerUsingGetOrCreate() throws Exception {
        CheckpointStreamFactory streamFactory = this.createStreamFactory();
        SharedStateRegistryImpl sharedStateRegistry = new SharedStateRegistryImpl();
        CheckpointableKeyedStateBackend backend = this.createKeyedBackend((TypeSerializer)IntSerializer.INSTANCE, this.env);
        try {
            ((SerializerConfigImpl)this.env.getExecutionConfig().getSerializerConfig()).addDefaultKryoSerializer(TestPojo.class, ExceptionThrowingTestSerializer.class);
            GenericTypeInfo pojoType = new GenericTypeInfo(TestPojo.class);
            Assertions.assertThat((Object)pojoType.createSerializer(this.env.getExecutionConfig().getSerializerConfig())).isInstanceOf(KryoSerializer.class);
            pojoType.createSerializer(this.env.getExecutionConfig().getSerializerConfig());
            ValueStateDescriptor kvId = new ValueStateDescriptor("id", (TypeInformation)pojoType);
            ValueState state = (ValueState)backend.getOrCreateKeyedState((TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
            Assertions.assertThat((Object)state).isInstanceOf(InternalValueState.class);
            ((InternalValueState)state).setCurrentNamespace((Object)VoidNamespace.INSTANCE);
            int numExceptions = 0;
            backend.setCurrentKey((Object)1);
            try {
                state.update((Object)new TestPojo("u1", 1));
            }
            catch (ExpectedKryoTestException e) {
                ++numExceptions;
            }
            catch (Exception e) {
                if (e.getCause() instanceof ExpectedKryoTestException) {
                    ++numExceptions;
                }
                throw e;
            }
            try {
                StateBackendTestBase.runSnapshot(backend.snapshot(682375462378L, 2L, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()), (SharedStateRegistry)sharedStateRegistry);
            }
            catch (ExpectedKryoTestException e) {
                ++numExceptions;
            }
            catch (Exception e) {
                if (e.getCause() instanceof ExpectedKryoTestException) {
                    ++numExceptions;
                }
                throw e;
            }
            ((AbstractIntegerAssert)Assertions.assertThat((int)numExceptions).withFailMessage("Didn't see the expected Kryo exception.", new Object[0])).isGreaterThan(0);
        }
        finally {
            IOUtils.closeQuietly(backend);
            backend.dispose();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @TestTemplate
    void testBackendUsesRegisteredKryoSerializer() throws Exception {
        CheckpointStreamFactory streamFactory = this.createStreamFactory();
        SharedStateRegistryImpl sharedStateRegistry = new SharedStateRegistryImpl();
        ((SerializerConfigImpl)this.env.getExecutionConfig().getSerializerConfig()).registerTypeWithKryoSerializer(TestPojo.class, ExceptionThrowingTestSerializer.class);
        GenericTypeInfo pojoType = new GenericTypeInfo(TestPojo.class);
        Assertions.assertThat((Object)pojoType.createSerializer(this.env.getExecutionConfig().getSerializerConfig())).isInstanceOf(KryoSerializer.class);
        ValueStateDescriptor kvId = new ValueStateDescriptor("id", (TypeInformation)pojoType);
        CheckpointableKeyedStateBackend backend = this.createKeyedBackend((TypeSerializer)IntSerializer.INSTANCE, this.env);
        try {
            ValueState state = (ValueState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
            int numExceptions = 0;
            backend.setCurrentKey((Object)1);
            try {
                state.update((Object)new TestPojo("u1", 1));
            }
            catch (ExpectedKryoTestException e) {
                ++numExceptions;
            }
            catch (Exception e) {
                if (e.getCause() instanceof ExpectedKryoTestException) {
                    ++numExceptions;
                }
                throw e;
            }
            try {
                StateBackendTestBase.runSnapshot(backend.snapshot(682375462378L, 2L, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()), (SharedStateRegistry)sharedStateRegistry);
            }
            catch (ExpectedKryoTestException e) {
                ++numExceptions;
            }
            catch (Exception e) {
                if (e.getCause() instanceof ExpectedKryoTestException) {
                    ++numExceptions;
                }
                throw e;
            }
            ((AbstractIntegerAssert)Assertions.assertThat((int)numExceptions).withFailMessage("Didn't see the expected Kryo exception.", new Object[0])).isGreaterThan(0);
        }
        finally {
            IOUtils.closeQuietly(backend);
            backend.dispose();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @TestTemplate
    void testBackendUsesRegisteredKryoSerializerUsingGetOrCreate() throws Exception {
        CheckpointStreamFactory streamFactory = this.createStreamFactory();
        SharedStateRegistryImpl sharedStateRegistry = new SharedStateRegistryImpl();
        ((SerializerConfigImpl)this.env.getExecutionConfig().getSerializerConfig()).registerTypeWithKryoSerializer(TestPojo.class, ExceptionThrowingTestSerializer.class);
        GenericTypeInfo pojoType = new GenericTypeInfo(TestPojo.class);
        Assertions.assertThat((Object)pojoType.createSerializer(this.env.getExecutionConfig().getSerializerConfig())).isInstanceOf(KryoSerializer.class);
        ValueStateDescriptor kvId = new ValueStateDescriptor("id", (TypeInformation)pojoType);
        CheckpointableKeyedStateBackend backend = this.createKeyedBackend((TypeSerializer)IntSerializer.INSTANCE, this.env);
        try {
            ValueState state = (ValueState)backend.getOrCreateKeyedState((TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
            Assertions.assertThat((Object)state).isInstanceOf(InternalValueState.class);
            ((InternalValueState)state).setCurrentNamespace((Object)VoidNamespace.INSTANCE);
            int numExceptions = 0;
            backend.setCurrentKey((Object)1);
            try {
                state.update((Object)new TestPojo("u1", 1));
            }
            catch (ExpectedKryoTestException e) {
                ++numExceptions;
            }
            catch (Exception e) {
                if (e.getCause() instanceof ExpectedKryoTestException) {
                    ++numExceptions;
                }
                throw e;
            }
            try {
                StateBackendTestBase.runSnapshot(backend.snapshot(682375462378L, 2L, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()), (SharedStateRegistry)sharedStateRegistry);
            }
            catch (ExpectedKryoTestException e) {
                ++numExceptions;
            }
            catch (Exception e) {
                if (e.getCause() instanceof ExpectedKryoTestException) {
                    ++numExceptions;
                }
                throw e;
            }
            ((AbstractIntegerAssert)Assertions.assertThat((int)numExceptions).withFailMessage("Didn't see the expected Kryo exception.", new Object[0])).isGreaterThan(0);
        }
        finally {
            IOUtils.closeQuietly(backend);
            backend.dispose();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @TestTemplate
    void testKryoRegisteringRestoreResilienceWithRegisteredType() throws Exception {
        CheckpointStreamFactory streamFactory = this.createStreamFactory();
        SharedStateRegistryImpl sharedStateRegistry = new SharedStateRegistryImpl();
        GenericTypeInfo pojoType = new GenericTypeInfo(TestPojo.class);
        Assertions.assertThat((Object)pojoType.createSerializer(this.env.getExecutionConfig().getSerializerConfig())).isInstanceOf(KryoSerializer.class);
        ValueStateDescriptor kvId = new ValueStateDescriptor("id", (TypeInformation)pojoType);
        CheckpointableKeyedStateBackend backend = this.createKeyedBackend((TypeSerializer)IntSerializer.INSTANCE, this.env);
        try {
            ValueState state = (ValueState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
            backend.setCurrentKey((Object)1);
            state.update((Object)new TestPojo("u1", 1));
            backend.setCurrentKey((Object)2);
            state.update((Object)new TestPojo("u2", 2));
            KeyedStateHandle snapshot = StateBackendTestBase.runSnapshot(backend.snapshot(682375462378L, 2L, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()), (SharedStateRegistry)sharedStateRegistry);
            IOUtils.closeQuietly(backend);
            backend.dispose();
            ((SerializerConfigImpl)this.env.getExecutionConfig().getSerializerConfig()).registerKryoType(TestPojo.class);
            backend = this.restoreKeyedBackend((TypeSerializer)IntSerializer.INSTANCE, snapshot, this.env);
            snapshot.discardState();
            state = (ValueState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
            backend.setCurrentKey((Object)1);
            Assertions.assertThat((Object)((TestPojo)state.value())).isEqualTo((Object)new TestPojo("u1", 1));
            backend.setCurrentKey((Object)2);
            Assertions.assertThat((Object)((TestPojo)state.value())).isEqualTo((Object)new TestPojo("u2", 2));
        }
        finally {
            IOUtils.closeQuietly(backend);
            backend.dispose();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @TestTemplate
    void testKryoRegisteringRestoreResilienceWithDefaultSerializer() throws Exception {
        Assumptions.assumeThat((boolean)this.supportsMetaInfoVerification()).isTrue();
        CheckpointStreamFactory streamFactory = this.createStreamFactory();
        SharedStateRegistryImpl sharedStateRegistry = new SharedStateRegistryImpl();
        CheckpointableKeyedStateBackend backend = this.createKeyedBackend((TypeSerializer)IntSerializer.INSTANCE, this.env);
        try {
            GenericTypeInfo pojoType = new GenericTypeInfo(TestPojo.class);
            Assertions.assertThat((Object)pojoType.createSerializer(this.env.getExecutionConfig().getSerializerConfig())).isInstanceOf(KryoSerializer.class);
            ValueStateDescriptor kvId = new ValueStateDescriptor("id", (TypeInformation)pojoType);
            ValueState state = (ValueState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
            backend.setCurrentKey((Object)1);
            state.update((Object)new TestPojo("u1", 1));
            backend.setCurrentKey((Object)2);
            state.update((Object)new TestPojo("u2", 2));
            KeyedStateHandle snapshot = StateBackendTestBase.runSnapshot(backend.snapshot(682375462378L, 2L, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()), (SharedStateRegistry)sharedStateRegistry);
            backend.dispose();
            ((SerializerConfigImpl)this.env.getExecutionConfig().getSerializerConfig()).addDefaultKryoSerializer(TestPojo.class, CustomKryoTestSerializer.class);
            backend = this.restoreKeyedBackend((TypeSerializer)IntSerializer.INSTANCE, snapshot, this.env);
            kvId = new ValueStateDescriptor("id", (TypeInformation)pojoType);
            state = (ValueState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
            backend.setCurrentKey((Object)1);
            state.update((Object)new TestPojo("u1", 11));
            KeyedStateHandle snapshot2 = StateBackendTestBase.runSnapshot(backend.snapshot(682375462378L, 2L, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()), (SharedStateRegistry)sharedStateRegistry);
            snapshot.discardState();
            backend.dispose();
            ((SerializerConfigImpl)this.env.getExecutionConfig().getSerializerConfig()).addDefaultKryoSerializer(TestPojo.class, CustomKryoTestSerializer.class);
            this.assertRestoreKeyedBackendFail(snapshot2, (ValueStateDescriptor<TestPojo>)kvId);
            snapshot2.discardState();
        }
        finally {
            IOUtils.closeQuietly(backend);
            backend.dispose();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @TestTemplate
    void testKryoRegisteringRestoreResilienceWithRegisteredSerializer() throws Exception {
        Assumptions.assumeThat((boolean)this.supportsMetaInfoVerification()).isTrue();
        CheckpointStreamFactory streamFactory = this.createStreamFactory();
        SharedStateRegistryImpl sharedStateRegistry = new SharedStateRegistryImpl();
        CheckpointableKeyedStateBackend backend = null;
        try {
            backend = this.createKeyedBackend((TypeSerializer)IntSerializer.INSTANCE, this.env);
            GenericTypeInfo pojoType = new GenericTypeInfo(TestPojo.class);
            Assertions.assertThat((Object)pojoType.createSerializer(this.env.getExecutionConfig().getSerializerConfig())).isInstanceOf(KryoSerializer.class);
            ValueStateDescriptor kvId = new ValueStateDescriptor("id", (TypeInformation)pojoType);
            ValueState state = (ValueState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
            backend.setCurrentKey((Object)1);
            state.update((Object)new TestPojo("u1", 1));
            backend.setCurrentKey((Object)2);
            state.update((Object)new TestPojo("u2", 2));
            KeyedStateHandle snapshot = StateBackendTestBase.runSnapshot(backend.snapshot(682375462378L, 2L, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()), (SharedStateRegistry)sharedStateRegistry);
            backend.dispose();
            ((SerializerConfigImpl)this.env.getExecutionConfig().getSerializerConfig()).registerTypeWithKryoSerializer(TestPojo.class, CustomKryoTestSerializer.class);
            backend = this.restoreKeyedBackend((TypeSerializer)IntSerializer.INSTANCE, snapshot, this.env);
            kvId = new ValueStateDescriptor("id", (TypeInformation)pojoType);
            state = (ValueState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
            backend.setCurrentKey((Object)1);
            state.update((Object)new TestPojo("u1", 11));
            KeyedStateHandle snapshot2 = StateBackendTestBase.runSnapshot(backend.snapshot(682375462378L, 2L, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()), (SharedStateRegistry)sharedStateRegistry);
            snapshot.discardState();
            backend.dispose();
            ((SerializerConfigImpl)this.env.getExecutionConfig().getSerializerConfig()).registerTypeWithKryoSerializer(TestPojo.class, CustomKryoTestSerializer.class);
            this.assertRestoreKeyedBackendFail(snapshot2, (ValueStateDescriptor<TestPojo>)kvId);
        }
        finally {
            if (backend != null) {
                backend.dispose();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @TestTemplate
    void testKryoRestoreResilienceWithDifferentRegistrationOrder() throws Exception {
        CheckpointStreamFactory streamFactory = this.createStreamFactory();
        SharedStateRegistryImpl sharedStateRegistry = new SharedStateRegistryImpl();
        ((SerializerConfigImpl)this.env.getExecutionConfig().getSerializerConfig()).registerKryoType(TestNestedPojoClassA.class);
        ((SerializerConfigImpl)this.env.getExecutionConfig().getSerializerConfig()).registerKryoType(TestNestedPojoClassB.class);
        CheckpointableKeyedStateBackend backend = this.createKeyedBackend((TypeSerializer)IntSerializer.INSTANCE, this.env);
        try {
            GenericTypeInfo pojoType = new GenericTypeInfo(TestPojo.class);
            Assertions.assertThat((Object)pojoType.createSerializer(this.env.getExecutionConfig().getSerializerConfig())).isInstanceOf(KryoSerializer.class);
            ValueStateDescriptor kvId = new ValueStateDescriptor("id", (TypeInformation)pojoType);
            ValueState state = (ValueState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
            InternalKvState internalKvState = (InternalKvState)state;
            KryoSerializer kryoSerializer = (KryoSerializer)internalKvState.getValueSerializer();
            int mainPojoClassRegistrationId = kryoSerializer.getKryo().getRegistration(TestPojo.class).getId();
            int nestedPojoClassARegistrationId = kryoSerializer.getKryo().getRegistration(TestNestedPojoClassA.class).getId();
            int nestedPojoClassBRegistrationId = kryoSerializer.getKryo().getRegistration(TestNestedPojoClassB.class).getId();
            backend.setCurrentKey((Object)1);
            state.update((Object)new TestPojo("u1", 1, new TestNestedPojoClassA(1.0, 2), new TestNestedPojoClassB(2.3, "foo")));
            backend.setCurrentKey((Object)2);
            state.update((Object)new TestPojo("u2", 2, new TestNestedPojoClassA(2.0, 5), new TestNestedPojoClassB(3.1, "bar")));
            KeyedStateHandle snapshot = StateBackendTestBase.runSnapshot(backend.snapshot(682375462378L, 2L, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()), (SharedStateRegistry)sharedStateRegistry);
            backend.dispose();
            this.env.close();
            this.env = this.buildMockEnv();
            ((SerializerConfigImpl)this.env.getExecutionConfig().getSerializerConfig()).registerKryoType(TestNestedPojoClassB.class);
            ((SerializerConfigImpl)this.env.getExecutionConfig().getSerializerConfig()).registerKryoType(TestNestedPojoClassA.class);
            backend = this.restoreKeyedBackend((TypeSerializer)IntSerializer.INSTANCE, snapshot, this.env);
            kvId = new ValueStateDescriptor("id", (TypeInformation)pojoType);
            state = (ValueState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
            internalKvState = (InternalKvState)state;
            kryoSerializer = (KryoSerializer)internalKvState.getValueSerializer();
            Assertions.assertThat((int)kryoSerializer.getKryo().getRegistration(TestPojo.class).getId()).isEqualTo(mainPojoClassRegistrationId);
            Assertions.assertThat((int)kryoSerializer.getKryo().getRegistration(TestNestedPojoClassA.class).getId()).isEqualTo(nestedPojoClassARegistrationId);
            Assertions.assertThat((int)kryoSerializer.getKryo().getRegistration(TestNestedPojoClassB.class).getId()).isEqualTo(nestedPojoClassBRegistrationId);
            backend.setCurrentKey((Object)1);
            state.update((Object)new TestPojo("u1", 11, new TestNestedPojoClassA(22.1, 12), new TestNestedPojoClassB(1.23, "foobar")));
            StateBackendTestBase.runSnapshot(backend.snapshot(682375462378L, 2L, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()), (SharedStateRegistry)sharedStateRegistry);
            snapshot.discardState();
        }
        finally {
            backend.dispose();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @TestTemplate
    void testPojoRestoreResilienceWithDifferentRegistrationOrder() throws Exception {
        CheckpointStreamFactory streamFactory = this.createStreamFactory();
        SharedStateRegistryImpl sharedStateRegistry = new SharedStateRegistryImpl();
        ((SerializerConfigImpl)this.env.getExecutionConfig().getSerializerConfig()).registerPojoType(TestNestedPojoClassA.class);
        ((SerializerConfigImpl)this.env.getExecutionConfig().getSerializerConfig()).registerPojoType(TestNestedPojoClassB.class);
        CheckpointableKeyedStateBackend backend = this.createKeyedBackend((TypeSerializer)IntSerializer.INSTANCE, this.env);
        try {
            TypeInformation pojoType = TypeExtractor.getForClass(TestPojo.class);
            Assertions.assertThat((Object)pojoType.createSerializer(this.env.getExecutionConfig().getSerializerConfig())).isInstanceOf(PojoSerializer.class);
            ValueStateDescriptor kvId = new ValueStateDescriptor("id", pojoType);
            ValueState state = (ValueState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
            backend.setCurrentKey((Object)1);
            state.update((Object)new TestPojo("u1", 1, new TestNestedPojoClassA(1.0, 2), new TestNestedPojoClassB(2.3, "foo")));
            backend.setCurrentKey((Object)2);
            state.update((Object)new TestPojo("u2", 2, new TestNestedPojoClassA(2.0, 5), new TestNestedPojoClassB(3.1, "bar")));
            KeyedStateHandle snapshot = StateBackendTestBase.runSnapshot(backend.snapshot(682375462378L, 2L, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()), (SharedStateRegistry)sharedStateRegistry);
            backend.dispose();
            this.env.close();
            this.env = this.buildMockEnv();
            ((SerializerConfigImpl)this.env.getExecutionConfig().getSerializerConfig()).registerPojoType(TestNestedPojoClassB.class);
            ((SerializerConfigImpl)this.env.getExecutionConfig().getSerializerConfig()).registerPojoType(TestNestedPojoClassA.class);
            backend = this.restoreKeyedBackend((TypeSerializer)IntSerializer.INSTANCE, snapshot, this.env);
            kvId = new ValueStateDescriptor("id", pojoType);
            state = (ValueState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
            backend.setCurrentKey((Object)1);
            state.update((Object)new TestPojo("u1", 11, new TestNestedPojoClassA(22.1, 12), new TestNestedPojoClassB(1.23, "foobar")));
            StateBackendTestBase.runSnapshot(backend.snapshot(682375462378L, 2L, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()), (SharedStateRegistry)sharedStateRegistry);
            snapshot.discardState();
        }
        finally {
            backend.dispose();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @TestTemplate
    void testValueState() throws Exception {
        CheckpointStreamFactory streamFactory = this.createStreamFactory();
        SharedStateRegistryImpl sharedStateRegistry = new SharedStateRegistryImpl();
        ValueStateDescriptor kvId = new ValueStateDescriptor("id", String.class);
        IntSerializer keySerializer = IntSerializer.INSTANCE;
        VoidNamespaceSerializer namespaceSerializer = VoidNamespaceSerializer.INSTANCE;
        CheckpointableKeyedStateBackend backend = this.createKeyedBackend((TypeSerializer)IntSerializer.INSTANCE);
        try {
            ValueState state = (ValueState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
            InternalKvState kvState = (InternalKvState)state;
            TypeSerializer valueSerializer = kvId.getSerializer();
            backend.setCurrentKey((Object)1);
            Assertions.assertThat((String)((String)state.value())).isNull();
            Assertions.assertThat((String)((String)StateBackendTestBase.getSerializedValue(kvState, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer))).isNull();
            state.update((Object)"1");
            backend.setCurrentKey((Object)2);
            Assertions.assertThat((String)((String)state.value())).isNull();
            Assertions.assertThat((String)((String)StateBackendTestBase.getSerializedValue(kvState, 2, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer))).isNull();
            state.update((Object)"2");
            backend.setCurrentKey((Object)1);
            Assertions.assertThat((String)((String)state.value())).isEqualTo("1");
            Assertions.assertThat((String)((String)StateBackendTestBase.getSerializedValue(kvState, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer))).isEqualTo("1");
            KeyedStateHandle snapshot1 = StateBackendTestBase.runSnapshot(backend.snapshot(682375462378L, 2L, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()), (SharedStateRegistry)sharedStateRegistry);
            backend.setCurrentKey((Object)1);
            state.update((Object)"u1");
            backend.setCurrentKey((Object)2);
            state.update((Object)"u2");
            backend.setCurrentKey((Object)3);
            state.update((Object)"u3");
            KeyedStateHandle snapshot2 = StateBackendTestBase.runSnapshot(backend.snapshot(682375462379L, 4L, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()), (SharedStateRegistry)sharedStateRegistry);
            backend.setCurrentKey((Object)1);
            Assertions.assertThat((String)((String)state.value())).isEqualTo("u1");
            Assertions.assertThat((String)((String)StateBackendTestBase.getSerializedValue(kvState, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer))).isEqualTo("u1");
            backend.setCurrentKey((Object)2);
            Assertions.assertThat((String)((String)state.value())).isEqualTo("u2");
            Assertions.assertThat((String)((String)StateBackendTestBase.getSerializedValue(kvState, 2, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer))).isEqualTo("u2");
            backend.setCurrentKey((Object)3);
            Assertions.assertThat((String)((String)state.value())).isEqualTo("u3");
            Assertions.assertThat((String)((String)StateBackendTestBase.getSerializedValue(kvState, 3, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer))).isEqualTo("u3");
            backend.dispose();
            backend = this.restoreKeyedBackend((TypeSerializer)IntSerializer.INSTANCE, snapshot1);
            snapshot1.discardState();
            ValueState restored1 = (ValueState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
            InternalKvState restoredKvState1 = (InternalKvState)restored1;
            backend.setCurrentKey((Object)1);
            Assertions.assertThat((String)((String)restored1.value())).isEqualTo("1");
            Assertions.assertThat((String)((String)StateBackendTestBase.getSerializedValue(restoredKvState1, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer))).isEqualTo("1");
            backend.setCurrentKey((Object)2);
            Assertions.assertThat((String)((String)restored1.value())).isEqualTo("2");
            Assertions.assertThat((String)((String)StateBackendTestBase.getSerializedValue(restoredKvState1, 2, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer))).isEqualTo("2");
            backend.dispose();
            backend = this.restoreKeyedBackend((TypeSerializer)IntSerializer.INSTANCE, snapshot2);
            snapshot2.discardState();
            ValueState restored2 = (ValueState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
            InternalKvState restoredKvState2 = (InternalKvState)restored2;
            backend.setCurrentKey((Object)1);
            Assertions.assertThat((String)((String)restored2.value())).isEqualTo("u1");
            Assertions.assertThat((String)((String)StateBackendTestBase.getSerializedValue(restoredKvState2, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer))).isEqualTo("u1");
            backend.setCurrentKey((Object)2);
            Assertions.assertThat((String)((String)restored2.value())).isEqualTo("u2");
            Assertions.assertThat((String)((String)StateBackendTestBase.getSerializedValue(restoredKvState2, 2, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer))).isEqualTo("u2");
            backend.setCurrentKey((Object)3);
            Assertions.assertThat((String)((String)restored2.value())).isEqualTo("u3");
            Assertions.assertThat((String)((String)StateBackendTestBase.getSerializedValue(restoredKvState2, 3, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer))).isEqualTo("u3");
        }
        finally {
            IOUtils.closeQuietly(backend);
            backend.dispose();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @TestTemplate
    void testValueStateWorkWithTtl() throws Exception {
        CheckpointableKeyedStateBackend backend = this.createKeyedBackend((TypeSerializer)IntSerializer.INSTANCE);
        try {
            ValueStateDescriptor kvId = new ValueStateDescriptor("id", MutableLong.class);
            kvId.enableTimeToLive(StateTtlConfig.newBuilder((Duration)Duration.ofSeconds(1L)).build());
            ValueState state = (ValueState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
            backend.setCurrentKey((Object)1);
            state.update((Object)new MutableLong());
            state.value();
        }
        finally {
            IOUtils.closeQuietly(backend);
            backend.dispose();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @TestTemplate
    void testValueStateRace() throws Exception {
        Integer namespace = 1;
        ValueStateDescriptor kvId = new ValueStateDescriptor("id", String.class);
        IntSerializer keySerializer = IntSerializer.INSTANCE;
        IntSerializer namespaceSerializer = IntSerializer.INSTANCE;
        CheckpointableKeyedStateBackend backend = this.createKeyedBackend((TypeSerializer)IntSerializer.INSTANCE);
        try {
            final ValueState state = (ValueState)backend.getPartitionedState((Object)namespace, (TypeSerializer)IntSerializer.INSTANCE, (StateDescriptor)kvId);
            TypeSerializer valueSerializer = kvId.getSerializer();
            final InternalKvState kvState = (InternalKvState)state;
            boolean key1 = true;
            backend.setCurrentKey((Object)1);
            kvState.setCurrentNamespace((Object)2);
            state.update((Object)"2");
            Assertions.assertThat((String)((String)state.value())).isEqualTo("2");
            Assertions.assertThat((String)((String)StateBackendTestBase.getSerializedValue(kvState, 3, keySerializer, namespace, IntSerializer.INSTANCE, valueSerializer))).isNull();
            Assertions.assertThat((String)((String)state.value())).isEqualTo("2");
            kvState.setCurrentNamespace((Object)namespace);
            int key2 = 10;
            backend.setCurrentKey((Object)10);
            Assertions.assertThat((String)((String)state.value())).isNull();
            Assertions.assertThat((String)((String)StateBackendTestBase.getSerializedValue(kvState, 10, keySerializer, namespace, namespaceSerializer, valueSerializer))).isNull();
            state.update((Object)"1");
            final CheckedThread getter = new CheckedThread("State getter"){

                public void go() throws Exception {
                    while (!this.isInterrupted()) {
                        Assertions.assertThat((String)((String)state.value())).isEqualTo("1");
                    }
                }
            };
            final CheckedThread serializedGetter = new CheckedThread("Serialized state getter", (TypeSerializer)keySerializer, namespace, (TypeSerializer)namespaceSerializer, valueSerializer){
                final /* synthetic */ TypeSerializer val$keySerializer;
                final /* synthetic */ Integer val$namespace;
                final /* synthetic */ TypeSerializer val$namespaceSerializer;
                final /* synthetic */ TypeSerializer val$valueSerializer;
                {
                    this.val$keySerializer = typeSerializer;
                    this.val$namespace = n;
                    this.val$namespaceSerializer = typeSerializer2;
                    this.val$valueSerializer = typeSerializer3;
                    super(name);
                }

                public void go() throws Exception {
                    while (!this.isInterrupted() && getter.isAlive()) {
                        String serializedValue = (String)StateBackendTestBase.getSerializedValue(kvState, 10, this.val$keySerializer, this.val$namespace, this.val$namespaceSerializer, this.val$valueSerializer);
                        Assertions.assertThat((String)serializedValue).isEqualTo("1");
                    }
                }
            };
            getter.start();
            serializedGetter.start();
            Timer t = new Timer("stopper");
            t.schedule(new TimerTask(){

                @Override
                public void run() {
                    getter.interrupt();
                    serializedGetter.interrupt();
                    this.cancel();
                }
            }, 100L);
            serializedGetter.sync();
            getter.interrupt();
            getter.sync();
            t.cancel();
        }
        finally {
            IOUtils.closeQuietly(backend);
            backend.dispose();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @TestTemplate
    void testMultipleValueStates() throws Exception {
        CheckpointStreamFactory streamFactory = this.createStreamFactory();
        SharedStateRegistryImpl sharedStateRegistry = new SharedStateRegistryImpl();
        ValueStateDescriptor desc1 = new ValueStateDescriptor("a-string", (TypeSerializer)StringSerializer.INSTANCE);
        ValueStateDescriptor desc2 = new ValueStateDescriptor("an-integer", (TypeSerializer)IntSerializer.INSTANCE);
        CheckpointableKeyedStateBackend backend = this.createKeyedBackend((TypeSerializer)IntSerializer.INSTANCE, 1, new KeyGroupRange(0, 0), this.env);
        try {
            ValueState state1 = (ValueState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)desc1);
            ValueState state2 = (ValueState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)desc2);
            backend.setCurrentKey((Object)1);
            Assertions.assertThat((String)((String)state1.value())).isNull();
            Assertions.assertThat((Integer)((Integer)state2.value())).isNull();
            state1.update((Object)"1");
            Assertions.assertThat((String)((String)state1.value())).isEqualTo("1");
            Assertions.assertThat((Integer)((Integer)state2.value())).isNull();
            state2.update((Object)13);
            Assertions.assertThat((String)((String)state1.value())).isEqualTo("1");
            Assertions.assertThat((Integer)((Integer)state2.value())).isEqualTo(13);
            KeyedStateHandle snapshot1 = StateBackendTestBase.runSnapshot(backend.snapshot(682375462378L, 2L, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()), (SharedStateRegistry)sharedStateRegistry);
            backend.dispose();
            backend = this.restoreKeyedBackend((TypeSerializer)IntSerializer.INSTANCE, 1, new KeyGroupRange(0, 0), Collections.singletonList(snapshot1), this.env);
            snapshot1.discardState();
            backend.setCurrentKey((Object)1);
            state1 = (ValueState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)desc1);
            state2 = (ValueState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)desc2);
            Assertions.assertThat((String)((String)state1.value())).isEqualTo("1");
            Assertions.assertThat((Integer)((Integer)state2.value())).isEqualTo(13);
        }
        finally {
            IOUtils.closeQuietly(backend);
            backend.dispose();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @TestTemplate
    void testValueStateNullUpdate() throws Exception {
        Assertions.assertThatThrownBy(() -> LongSerializer.INSTANCE.serialize(null, (DataOutputView)new DataOutputViewStreamWrapper((OutputStream)new ByteArrayOutputStream()))).isInstanceOf(NullPointerException.class);
        CheckpointStreamFactory streamFactory = this.createStreamFactory();
        SharedStateRegistryImpl sharedStateRegistry = new SharedStateRegistryImpl();
        CheckpointableKeyedStateBackend backend = this.createKeyedBackend((TypeSerializer)IntSerializer.INSTANCE);
        try {
            ValueStateDescriptor kvId = new ValueStateDescriptor("id", (TypeSerializer)LongSerializer.INSTANCE, (Object)42L);
            ValueState state = (ValueState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
            backend.setCurrentKey((Object)1);
            Assertions.assertThat((Long)((Long)state.value())).isEqualTo(42L);
            state.update((Object)1L);
            Assertions.assertThat((Long)((Long)state.value())).isOne();
            backend.setCurrentKey((Object)2);
            Assertions.assertThat((Long)((Long)state.value())).isEqualTo(42L);
            backend.setCurrentKey((Object)1);
            state.clear();
            Assertions.assertThat((Long)((Long)state.value())).isEqualTo(42L);
            state.update((Object)17L);
            Assertions.assertThat((Long)((Long)state.value())).isEqualTo(17L);
            state.update(null);
            Assertions.assertThat((Long)((Long)state.value())).isEqualTo(42L);
            KeyedStateHandle snapshot1 = StateBackendTestBase.runSnapshot(backend.snapshot(682375462378L, 2L, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()), (SharedStateRegistry)sharedStateRegistry);
            backend.dispose();
            backend = this.restoreKeyedBackend((TypeSerializer)IntSerializer.INSTANCE, snapshot1);
            snapshot1.discardState();
            backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
        }
        finally {
            IOUtils.closeQuietly(backend);
            backend.dispose();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @TestTemplate
    void testListState() throws Exception {
        CheckpointStreamFactory streamFactory = this.createStreamFactory();
        SharedStateRegistryImpl sharedStateRegistry = new SharedStateRegistryImpl();
        ListStateDescriptor kvId = new ListStateDescriptor("id", String.class);
        IntSerializer keySerializer = IntSerializer.INSTANCE;
        VoidNamespaceSerializer namespaceSerializer = VoidNamespaceSerializer.INSTANCE;
        CheckpointableKeyedStateBackend backend = this.createKeyedBackend((TypeSerializer)IntSerializer.INSTANCE);
        try {
            ListState state = (ListState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
            InternalKvState kvState = (InternalKvState)state;
            TypeSerializer valueSerializer = kvId.getElementSerializer();
            Joiner joiner = Joiner.on((String)",");
            backend.setCurrentKey((Object)1);
            Assertions.assertThat((Iterable)((Iterable)state.get())).isNull();
            Assertions.assertThat(StateBackendTestBase.getSerializedList(kvState, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer)).isNull();
            state.add((Object)"1");
            backend.setCurrentKey((Object)2);
            Assertions.assertThat((Iterable)((Iterable)state.get())).isNull();
            Assertions.assertThat(StateBackendTestBase.getSerializedList(kvState, 2, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer)).isNull();
            state.update(Arrays.asList("2"));
            backend.setCurrentKey((Object)1);
            Assertions.assertThat((String)joiner.join((Iterable)state.get())).isEqualTo("1");
            Assertions.assertThat((String)joiner.join(StateBackendTestBase.getSerializedList(kvState, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer))).isEqualTo("1");
            KeyedStateHandle snapshot1 = StateBackendTestBase.runSnapshot(backend.snapshot(682375462378L, 2L, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()), (SharedStateRegistry)sharedStateRegistry);
            backend.setCurrentKey((Object)1);
            state.add((Object)"u1");
            backend.setCurrentKey((Object)2);
            state.add((Object)"u2");
            backend.setCurrentKey((Object)3);
            state.add((Object)"u3");
            KeyedStateHandle snapshot2 = StateBackendTestBase.runSnapshot(backend.snapshot(682375462379L, 4L, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()), (SharedStateRegistry)sharedStateRegistry);
            backend.setCurrentKey((Object)1);
            Assertions.assertThat((String)joiner.join((Iterable)state.get())).isEqualTo("1,u1");
            Assertions.assertThat((String)joiner.join(StateBackendTestBase.getSerializedList(kvState, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer))).isEqualTo("1,u1");
            backend.setCurrentKey((Object)2);
            Assertions.assertThat((String)joiner.join((Iterable)state.get())).isEqualTo("2,u2");
            Assertions.assertThat((String)joiner.join(StateBackendTestBase.getSerializedList(kvState, 2, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer))).isEqualTo("2,u2");
            backend.setCurrentKey((Object)3);
            Assertions.assertThat((String)joiner.join((Iterable)state.get())).isEqualTo("u3");
            Assertions.assertThat((String)joiner.join(StateBackendTestBase.getSerializedList(kvState, 3, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer))).isEqualTo("u3");
            backend.dispose();
            backend = this.restoreKeyedBackend((TypeSerializer)IntSerializer.INSTANCE, snapshot1);
            snapshot1.discardState();
            ListState restored1 = (ListState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
            InternalKvState restoredKvState1 = (InternalKvState)restored1;
            backend.setCurrentKey((Object)1);
            Assertions.assertThat((String)joiner.join((Iterable)restored1.get())).isEqualTo("1");
            Assertions.assertThat((String)joiner.join(StateBackendTestBase.getSerializedList(restoredKvState1, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer))).isEqualTo("1");
            backend.setCurrentKey((Object)2);
            Assertions.assertThat((String)joiner.join((Iterable)restored1.get())).isEqualTo("2");
            Assertions.assertThat((String)joiner.join(StateBackendTestBase.getSerializedList(restoredKvState1, 2, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer))).isEqualTo("2");
            backend.dispose();
            backend = this.restoreKeyedBackend((TypeSerializer)IntSerializer.INSTANCE, snapshot2);
            snapshot2.discardState();
            ListState restored2 = (ListState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
            InternalKvState restoredKvState2 = (InternalKvState)restored2;
            backend.setCurrentKey((Object)1);
            Assertions.assertThat((String)joiner.join((Iterable)restored2.get())).isEqualTo("1,u1");
            Assertions.assertThat((String)joiner.join(StateBackendTestBase.getSerializedList(restoredKvState2, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer))).isEqualTo("1,u1");
            backend.setCurrentKey((Object)2);
            Assertions.assertThat((String)joiner.join((Iterable)restored2.get())).isEqualTo("2,u2");
            Assertions.assertThat((String)joiner.join(StateBackendTestBase.getSerializedList(restoredKvState2, 2, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer))).isEqualTo("2,u2");
            backend.setCurrentKey((Object)3);
            Assertions.assertThat((String)joiner.join((Iterable)restored2.get())).isEqualTo("u3");
            Assertions.assertThat((String)joiner.join(StateBackendTestBase.getSerializedList(restoredKvState2, 3, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer))).isEqualTo("u3");
        }
        finally {
            IOUtils.closeQuietly(backend);
            backend.dispose();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @TestTemplate
    void testListStateAddNull() throws Exception {
        ListStateDescriptor stateDescr = new ListStateDescriptor("my-state", Long.class);
        CheckpointableKeyedStateBackend keyedBackend = this.createKeyedBackend((TypeSerializer)StringSerializer.INSTANCE);
        try {
            ListState state = (ListState)keyedBackend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)stateDescr);
            keyedBackend.setCurrentKey((Object)"abc");
            Assertions.assertThat((Iterable)((Iterable)state.get())).isNull();
            Assertions.assertThatThrownBy(() -> state.add(null)).isInstanceOf(NullPointerException.class);
        }
        finally {
            IOUtils.closeQuietly(keyedBackend);
            keyedBackend.dispose();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @TestTemplate
    void testListStateAddAllNullEntries() throws Exception {
        ListStateDescriptor stateDescr = new ListStateDescriptor("my-state", Long.class);
        CheckpointableKeyedStateBackend keyedBackend = this.createKeyedBackend((TypeSerializer)StringSerializer.INSTANCE);
        try {
            ListState state = (ListState)keyedBackend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)stateDescr);
            keyedBackend.setCurrentKey((Object)"abc");
            Assertions.assertThat((Iterable)((Iterable)state.get())).isNull();
            ArrayList<Long> adding = new ArrayList<Long>();
            adding.add(3L);
            adding.add(null);
            adding.add(5L);
            Assertions.assertThatThrownBy(() -> state.addAll(adding)).isInstanceOf(NullPointerException.class);
        }
        finally {
            IOUtils.closeQuietly(keyedBackend);
            keyedBackend.dispose();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @TestTemplate
    void testListStateAddAllNull() throws Exception {
        ListStateDescriptor stateDescr = new ListStateDescriptor("my-state", Long.class);
        CheckpointableKeyedStateBackend keyedBackend = this.createKeyedBackend((TypeSerializer)StringSerializer.INSTANCE);
        try {
            ListState state = (ListState)keyedBackend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)stateDescr);
            keyedBackend.setCurrentKey((Object)"abc");
            Assertions.assertThat((Iterable)((Iterable)state.get())).isNull();
            Assertions.assertThatThrownBy(() -> state.addAll(null)).isInstanceOf(NullPointerException.class);
        }
        finally {
            IOUtils.closeQuietly(keyedBackend);
            keyedBackend.dispose();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @TestTemplate
    void testListStateUpdateNullEntries() throws Exception {
        ListStateDescriptor stateDescr = new ListStateDescriptor("my-state", Long.class);
        CheckpointableKeyedStateBackend keyedBackend = this.createKeyedBackend((TypeSerializer)StringSerializer.INSTANCE);
        try {
            ListState state = (ListState)keyedBackend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)stateDescr);
            keyedBackend.setCurrentKey((Object)"abc");
            Assertions.assertThat((Iterable)((Iterable)state.get())).isNull();
            ArrayList<Long> adding = new ArrayList<Long>();
            adding.add(3L);
            adding.add(null);
            adding.add(5L);
            Assertions.assertThatThrownBy(() -> state.update(adding)).isInstanceOf(NullPointerException.class);
        }
        finally {
            IOUtils.closeQuietly(keyedBackend);
            keyedBackend.dispose();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @TestTemplate
    void testListStateUpdateNull() throws Exception {
        ListStateDescriptor stateDescr = new ListStateDescriptor("my-state", Long.class);
        CheckpointableKeyedStateBackend keyedBackend = this.createKeyedBackend((TypeSerializer)StringSerializer.INSTANCE);
        try {
            ListState state = (ListState)keyedBackend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)stateDescr);
            keyedBackend.setCurrentKey((Object)"abc");
            Assertions.assertThat((Iterable)((Iterable)state.get())).isNull();
            Assertions.assertThatThrownBy(() -> state.update(null)).isInstanceOf(NullPointerException.class);
        }
        finally {
            IOUtils.closeQuietly(keyedBackend);
            keyedBackend.dispose();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @TestTemplate
    void testListStateAPIs() throws Exception {
        ListStateDescriptor stateDescr = new ListStateDescriptor("my-state", Long.class);
        CheckpointableKeyedStateBackend keyedBackend = this.createKeyedBackend((TypeSerializer)StringSerializer.INSTANCE);
        try {
            ListState state = (ListState)keyedBackend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)stateDescr);
            keyedBackend.setCurrentKey((Object)"def");
            Assertions.assertThat((Iterable)((Iterable)state.get())).isNull();
            state.add((Object)17L);
            state.add((Object)11L);
            Assertions.assertThat((Iterable)((Iterable)state.get())).containsExactlyInAnyOrder((Object[])new Long[]{17L, 11L});
            state.update(Collections.emptyList());
            Assertions.assertThat((Iterable)((Iterable)state.get())).isNull();
            state.update(Arrays.asList(10L, 16L));
            Assertions.assertThat((Iterable)((Iterable)state.get())).containsExactlyInAnyOrder((Object[])new Long[]{16L, 10L});
            Assertions.assertThat((Iterable)((Iterable)state.get())).containsExactlyInAnyOrder((Object[])new Long[]{16L, 10L});
            keyedBackend.setCurrentKey((Object)"abc");
            Assertions.assertThat((Iterable)((Iterable)state.get())).isNull();
            keyedBackend.setCurrentKey((Object)"g");
            Assertions.assertThat((Iterable)((Iterable)state.get())).isNull();
            Assertions.assertThat((Iterable)((Iterable)state.get())).isNull();
            state.addAll(Collections.emptyList());
            Assertions.assertThat((Iterable)((Iterable)state.get())).isNull();
            state.addAll(Arrays.asList(3L, 4L));
            Assertions.assertThat((Iterable)((Iterable)state.get())).containsExactlyInAnyOrder((Object[])new Long[]{3L, 4L});
            Assertions.assertThat((Iterable)((Iterable)state.get())).containsExactlyInAnyOrder((Object[])new Long[]{3L, 4L});
            state.addAll(new ArrayList());
            Assertions.assertThat((Iterable)((Iterable)state.get())).containsExactlyInAnyOrder((Object[])new Long[]{3L, 4L});
            state.addAll(Arrays.asList(5L, 6L));
            Assertions.assertThat((Iterable)((Iterable)state.get())).containsExactlyInAnyOrder((Object[])new Long[]{3L, 4L, 5L, 6L});
            state.addAll(new ArrayList());
            Assertions.assertThat((Iterable)((Iterable)state.get())).containsExactlyInAnyOrder((Object[])new Long[]{3L, 4L, 5L, 6L});
            Assertions.assertThat((Iterable)((Iterable)state.get())).containsExactlyInAnyOrder((Object[])new Long[]{3L, 4L, 5L, 6L});
            state.update(Arrays.asList(1L, 2L));
            Assertions.assertThat((Iterable)((Iterable)state.get())).containsExactlyInAnyOrder((Object[])new Long[]{1L, 2L});
            keyedBackend.setCurrentKey((Object)"def");
            Assertions.assertThat((Iterable)((Iterable)state.get())).containsExactlyInAnyOrder((Object[])new Long[]{10L, 16L});
            state.clear();
            Assertions.assertThat((Iterable)((Iterable)state.get())).isNull();
            keyedBackend.setCurrentKey((Object)"g");
            state.add((Object)3L);
            state.add((Object)2L);
            state.add((Object)1L);
            keyedBackend.setCurrentKey((Object)"def");
            Assertions.assertThat((Iterable)((Iterable)state.get())).isNull();
            keyedBackend.setCurrentKey((Object)"g");
            Assertions.assertThat((Iterable)((Iterable)state.get())).containsExactlyInAnyOrder((Object[])new Long[]{1L, 2L, 3L, 2L, 1L});
            state.update(Arrays.asList(5L, 6L));
            Assertions.assertThat((Iterable)((Iterable)state.get())).containsExactlyInAnyOrder((Object[])new Long[]{5L, 6L});
            state.clear();
            ((AbstractIntegerAssert)Assertions.assertThat((int)((TestableKeyedStateBackend)keyedBackend).numKeyValueStateEntries()).withFailMessage("State backend is not empty.", new Object[0])).isZero();
        }
        finally {
            IOUtils.closeQuietly(keyedBackend);
            keyedBackend.dispose();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @TestTemplate
    void testListStateMerging() throws Exception {
        ListStateDescriptor stateDescr = new ListStateDescriptor("my-state", Long.class);
        Integer namespace1 = 1;
        Integer namespace2 = 2;
        Integer namespace3 = 3;
        CheckpointableKeyedStateBackend keyedBackend = this.createKeyedBackend((TypeSerializer)StringSerializer.INSTANCE);
        try {
            InternalListState state = (InternalListState)keyedBackend.getPartitionedState((Object)0, (TypeSerializer)IntSerializer.INSTANCE, (StateDescriptor)stateDescr);
            keyedBackend.setCurrentKey((Object)"abc");
            state.setCurrentNamespace((Object)namespace1);
            state.add((Object)33L);
            state.add((Object)55L);
            state.setCurrentNamespace((Object)namespace2);
            state.add((Object)22L);
            state.add((Object)11L);
            state.setCurrentNamespace((Object)namespace3);
            state.add((Object)44L);
            keyedBackend.setCurrentKey((Object)"def");
            state.setCurrentNamespace((Object)namespace1);
            state.add((Object)11L);
            state.add((Object)44L);
            state.setCurrentNamespace((Object)namespace3);
            state.add((Object)22L);
            state.add((Object)55L);
            state.add((Object)33L);
            keyedBackend.setCurrentKey((Object)"jkl");
            state.setCurrentNamespace((Object)namespace1);
            state.add((Object)11L);
            state.add((Object)22L);
            state.add((Object)33L);
            state.add((Object)44L);
            state.add((Object)55L);
            keyedBackend.setCurrentKey((Object)"mno");
            state.setCurrentNamespace((Object)namespace3);
            state.add((Object)11L);
            state.add((Object)22L);
            state.add((Object)33L);
            state.add((Object)44L);
            state.add((Object)55L);
            keyedBackend.setCurrentKey((Object)"abc");
            state.mergeNamespaces((Object)namespace1, Arrays.asList(namespace2, namespace3));
            state.setCurrentNamespace((Object)namespace1);
            Assertions.assertThat((Iterable)((Iterable)state.get())).containsExactlyInAnyOrder((Object[])new Long[]{11L, 22L, 33L, 44L, 55L});
            keyedBackend.setCurrentKey((Object)"def");
            state.mergeNamespaces((Object)namespace1, Arrays.asList(namespace2, namespace3));
            state.setCurrentNamespace((Object)namespace1);
            Assertions.assertThat((Iterable)((Iterable)state.get())).containsExactlyInAnyOrder((Object[])new Long[]{11L, 22L, 33L, 44L, 55L});
            keyedBackend.setCurrentKey((Object)"ghi");
            state.mergeNamespaces((Object)namespace1, Arrays.asList(namespace2, namespace3));
            state.setCurrentNamespace((Object)namespace1);
            Assertions.assertThat((Iterable)((Iterable)state.get())).isNull();
            keyedBackend.setCurrentKey((Object)"jkl");
            state.mergeNamespaces((Object)namespace1, Arrays.asList(namespace2, namespace3));
            state.setCurrentNamespace((Object)namespace1);
            Assertions.assertThat((Iterable)((Iterable)state.get())).containsExactlyInAnyOrder((Object[])new Long[]{11L, 22L, 33L, 44L, 55L});
            keyedBackend.setCurrentKey((Object)"mno");
            state.mergeNamespaces((Object)namespace1, Arrays.asList(namespace2, namespace3));
            state.setCurrentNamespace((Object)namespace1);
            Assertions.assertThat((Iterable)((Iterable)state.get())).containsExactlyInAnyOrder((Object[])new Long[]{11L, 22L, 33L, 44L, 55L});
            keyedBackend.setCurrentKey((Object)"abc");
            state.setCurrentNamespace((Object)namespace1);
            state.clear();
            keyedBackend.setCurrentKey((Object)"def");
            state.setCurrentNamespace((Object)namespace1);
            state.clear();
            keyedBackend.setCurrentKey((Object)"ghi");
            state.setCurrentNamespace((Object)namespace1);
            state.clear();
            keyedBackend.setCurrentKey((Object)"jkl");
            state.setCurrentNamespace((Object)namespace1);
            state.clear();
            keyedBackend.setCurrentKey((Object)"mno");
            state.setCurrentNamespace((Object)namespace1);
            state.clear();
            ((AbstractIntegerAssert)Assertions.assertThat((int)((TestableKeyedStateBackend)keyedBackend).numKeyValueStateEntries()).withFailMessage("State backend is not empty.", new Object[0])).isZero();
        }
        finally {
            IOUtils.closeQuietly(keyedBackend);
            keyedBackend.dispose();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @TestTemplate
    void testReducingState() throws Exception {
        CheckpointStreamFactory streamFactory = this.createStreamFactory();
        SharedStateRegistryImpl sharedStateRegistry = new SharedStateRegistryImpl();
        ReducingStateDescriptor kvId = new ReducingStateDescriptor("id", (ReduceFunction)new AppendingReduce(), String.class);
        IntSerializer keySerializer = IntSerializer.INSTANCE;
        VoidNamespaceSerializer namespaceSerializer = VoidNamespaceSerializer.INSTANCE;
        CheckpointableKeyedStateBackend backend = this.createKeyedBackend((TypeSerializer)IntSerializer.INSTANCE);
        try {
            ReducingState state = (ReducingState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
            InternalKvState kvState = (InternalKvState)state;
            TypeSerializer valueSerializer = kvId.getSerializer();
            backend.setCurrentKey((Object)1);
            Assertions.assertThat((String)((String)state.get())).isNull();
            Assertions.assertThat((String)((String)StateBackendTestBase.getSerializedValue(kvState, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer))).isNull();
            state.add((Object)"1");
            backend.setCurrentKey((Object)2);
            Assertions.assertThat((String)((String)state.get())).isNull();
            Assertions.assertThat((String)((String)StateBackendTestBase.getSerializedValue(kvState, 2, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer))).isNull();
            state.add((Object)"2");
            backend.setCurrentKey((Object)1);
            Assertions.assertThat((String)((String)state.get())).isEqualTo("1");
            Assertions.assertThat((String)((String)StateBackendTestBase.getSerializedValue(kvState, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer))).isEqualTo("1");
            KeyedStateHandle snapshot1 = StateBackendTestBase.runSnapshot(backend.snapshot(682375462378L, 2L, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()), (SharedStateRegistry)sharedStateRegistry);
            backend.setCurrentKey((Object)1);
            state.add((Object)"u1");
            backend.setCurrentKey((Object)2);
            state.add((Object)"u2");
            backend.setCurrentKey((Object)3);
            state.add((Object)"u3");
            KeyedStateHandle snapshot2 = StateBackendTestBase.runSnapshot(backend.snapshot(682375462379L, 4L, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()), (SharedStateRegistry)sharedStateRegistry);
            backend.setCurrentKey((Object)1);
            Assertions.assertThat((String)((String)state.get())).isEqualTo("1,u1");
            Assertions.assertThat((String)((String)StateBackendTestBase.getSerializedValue(kvState, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer))).isEqualTo("1,u1");
            backend.setCurrentKey((Object)2);
            Assertions.assertThat((String)((String)state.get())).isEqualTo("2,u2");
            Assertions.assertThat((String)((String)StateBackendTestBase.getSerializedValue(kvState, 2, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer))).isEqualTo("2,u2");
            backend.setCurrentKey((Object)3);
            Assertions.assertThat((String)((String)state.get())).isEqualTo("u3");
            Assertions.assertThat((String)((String)StateBackendTestBase.getSerializedValue(kvState, 3, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer))).isEqualTo("u3");
            backend.dispose();
            backend = this.restoreKeyedBackend((TypeSerializer)IntSerializer.INSTANCE, snapshot1);
            snapshot1.discardState();
            ReducingState restored1 = (ReducingState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
            InternalKvState restoredKvState1 = (InternalKvState)restored1;
            backend.setCurrentKey((Object)1);
            Assertions.assertThat((String)((String)restored1.get())).isEqualTo("1");
            Assertions.assertThat((String)((String)StateBackendTestBase.getSerializedValue(restoredKvState1, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer))).isEqualTo("1");
            backend.setCurrentKey((Object)2);
            Assertions.assertThat((String)((String)restored1.get())).isEqualTo("2");
            Assertions.assertThat((String)((String)StateBackendTestBase.getSerializedValue(restoredKvState1, 2, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer))).isEqualTo("2");
            backend.dispose();
            backend = this.restoreKeyedBackend((TypeSerializer)IntSerializer.INSTANCE, snapshot2);
            snapshot2.discardState();
            ReducingState restored2 = (ReducingState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
            InternalKvState restoredKvState2 = (InternalKvState)restored2;
            backend.setCurrentKey((Object)1);
            Assertions.assertThat((String)((String)restored2.get())).isEqualTo("1,u1");
            Assertions.assertThat((String)((String)StateBackendTestBase.getSerializedValue(restoredKvState2, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer))).isEqualTo("1,u1");
            backend.setCurrentKey((Object)2);
            Assertions.assertThat((String)((String)restored2.get())).isEqualTo("2,u2");
            Assertions.assertThat((String)((String)StateBackendTestBase.getSerializedValue(restoredKvState2, 2, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer))).isEqualTo("2,u2");
            backend.setCurrentKey((Object)3);
            Assertions.assertThat((String)((String)restored2.get())).isEqualTo("u3");
            Assertions.assertThat((String)((String)StateBackendTestBase.getSerializedValue(restoredKvState2, 3, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer))).isEqualTo("u3");
        }
        finally {
            IOUtils.closeQuietly(backend);
            backend.dispose();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @TestTemplate
    void testReducingStateAddAndGet() throws Exception {
        ReducingStateDescriptor stateDescr = new ReducingStateDescriptor("my-state", (ReduceFunction & Serializable)(a, b) -> a + b, Long.class);
        CheckpointableKeyedStateBackend keyedBackend = this.createKeyedBackend((TypeSerializer)StringSerializer.INSTANCE);
        try {
            ReducingState state = (ReducingState)keyedBackend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)stateDescr);
            keyedBackend.setCurrentKey((Object)"abc");
            Assertions.assertThat((Long)((Long)state.get())).isNull();
            keyedBackend.setCurrentKey((Object)"def");
            Assertions.assertThat((Long)((Long)state.get())).isNull();
            state.add((Object)17L);
            state.add((Object)11L);
            Assertions.assertThat((long)((Long)state.get())).isEqualTo(28L);
            keyedBackend.setCurrentKey((Object)"abc");
            Assertions.assertThat((Long)((Long)state.get())).isNull();
            keyedBackend.setCurrentKey((Object)"g");
            Assertions.assertThat((Long)((Long)state.get())).isNull();
            state.add((Object)1L);
            state.add((Object)2L);
            keyedBackend.setCurrentKey((Object)"def");
            Assertions.assertThat((long)((Long)state.get())).isEqualTo(28L);
            state.clear();
            Assertions.assertThat((Long)((Long)state.get())).isNull();
            keyedBackend.setCurrentKey((Object)"g");
            state.add((Object)3L);
            state.add((Object)2L);
            state.add((Object)1L);
            keyedBackend.setCurrentKey((Object)"def");
            Assertions.assertThat((Long)((Long)state.get())).isNull();
            keyedBackend.setCurrentKey((Object)"g");
            Assertions.assertThat((long)((Long)state.get())).isEqualTo(9L);
            state.clear();
            ((AbstractIntegerAssert)Assertions.assertThat((int)((TestableKeyedStateBackend)keyedBackend).numKeyValueStateEntries()).withFailMessage("State backend is not empty.", new Object[0])).isZero();
        }
        finally {
            IOUtils.closeQuietly(keyedBackend);
            keyedBackend.dispose();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @TestTemplate
    void testReducingStateMerging() throws Exception {
        ReducingStateDescriptor stateDescr = new ReducingStateDescriptor("my-state", (ReduceFunction & Serializable)(a, b) -> a + b, Long.class);
        Integer namespace1 = 1;
        Integer namespace2 = 2;
        Integer namespace3 = 3;
        Long expectedResult = 165L;
        CheckpointableKeyedStateBackend keyedBackend = this.createKeyedBackend((TypeSerializer)StringSerializer.INSTANCE);
        try {
            InternalReducingState state = (InternalReducingState)keyedBackend.getPartitionedState((Object)0, (TypeSerializer)IntSerializer.INSTANCE, (StateDescriptor)stateDescr);
            keyedBackend.setCurrentKey((Object)"abc");
            state.setCurrentNamespace((Object)namespace1);
            state.add((Object)33L);
            state.add((Object)55L);
            state.setCurrentNamespace((Object)namespace2);
            state.add((Object)22L);
            state.add((Object)11L);
            state.setCurrentNamespace((Object)namespace3);
            state.add((Object)44L);
            keyedBackend.setCurrentKey((Object)"def");
            state.setCurrentNamespace((Object)namespace1);
            state.add((Object)11L);
            state.add((Object)44L);
            state.setCurrentNamespace((Object)namespace3);
            state.add((Object)22L);
            state.add((Object)55L);
            state.add((Object)33L);
            keyedBackend.setCurrentKey((Object)"jkl");
            state.setCurrentNamespace((Object)namespace1);
            state.add((Object)11L);
            state.add((Object)22L);
            state.add((Object)33L);
            state.add((Object)44L);
            state.add((Object)55L);
            keyedBackend.setCurrentKey((Object)"mno");
            state.setCurrentNamespace((Object)namespace3);
            state.add((Object)11L);
            state.add((Object)22L);
            state.add((Object)33L);
            state.add((Object)44L);
            state.add((Object)55L);
            keyedBackend.setCurrentKey((Object)"abc");
            state.mergeNamespaces((Object)namespace1, Arrays.asList(namespace2, namespace3));
            state.setCurrentNamespace((Object)namespace1);
            Assertions.assertThat((Long)((Long)state.get())).isEqualTo((Object)expectedResult);
            keyedBackend.setCurrentKey((Object)"def");
            state.mergeNamespaces((Object)namespace1, Arrays.asList(namespace2, namespace3));
            state.setCurrentNamespace((Object)namespace1);
            Assertions.assertThat((Long)((Long)state.get())).isEqualTo((Object)expectedResult);
            keyedBackend.setCurrentKey((Object)"ghi");
            state.mergeNamespaces((Object)namespace1, Arrays.asList(namespace2, namespace3));
            state.setCurrentNamespace((Object)namespace1);
            Assertions.assertThat((Long)((Long)state.get())).isNull();
            keyedBackend.setCurrentKey((Object)"jkl");
            state.mergeNamespaces((Object)namespace1, Arrays.asList(namespace2, namespace3));
            state.setCurrentNamespace((Object)namespace1);
            Assertions.assertThat((Long)((Long)state.get())).isEqualTo((Object)expectedResult);
            keyedBackend.setCurrentKey((Object)"mno");
            state.mergeNamespaces((Object)namespace1, Arrays.asList(namespace2, namespace3));
            state.setCurrentNamespace((Object)namespace1);
            Assertions.assertThat((Long)((Long)state.get())).isEqualTo((Object)expectedResult);
            keyedBackend.setCurrentKey((Object)"abc");
            state.setCurrentNamespace((Object)namespace1);
            state.clear();
            keyedBackend.setCurrentKey((Object)"def");
            state.setCurrentNamespace((Object)namespace1);
            state.clear();
            keyedBackend.setCurrentKey((Object)"ghi");
            state.setCurrentNamespace((Object)namespace1);
            state.clear();
            keyedBackend.setCurrentKey((Object)"jkl");
            state.setCurrentNamespace((Object)namespace1);
            state.clear();
            keyedBackend.setCurrentKey((Object)"mno");
            state.setCurrentNamespace((Object)namespace1);
            state.clear();
            ((AbstractIntegerAssert)Assertions.assertThat((int)((TestableKeyedStateBackend)keyedBackend).numKeyValueStateEntries()).withFailMessage("State backend is not empty.", new Object[0])).isZero();
        }
        finally {
            IOUtils.closeQuietly(keyedBackend);
            keyedBackend.dispose();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @TestTemplate
    void testAggregatingStateAddAndGetWithMutableAccumulator() throws Exception {
        AggregatingStateDescriptor stateDescr = new AggregatingStateDescriptor("my-state", (AggregateFunction)new MutableAggregatingAddingFunction(), MutableLong.class);
        CheckpointableKeyedStateBackend keyedBackend = this.createKeyedBackend((TypeSerializer)StringSerializer.INSTANCE);
        try {
            AggregatingState state = (AggregatingState)keyedBackend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)stateDescr);
            keyedBackend.setCurrentKey((Object)"abc");
            Assertions.assertThat((Long)((Long)state.get())).isNull();
            keyedBackend.setCurrentKey((Object)"def");
            Assertions.assertThat((Long)((Long)state.get())).isNull();
            state.add((Object)17L);
            state.add((Object)11L);
            Assertions.assertThat((long)((Long)state.get())).isEqualTo(28L);
            keyedBackend.setCurrentKey((Object)"abc");
            Assertions.assertThat((Long)((Long)state.get())).isNull();
            keyedBackend.setCurrentKey((Object)"g");
            Assertions.assertThat((Long)((Long)state.get())).isNull();
            state.add((Object)1L);
            state.add((Object)2L);
            keyedBackend.setCurrentKey((Object)"def");
            Assertions.assertThat((long)((Long)state.get())).isEqualTo(28L);
            state.clear();
            Assertions.assertThat((Long)((Long)state.get())).isNull();
            keyedBackend.setCurrentKey((Object)"g");
            state.add((Object)3L);
            state.add((Object)2L);
            state.add((Object)1L);
            keyedBackend.setCurrentKey((Object)"def");
            Assertions.assertThat((Long)((Long)state.get())).isNull();
            keyedBackend.setCurrentKey((Object)"g");
            Assertions.assertThat((long)((Long)state.get())).isEqualTo(9L);
            state.clear();
            ((AbstractIntegerAssert)Assertions.assertThat((int)((TestableKeyedStateBackend)keyedBackend).numKeyValueStateEntries()).withFailMessage("State backend is not empty.", new Object[0])).isZero();
        }
        finally {
            IOUtils.closeQuietly(keyedBackend);
            keyedBackend.dispose();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @TestTemplate
    void testAggregatingStateMergingWithMutableAccumulator() throws Exception {
        AggregatingStateDescriptor stateDescr = new AggregatingStateDescriptor("my-state", (AggregateFunction)new MutableAggregatingAddingFunction(), MutableLong.class);
        Integer namespace1 = 1;
        Integer namespace2 = 2;
        Integer namespace3 = 3;
        Long expectedResult = 165L;
        CheckpointableKeyedStateBackend keyedBackend = this.createKeyedBackend((TypeSerializer)StringSerializer.INSTANCE);
        try {
            InternalAggregatingState state = (InternalAggregatingState)keyedBackend.getPartitionedState((Object)0, (TypeSerializer)IntSerializer.INSTANCE, (StateDescriptor)stateDescr);
            keyedBackend.setCurrentKey((Object)"abc");
            state.setCurrentNamespace((Object)namespace1);
            state.add((Object)33L);
            state.add((Object)55L);
            state.setCurrentNamespace((Object)namespace2);
            state.add((Object)22L);
            state.add((Object)11L);
            state.setCurrentNamespace((Object)namespace3);
            state.add((Object)44L);
            keyedBackend.setCurrentKey((Object)"def");
            state.setCurrentNamespace((Object)namespace1);
            state.add((Object)11L);
            state.add((Object)44L);
            state.setCurrentNamespace((Object)namespace3);
            state.add((Object)22L);
            state.add((Object)55L);
            state.add((Object)33L);
            keyedBackend.setCurrentKey((Object)"jkl");
            state.setCurrentNamespace((Object)namespace1);
            state.add((Object)11L);
            state.add((Object)22L);
            state.add((Object)33L);
            state.add((Object)44L);
            state.add((Object)55L);
            keyedBackend.setCurrentKey((Object)"mno");
            state.setCurrentNamespace((Object)namespace3);
            state.add((Object)11L);
            state.add((Object)22L);
            state.add((Object)33L);
            state.add((Object)44L);
            state.add((Object)55L);
            keyedBackend.setCurrentKey((Object)"abc");
            state.mergeNamespaces((Object)namespace1, Arrays.asList(namespace2, namespace3));
            state.setCurrentNamespace((Object)namespace1);
            Assertions.assertThat((Long)((Long)state.get())).isEqualTo((Object)expectedResult);
            keyedBackend.setCurrentKey((Object)"def");
            state.mergeNamespaces((Object)namespace1, Arrays.asList(namespace2, namespace3));
            state.setCurrentNamespace((Object)namespace1);
            Assertions.assertThat((Long)((Long)state.get())).isEqualTo((Object)expectedResult);
            keyedBackend.setCurrentKey((Object)"ghi");
            state.mergeNamespaces((Object)namespace1, Arrays.asList(namespace2, namespace3));
            state.setCurrentNamespace((Object)namespace1);
            Assertions.assertThat((Long)((Long)state.get())).isNull();
            keyedBackend.setCurrentKey((Object)"jkl");
            state.mergeNamespaces((Object)namespace1, Arrays.asList(namespace2, namespace3));
            state.setCurrentNamespace((Object)namespace1);
            Assertions.assertThat((Long)((Long)state.get())).isEqualTo((Object)expectedResult);
            keyedBackend.setCurrentKey((Object)"mno");
            state.mergeNamespaces((Object)namespace1, Arrays.asList(namespace2, namespace3));
            state.setCurrentNamespace((Object)namespace1);
            Assertions.assertThat((Long)((Long)state.get())).isEqualTo((Object)expectedResult);
            keyedBackend.setCurrentKey((Object)"abc");
            state.setCurrentNamespace((Object)namespace1);
            state.clear();
            keyedBackend.setCurrentKey((Object)"def");
            state.setCurrentNamespace((Object)namespace1);
            state.clear();
            keyedBackend.setCurrentKey((Object)"ghi");
            state.setCurrentNamespace((Object)namespace1);
            state.clear();
            keyedBackend.setCurrentKey((Object)"jkl");
            state.setCurrentNamespace((Object)namespace1);
            state.clear();
            keyedBackend.setCurrentKey((Object)"mno");
            state.setCurrentNamespace((Object)namespace1);
            state.clear();
            ((AbstractIntegerAssert)Assertions.assertThat((int)((TestableKeyedStateBackend)keyedBackend).numKeyValueStateEntries()).withFailMessage("State backend is not empty.", new Object[0])).isZero();
        }
        finally {
            IOUtils.closeQuietly(keyedBackend);
            keyedBackend.dispose();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @TestTemplate
    void testAggregatingStateAddAndGetWithImmutableAccumulator() throws Exception {
        AggregatingStateDescriptor stateDescr = new AggregatingStateDescriptor("my-state", (AggregateFunction)new ImmutableAggregatingAddingFunction(), Long.class);
        CheckpointableKeyedStateBackend keyedBackend = this.createKeyedBackend((TypeSerializer)StringSerializer.INSTANCE);
        try {
            AggregatingState state = (AggregatingState)keyedBackend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)stateDescr);
            keyedBackend.setCurrentKey((Object)"abc");
            Assertions.assertThat((Long)((Long)state.get())).isNull();
            keyedBackend.setCurrentKey((Object)"def");
            Assertions.assertThat((Long)((Long)state.get())).isNull();
            state.add((Object)17L);
            state.add((Object)11L);
            Assertions.assertThat((long)((Long)state.get())).isEqualTo(28L);
            keyedBackend.setCurrentKey((Object)"abc");
            Assertions.assertThat((Long)((Long)state.get())).isNull();
            keyedBackend.setCurrentKey((Object)"g");
            Assertions.assertThat((Long)((Long)state.get())).isNull();
            state.add((Object)1L);
            state.add((Object)2L);
            keyedBackend.setCurrentKey((Object)"def");
            Assertions.assertThat((long)((Long)state.get())).isEqualTo(28L);
            state.clear();
            Assertions.assertThat((Long)((Long)state.get())).isNull();
            keyedBackend.setCurrentKey((Object)"g");
            state.add((Object)3L);
            state.add((Object)2L);
            state.add((Object)1L);
            keyedBackend.setCurrentKey((Object)"def");
            Assertions.assertThat((Long)((Long)state.get())).isNull();
            keyedBackend.setCurrentKey((Object)"g");
            Assertions.assertThat((long)((Long)state.get())).isEqualTo(9L);
            state.clear();
            ((AbstractIntegerAssert)Assertions.assertThat((int)((TestableKeyedStateBackend)keyedBackend).numKeyValueStateEntries()).withFailMessage("State backend is not empty.", new Object[0])).isZero();
        }
        finally {
            IOUtils.closeQuietly(keyedBackend);
            keyedBackend.dispose();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @TestTemplate
    void testAggregatingStateMergingWithImmutableAccumulator() throws Exception {
        AggregatingStateDescriptor stateDescr = new AggregatingStateDescriptor("my-state", (AggregateFunction)new ImmutableAggregatingAddingFunction(), Long.class);
        Integer namespace1 = 1;
        Integer namespace2 = 2;
        Integer namespace3 = 3;
        Long expectedResult = 165L;
        CheckpointableKeyedStateBackend keyedBackend = this.createKeyedBackend((TypeSerializer)StringSerializer.INSTANCE);
        try {
            InternalAggregatingState state = (InternalAggregatingState)keyedBackend.getPartitionedState((Object)0, (TypeSerializer)IntSerializer.INSTANCE, (StateDescriptor)stateDescr);
            keyedBackend.setCurrentKey((Object)"abc");
            state.setCurrentNamespace((Object)namespace1);
            state.add((Object)33L);
            state.add((Object)55L);
            state.setCurrentNamespace((Object)namespace2);
            state.add((Object)22L);
            state.add((Object)11L);
            state.setCurrentNamespace((Object)namespace3);
            state.add((Object)44L);
            keyedBackend.setCurrentKey((Object)"def");
            state.setCurrentNamespace((Object)namespace1);
            state.add((Object)11L);
            state.add((Object)44L);
            state.setCurrentNamespace((Object)namespace3);
            state.add((Object)22L);
            state.add((Object)55L);
            state.add((Object)33L);
            keyedBackend.setCurrentKey((Object)"jkl");
            state.setCurrentNamespace((Object)namespace1);
            state.add((Object)11L);
            state.add((Object)22L);
            state.add((Object)33L);
            state.add((Object)44L);
            state.add((Object)55L);
            keyedBackend.setCurrentKey((Object)"mno");
            state.setCurrentNamespace((Object)namespace3);
            state.add((Object)11L);
            state.add((Object)22L);
            state.add((Object)33L);
            state.add((Object)44L);
            state.add((Object)55L);
            keyedBackend.setCurrentKey((Object)"abc");
            state.mergeNamespaces((Object)namespace1, Arrays.asList(namespace2, namespace3));
            state.setCurrentNamespace((Object)namespace1);
            Assertions.assertThat((Long)((Long)state.get())).isEqualTo((Object)expectedResult);
            keyedBackend.setCurrentKey((Object)"def");
            state.mergeNamespaces((Object)namespace1, Arrays.asList(namespace2, namespace3));
            state.setCurrentNamespace((Object)namespace1);
            Assertions.assertThat((Long)((Long)state.get())).isEqualTo((Object)expectedResult);
            keyedBackend.setCurrentKey((Object)"ghi");
            state.mergeNamespaces((Object)namespace1, Arrays.asList(namespace2, namespace3));
            state.setCurrentNamespace((Object)namespace1);
            Assertions.assertThat((Long)((Long)state.get())).isNull();
            keyedBackend.setCurrentKey((Object)"jkl");
            state.mergeNamespaces((Object)namespace1, Arrays.asList(namespace2, namespace3));
            state.setCurrentNamespace((Object)namespace1);
            Assertions.assertThat((Long)((Long)state.get())).isEqualTo((Object)expectedResult);
            keyedBackend.setCurrentKey((Object)"mno");
            state.mergeNamespaces((Object)namespace1, Arrays.asList(namespace2, namespace3));
            state.setCurrentNamespace((Object)namespace1);
            Assertions.assertThat((Long)((Long)state.get())).isEqualTo((Object)expectedResult);
            keyedBackend.setCurrentKey((Object)"abc");
            state.setCurrentNamespace((Object)namespace1);
            state.clear();
            keyedBackend.setCurrentKey((Object)"def");
            state.setCurrentNamespace((Object)namespace1);
            state.clear();
            keyedBackend.setCurrentKey((Object)"ghi");
            state.setCurrentNamespace((Object)namespace1);
            state.clear();
            keyedBackend.setCurrentKey((Object)"jkl");
            state.setCurrentNamespace((Object)namespace1);
            state.clear();
            keyedBackend.setCurrentKey((Object)"mno");
            state.setCurrentNamespace((Object)namespace1);
            state.clear();
            ((AbstractIntegerAssert)Assertions.assertThat((int)((TestableKeyedStateBackend)keyedBackend).numKeyValueStateEntries()).withFailMessage("State backend is not empty.", new Object[0])).isZero();
        }
        finally {
            IOUtils.closeQuietly(keyedBackend);
            keyedBackend.dispose();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @TestTemplate
    void testMapState() throws Exception {
        CheckpointStreamFactory streamFactory = this.createStreamFactory();
        SharedStateRegistryImpl sharedStateRegistry = new SharedStateRegistryImpl();
        MapStateDescriptor kvId = new MapStateDescriptor("id", Integer.class, String.class);
        StringSerializer keySerializer = StringSerializer.INSTANCE;
        VoidNamespaceSerializer namespaceSerializer = VoidNamespaceSerializer.INSTANCE;
        CheckpointableKeyedStateBackend backend = this.createKeyedBackend((TypeSerializer)StringSerializer.INSTANCE);
        try {
            MapState state = (MapState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
            InternalKvState kvState = (InternalKvState)state;
            TypeSerializer userKeySerializer = kvId.getKeySerializer();
            TypeSerializer userValueSerializer = kvId.getValueSerializer();
            backend.setCurrentKey((Object)"1");
            Assertions.assertThat((String)((String)state.get((Object)1))).isNull();
            Assertions.assertThat(StateBackendTestBase.getSerializedMap(kvState, "1", keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, userValueSerializer)).isNull();
            state.put((Object)1, (Object)"1");
            backend.setCurrentKey((Object)"2");
            Assertions.assertThat((String)((String)state.get((Object)2))).isNull();
            Assertions.assertThat(StateBackendTestBase.getSerializedMap(kvState, "2", keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, userValueSerializer)).isNull();
            state.put((Object)2, (Object)"2");
            backend.setCurrentKey((Object)"11");
            state.put((Object)11, (Object)"11");
            backend.setCurrentKey((Object)"1");
            Assertions.assertThat((boolean)state.contains((Object)1)).isTrue();
            Assertions.assertThat((String)((String)state.get((Object)1))).isEqualTo("1");
            Assertions.assertThat(StateBackendTestBase.getSerializedMap(kvState, "1", keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, userValueSerializer)).isEqualTo((Object)new HashMap<Integer, String>(){
                {
                    this.put(1, "1");
                }
            });
            Assertions.assertThat(StateBackendTestBase.getSerializedMap(kvState, "11", keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, userValueSerializer)).isEqualTo((Object)new HashMap<Integer, String>(){
                {
                    this.put(11, "11");
                }
            });
            KeyedStateHandle snapshot1 = StateBackendTestBase.runSnapshot(backend.snapshot(682375462378L, 2L, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()), (SharedStateRegistry)sharedStateRegistry);
            backend.setCurrentKey((Object)"1");
            state.put((Object)1, (Object)"101");
            backend.setCurrentKey((Object)"2");
            state.put((Object)102, (Object)"102");
            backend.setCurrentKey((Object)"3");
            state.put((Object)103, (Object)"103");
            state.putAll((Map)new HashMap<Integer, String>(){
                {
                    this.put(1031, "1031");
                    this.put(1032, "1032");
                }
            });
            KeyedStateHandle snapshot2 = StateBackendTestBase.runSnapshot(backend.snapshot(682375462379L, 4L, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()), (SharedStateRegistry)sharedStateRegistry);
            backend.setCurrentKey((Object)"1");
            Assertions.assertThat((String)((String)state.get((Object)1))).isEqualTo("101");
            Assertions.assertThat(StateBackendTestBase.getSerializedMap(kvState, "1", keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, userValueSerializer)).isEqualTo((Object)new HashMap<Integer, String>(){
                {
                    this.put(1, "101");
                }
            });
            backend.setCurrentKey((Object)"2");
            Assertions.assertThat((String)((String)state.get((Object)102))).isEqualTo("102");
            Assertions.assertThat(StateBackendTestBase.getSerializedMap(kvState, "2", keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, userValueSerializer)).isEqualTo((Object)new HashMap<Integer, String>(){
                {
                    this.put(2, "2");
                    this.put(102, "102");
                }
            });
            backend.setCurrentKey((Object)"3");
            Assertions.assertThat((boolean)state.contains((Object)103)).isTrue();
            Assertions.assertThat((String)((String)state.get((Object)103))).isEqualTo("103");
            Assertions.assertThat(StateBackendTestBase.getSerializedMap(kvState, "3", keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, userValueSerializer)).isEqualTo((Object)new HashMap<Integer, String>(){
                {
                    this.put(103, "103");
                    this.put(1031, "1031");
                    this.put(1032, "1032");
                }
            });
            ArrayList<Integer> keys = new ArrayList<Integer>();
            for (Integer key : state.keys()) {
                keys.add(key);
            }
            List<Integer> expectedKeys = Arrays.asList(103, 1031, 1032);
            Assertions.assertThat(keys).hasSameSizeAs(expectedKeys);
            keys.removeAll(expectedKeys);
            ArrayList<String> values = new ArrayList<String>();
            for (String value : state.values()) {
                values.add(value);
            }
            List<String> expectedValues = Arrays.asList("103", "1031", "1032");
            Assertions.assertThat(values).hasSameSizeAs(expectedValues);
            values.removeAll(expectedValues);
            backend.setCurrentKey((Object)"1");
            state.clear();
            backend.setCurrentKey((Object)"2");
            state.remove((Object)102);
            backend.setCurrentKey((Object)"3");
            String updateSuffix = "_updated";
            Iterator iterator = state.iterator();
            while (iterator.hasNext()) {
                Map.Entry entry = (Map.Entry)iterator.next();
                if (((String)entry.getValue()).length() != 4) {
                    iterator.remove();
                    continue;
                }
                entry.setValue((String)entry.getValue() + "_updated");
            }
            backend.setCurrentKey((Object)"1");
            backend.setCurrentKey((Object)"2");
            Assertions.assertThat((boolean)state.contains((Object)102)).isFalse();
            backend.setCurrentKey((Object)"3");
            for (Map.Entry entry : state.entries()) {
                Assertions.assertThat((String)((String)entry.getValue())).hasSize(4 + "_updated".length());
                Assertions.assertThat((boolean)((String)entry.getValue()).endsWith("_updated")).isTrue();
            }
            backend.dispose();
            backend = this.restoreKeyedBackend((TypeSerializer)StringSerializer.INSTANCE, snapshot1);
            snapshot1.discardState();
            MapState restored1 = (MapState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
            InternalKvState restoredKvState1 = (InternalKvState)restored1;
            backend.setCurrentKey((Object)"1");
            Assertions.assertThat((String)((String)restored1.get((Object)1))).isEqualTo("1");
            Assertions.assertThat(StateBackendTestBase.getSerializedMap(restoredKvState1, "1", keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, userValueSerializer)).isEqualTo((Object)new HashMap<Integer, String>(){
                {
                    this.put(1, "1");
                }
            });
            backend.setCurrentKey((Object)"2");
            Assertions.assertThat((String)((String)restored1.get((Object)2))).isEqualTo("2");
            Assertions.assertThat(StateBackendTestBase.getSerializedMap(restoredKvState1, "2", keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, userValueSerializer)).isEqualTo((Object)new HashMap<Integer, String>(){
                {
                    this.put(2, "2");
                }
            });
            backend.dispose();
            backend = this.restoreKeyedBackend((TypeSerializer)StringSerializer.INSTANCE, snapshot2);
            snapshot2.discardState();
            MapState restored2 = (MapState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
            InternalKvState restoredKvState2 = (InternalKvState)restored2;
            backend.setCurrentKey((Object)"1");
            Assertions.assertThat((String)((String)restored2.get((Object)1))).isEqualTo("101");
            Assertions.assertThat(StateBackendTestBase.getSerializedMap(restoredKvState2, "1", keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, userValueSerializer)).isEqualTo((Object)new HashMap<Integer, String>(){
                {
                    this.put(1, "101");
                }
            });
            backend.setCurrentKey((Object)"2");
            Assertions.assertThat((String)((String)restored2.get((Object)102))).isEqualTo("102");
            Assertions.assertThat(StateBackendTestBase.getSerializedMap(restoredKvState2, "2", keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, userValueSerializer)).isEqualTo((Object)new HashMap<Integer, String>(){
                {
                    this.put(2, "2");
                    this.put(102, "102");
                }
            });
            backend.setCurrentKey((Object)"3");
            Assertions.assertThat((String)((String)restored2.get((Object)103))).isEqualTo("103");
            Assertions.assertThat(StateBackendTestBase.getSerializedMap(restoredKvState2, "3", keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, userValueSerializer)).isEqualTo((Object)new HashMap<Integer, String>(){
                {
                    this.put(103, "103");
                    this.put(1031, "1031");
                    this.put(1032, "1032");
                }
            });
        }
        finally {
            IOUtils.closeQuietly(backend);
            backend.dispose();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @TestTemplate
    void testMapStateIsEmpty() throws Exception {
        MapStateDescriptor kvId = new MapStateDescriptor("id", Integer.class, Long.class);
        CheckpointableKeyedStateBackend backend = this.createKeyedBackend((TypeSerializer)IntSerializer.INSTANCE);
        try {
            int i;
            MapState state = (MapState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
            backend.setCurrentKey((Object)1);
            Assertions.assertThat((boolean)state.isEmpty()).isTrue();
            int stateSize = 1024;
            for (i = 0; i < stateSize; ++i) {
                state.put((Object)i, (Object)((long)i * 2L));
                Assertions.assertThat((boolean)state.isEmpty()).isFalse();
            }
            for (i = 0; i < stateSize; ++i) {
                Assertions.assertThat((boolean)state.isEmpty()).isFalse();
                state.remove((Object)i);
            }
            Assertions.assertThat((boolean)state.isEmpty()).isTrue();
        }
        finally {
            IOUtils.closeQuietly(backend);
            backend.dispose();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @TestTemplate
    void testMapStateIteratorArbitraryAccess() throws Exception {
        MapStateDescriptor kvId = new MapStateDescriptor("id", Integer.class, Long.class);
        CheckpointableKeyedStateBackend backend = this.createKeyedBackend((TypeSerializer)IntSerializer.INSTANCE);
        try {
            MapState state = (MapState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
            backend.setCurrentKey((Object)1);
            int stateSize = 4096;
            for (int i = 0; i < stateSize; ++i) {
                state.put((Object)i, (Object)((long)i * 2L));
            }
            Iterator iterator = state.iterator();
            int iteratorCount = 0;
            while (iterator.hasNext()) {
                Map.Entry entry = (Map.Entry)iterator.next();
                Assertions.assertThat((Integer)((Integer)entry.getKey())).isEqualTo(iteratorCount);
                switch (ThreadLocalRandom.current().nextInt() % 3) {
                    case 0: {
                        iterator.remove();
                        Assertions.assertThatThrownBy(iterator::remove).isInstanceOf(IllegalStateException.class);
                        break;
                    }
                    case 1: {
                        iterator.hasNext();
                        iterator.remove();
                        break;
                    }
                }
                ++iteratorCount;
            }
            Assertions.assertThat((int)iteratorCount).isEqualTo(stateSize);
        }
        finally {
            IOUtils.closeQuietly(backend);
            backend.dispose();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @TestTemplate
    public void testMapStateEntryCompare() throws Exception {
        MapStateDescriptor stateDesc1 = new MapStateDescriptor("map-state-1", Integer.class, Long.class);
        MapStateDescriptor stateDesc2 = new MapStateDescriptor("map-state-2", Integer.class, Long.class);
        CheckpointableKeyedStateBackend backend = this.createKeyedBackend((TypeSerializer)IntSerializer.INSTANCE);
        try {
            MapState state1 = (MapState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)stateDesc1);
            MapState state2 = (MapState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)stateDesc2);
            AbstractMap.SimpleEntry<Integer, Long> expectedEntry = new AbstractMap.SimpleEntry<Integer, Long>(0, 10L);
            backend.setCurrentKey((Object)1);
            state1.put((Object)((Integer)expectedEntry.getKey()), (Object)((Long)expectedEntry.getValue()));
            state2.put((Object)((Integer)expectedEntry.getKey()), (Object)((Long)expectedEntry.getValue()));
            Assertions.assertThat((Object)((Map.Entry)state1.entries().iterator().next())).isEqualTo(expectedEntry);
            Assertions.assertThat((Object)((Map.Entry)state2.entries().iterator().next())).isEqualTo(expectedEntry);
            Assertions.assertThat((Object)((Map.Entry)state1.entries().iterator().next())).isEqualTo(state2.entries().iterator().next());
        }
        finally {
            IOUtils.closeQuietly(backend);
            backend.dispose();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @TestTemplate
    void testValueStateNullAsDefaultValue() throws Exception {
        ValueStateDescriptor kvId = new ValueStateDescriptor("id", String.class, null);
        CheckpointableKeyedStateBackend backend = this.createKeyedBackend((TypeSerializer)IntSerializer.INSTANCE);
        try {
            ValueState state = (ValueState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
            backend.setCurrentKey((Object)1);
            Assertions.assertThat((String)((String)state.value())).isNull();
            state.update((Object)"Ciao");
            Assertions.assertThat((String)((String)state.value())).isEqualTo("Ciao");
            state.clear();
            Assertions.assertThat((String)((String)state.value())).isNull();
        }
        finally {
            IOUtils.closeQuietly(backend);
            backend.dispose();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @TestTemplate
    void testValueStateDefaultValue() throws Exception {
        KeyedStateHandle keyedStateHandle;
        ValueState state;
        ValueStateDescriptor kvId = new ValueStateDescriptor("id", String.class, (Object)"Hello");
        CheckpointableKeyedStateBackend backend = this.createKeyedBackend((TypeSerializer)IntSerializer.INSTANCE);
        try {
            state = (ValueState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
            backend.setCurrentKey((Object)1);
            Assertions.assertThat((String)((String)state.value())).isEqualTo("Hello");
            state.update((Object)"Ciao");
            Assertions.assertThat((String)((String)state.value())).isEqualTo("Ciao");
            state.clear();
            Assertions.assertThat((String)((String)state.value())).isEqualTo("Hello");
            keyedStateHandle = StateBackendTestBase.runSnapshot(backend.snapshot(1L, 1L, this.createStreamFactory(), CheckpointOptions.forCheckpointWithDefaultLocation()), (SharedStateRegistry)new SharedStateRegistryImpl());
        }
        finally {
            IOUtils.closeQuietly(backend);
            backend.dispose();
        }
        try {
            backend = this.restoreKeyedBackend((TypeSerializer)IntSerializer.INSTANCE, keyedStateHandle);
            state = (ValueState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
            backend.setCurrentKey((Object)1);
            Assertions.assertThat((String)((String)state.value())).isEqualTo("Hello");
        }
        finally {
            IOUtils.closeQuietly(backend);
            backend.dispose();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @TestTemplate
    void testReducingStateDefaultValue() throws Exception {
        ReducingStateDescriptor kvId = new ReducingStateDescriptor("id", (ReduceFunction)new AppendingReduce(), String.class);
        CheckpointableKeyedStateBackend backend = this.createKeyedBackend((TypeSerializer)IntSerializer.INSTANCE);
        try {
            ReducingState state = (ReducingState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
            backend.setCurrentKey((Object)1);
            Assertions.assertThat((String)((String)state.get())).isNull();
            state.add((Object)"Ciao");
            Assertions.assertThat((String)((String)state.get())).isEqualTo("Ciao");
            state.clear();
            Assertions.assertThat((String)((String)state.get())).isNull();
        }
        finally {
            IOUtils.closeQuietly(backend);
            backend.dispose();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @TestTemplate
    void testListStateDefaultValue() throws Exception {
        ListStateDescriptor kvId = new ListStateDescriptor("id", String.class);
        CheckpointableKeyedStateBackend backend = this.createKeyedBackend((TypeSerializer)IntSerializer.INSTANCE);
        try {
            ListState state = (ListState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
            backend.setCurrentKey((Object)1);
            Assertions.assertThat((Iterable)((Iterable)state.get())).isNull();
            state.update(Arrays.asList("Ciao", "Bello"));
            Assertions.assertThat((Iterable)((Iterable)state.get())).containsExactlyInAnyOrder((Object[])new String[]{"Ciao", "Bello"});
            state.clear();
            Assertions.assertThat((Iterable)((Iterable)state.get())).isNull();
        }
        finally {
            IOUtils.closeQuietly(backend);
            backend.dispose();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @TestTemplate
    void testMapStateDefaultValue() throws Exception {
        MapStateDescriptor kvId = new MapStateDescriptor("id", String.class, String.class);
        CheckpointableKeyedStateBackend backend = this.createKeyedBackend((TypeSerializer)IntSerializer.INSTANCE);
        try {
            MapState state = (MapState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
            backend.setCurrentKey((Object)1);
            Assertions.assertThat((Iterable)state.entries()).isNotNull();
            Assertions.assertThat(state.entries().iterator()).isExhausted();
            state.put((Object)"Ciao", (Object)"Hello");
            state.put((Object)"Bello", (Object)"Nice");
            Assertions.assertThat((Iterable)state.entries()).isNotNull();
            Assertions.assertThat((String)((String)state.get((Object)"Ciao"))).isEqualTo("Hello");
            Assertions.assertThat((String)((String)state.get((Object)"Bello"))).isEqualTo("Nice");
            state.clear();
            Assertions.assertThat((Iterable)state.entries()).isNotNull();
            Assertions.assertThat(state.entries().iterator()).isExhausted();
        }
        finally {
            IOUtils.closeQuietly(backend);
            backend.dispose();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @TestTemplate
    void testSnapshotNonAccessedState() throws Exception {
        CheckpointStreamFactory streamFactory = this.createStreamFactory();
        SharedStateRegistryImpl sharedStateRegistry = new SharedStateRegistryImpl();
        String stateName = "test-name";
        CheckpointableKeyedStateBackend backend = this.createKeyedBackend((TypeSerializer)StringSerializer.INSTANCE);
        try {
            MapStateDescriptor kvId = new MapStateDescriptor("test-name", Integer.class, String.class);
            MapState mapState = (MapState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
            backend.setCurrentKey((Object)"1");
            mapState.put((Object)11, (Object)"foo");
            backend.setCurrentKey((Object)"2");
            mapState.put((Object)8, (Object)"bar");
            backend.setCurrentKey((Object)"3");
            mapState.put((Object)91, (Object)"hello world");
            KeyedStateHandle snapshot = StateBackendTestBase.runSnapshot(backend.snapshot(1L, 2L, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()), (SharedStateRegistry)sharedStateRegistry);
            backend.dispose();
            backend = this.restoreKeyedBackend((TypeSerializer)StringSerializer.INSTANCE, snapshot);
            snapshot = StateBackendTestBase.runSnapshot(backend.snapshot(2L, 3L, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()), (SharedStateRegistry)sharedStateRegistry);
            backend.dispose();
            backend = this.restoreKeyedBackend((TypeSerializer)StringSerializer.INSTANCE, snapshot);
            mapState = (MapState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
            backend.setCurrentKey((Object)"1");
            Assertions.assertThat((String)((String)mapState.get((Object)11))).isEqualTo("foo");
            backend.setCurrentKey((Object)"2");
            Assertions.assertThat((String)((String)mapState.get((Object)8))).isEqualTo("bar");
            backend.setCurrentKey((Object)"3");
            Assertions.assertThat((String)((String)mapState.get((Object)91))).isEqualTo("hello world");
            snapshot.discardState();
        }
        finally {
            IOUtils.closeQuietly(backend);
            backend.dispose();
        }
    }

    @TestTemplate
    void testKeyGroupSnapshotRestoreScaleDown() throws Exception {
        this.testKeyGroupSnapshotRestore(4, 2, 128);
    }

    @TestTemplate
    void testKeyGroupSnapshotRestoreScaleUp() throws Exception {
        this.testKeyGroupSnapshotRestore(2, 4, 128);
    }

    @TestTemplate
    void testKeyGroupsSnapshotRestoreNoRescale() throws Exception {
        this.testKeyGroupSnapshotRestore(2, 2, 128);
    }

    @TestTemplate
    void testKeyGroupsSnapshotRestoreScaleUpUnEvenDistribute() throws Exception {
        this.testKeyGroupSnapshotRestore(15, 77, 128);
    }

    @TestTemplate
    void testKeyGroupsSnapshotRestoreScaleDownUnEvenDistribute() throws Exception {
        this.testKeyGroupSnapshotRestore(77, 15, 128);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void testKeyGroupSnapshotRestore(int sourceParallelism, int targetParallelism, int maxParallelism) throws Exception {
        int i;
        Preconditions.checkArgument((sourceParallelism > 0 ? 1 : 0) != 0, (String)"parallelism must be positive, current is %s.", (Object[])new Object[]{sourceParallelism});
        Preconditions.checkArgument((targetParallelism > 0 ? 1 : 0) != 0, (String)"parallelism must be positive, current is %s.", (Object[])new Object[]{targetParallelism});
        Preconditions.checkArgument((sourceParallelism <= maxParallelism ? 1 : 0) != 0, (Object)"Maximum parallelism must not be smaller than parallelism.");
        Preconditions.checkArgument((targetParallelism <= maxParallelism ? 1 : 0) != 0, (Object)"Maximum parallelism must not be smaller than parallelism.");
        Random random = new Random();
        ArrayList<ValueStateDescriptor> stateDescriptors = new ArrayList<ValueStateDescriptor>(maxParallelism);
        ArrayList<Integer> keyInKeyGroups = new ArrayList<Integer>(maxParallelism);
        ArrayList<CallSite> expectedValue = new ArrayList<CallSite>(maxParallelism);
        for (int i2 = 0; i2 < maxParallelism; ++i2) {
            stateDescriptors.add(new ValueStateDescriptor("state" + i2, String.class));
        }
        CheckpointStreamFactory streamFactory = this.createStreamFactory();
        SharedStateRegistryImpl sharedStateRegistry = new SharedStateRegistryImpl();
        ArrayList<KeyedStateHandle> snapshots = new ArrayList<KeyedStateHandle>(sourceParallelism);
        for (int i3 = 0; i3 < sourceParallelism; ++i3) {
            KeyGroupRange range = KeyGroupRange.of((int)(maxParallelism * i3 / sourceParallelism), (int)(maxParallelism * (i3 + 1) / sourceParallelism - 1));
            CheckpointableKeyedStateBackend backend = this.createKeyedBackend((TypeSerializer)IntSerializer.INSTANCE, maxParallelism, range, this.env);
            try {
                for (int j = range.getStartKeyGroup(); j <= range.getEndKeyGroup(); ++j) {
                    ValueState state = (ValueState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)stateDescriptors.get(j));
                    int keyInKeyGroup = this.getKeyInKeyGroup(random, maxParallelism, KeyGroupRange.of((int)j, (int)j));
                    backend.setCurrentKey((Object)keyInKeyGroup);
                    keyInKeyGroups.add(keyInKeyGroup);
                    String updateValue = i3 + ":" + j;
                    state.update((Object)updateValue);
                    expectedValue.add((CallSite)((Object)updateValue));
                }
                snapshots.add(StateBackendTestBase.runSnapshot(backend.snapshot(0L, 0L, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()), (SharedStateRegistry)sharedStateRegistry));
                continue;
            }
            finally {
                IOUtils.closeQuietly(backend);
                backend.dispose();
            }
        }
        ArrayList<KeyGroupRange> keyGroupRangesRestore = new ArrayList<KeyGroupRange>();
        for (int i4 = 0; i4 < targetParallelism; ++i4) {
            keyGroupRangesRestore.add(KeyGroupRangeAssignment.computeKeyGroupRangeForOperatorIndex((int)maxParallelism, (int)targetParallelism, (int)i4));
        }
        ArrayList keyGroupStatesAfterDistribute = new ArrayList(targetParallelism);
        for (i = 0; i < targetParallelism; ++i) {
            ArrayList keyedStateHandles = new ArrayList();
            StateAssignmentOperation.extractIntersectingState(snapshots, (KeyGroupRange)((KeyGroupRange)keyGroupRangesRestore.get(i)), keyedStateHandles);
            keyGroupStatesAfterDistribute.add(keyedStateHandles);
        }
        for (i = 0; i < targetParallelism; ++i) {
            CheckpointableKeyedStateBackend backend = this.restoreKeyedBackend((TypeSerializer)IntSerializer.INSTANCE, maxParallelism, (KeyGroupRange)keyGroupRangesRestore.get(i), (List)keyGroupStatesAfterDistribute.get(i), this.env);
            try {
                KeyGroupRange range = (KeyGroupRange)keyGroupRangesRestore.get(i);
                for (int j = range.getStartKeyGroup(); j <= range.getEndKeyGroup(); ++j) {
                    ValueState state = (ValueState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)stateDescriptors.get(j));
                    backend.setCurrentKey((Object)((Integer)keyInKeyGroups.get(j)));
                    Assertions.assertThat((String)((String)state.value())).isEqualTo((String)expectedValue.get(j));
                }
                continue;
            }
            finally {
                IOUtils.closeQuietly(backend);
                backend.dispose();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @TestTemplate
    void testRestoreWithWrongKeySerializer() throws Exception {
        KeyedStateHandle snapshot;
        Assumptions.assumeThat((boolean)this.supportsMetaInfoVerification()).isTrue();
        CheckpointStreamFactory streamFactory = this.createStreamFactory();
        SharedStateRegistryImpl sharedStateRegistry = new SharedStateRegistryImpl();
        ValueStateDescriptor kvId = new ValueStateDescriptor("id", String.class);
        CheckpointableKeyedStateBackend backend = this.createKeyedBackend((TypeSerializer)IntSerializer.INSTANCE);
        try {
            ValueState state = (ValueState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
            backend.setCurrentKey((Object)1);
            state.update((Object)"1");
            backend.setCurrentKey((Object)2);
            state.update((Object)"2");
            snapshot = StateBackendTestBase.runSnapshot(backend.snapshot(682375462378L, 2L, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()), (SharedStateRegistry)sharedStateRegistry);
        }
        finally {
            IOUtils.closeQuietly(backend);
            backend.dispose();
        }
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> this.restoreKeyedBackend((TypeSerializer)DoubleSerializer.INSTANCE, snapshot)).withFailMessage("should recognize wrong key serializer", new Object[0])).satisfiesAnyOf(new ThrowingConsumer[]{e -> Assertions.assertThat((Throwable)e).isInstanceOf(StateMigrationException.class), e -> Assertions.assertThat((Throwable)e).hasCauseInstanceOf(StateMigrationException.class)});
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @TestTemplate
    void testValueStateRestoreWithWrongSerializers() throws Exception {
        Assumptions.assumeThat((boolean)this.supportsMetaInfoVerification()).isTrue();
        CheckpointStreamFactory streamFactory = this.createStreamFactory();
        SharedStateRegistryImpl sharedStateRegistry = new SharedStateRegistryImpl();
        CheckpointableKeyedStateBackend backend = this.createKeyedBackend((TypeSerializer)IntSerializer.INSTANCE);
        try {
            ValueStateDescriptor kvId = new ValueStateDescriptor("id", String.class);
            ValueState state = (ValueState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
            backend.setCurrentKey((Object)1);
            state.update((Object)"1");
            backend.setCurrentKey((Object)2);
            state.update((Object)"2");
            KeyedStateHandle snapshot1 = StateBackendTestBase.runSnapshot(backend.snapshot(682375462378L, 2L, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()), (SharedStateRegistry)sharedStateRegistry);
            backend.dispose();
            backend = this.restoreKeyedBackend((TypeSerializer)IntSerializer.INSTANCE, snapshot1);
            snapshot1.discardState();
            FloatSerializer fakeStringSerializer = FloatSerializer.INSTANCE;
            CheckpointableKeyedStateBackend restoredBackend = backend;
            ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> StateBackendTestBase.lambda$testValueStateRestoreWithWrongSerializers$16(restoredBackend, (TypeSerializer)fakeStringSerializer)).withFailMessage("should recognize wrong serializers", new Object[0])).isInstanceOf(StateMigrationException.class);
        }
        finally {
            IOUtils.closeQuietly(backend);
            backend.dispose();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @TestTemplate
    void testListStateRestoreWithWrongSerializers() throws Exception {
        Assumptions.assumeThat((boolean)this.supportsMetaInfoVerification()).isTrue();
        CheckpointStreamFactory streamFactory = this.createStreamFactory();
        SharedStateRegistryImpl sharedStateRegistry = new SharedStateRegistryImpl();
        CheckpointableKeyedStateBackend backend = this.createKeyedBackend((TypeSerializer)IntSerializer.INSTANCE);
        try {
            ListStateDescriptor kvId = new ListStateDescriptor("id", String.class);
            ListState state = (ListState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
            backend.setCurrentKey((Object)1);
            state.add((Object)"1");
            backend.setCurrentKey((Object)2);
            state.add((Object)"2");
            KeyedStateHandle snapshot1 = StateBackendTestBase.runSnapshot(backend.snapshot(682375462378L, 2L, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()), (SharedStateRegistry)sharedStateRegistry);
            backend.dispose();
            backend = this.restoreKeyedBackend((TypeSerializer)IntSerializer.INSTANCE, snapshot1);
            snapshot1.discardState();
            FloatSerializer fakeStringSerializer = FloatSerializer.INSTANCE;
            CheckpointableKeyedStateBackend restoredBackend = backend;
            ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> StateBackendTestBase.lambda$testListStateRestoreWithWrongSerializers$17(restoredBackend, (TypeSerializer)fakeStringSerializer)).withFailMessage("should recognize wrong serializers", new Object[0])).isInstanceOf(StateMigrationException.class);
        }
        finally {
            IOUtils.closeQuietly(backend);
            backend.dispose();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @TestTemplate
    void testReducingStateRestoreWithWrongSerializers() throws Exception {
        Assumptions.assumeThat((boolean)this.supportsMetaInfoVerification()).isTrue();
        CheckpointStreamFactory streamFactory = this.createStreamFactory();
        SharedStateRegistryImpl sharedStateRegistry = new SharedStateRegistryImpl();
        CheckpointableKeyedStateBackend backend = this.createKeyedBackend((TypeSerializer)IntSerializer.INSTANCE);
        try {
            ReducingStateDescriptor kvId = new ReducingStateDescriptor("id", (ReduceFunction)new AppendingReduce(), (TypeSerializer)StringSerializer.INSTANCE);
            ReducingState state = (ReducingState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
            backend.setCurrentKey((Object)1);
            state.add((Object)"1");
            backend.setCurrentKey((Object)2);
            state.add((Object)"2");
            KeyedStateHandle snapshot1 = StateBackendTestBase.runSnapshot(backend.snapshot(682375462378L, 2L, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()), (SharedStateRegistry)sharedStateRegistry);
            backend.dispose();
            backend = this.restoreKeyedBackend((TypeSerializer)IntSerializer.INSTANCE, snapshot1);
            snapshot1.discardState();
            FloatSerializer fakeStringSerializer = FloatSerializer.INSTANCE;
            CheckpointableKeyedStateBackend restoredBackend = backend;
            ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> StateBackendTestBase.lambda$testReducingStateRestoreWithWrongSerializers$18(restoredBackend, (TypeSerializer)fakeStringSerializer)).withFailMessage("should recognize wrong serializers", new Object[0])).isInstanceOf(StateMigrationException.class);
        }
        finally {
            IOUtils.closeQuietly(backend);
            backend.dispose();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @TestTemplate
    void testMapStateRestoreWithWrongSerializers() throws Exception {
        Assumptions.assumeThat((boolean)this.supportsMetaInfoVerification()).isTrue();
        CheckpointStreamFactory streamFactory = this.createStreamFactory();
        SharedStateRegistryImpl sharedStateRegistry = new SharedStateRegistryImpl();
        CheckpointableKeyedStateBackend backend = this.createKeyedBackend((TypeSerializer)IntSerializer.INSTANCE);
        try {
            MapStateDescriptor kvId = new MapStateDescriptor("id", (TypeSerializer)StringSerializer.INSTANCE, (TypeSerializer)StringSerializer.INSTANCE);
            MapState state = (MapState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
            backend.setCurrentKey((Object)1);
            state.put((Object)"1", (Object)"First");
            backend.setCurrentKey((Object)2);
            state.put((Object)"2", (Object)"Second");
            KeyedStateHandle snapshot1 = StateBackendTestBase.runSnapshot(backend.snapshot(682375462378L, 2L, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()), (SharedStateRegistry)sharedStateRegistry);
            backend.dispose();
            backend = this.restoreKeyedBackend((TypeSerializer)IntSerializer.INSTANCE, snapshot1);
            snapshot1.discardState();
            FloatSerializer fakeStringSerializer = FloatSerializer.INSTANCE;
            CheckpointableKeyedStateBackend restoredBackend = backend;
            ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> StateBackendTestBase.lambda$testMapStateRestoreWithWrongSerializers$19(restoredBackend, (TypeSerializer)fakeStringSerializer)).withFailMessage("should recognize wrong serializers", new Object[0])).isInstanceOf(StateMigrationException.class);
        }
        finally {
            IOUtils.closeQuietly(backend);
            backend.dispose();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @TestTemplate
    void testCopyDefaultValue() throws Exception {
        ValueStateDescriptor kvId = new ValueStateDescriptor("id", IntValue.class, (Object)new IntValue(-1));
        CheckpointableKeyedStateBackend backend = this.createKeyedBackend((TypeSerializer)IntSerializer.INSTANCE);
        try {
            ValueState state = (ValueState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
            backend.setCurrentKey((Object)1);
            IntValue default1 = (IntValue)state.value();
            backend.setCurrentKey((Object)2);
            IntValue default2 = (IntValue)state.value();
            Assertions.assertThat((Comparable)default1).isNotNull();
            Assertions.assertThat((Comparable)default2).isNotNull();
            Assertions.assertThat((Comparable)default2).isEqualTo((Object)default1);
            Assertions.assertThat((Comparable)default2).isNotSameAs((Object)default1);
        }
        finally {
            IOUtils.closeQuietly(backend);
            backend.dispose();
        }
    }

    @TestTemplate
    void testRequireNonNullNamespace() throws Exception {
        ValueStateDescriptor kvId = new ValueStateDescriptor("id", IntValue.class, (Object)new IntValue(-1));
        CheckpointableKeyedStateBackend backend = this.createKeyedBackend((TypeSerializer)IntSerializer.INSTANCE);
        try {
            Assertions.assertThatThrownBy(() -> backend.getPartitionedState(null, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId)).isInstanceOf(NullPointerException.class);
            Assertions.assertThatThrownBy(() -> backend.getPartitionedState((Object)VoidNamespace.INSTANCE, null, (StateDescriptor)kvId)).isInstanceOf(NullPointerException.class);
            Assertions.assertThatThrownBy(() -> backend.getPartitionedState(null, null, (StateDescriptor)kvId)).isInstanceOf(NullPointerException.class);
        }
        finally {
            IOUtils.closeQuietly(backend);
            backend.dispose();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void testConcurrentMapIfQueryable() throws Exception {
        boolean numberOfKeyGroups = true;
        CheckpointableKeyedStateBackend backend = this.createKeyedBackend((TypeSerializer)IntSerializer.INSTANCE, 1, new KeyGroupRange(0, 0), new DummyEnvironment());
        try {
            ValueStateDescriptor desc = new ValueStateDescriptor("value-state", Integer.class, (Object)-1);
            desc.setQueryable("my-query");
            ValueState state = (ValueState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)desc);
            InternalKvState kvState = (InternalKvState)state;
            Assertions.assertThat((Object)kvState).isInstanceOf(AbstractHeapState.class);
            kvState.setCurrentNamespace((Object)VoidNamespace.INSTANCE);
            backend.setCurrentKey((Object)1);
            state.update((Object)121818273);
            ((IterableAssert)Assertions.assertThat((Iterable)((AbstractHeapState)kvState).getStateTable()).withFailMessage("State not set", new Object[0])).isNotNull();
            desc = new ListStateDescriptor("list-state", Integer.class);
            desc.setQueryable("my-query");
            state = (ListState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)desc);
            kvState = (InternalKvState)state;
            Assertions.assertThat((Object)kvState).isInstanceOf(AbstractHeapState.class);
            kvState.setCurrentNamespace((Object)VoidNamespace.INSTANCE);
            backend.setCurrentKey((Object)1);
            state.add((Object)121818273);
            ((IterableAssert)Assertions.assertThat((Iterable)((AbstractHeapState)kvState).getStateTable()).withFailMessage("State not set", new Object[0])).isNotNull();
            desc = new ReducingStateDescriptor("reducing-state", (ReduceFunction)new ReduceFunction<Integer>(){

                public Integer reduce(Integer value1, Integer value2) throws Exception {
                    return value1 + value2;
                }
            }, Integer.class);
            desc.setQueryable("my-query");
            state = (ReducingState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)desc);
            kvState = (InternalKvState)state;
            Assertions.assertThat((Object)kvState).isInstanceOf(AbstractHeapState.class);
            kvState.setCurrentNamespace((Object)VoidNamespace.INSTANCE);
            backend.setCurrentKey((Object)1);
            state.add((Object)121818273);
            ((IterableAssert)Assertions.assertThat((Iterable)((AbstractHeapState)kvState).getStateTable()).withFailMessage("State not set", new Object[0])).isNotNull();
            desc = new MapStateDescriptor("map-state", Integer.class, String.class);
            desc.setQueryable("my-query");
            state = (MapState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)desc);
            kvState = (InternalKvState)state;
            Assertions.assertThat((Object)kvState).isInstanceOf(AbstractHeapState.class);
            kvState.setCurrentNamespace((Object)VoidNamespace.INSTANCE);
            backend.setCurrentKey((Object)1);
            state.put((Object)121818273, (Object)"121818273");
            int keyGroupIndex = KeyGroupRangeAssignment.assignToKeyGroup((Object)1, (int)1);
            StateTable stateTable = ((AbstractHeapState)kvState).getStateTable();
            ((ObjectAssert)Assertions.assertThat((Object)stateTable.get((Object)keyGroupIndex)).withFailMessage("State not set", new Object[0])).isNotNull();
        }
        finally {
            IOUtils.closeQuietly(backend);
            backend.dispose();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @TestTemplate
    void testQueryableStateRegistration() throws Exception {
        KvStateRegistry registry = this.env.getKvStateRegistry();
        CheckpointStreamFactory streamFactory = this.createStreamFactory();
        SharedStateRegistryImpl sharedStateRegistry = new SharedStateRegistryImpl();
        CheckpointableKeyedStateBackend backend = this.createKeyedBackend((TypeSerializer)IntSerializer.INSTANCE, this.env);
        try {
            KeyGroupRange expectedKeyGroupRange = backend.getKeyGroupRange();
            TestingKvStateRegistryListener listener = new TestingKvStateRegistryListener();
            registry.registerListener(HighAvailabilityServices.DEFAULT_JOB_ID, (KvStateRegistryListener)listener);
            ValueStateDescriptor desc = new ValueStateDescriptor("test", (TypeSerializer)IntSerializer.INSTANCE);
            desc.setQueryable("banana");
            backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)desc);
            Assertions.assertThat((boolean)listener.isRegistered(this.env.getJobID(), this.env.getJobVertexId(), expectedKeyGroupRange, "banana")).isTrue();
            KeyedStateHandle snapshot = StateBackendTestBase.runSnapshot(backend.snapshot(682375462379L, 4L, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()), (SharedStateRegistry)sharedStateRegistry);
            backend.dispose();
            Assertions.assertThat((boolean)listener.isRegistered(this.env.getJobID(), this.env.getJobVertexId(), expectedKeyGroupRange, "banana")).isFalse();
            backend.dispose();
            backend = this.restoreKeyedBackend((TypeSerializer)IntSerializer.INSTANCE, snapshot, this.env);
            if (snapshot != null) {
                snapshot.discardState();
            }
            backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)desc);
            Assertions.assertThat((boolean)listener.isRegistered(this.env.getJobID(), this.env.getJobVertexId(), expectedKeyGroupRange, "banana")).isTrue();
        }
        finally {
            IOUtils.closeQuietly(backend);
            backend.dispose();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @TestTemplate
    void testEmptyStateCheckpointing() throws Exception {
        CheckpointableKeyedStateBackend backend = this.createKeyedBackend((TypeSerializer)IntSerializer.INSTANCE);
        try {
            CheckpointStreamFactory streamFactory = this.createStreamFactory();
            SharedStateRegistryImpl sharedStateRegistry = new SharedStateRegistryImpl();
            KeyedStateHandle snapshot = StateBackendTestBase.runSnapshot(backend.snapshot(682375462379L, 1L, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()), (SharedStateRegistry)sharedStateRegistry);
            Assertions.assertThat((Object)snapshot).isNull();
            backend.dispose();
            backend = this.restoreKeyedBackend((TypeSerializer)IntSerializer.INSTANCE, snapshot);
        }
        catch (Exception e) {
            e.printStackTrace();
            Assertions.fail((String)e.getMessage());
        }
        finally {
            IOUtils.closeQuietly(backend);
            backend.dispose();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @TestTemplate
    void testNumStateEntries() throws Exception {
        ValueStateDescriptor kvId = new ValueStateDescriptor("id", String.class);
        CheckpointableKeyedStateBackend backend = this.createKeyedBackend((TypeSerializer)IntSerializer.INSTANCE);
        try {
            Assertions.assertThat((int)((TestableKeyedStateBackend)backend).numKeyValueStateEntries()).isZero();
            ValueState state = (ValueState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
            backend.setCurrentKey((Object)0);
            state.update((Object)"hello");
            state.update((Object)"ciao");
            Assertions.assertThat((int)((TestableKeyedStateBackend)backend).numKeyValueStateEntries()).isOne();
            backend.setCurrentKey((Object)42);
            state.update((Object)"foo");
            Assertions.assertThat((int)((TestableKeyedStateBackend)backend).numKeyValueStateEntries()).isEqualTo(2);
            backend.setCurrentKey((Object)0);
            state.clear();
            Assertions.assertThat((int)((TestableKeyedStateBackend)backend).numKeyValueStateEntries()).isOne();
            backend.setCurrentKey((Object)42);
            state.clear();
            Assertions.assertThat((int)((TestableKeyedStateBackend)backend).numKeyValueStateEntries()).isZero();
        }
        finally {
            IOUtils.closeQuietly(backend);
            backend.dispose();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @TestTemplate
    void testParallelAsyncSnapshots() throws Exception {
        Assumptions.assumeThat((boolean)this.snapshotUsesStreamFactory()).isTrue();
        OneShotLatch blocker = new OneShotLatch();
        OneShotLatch waiter = new OneShotLatch();
        BlockerCheckpointStreamFactory streamFactory = new BlockerCheckpointStreamFactory(0x100000);
        streamFactory.setWaiterLatch(waiter);
        streamFactory.setBlockerLatch(blocker);
        streamFactory.setAfterNumberInvocations(10);
        CheckpointableKeyedStateBackend backend = this.createKeyedBackend((TypeSerializer)IntSerializer.INSTANCE);
        try {
            if (!this.supportsAsynchronousSnapshots()) {
                return;
            }
            InternalValueState valueState = (InternalValueState)backend.createOrUpdateInternalState((TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)new ValueStateDescriptor("test", (TypeSerializer)IntSerializer.INSTANCE));
            valueState.setCurrentNamespace((Object)VoidNamespace.INSTANCE);
            for (int i = 0; i < 10; ++i) {
                backend.setCurrentKey((Object)i);
                valueState.update((Object)i);
            }
            RunnableFuture snapshot1 = backend.snapshot(0L, 0L, (CheckpointStreamFactory)streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation());
            Thread runner1 = new Thread((Runnable)snapshot1, "snapshot-1-runner");
            runner1.start();
            waiter.await();
            for (int i = 5; i < 15; ++i) {
                backend.setCurrentKey((Object)i);
                valueState.update((Object)(i + 1));
            }
            streamFactory.setWaiterLatch(null);
            streamFactory.setBlockerLatch(null);
            RunnableFuture snapshot2 = backend.snapshot(1L, 1L, (CheckpointStreamFactory)streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation());
            Thread runner2 = new Thread((Runnable)snapshot2, "snapshot-2-runner");
            runner2.start();
            snapshot2.get();
            blocker.trigger();
            snapshot1.get();
        }
        finally {
            IOUtils.closeQuietly(backend);
            backend.dispose();
        }
    }

    @TestTemplate
    void testNonConcurrentSnapshotTransformerAccess() throws Exception {
        BlockerCheckpointStreamFactory streamFactory = new BlockerCheckpointStreamFactory(0x100000);
        CheckpointableKeyedStateBackend backend = null;
        try {
            backend = this.createKeyedBackend((TypeSerializer)IntSerializer.INSTANCE);
            new StateSnapshotTransformerTest(backend, streamFactory).testNonConcurrentSnapshotTransformerAccess();
        }
        finally {
            if (backend != null) {
                IOUtils.closeQuietly(backend);
                backend.dispose();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @TestTemplate
    void testAsyncSnapshot() throws Exception {
        InternalValueState valueState;
        Assumptions.assumeThat((boolean)this.snapshotUsesStreamFactory()).isTrue();
        OneShotLatch waiter = new OneShotLatch();
        BlockerCheckpointStreamFactory streamFactory = new BlockerCheckpointStreamFactory(0x100000);
        streamFactory.setWaiterLatch(waiter);
        CheckpointableKeyedStateBackend backend = null;
        KeyedStateHandle stateHandle = null;
        try {
            backend = this.createKeyedBackend((TypeSerializer)IntSerializer.INSTANCE);
            valueState = (InternalValueState)backend.createOrUpdateInternalState((TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)new ValueStateDescriptor("test", (TypeSerializer)IntSerializer.INSTANCE));
            valueState.setCurrentNamespace((Object)VoidNamespace.INSTANCE);
            for (int i = 0; i < 10; ++i) {
                backend.setCurrentKey((Object)i);
                valueState.update((Object)i);
            }
            RunnableFuture snapshot = backend.snapshot(0L, 0L, (CheckpointStreamFactory)streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation());
            Thread runner = new Thread(snapshot);
            runner.start();
            for (int i = 0; i < 20; ++i) {
                backend.setCurrentKey((Object)i);
                valueState.update((Object)(i + 1));
                if (10 != i) continue;
                waiter.await();
            }
            runner.join();
            SnapshotResult snapshotResult = (SnapshotResult)snapshot.get();
            stateHandle = (KeyedStateHandle)snapshotResult.getJobManagerOwnedSnapshot();
            for (int i = 0; i < 20; ++i) {
                backend.setCurrentKey((Object)i);
                Assertions.assertThat((int)((Integer)valueState.value())).isEqualTo(i + 1);
            }
        }
        finally {
            if (null != backend) {
                IOUtils.closeQuietly(backend);
                backend.dispose();
            }
        }
        Assertions.assertThat((Object)stateHandle).isNotNull();
        backend = null;
        try {
            backend = this.restoreKeyedBackend((TypeSerializer)IntSerializer.INSTANCE, stateHandle);
            valueState = (InternalValueState)backend.createOrUpdateInternalState((TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)new ValueStateDescriptor("test", (TypeSerializer)IntSerializer.INSTANCE));
            valueState.setCurrentNamespace((Object)VoidNamespace.INSTANCE);
            for (int i = 0; i < 10; ++i) {
                backend.setCurrentKey((Object)i);
                Assertions.assertThat((int)((Integer)valueState.value())).isEqualTo(i);
            }
            backend.setCurrentKey((Object)11);
            Assertions.assertThat((Integer)((Integer)valueState.value())).isNull();
        }
        finally {
            if (null != backend) {
                IOUtils.closeQuietly(backend);
                backend.dispose();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @TestTemplate
    void testConcurrentModificationWithApplyToAllKeys() throws Exception {
        CheckpointableKeyedStateBackend backend = this.createKeyedBackend((TypeSerializer)IntSerializer.INSTANCE);
        try {
            ListStateDescriptor listStateDescriptor = new ListStateDescriptor("foo", (TypeSerializer)StringSerializer.INSTANCE);
            ListState listState = (ListState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)listStateDescriptor);
            for (int i = 0; i < 100; ++i) {
                backend.setCurrentKey((Object)i);
                listState.add((Object)("Hello" + i));
            }
            backend.applyToAllKeys((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)listStateDescriptor, (KeyedStateFunction)new KeyedStateFunction<Integer, ListState<String>>(){

                public void process(Integer key, ListState<String> state) throws Exception {
                    Assertions.assertThat((String)((String)((Iterable)state.get()).iterator().next())).isEqualTo("Hello" + key);
                }
            });
            backend.applyToAllKeys((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)listStateDescriptor, (KeyedStateFunction)new KeyedStateFunction<Integer, ListState<String>>(){

                public void process(Integer key, ListState<String> state) throws Exception {
                    state.clear();
                }
            });
            backend.applyToAllKeys((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)listStateDescriptor, (KeyedStateFunction)new KeyedStateFunction<Integer, ListState<String>>(){

                public void process(Integer key, ListState<String> state) throws Exception {
                    Assertions.assertThat(((Iterable)state.get()).iterator()).isExhausted();
                }
            });
            backend.applyToAllKeys((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)listStateDescriptor, (KeyedStateFunction)new KeyedStateFunction<Integer, ListState<String>>(){

                public void process(Integer key, ListState<String> state) throws Exception {
                    state.add((Object)("Hello" + key));
                    state.clear();
                    state.add((Object)("Hello_" + key));
                }
            });
            backend.applyToAllKeys((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)listStateDescriptor, (KeyedStateFunction)new KeyedStateFunction<Integer, ListState<String>>(){

                public void process(Integer key, ListState<String> state) throws Exception {
                    Iterator it = ((Iterable)state.get()).iterator();
                    Assertions.assertThat((String)((String)it.next())).isEqualTo("Hello_" + key);
                    Assertions.assertThat(it).isExhausted();
                }
            });
        }
        finally {
            IOUtils.closeQuietly(backend);
            backend.dispose();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @TestTemplate
    void testApplyToAllKeysLambdaFunction() throws Exception {
        CheckpointableKeyedStateBackend backend = this.createKeyedBackend((TypeSerializer)IntSerializer.INSTANCE);
        try {
            ListStateDescriptor listStateDescriptor = new ListStateDescriptor("foo", (TypeSerializer)StringSerializer.INSTANCE);
            ListState listState = (ListState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)listStateDescriptor);
            for (int i = 0; i < 100; ++i) {
                backend.setCurrentKey((Object)i);
                listState.add((Object)("Hello" + i));
            }
            backend.applyToAllKeys((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)listStateDescriptor, (key, state) -> Assertions.assertThat((String)((String)((Iterable)state.get()).iterator().next())).isEqualTo("Hello" + key));
        }
        finally {
            IOUtils.closeQuietly(backend);
            backend.dispose();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @TestTemplate
    void testAsyncSnapshotCancellation() throws Exception {
        Assumptions.assumeThat((boolean)this.snapshotUsesStreamFactory()).isTrue();
        OneShotLatch blocker = new OneShotLatch();
        OneShotLatch waiter = new OneShotLatch();
        BlockerCheckpointStreamFactory streamFactory = new BlockerCheckpointStreamFactory(0x100000);
        streamFactory.setWaiterLatch(waiter);
        streamFactory.setBlockerLatch(blocker);
        streamFactory.setAfterNumberInvocations(10);
        CheckpointableKeyedStateBackend backend = this.createKeyedBackend((TypeSerializer)IntSerializer.INSTANCE);
        try {
            if (!this.supportsAsynchronousSnapshots()) {
                return;
            }
            InternalValueState valueState = (InternalValueState)backend.createOrUpdateInternalState((TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)new ValueStateDescriptor("test", (TypeSerializer)IntSerializer.INSTANCE));
            valueState.setCurrentNamespace((Object)VoidNamespace.INSTANCE);
            for (int i = 0; i < 10; ++i) {
                backend.setCurrentKey((Object)i);
                valueState.update((Object)i);
            }
            RunnableFuture snapshot = backend.snapshot(0L, 0L, (CheckpointStreamFactory)streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation());
            Thread runner = new Thread(snapshot);
            runner.start();
            waiter.await();
            IOUtils.closeQuietly(backend);
            blocker.trigger();
            runner.join();
            ((AbstractThrowableAssert)Assertions.assertThatThrownBy(snapshot::get).withFailMessage("Close was not propagated.", new Object[0])).isInstanceOf(CancellationException.class);
        }
        finally {
            IOUtils.closeQuietly(backend);
            backend.dispose();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @TestTemplate
    void testMapStateGetKeys() throws Exception {
        int namespace1ElementsNum = 1000;
        int namespace2ElementsNum = 1000;
        String fieldName = "get-keys-test";
        CheckpointableKeyedStateBackend backend = this.createKeyedBackend((TypeSerializer)IntSerializer.INSTANCE);
        try {
            int expectedKey;
            PrimitiveIterator.OfInt actualIterator;
            String ns1 = "ns1";
            MapState keyedState1 = (MapState)backend.getPartitionedState((Object)"ns1", (TypeSerializer)StringSerializer.INSTANCE, (StateDescriptor)new MapStateDescriptor(fieldName, (TypeSerializer)StringSerializer.INSTANCE, (TypeSerializer)IntSerializer.INSTANCE));
            for (int key = 0; key < 1000; ++key) {
                backend.setCurrentKey((Object)key);
                keyedState1.put((Object)"he", (Object)(key * 2));
                keyedState1.put((Object)"ho", (Object)(key * 2));
            }
            String ns2 = "ns2";
            MapState keyedState2 = (MapState)backend.getPartitionedState((Object)"ns2", (TypeSerializer)StringSerializer.INSTANCE, (StateDescriptor)new MapStateDescriptor(fieldName, (TypeSerializer)StringSerializer.INSTANCE, (TypeSerializer)IntSerializer.INSTANCE));
            for (int key = 1000; key < 2000; ++key) {
                backend.setCurrentKey((Object)key);
                keyedState2.put((Object)"he", (Object)(key * 2));
                keyedState2.put((Object)"ho", (Object)(key * 2));
            }
            try (Stream<Integer> keysStream = backend.getKeys(fieldName, (Object)"ns1").sorted();){
                actualIterator = keysStream.mapToInt(value -> value).iterator();
                for (expectedKey = 0; expectedKey < 1000; ++expectedKey) {
                    Assertions.assertThat((Iterator)actualIterator).hasNext();
                    Assertions.assertThat((int)actualIterator.nextInt()).isEqualTo(expectedKey);
                }
                Assertions.assertThat((Iterator)actualIterator).isExhausted();
            }
            keysStream = backend.getKeys(fieldName, (Object)"ns2").sorted();
            try {
                actualIterator = keysStream.mapToInt(value -> value).iterator();
                for (expectedKey = 1000; expectedKey < 2000; ++expectedKey) {
                    Assertions.assertThat((Iterator)actualIterator).hasNext();
                    Assertions.assertThat((int)actualIterator.nextInt()).isEqualTo(expectedKey);
                }
                Assertions.assertThat((Iterator)actualIterator).isExhausted();
            }
            finally {
                if (keysStream != null) {
                    keysStream.close();
                }
            }
        }
        finally {
            IOUtils.closeQuietly(backend);
            backend.dispose();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @TestTemplate
    void testMapStateGetKeysAndNamespaces() throws Exception {
        int elementsNum = 1000;
        String fieldName = "get-keys-test";
        CheckpointableKeyedStateBackend backend = this.createKeyedBackend((TypeSerializer)IntSerializer.INSTANCE);
        try {
            InternalMapState internalState = (InternalMapState)backend.createOrUpdateInternalState((TypeSerializer)StringSerializer.INSTANCE, (StateDescriptor)new MapStateDescriptor(fieldName, (TypeSerializer)StringSerializer.INSTANCE, (TypeSerializer)IntSerializer.INSTANCE));
            String[] namespaces = new String[]{"ns1", "ns2"};
            for (int key = 0; key < 1000; ++key) {
                backend.setCurrentKey((Object)key);
                for (String ns : namespaces) {
                    internalState.setCurrentNamespace((Object)ns);
                    internalState.put((Object)"hello", (Object)key);
                    internalState.put((Object)"world", (Object)key);
                }
            }
            try (Stream stream = backend.getKeysAndNamespaces(fieldName);){
                HashMap keysByNamespace = new HashMap();
                stream.forEach(entry -> {
                    ((AbstractStringAssert)Assertions.assertThat((String)((String)entry.f1)).withFailMessage("Unexpected namespace", new Object[0])).isIn((Object[])namespaces);
                    ((AbstractIntegerAssert)Assertions.assertThat((Integer)((Integer)entry.f0)).withFailMessage("Unexpected key", new Object[0])).isGreaterThanOrEqualTo(0).isLessThan(1000);
                    Set keys = keysByNamespace.computeIfAbsent((String)entry.f1, k -> new HashSet());
                    ((AbstractBooleanAssert)Assertions.assertThat((boolean)keys.add((Integer)entry.f0)).withFailMessage("Duplicate key for namespace", new Object[0])).isTrue();
                });
            }
        }
        finally {
            IOUtils.closeQuietly(backend);
            backend.dispose();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @TestTemplate
    void testCheckConcurrencyProblemWhenPerformingCheckpointAsync() throws Exception {
        CheckpointStreamFactory streamFactory = this.createStreamFactory();
        ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1);
        CheckpointableKeyedStateBackend backend = this.createKeyedBackend((TypeSerializer)IntSerializer.INSTANCE, this.env);
        try {
            long checkpointID = 0L;
            ArrayList<Future<SnapshotResult<KeyedStateHandle>>> futureList = new ArrayList<Future<SnapshotResult<KeyedStateHandle>>>();
            for (int i = 0; i < 10; ++i) {
                ValueStateDescriptor valueStateDescriptor = new ValueStateDescriptor("id" + i, (TypeSerializer)IntSerializer.INSTANCE);
                ValueState state = (ValueState)backend.getOrCreateKeyedState((TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)valueStateDescriptor);
                ((InternalValueState)state).setCurrentNamespace((Object)VoidNamespace.INSTANCE);
                backend.setCurrentKey((Object)i);
                state.update((Object)i);
                futureList.add(this.runSnapshotAsync(executorService, backend.snapshot(checkpointID++, System.currentTimeMillis(), streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation())));
            }
            for (Future future : futureList) {
                future.get(20L, TimeUnit.SECONDS);
            }
        }
        catch (Exception e) {
            Assertions.fail((String)e.getMessage());
        }
        finally {
            IOUtils.closeQuietly(backend);
            backend.dispose();
            executorService.shutdown();
        }
    }

    private void assertRestoreKeyedBackendFail(KeyedStateHandle keyedStateHandle, ValueStateDescriptor<TestPojo> kvId) {
        Assertions.assertThatThrownBy(() -> {
            CheckpointableKeyedStateBackend restoreBackend = null;
            try {
                restoreBackend = this.restoreKeyedBackend((TypeSerializer)IntSerializer.INSTANCE, keyedStateHandle, this.env);
                ValueState restoreState = (ValueState)restoreBackend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
                restoreBackend.setCurrentKey((Object)1);
                restoreState.value();
                restoreBackend.dispose();
            }
            finally {
                if (restoreBackend != null) {
                    IOUtils.closeQuietly(restoreBackend);
                    restoreBackend.dispose();
                }
            }
        }).satisfiesAnyOf(new ThrowingConsumer[]{e -> Assertions.assertThat((Throwable)e).isInstanceOf(ExpectedKryoTestException.class), e -> Assertions.assertThat((Throwable)e).hasCauseInstanceOf(ExpectedKryoTestException.class)});
    }

    protected Future<SnapshotResult<KeyedStateHandle>> runSnapshotAsync(ExecutorService executorService, RunnableFuture<SnapshotResult<KeyedStateHandle>> snapshotRunnableFuture) throws Exception {
        if (!snapshotRunnableFuture.isDone()) {
            CompletableFuture<SnapshotResult<KeyedStateHandle>> completableFuture = new CompletableFuture<SnapshotResult<KeyedStateHandle>>();
            executorService.submit(() -> {
                try {
                    snapshotRunnableFuture.run();
                    completableFuture.complete((SnapshotResult)snapshotRunnableFuture.get());
                }
                catch (Exception e) {
                    completableFuture.completeExceptionally(e);
                }
            });
            return completableFuture;
        }
        return CompletableFuture.completedFuture((SnapshotResult)snapshotRunnableFuture.get());
    }

    private int getKeyInKeyGroup(Random random, int maxParallelism, KeyGroupRange keyGroupRange) {
        int keyInKG = random.nextInt();
        int kg = KeyGroupRangeAssignment.assignToKeyGroup((Object)keyInKG, (int)maxParallelism);
        while (!keyGroupRange.contains(kg)) {
            keyInKG = random.nextInt();
            kg = KeyGroupRangeAssignment.assignToKeyGroup((Object)keyInKG, (int)maxParallelism);
        }
        return keyInKG;
    }

    protected static <V, K, N> V getSerializedValue(InternalKvState<K, N, V> kvState, K key, TypeSerializer<K> keySerializer, N namespace, TypeSerializer<N> namespaceSerializer, TypeSerializer<V> valueSerializer) throws Exception {
        byte[] serializedKeyAndNamespace = KvStateSerializer.serializeKeyAndNamespace(key, keySerializer, namespace, namespaceSerializer);
        byte[] serializedValue = kvState.getSerializedValue(serializedKeyAndNamespace, kvState.getKeySerializer(), kvState.getNamespaceSerializer(), kvState.getValueSerializer());
        if (serializedValue == null) {
            return null;
        }
        return (V)KvStateSerializer.deserializeValue((byte[])serializedValue, valueSerializer);
    }

    private static <V, K, N> List<V> getSerializedList(InternalKvState<K, N, V> kvState, K key, TypeSerializer<K> keySerializer, N namespace, TypeSerializer<N> namespaceSerializer, TypeSerializer<V> valueSerializer) throws Exception {
        byte[] serializedKeyAndNamespace = KvStateSerializer.serializeKeyAndNamespace(key, keySerializer, namespace, namespaceSerializer);
        byte[] serializedValue = kvState.getSerializedValue(serializedKeyAndNamespace, kvState.getKeySerializer(), kvState.getNamespaceSerializer(), kvState.getValueSerializer());
        if (serializedValue == null) {
            return null;
        }
        return KvStateSerializer.deserializeList((byte[])serializedValue, valueSerializer);
    }

    private static <UK, UV, K, N> Map<UK, UV> getSerializedMap(InternalKvState<K, N, Map<UK, UV>> kvState, K key, TypeSerializer<K> keySerializer, N namespace, TypeSerializer<N> namespaceSerializer, TypeSerializer<UK> userKeySerializer, TypeSerializer<UV> userValueSerializer) throws Exception {
        byte[] serializedKeyAndNamespace = KvStateSerializer.serializeKeyAndNamespace(key, keySerializer, namespace, namespaceSerializer);
        byte[] serializedValue = kvState.getSerializedValue(serializedKeyAndNamespace, kvState.getKeySerializer(), kvState.getNamespaceSerializer(), kvState.getValueSerializer());
        if (serializedValue == null) {
            return null;
        }
        return KvStateSerializer.deserializeMap((byte[])serializedValue, userKeySerializer, userValueSerializer);
    }

    public static KeyedStateHandle runSnapshot(RunnableFuture<SnapshotResult<KeyedStateHandle>> snapshotRunnableFuture, SharedStateRegistry sharedStateRegistry) throws Exception {
        SnapshotResult snapshotResult;
        KeyedStateHandle jobManagerOwnedSnapshot;
        if (!snapshotRunnableFuture.isDone()) {
            snapshotRunnableFuture.run();
        }
        if ((jobManagerOwnedSnapshot = (KeyedStateHandle)(snapshotResult = (SnapshotResult)snapshotRunnableFuture.get()).getJobManagerOwnedSnapshot()) != null) {
            jobManagerOwnedSnapshot.registerSharedStates(sharedStateRegistry, 0L);
        }
        return jobManagerOwnedSnapshot;
    }

    private MockEnvironment buildMockEnv() throws Exception {
        MockEnvironment mockEnvironment = MockEnvironment.builder().setTaskStateManager(this.getTestTaskStateManager()).build();
        mockEnvironment.setCheckpointStorageAccess(this.getCheckpointStorageAccess());
        return mockEnvironment;
    }

    protected TestTaskStateManager getTestTaskStateManager() throws IOException {
        return TestTaskStateManager.builder().build();
    }

    protected boolean snapshotUsesStreamFactory() {
        return true;
    }

    protected boolean supportsMetaInfoVerification() {
        return true;
    }

    protected boolean isSafeToReuseKVState() {
        return false;
    }

    private static /* synthetic */ void lambda$testMapStateRestoreWithWrongSerializers$19(CheckpointableKeyedStateBackend restoredBackend, TypeSerializer fakeStringSerializer) throws Throwable {
        ((MapState)restoredBackend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)new MapStateDescriptor("id", fakeStringSerializer, (TypeSerializer)StringSerializer.INSTANCE))).entries();
    }

    private static /* synthetic */ void lambda$testReducingStateRestoreWithWrongSerializers$18(CheckpointableKeyedStateBackend restoredBackend, TypeSerializer fakeStringSerializer) throws Throwable {
        ((ReducingState)restoredBackend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)new ReducingStateDescriptor("id", (ReduceFunction)new AppendingReduce(), fakeStringSerializer))).get();
    }

    private static /* synthetic */ void lambda$testListStateRestoreWithWrongSerializers$17(CheckpointableKeyedStateBackend restoredBackend, TypeSerializer fakeStringSerializer) throws Throwable {
        ((ListState)restoredBackend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)new ListStateDescriptor("id", fakeStringSerializer))).get();
    }

    private static /* synthetic */ void lambda$testValueStateRestoreWithWrongSerializers$16(CheckpointableKeyedStateBackend restoredBackend, TypeSerializer fakeStringSerializer) throws Throwable {
        ((ValueState)restoredBackend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)new ValueStateDescriptor("id", fakeStringSerializer))).value();
    }

    private static class TestingKvStateRegistryListener
    implements KvStateRegistryListener {
        private final Map<String, KvStateID> registeredStates = new HashMap<String, KvStateID>();

        private TestingKvStateRegistryListener() {
        }

        private String createKey(JobID jobId, JobVertexID jobVertexId, KeyGroupRange keyGroupRange, String registrationName) {
            return String.format("%s-%s-%s-%s", jobId, jobVertexId, keyGroupRange.prettyPrintInterval(), registrationName);
        }

        private boolean isRegistered(JobID jobId, JobVertexID jobVertexId, KeyGroupRange keyGroupRange, String registrationName) {
            return this.registeredStates.containsKey(this.createKey(jobId, jobVertexId, keyGroupRange, registrationName));
        }

        public void notifyKvStateRegistered(JobID jobId, JobVertexID jobVertexId, KeyGroupRange keyGroupRange, String registrationName, KvStateID kvStateId) {
            this.registeredStates.put(this.createKey(jobId, jobVertexId, keyGroupRange, registrationName), kvStateId);
        }

        public void notifyKvStateUnregistered(JobID jobId, JobVertexID jobVertexId, KeyGroupRange keyGroupRange, String registrationName) {
            this.registeredStates.remove(this.createKey(jobId, jobVertexId, keyGroupRange, registrationName));
        }
    }

    public static final class MutableLong {
        long value;
    }

    private static class ImmutableAggregatingAddingFunction
    implements AggregateFunction<Long, Long, Long> {
        private ImmutableAggregatingAddingFunction() {
        }

        public Long createAccumulator() {
            return 0L;
        }

        public Long add(Long value, Long accumulator) {
            accumulator = accumulator + value;
            return accumulator;
        }

        public Long getResult(Long accumulator) {
            return accumulator;
        }

        public Long merge(Long a, Long b) {
            return a + b;
        }
    }

    public static class MutableAggregatingAddingFunction
    implements AggregateFunction<Long, MutableLong, Long> {
        public MutableLong createAccumulator() {
            return new MutableLong();
        }

        public MutableLong add(Long value, MutableLong accumulator) {
            accumulator.value += value.longValue();
            return accumulator;
        }

        public Long getResult(MutableLong accumulator) {
            return accumulator.value;
        }

        public MutableLong merge(MutableLong a, MutableLong b) {
            a.value += b.value;
            return a;
        }
    }

    public static class CustomKryoTestSerializer
    extends JavaSerializer {
        public void write(Kryo kryo, Output output, Object object) {
            super.write(kryo, output, object);
        }

        public Object read(Kryo kryo, Input input, Class type) {
            throw new ExpectedKryoTestException();
        }
    }

    public static class ExceptionThrowingTestSerializer
    extends JavaSerializer {
        public void write(Kryo kryo, Output output, Object object) {
            throw new ExpectedKryoTestException();
        }

        public Object read(Kryo kryo, Input input, Class type) {
            throw new ExpectedKryoTestException();
        }
    }

    private static class ExpectedKryoTestException
    extends RuntimeException {
        private ExpectedKryoTestException() {
        }
    }

    public static class TestNestedPojoClassB
    implements Serializable {
        private Double doubleField;
        private String strField;

        public TestNestedPojoClassB() {
        }

        public TestNestedPojoClassB(Double doubleField, String strField) {
            this.doubleField = doubleField;
            this.strField = strField;
        }

        public Double getDoubleField() {
            return this.doubleField;
        }

        public void setDoubleField(Double doubleField) {
            this.doubleField = doubleField;
        }

        public String getStrField() {
            return this.strField;
        }

        public void setStrField(String strField) {
            this.strField = strField;
        }

        public String toString() {
            return "TestNestedPojoClassB{doubleField='" + this.doubleField + "', strField=" + this.strField + "}";
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || this.getClass() != o.getClass()) {
                return false;
            }
            TestNestedPojoClassB testNestedPojoClassB = (TestNestedPojoClassB)o;
            if (!this.doubleField.equals(testNestedPojoClassB.doubleField)) {
                return false;
            }
            return this.strField.equals(testNestedPojoClassB.strField);
        }

        public int hashCode() {
            int result = this.doubleField.hashCode();
            result = 31 * result + this.strField.hashCode();
            return result;
        }
    }

    public static class TestNestedPojoClassA
    implements Serializable {
        private Double doubleField;
        private Integer intField;

        public TestNestedPojoClassA() {
        }

        public TestNestedPojoClassA(Double doubleField, Integer intField) {
            this.doubleField = doubleField;
            this.intField = intField;
        }

        public Double getDoubleField() {
            return this.doubleField;
        }

        public void setDoubleField(Double doubleField) {
            this.doubleField = doubleField;
        }

        public Integer getIntField() {
            return this.intField;
        }

        public void setIntField(Integer intField) {
            this.intField = intField;
        }

        public String toString() {
            return "TestNestedPojoClassA{doubleField='" + this.doubleField + "', intField=" + this.intField + "}";
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || this.getClass() != o.getClass()) {
                return false;
            }
            TestNestedPojoClassA testNestedPojoClassA = (TestNestedPojoClassA)o;
            if (!this.doubleField.equals(testNestedPojoClassA.doubleField)) {
                return false;
            }
            return this.intField.equals(testNestedPojoClassA.intField);
        }

        public int hashCode() {
            int result = this.doubleField.hashCode();
            result = 31 * result + this.intField.hashCode();
            return result;
        }
    }

    public static class TestPojo
    implements Serializable {
        private String strField;
        private Integer intField;
        private TestNestedPojoClassA kryoClassAField;
        private TestNestedPojoClassB kryoClassBField;

        public TestPojo() {
        }

        public TestPojo(String strField, Integer intField) {
            this.strField = strField;
            this.intField = intField;
            this.kryoClassAField = null;
            this.kryoClassBField = null;
        }

        public TestPojo(String strField, Integer intField, TestNestedPojoClassA classAField, TestNestedPojoClassB classBfield) {
            this.strField = strField;
            this.intField = intField;
            this.kryoClassAField = classAField;
            this.kryoClassBField = classBfield;
        }

        public String getStrField() {
            return this.strField;
        }

        public void setStrField(String strField) {
            this.strField = strField;
        }

        public Integer getIntField() {
            return this.intField;
        }

        public void setIntField(Integer intField) {
            this.intField = intField;
        }

        public TestNestedPojoClassA getKryoClassAField() {
            return this.kryoClassAField;
        }

        public void setKryoClassAField(TestNestedPojoClassA kryoClassAField) {
            this.kryoClassAField = kryoClassAField;
        }

        public TestNestedPojoClassB getKryoClassBField() {
            return this.kryoClassBField;
        }

        public void setKryoClassBField(TestNestedPojoClassB kryoClassBField) {
            this.kryoClassBField = kryoClassBField;
        }

        public String toString() {
            return "TestPojo{strField='" + this.strField + "', intField=" + this.intField + "}";
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || this.getClass() != o.getClass()) {
                return false;
            }
            TestPojo testPojo = (TestPojo)o;
            return this.strField.equals(testPojo.strField) && this.intField.equals(testPojo.intField) && (this.kryoClassAField == null && testPojo.kryoClassAField == null || this.kryoClassAField.equals(testPojo.kryoClassAField)) && (this.kryoClassBField == null && testPojo.kryoClassBField == null || this.kryoClassBField.equals(testPojo.kryoClassBField));
        }

        public int hashCode() {
            int result = this.strField.hashCode();
            result = 31 * result + this.intField.hashCode();
            if (this.kryoClassAField != null) {
                result = 31 * result + this.kryoClassAField.hashCode();
            }
            if (this.kryoClassBField != null) {
                result = 31 * result + this.kryoClassBField.hashCode();
            }
            return result;
        }
    }

    private static class AppendingReduce
    implements ReduceFunction<String> {
        private AppendingReduce() {
        }

        public String reduce(String value1, String value2) throws Exception {
            return value1 + "," + value2;
        }
    }
}

