/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.api.operators.sorted.state;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.state.AggregatingState;
import org.apache.flink.api.common.state.AggregatingStateDescriptor;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ReducingState;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.runtime.state.CheckpointableKeyedStateBackend;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.runtime.state.VoidNamespaceSerializer;
import org.apache.flink.runtime.state.internal.InternalAggregatingState;
import org.apache.flink.runtime.state.internal.InternalListState;
import org.apache.flink.runtime.state.internal.InternalReducingState;
import org.apache.flink.streaming.api.operators.sorted.state.BatchExecutionKeyedStateBackend;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

class BatchExecutionStateBackendTest {
    BatchExecutionStateBackendTest() {
    }

    private <K> CheckpointableKeyedStateBackend<K> createKeyedBackend(TypeSerializer<K> keySerializer) {
        return new BatchExecutionKeyedStateBackend(keySerializer, new KeyGroupRange(0, 9), new ExecutionConfig());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    void testListStateAddNull() throws Exception {
        CheckpointableKeyedStateBackend keyedBackend = this.createKeyedBackend((TypeSerializer)StringSerializer.INSTANCE);
        ListStateDescriptor stateDescr = new ListStateDescriptor("my-state", Long.class);
        try {
            ListState state = (ListState)keyedBackend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)stateDescr);
            keyedBackend.setCurrentKey((Object)"abc");
            Assertions.assertThat((Iterable)((Iterable)state.get())).isNull();
            Assertions.assertThatThrownBy(() -> state.add(null)).isInstanceOf(NullPointerException.class);
        }
        finally {
            keyedBackend.close();
            keyedBackend.dispose();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    void testListStateAddAllNullEntries() throws Exception {
        CheckpointableKeyedStateBackend keyedBackend = this.createKeyedBackend((TypeSerializer)StringSerializer.INSTANCE);
        ListStateDescriptor stateDescr = new ListStateDescriptor("my-state", Long.class);
        try {
            ListState state = (ListState)keyedBackend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)stateDescr);
            keyedBackend.setCurrentKey((Object)"abc");
            Assertions.assertThat((Iterable)((Iterable)state.get())).isNull();
            ArrayList<Long> adding = new ArrayList<Long>();
            adding.add(3L);
            adding.add(null);
            adding.add(5L);
            Assertions.assertThatThrownBy(() -> state.addAll(adding)).isInstanceOf(NullPointerException.class);
        }
        finally {
            keyedBackend.close();
            keyedBackend.dispose();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    void testListStateAddAllNull() throws Exception {
        CheckpointableKeyedStateBackend keyedBackend = this.createKeyedBackend((TypeSerializer)StringSerializer.INSTANCE);
        ListStateDescriptor stateDescr = new ListStateDescriptor("my-state", Long.class);
        try {
            ListState state = (ListState)keyedBackend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)stateDescr);
            keyedBackend.setCurrentKey((Object)"abc");
            Assertions.assertThat((Iterable)((Iterable)state.get())).isNull();
            Assertions.assertThatThrownBy(() -> state.addAll(null)).isInstanceOf(NullPointerException.class);
        }
        finally {
            keyedBackend.close();
            keyedBackend.dispose();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    void testListStateUpdateNullEntries() throws Exception {
        CheckpointableKeyedStateBackend keyedBackend = this.createKeyedBackend((TypeSerializer)StringSerializer.INSTANCE);
        ListStateDescriptor stateDescr = new ListStateDescriptor("my-state", Long.class);
        try {
            ListState state = (ListState)keyedBackend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)stateDescr);
            keyedBackend.setCurrentKey((Object)"abc");
            Assertions.assertThat((Iterable)((Iterable)state.get())).isNull();
            ArrayList<Long> adding = new ArrayList<Long>();
            adding.add(3L);
            adding.add(null);
            adding.add(5L);
            Assertions.assertThatThrownBy(() -> state.update(adding)).isInstanceOf(NullPointerException.class);
        }
        finally {
            keyedBackend.close();
            keyedBackend.dispose();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    void testListStateUpdateNull() throws Exception {
        CheckpointableKeyedStateBackend keyedBackend = this.createKeyedBackend((TypeSerializer)StringSerializer.INSTANCE);
        ListStateDescriptor stateDescr = new ListStateDescriptor("my-state", Long.class);
        try {
            ListState state = (ListState)keyedBackend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)stateDescr);
            keyedBackend.setCurrentKey((Object)"abc");
            Assertions.assertThat((Iterable)((Iterable)state.get())).isNull();
            Assertions.assertThatThrownBy(() -> state.update(null)).isInstanceOf(NullPointerException.class);
        }
        finally {
            keyedBackend.close();
            keyedBackend.dispose();
        }
    }

    @Test
    void testListStateAPIs() throws Exception {
        ListStateDescriptor stateDescr = new ListStateDescriptor("my-state", Long.class);
        try (CheckpointableKeyedStateBackend keyedBackend = this.createKeyedBackend((TypeSerializer)StringSerializer.INSTANCE);){
            ListState state = (ListState)keyedBackend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)stateDescr);
            keyedBackend.setCurrentKey((Object)"g");
            Assertions.assertThat((Iterable)((Iterable)state.get())).isNull();
            Assertions.assertThat((Iterable)((Iterable)state.get())).isNull();
            state.addAll(Collections.emptyList());
            Assertions.assertThat((Iterable)((Iterable)state.get())).isNull();
            state.addAll(Arrays.asList(3L, 4L));
            Assertions.assertThat((Iterable)((Iterable)state.get())).containsExactlyInAnyOrder((Object[])new Long[]{3L, 4L});
            Assertions.assertThat((Iterable)((Iterable)state.get())).containsExactlyInAnyOrder((Object[])new Long[]{3L, 4L});
            state.addAll(new ArrayList());
            Assertions.assertThat((Iterable)((Iterable)state.get())).containsExactlyInAnyOrder((Object[])new Long[]{3L, 4L});
            state.addAll(Arrays.asList(5L, 6L));
            Assertions.assertThat((Iterable)((Iterable)state.get())).containsExactlyInAnyOrder((Object[])new Long[]{3L, 4L, 5L, 6L});
            state.addAll(new ArrayList());
            Assertions.assertThat((Iterable)((Iterable)state.get())).containsExactlyInAnyOrder((Object[])new Long[]{3L, 4L, 5L, 6L});
            Assertions.assertThat((Iterable)((Iterable)state.get())).containsExactlyInAnyOrder((Object[])new Long[]{3L, 4L, 5L, 6L});
            state.update(Arrays.asList(1L, 2L));
            Assertions.assertThat((Iterable)((Iterable)state.get())).containsExactlyInAnyOrder((Object[])new Long[]{1L, 2L});
        }
    }

    @Test
    void testListStateMergingOverThreeNamespaces() throws Exception {
        ListStateDescriptor stateDescr = new ListStateDescriptor("my-state", Long.class);
        Integer namespace1 = 1;
        Integer namespace2 = 2;
        Integer namespace3 = 3;
        try (CheckpointableKeyedStateBackend keyedBackend = this.createKeyedBackend((TypeSerializer)StringSerializer.INSTANCE);){
            InternalListState state = (InternalListState)keyedBackend.getPartitionedState((Object)0, (TypeSerializer)IntSerializer.INSTANCE, (StateDescriptor)stateDescr);
            keyedBackend.setCurrentKey((Object)"abc");
            state.setCurrentNamespace((Object)namespace1);
            state.add((Object)33L);
            state.add((Object)55L);
            state.setCurrentNamespace((Object)namespace2);
            state.add((Object)22L);
            state.add((Object)11L);
            state.setCurrentNamespace((Object)namespace3);
            state.add((Object)44L);
            state.mergeNamespaces((Object)namespace1, Arrays.asList(namespace2, namespace3));
            state.setCurrentNamespace((Object)namespace1);
            Assertions.assertThat((Iterable)((Iterable)state.get())).containsExactlyInAnyOrder((Object[])new Long[]{11L, 22L, 33L, 44L, 55L});
            keyedBackend.setCurrentKey((Object)"abc");
            state.setCurrentNamespace((Object)namespace1);
            state.clear();
            Assertions.assertThat((Iterable)((Iterable)state.get())).isNull();
        }
    }

    @Test
    void testListStateMergingWithEmptyNamespace() throws Exception {
        ListStateDescriptor stateDescr = new ListStateDescriptor("my-state", Long.class);
        Integer namespace1 = 1;
        Integer namespace2 = 2;
        Integer namespace3 = 3;
        try (CheckpointableKeyedStateBackend keyedBackend = this.createKeyedBackend((TypeSerializer)StringSerializer.INSTANCE);){
            InternalListState state = (InternalListState)keyedBackend.getPartitionedState((Object)0, (TypeSerializer)IntSerializer.INSTANCE, (StateDescriptor)stateDescr);
            keyedBackend.setCurrentKey((Object)"def");
            state.setCurrentNamespace((Object)namespace1);
            state.add((Object)11L);
            state.add((Object)44L);
            state.setCurrentNamespace((Object)namespace3);
            state.add((Object)22L);
            state.add((Object)55L);
            state.add((Object)33L);
            keyedBackend.setCurrentKey((Object)"def");
            state.mergeNamespaces((Object)namespace1, Arrays.asList(namespace2, namespace3));
            state.setCurrentNamespace((Object)namespace1);
            Assertions.assertThat((Iterable)((Iterable)state.get())).containsExactlyInAnyOrder((Object[])new Long[]{11L, 22L, 33L, 44L, 55L});
            keyedBackend.setCurrentKey((Object)"def");
            state.setCurrentNamespace((Object)namespace1);
            state.clear();
            Assertions.assertThat((Iterable)((Iterable)state.get())).isNull();
        }
    }

    @Test
    void testListStateMergingEmpty() throws Exception {
        ListStateDescriptor stateDescr = new ListStateDescriptor("my-state", Long.class);
        Integer namespace1 = 1;
        Integer namespace2 = 2;
        Integer namespace3 = 3;
        try (CheckpointableKeyedStateBackend keyedBackend = this.createKeyedBackend((TypeSerializer)StringSerializer.INSTANCE);){
            InternalListState state = (InternalListState)keyedBackend.getPartitionedState((Object)0, (TypeSerializer)IntSerializer.INSTANCE, (StateDescriptor)stateDescr);
            keyedBackend.setCurrentKey((Object)"ghi");
            state.mergeNamespaces((Object)namespace1, Arrays.asList(namespace2, namespace3));
            state.setCurrentNamespace((Object)namespace1);
            Assertions.assertThat((Iterable)((Iterable)state.get())).isNull();
            keyedBackend.setCurrentKey((Object)"ghi");
            state.setCurrentNamespace((Object)namespace1);
            state.clear();
            Assertions.assertThat((Iterable)((Iterable)state.get())).isNull();
        }
    }

    @Test
    void testListStateMergingAllInTargetNamespace() throws Exception {
        ListStateDescriptor stateDescr = new ListStateDescriptor("my-state", Long.class);
        Integer namespace1 = 1;
        Integer namespace2 = 2;
        Integer namespace3 = 3;
        try (CheckpointableKeyedStateBackend keyedBackend = this.createKeyedBackend((TypeSerializer)StringSerializer.INSTANCE);){
            InternalListState state = (InternalListState)keyedBackend.getPartitionedState((Object)0, (TypeSerializer)IntSerializer.INSTANCE, (StateDescriptor)stateDescr);
            keyedBackend.setCurrentKey((Object)"jkl");
            state.setCurrentNamespace((Object)namespace1);
            state.add((Object)11L);
            state.add((Object)22L);
            state.add((Object)33L);
            state.add((Object)44L);
            state.add((Object)55L);
            keyedBackend.setCurrentKey((Object)"jkl");
            state.mergeNamespaces((Object)namespace1, Arrays.asList(namespace2, namespace3));
            state.setCurrentNamespace((Object)namespace1);
            Assertions.assertThat((Iterable)((Iterable)state.get())).containsExactlyInAnyOrder((Object[])new Long[]{11L, 22L, 33L, 44L, 55L});
            keyedBackend.setCurrentKey((Object)"jkl");
            state.setCurrentNamespace((Object)namespace1);
            state.clear();
            Assertions.assertThat((Iterable)((Iterable)state.get())).isNull();
        }
    }

    @Test
    void testListStateMergingInASingleNamespace() throws Exception {
        ListStateDescriptor stateDescr = new ListStateDescriptor("my-state", Long.class);
        Integer namespace1 = 1;
        Integer namespace2 = 2;
        Integer namespace3 = 3;
        try (CheckpointableKeyedStateBackend keyedBackend = this.createKeyedBackend((TypeSerializer)StringSerializer.INSTANCE);){
            InternalListState state = (InternalListState)keyedBackend.getPartitionedState((Object)0, (TypeSerializer)IntSerializer.INSTANCE, (StateDescriptor)stateDescr);
            keyedBackend.setCurrentKey((Object)"mno");
            state.setCurrentNamespace((Object)namespace3);
            state.add((Object)11L);
            state.add((Object)22L);
            state.add((Object)33L);
            state.add((Object)44L);
            state.add((Object)55L);
            keyedBackend.setCurrentKey((Object)"mno");
            state.mergeNamespaces((Object)namespace1, Arrays.asList(namespace2, namespace3));
            state.setCurrentNamespace((Object)namespace1);
            Assertions.assertThat((Iterable)((Iterable)state.get())).containsExactlyInAnyOrder((Object[])new Long[]{11L, 22L, 33L, 44L, 55L});
            keyedBackend.setCurrentKey((Object)"mno");
            state.setCurrentNamespace((Object)namespace1);
            state.clear();
            Assertions.assertThat((Iterable)((Iterable)state.get())).isNull();
        }
    }

    @Test
    void testReducingStateAddAndGet() throws Exception {
        ReducingStateDescriptor stateDescr = new ReducingStateDescriptor("my-state", Long::sum, Long.class);
        try (CheckpointableKeyedStateBackend keyedBackend = this.createKeyedBackend((TypeSerializer)StringSerializer.INSTANCE);){
            ReducingState state = (ReducingState)keyedBackend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)stateDescr);
            keyedBackend.setCurrentKey((Object)"def");
            Assertions.assertThat((Long)((Long)state.get())).isNull();
            state.add((Object)17L);
            state.add((Object)11L);
            Assertions.assertThat((Long)((Long)state.get())).isEqualTo(28L);
            keyedBackend.setCurrentKey((Object)"def");
            Assertions.assertThat((Long)((Long)state.get())).isEqualTo(28L);
            state.clear();
            Assertions.assertThat((Long)((Long)state.get())).isNull();
            keyedBackend.setCurrentKey((Object)"g");
            Assertions.assertThat((Long)((Long)state.get())).isNull();
            state.add((Object)1L);
            state.add((Object)2L);
            keyedBackend.setCurrentKey((Object)"g");
            state.add((Object)3L);
            state.add((Object)2L);
            state.add((Object)1L);
            keyedBackend.setCurrentKey((Object)"g");
            Assertions.assertThat((Long)((Long)state.get())).isEqualTo(9L);
        }
    }

    @Test
    void testReducingStateMergingOverThreeNamespaces() throws Exception {
        ReducingStateDescriptor stateDescr = new ReducingStateDescriptor("my-state", Long::sum, Long.class);
        Integer namespace1 = 1;
        Integer namespace2 = 2;
        Integer namespace3 = 3;
        Long expectedResult = 165L;
        try (CheckpointableKeyedStateBackend keyedBackend = this.createKeyedBackend((TypeSerializer)StringSerializer.INSTANCE);){
            InternalReducingState state = (InternalReducingState)keyedBackend.getPartitionedState((Object)0, (TypeSerializer)IntSerializer.INSTANCE, (StateDescriptor)stateDescr);
            keyedBackend.setCurrentKey((Object)"abc");
            state.setCurrentNamespace((Object)namespace1);
            state.add((Object)33L);
            state.add((Object)55L);
            state.setCurrentNamespace((Object)namespace2);
            state.add((Object)22L);
            state.add((Object)11L);
            state.setCurrentNamespace((Object)namespace3);
            state.add((Object)44L);
            keyedBackend.setCurrentKey((Object)"abc");
            state.mergeNamespaces((Object)namespace1, Arrays.asList(namespace2, namespace3));
            state.setCurrentNamespace((Object)namespace1);
            Assertions.assertThat((Long)((Long)state.get())).isEqualTo((Object)expectedResult);
            keyedBackend.setCurrentKey((Object)"abc");
            state.setCurrentNamespace((Object)namespace1);
            state.clear();
            Assertions.assertThat((Long)((Long)state.get())).isNull();
        }
    }

    @Test
    void testReducingStateMergingWithEmpty() throws Exception {
        ReducingStateDescriptor stateDescr = new ReducingStateDescriptor("my-state", Long::sum, Long.class);
        Integer namespace1 = 1;
        Integer namespace2 = 2;
        Integer namespace3 = 3;
        Long expectedResult = 165L;
        try (CheckpointableKeyedStateBackend keyedBackend = this.createKeyedBackend((TypeSerializer)StringSerializer.INSTANCE);){
            InternalReducingState state = (InternalReducingState)keyedBackend.getPartitionedState((Object)0, (TypeSerializer)IntSerializer.INSTANCE, (StateDescriptor)stateDescr);
            keyedBackend.setCurrentKey((Object)"def");
            state.setCurrentNamespace((Object)namespace1);
            state.add((Object)11L);
            state.add((Object)44L);
            state.setCurrentNamespace((Object)namespace3);
            state.add((Object)22L);
            state.add((Object)55L);
            state.add((Object)33L);
            keyedBackend.setCurrentKey((Object)"def");
            state.mergeNamespaces((Object)namespace1, Arrays.asList(namespace2, namespace3));
            state.setCurrentNamespace((Object)namespace1);
            Assertions.assertThat((Long)((Long)state.get())).isEqualTo((Object)expectedResult);
            keyedBackend.setCurrentKey((Object)"def");
            state.setCurrentNamespace((Object)namespace1);
            state.clear();
            Assertions.assertThat((Long)((Long)state.get())).isNull();
        }
    }

    @Test
    void testReducingStateMergingEmpty() throws Exception {
        ReducingStateDescriptor stateDescr = new ReducingStateDescriptor("my-state", Long::sum, Long.class);
        Integer namespace1 = 1;
        Integer namespace2 = 2;
        Integer namespace3 = 3;
        try (CheckpointableKeyedStateBackend keyedBackend = this.createKeyedBackend((TypeSerializer)StringSerializer.INSTANCE);){
            InternalReducingState state = (InternalReducingState)keyedBackend.getPartitionedState((Object)0, (TypeSerializer)IntSerializer.INSTANCE, (StateDescriptor)stateDescr);
            keyedBackend.setCurrentKey((Object)"ghi");
            state.mergeNamespaces((Object)namespace1, Arrays.asList(namespace2, namespace3));
            state.setCurrentNamespace((Object)namespace1);
            Assertions.assertThat((Long)((Long)state.get())).isNull();
        }
    }

    @Test
    void testReducingStateMergingInTargetNamespace() throws Exception {
        ReducingStateDescriptor stateDescr = new ReducingStateDescriptor("my-state", Long::sum, Long.class);
        Integer namespace1 = 1;
        Integer namespace2 = 2;
        Integer namespace3 = 3;
        Long expectedResult = 165L;
        try (CheckpointableKeyedStateBackend keyedBackend = this.createKeyedBackend((TypeSerializer)StringSerializer.INSTANCE);){
            InternalReducingState state = (InternalReducingState)keyedBackend.getPartitionedState((Object)0, (TypeSerializer)IntSerializer.INSTANCE, (StateDescriptor)stateDescr);
            keyedBackend.setCurrentKey((Object)"jkl");
            state.setCurrentNamespace((Object)namespace1);
            state.add((Object)11L);
            state.add((Object)22L);
            state.add((Object)33L);
            state.add((Object)44L);
            state.add((Object)55L);
            keyedBackend.setCurrentKey((Object)"jkl");
            state.mergeNamespaces((Object)namespace1, Arrays.asList(namespace2, namespace3));
            state.setCurrentNamespace((Object)namespace1);
            Assertions.assertThat((Long)((Long)state.get())).isEqualTo((Object)expectedResult);
            keyedBackend.setCurrentKey((Object)"jkl");
            state.setCurrentNamespace((Object)namespace1);
            state.clear();
            Assertions.assertThat((Long)((Long)state.get())).isNull();
        }
    }

    @Test
    void testReducingStateMergingInASingleNamespace() throws Exception {
        ReducingStateDescriptor stateDescr = new ReducingStateDescriptor("my-state", Long::sum, Long.class);
        Integer namespace1 = 1;
        Integer namespace2 = 2;
        Integer namespace3 = 3;
        Long expectedResult = 165L;
        try (CheckpointableKeyedStateBackend keyedBackend = this.createKeyedBackend((TypeSerializer)StringSerializer.INSTANCE);){
            InternalReducingState state = (InternalReducingState)keyedBackend.getPartitionedState((Object)0, (TypeSerializer)IntSerializer.INSTANCE, (StateDescriptor)stateDescr);
            keyedBackend.setCurrentKey((Object)"mno");
            state.setCurrentNamespace((Object)namespace3);
            state.add((Object)11L);
            state.add((Object)22L);
            state.add((Object)33L);
            state.add((Object)44L);
            state.add((Object)55L);
            keyedBackend.setCurrentKey((Object)"mno");
            state.mergeNamespaces((Object)namespace1, Arrays.asList(namespace2, namespace3));
            state.setCurrentNamespace((Object)namespace1);
            Assertions.assertThat((Long)((Long)state.get())).isEqualTo((Object)expectedResult);
            keyedBackend.setCurrentKey((Object)"mno");
            state.setCurrentNamespace((Object)namespace1);
            state.clear();
            Assertions.assertThat((Long)((Long)state.get())).isNull();
        }
    }

    @Test
    void testAggregatingStateAddAndGetWithMutableAccumulator() throws Exception {
        AggregatingStateDescriptor stateDescr = new AggregatingStateDescriptor("my-state", (AggregateFunction)new MutableAggregatingAddingFunction(), MutableLong.class);
        try (CheckpointableKeyedStateBackend keyedBackend = this.createKeyedBackend((TypeSerializer)StringSerializer.INSTANCE);){
            AggregatingState state = (AggregatingState)keyedBackend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)stateDescr);
            keyedBackend.setCurrentKey((Object)"def");
            Assertions.assertThat((Long)((Long)state.get())).isNull();
            state.add((Object)17L);
            state.add((Object)11L);
            Assertions.assertThat((Long)((Long)state.get())).isEqualTo(28L);
            keyedBackend.setCurrentKey((Object)"def");
            Assertions.assertThat((Long)((Long)state.get())).isEqualTo(28L);
            state.clear();
            Assertions.assertThat((Long)((Long)state.get())).isNull();
            keyedBackend.setCurrentKey((Object)"def");
            Assertions.assertThat((Long)((Long)state.get())).isNull();
            keyedBackend.setCurrentKey((Object)"g");
            Assertions.assertThat((Long)((Long)state.get())).isNull();
            state.add((Object)1L);
            state.add((Object)2L);
            keyedBackend.setCurrentKey((Object)"g");
            state.add((Object)3L);
            state.add((Object)2L);
            state.add((Object)1L);
            keyedBackend.setCurrentKey((Object)"g");
            Assertions.assertThat((Long)((Long)state.get())).isEqualTo(9L);
            state.clear();
            Assertions.assertThat((Long)((Long)state.get())).isNull();
        }
    }

    @Test
    void testAggregatingStateMergingWithMutableAccumulatorOverThreeNamespaces() throws Exception {
        AggregatingStateDescriptor stateDescr = new AggregatingStateDescriptor("my-state", (AggregateFunction)new MutableAggregatingAddingFunction(), MutableLong.class);
        Integer namespace1 = 1;
        Integer namespace2 = 2;
        Integer namespace3 = 3;
        Long expectedResult = 165L;
        try (CheckpointableKeyedStateBackend keyedBackend = this.createKeyedBackend((TypeSerializer)StringSerializer.INSTANCE);){
            InternalAggregatingState state = (InternalAggregatingState)keyedBackend.getPartitionedState((Object)0, (TypeSerializer)IntSerializer.INSTANCE, (StateDescriptor)stateDescr);
            keyedBackend.setCurrentKey((Object)"abc");
            state.setCurrentNamespace((Object)namespace1);
            state.add((Object)33L);
            state.add((Object)55L);
            state.setCurrentNamespace((Object)namespace2);
            state.add((Object)22L);
            state.add((Object)11L);
            state.setCurrentNamespace((Object)namespace3);
            state.add((Object)44L);
            keyedBackend.setCurrentKey((Object)"abc");
            state.mergeNamespaces((Object)namespace1, Arrays.asList(namespace2, namespace3));
            state.setCurrentNamespace((Object)namespace1);
            Assertions.assertThat((Long)((Long)state.get())).isEqualTo((Object)expectedResult);
            keyedBackend.setCurrentKey((Object)"abc");
            state.setCurrentNamespace((Object)namespace1);
            state.clear();
            Assertions.assertThat((Long)((Long)state.get())).isNull();
        }
    }

    @Test
    void testAggregatingStateMergingWithMutableAccumulatorWithEmpty() throws Exception {
        AggregatingStateDescriptor stateDescr = new AggregatingStateDescriptor("my-state", (AggregateFunction)new MutableAggregatingAddingFunction(), MutableLong.class);
        Integer namespace1 = 1;
        Integer namespace2 = 2;
        Integer namespace3 = 3;
        Long expectedResult = 165L;
        try (CheckpointableKeyedStateBackend keyedBackend = this.createKeyedBackend((TypeSerializer)StringSerializer.INSTANCE);){
            InternalAggregatingState state = (InternalAggregatingState)keyedBackend.getPartitionedState((Object)0, (TypeSerializer)IntSerializer.INSTANCE, (StateDescriptor)stateDescr);
            keyedBackend.setCurrentKey((Object)"def");
            state.setCurrentNamespace((Object)namespace1);
            state.add((Object)11L);
            state.add((Object)44L);
            state.setCurrentNamespace((Object)namespace3);
            state.add((Object)22L);
            state.add((Object)55L);
            state.add((Object)33L);
            keyedBackend.setCurrentKey((Object)"def");
            state.mergeNamespaces((Object)namespace1, Arrays.asList(namespace2, namespace3));
            state.setCurrentNamespace((Object)namespace1);
            Assertions.assertThat((Long)((Long)state.get())).isEqualTo((Object)expectedResult);
            keyedBackend.setCurrentKey((Object)"def");
            state.setCurrentNamespace((Object)namespace1);
            state.clear();
            Assertions.assertThat((Long)((Long)state.get())).isNull();
        }
    }

    @Test
    void testAggregatingStateMergingWithMutableAccumulatorEmpty() throws Exception {
        AggregatingStateDescriptor stateDescr = new AggregatingStateDescriptor("my-state", (AggregateFunction)new MutableAggregatingAddingFunction(), MutableLong.class);
        Integer namespace1 = 1;
        Integer namespace2 = 2;
        Integer namespace3 = 3;
        try (CheckpointableKeyedStateBackend keyedBackend = this.createKeyedBackend((TypeSerializer)StringSerializer.INSTANCE);){
            InternalAggregatingState state = (InternalAggregatingState)keyedBackend.getPartitionedState((Object)0, (TypeSerializer)IntSerializer.INSTANCE, (StateDescriptor)stateDescr);
            keyedBackend.setCurrentKey((Object)"ghi");
            state.mergeNamespaces((Object)namespace1, Arrays.asList(namespace2, namespace3));
            state.setCurrentNamespace((Object)namespace1);
            Assertions.assertThat((Long)((Long)state.get())).isNull();
        }
    }

    @Test
    void testAggregatingStateMergingWithMutableAccumulatorInTargetNamespace() throws Exception {
        AggregatingStateDescriptor stateDescr = new AggregatingStateDescriptor("my-state", (AggregateFunction)new MutableAggregatingAddingFunction(), MutableLong.class);
        Integer namespace1 = 1;
        Integer namespace2 = 2;
        Integer namespace3 = 3;
        Long expectedResult = 165L;
        try (CheckpointableKeyedStateBackend keyedBackend = this.createKeyedBackend((TypeSerializer)StringSerializer.INSTANCE);){
            InternalAggregatingState state = (InternalAggregatingState)keyedBackend.getPartitionedState((Object)0, (TypeSerializer)IntSerializer.INSTANCE, (StateDescriptor)stateDescr);
            keyedBackend.setCurrentKey((Object)"jkl");
            state.setCurrentNamespace((Object)namespace1);
            state.add((Object)11L);
            state.add((Object)22L);
            state.add((Object)33L);
            state.add((Object)44L);
            state.add((Object)55L);
            keyedBackend.setCurrentKey((Object)"jkl");
            state.mergeNamespaces((Object)namespace1, Arrays.asList(namespace2, namespace3));
            state.setCurrentNamespace((Object)namespace1);
            Assertions.assertThat((Long)((Long)state.get())).isEqualTo((Object)expectedResult);
            keyedBackend.setCurrentKey((Object)"jkl");
            state.setCurrentNamespace((Object)namespace1);
            state.clear();
            Assertions.assertThat((Long)((Long)state.get())).isNull();
        }
    }

    @Test
    void testAggregatingStateMergingWithMutableAccumulatorInASingleNamespace() throws Exception {
        AggregatingStateDescriptor stateDescr = new AggregatingStateDescriptor("my-state", (AggregateFunction)new MutableAggregatingAddingFunction(), MutableLong.class);
        Integer namespace1 = 1;
        Integer namespace2 = 2;
        Integer namespace3 = 3;
        Long expectedResult = 165L;
        try (CheckpointableKeyedStateBackend keyedBackend = this.createKeyedBackend((TypeSerializer)StringSerializer.INSTANCE);){
            InternalAggregatingState state = (InternalAggregatingState)keyedBackend.getPartitionedState((Object)0, (TypeSerializer)IntSerializer.INSTANCE, (StateDescriptor)stateDescr);
            keyedBackend.setCurrentKey((Object)"mno");
            state.setCurrentNamespace((Object)namespace3);
            state.add((Object)11L);
            state.add((Object)22L);
            state.add((Object)33L);
            state.add((Object)44L);
            state.add((Object)55L);
            keyedBackend.setCurrentKey((Object)"mno");
            state.mergeNamespaces((Object)namespace1, Arrays.asList(namespace2, namespace3));
            state.setCurrentNamespace((Object)namespace1);
            Assertions.assertThat((Long)((Long)state.get())).isEqualTo((Object)expectedResult);
            keyedBackend.setCurrentKey((Object)"mno");
            state.setCurrentNamespace((Object)namespace1);
            state.clear();
            Assertions.assertThat((Long)((Long)state.get())).isNull();
        }
    }

    @Test
    void testAggregatingStateAddAndGetWithImmutableAccumulator() throws Exception {
        AggregatingStateDescriptor stateDescr = new AggregatingStateDescriptor("my-state", (AggregateFunction)new ImmutableAggregatingAddingFunction(), Long.class);
        try (CheckpointableKeyedStateBackend keyedBackend = this.createKeyedBackend((TypeSerializer)StringSerializer.INSTANCE);){
            AggregatingState state = (AggregatingState)keyedBackend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)stateDescr);
            keyedBackend.setCurrentKey((Object)"def");
            Assertions.assertThat((Long)((Long)state.get())).isNull();
            state.add((Object)17L);
            state.add((Object)11L);
            Assertions.assertThat((Long)((Long)state.get())).isEqualTo(28L);
            keyedBackend.setCurrentKey((Object)"def");
            Assertions.assertThat((Long)((Long)state.get())).isEqualTo(28L);
            state.clear();
            Assertions.assertThat((Long)((Long)state.get())).isNull();
            keyedBackend.setCurrentKey((Object)"g");
            Assertions.assertThat((Long)((Long)state.get())).isNull();
            state.add((Object)1L);
            state.add((Object)2L);
            keyedBackend.setCurrentKey((Object)"g");
            state.add((Object)3L);
            state.add((Object)2L);
            state.add((Object)1L);
            keyedBackend.setCurrentKey((Object)"g");
            Assertions.assertThat((Long)((Long)state.get())).isEqualTo(9L);
            state.clear();
            Assertions.assertThat((Long)((Long)state.get())).isNull();
        }
    }

    @Test
    void testAggregatingStateMergingWithImmutableAccumulatorOverThreeNamespaces() throws Exception {
        AggregatingStateDescriptor stateDescr = new AggregatingStateDescriptor("my-state", (AggregateFunction)new ImmutableAggregatingAddingFunction(), Long.class);
        Integer namespace1 = 1;
        Integer namespace2 = 2;
        Integer namespace3 = 3;
        Long expectedResult = 165L;
        try (CheckpointableKeyedStateBackend keyedBackend = this.createKeyedBackend((TypeSerializer)StringSerializer.INSTANCE);){
            InternalAggregatingState state = (InternalAggregatingState)keyedBackend.getPartitionedState((Object)0, (TypeSerializer)IntSerializer.INSTANCE, (StateDescriptor)stateDescr);
            keyedBackend.setCurrentKey((Object)"abc");
            state.setCurrentNamespace((Object)namespace1);
            state.add((Object)33L);
            state.add((Object)55L);
            state.setCurrentNamespace((Object)namespace2);
            state.add((Object)22L);
            state.add((Object)11L);
            state.setCurrentNamespace((Object)namespace3);
            state.add((Object)44L);
            keyedBackend.setCurrentKey((Object)"abc");
            state.mergeNamespaces((Object)namespace1, Arrays.asList(namespace2, namespace3));
            state.setCurrentNamespace((Object)namespace1);
            Assertions.assertThat((Long)((Long)state.get())).isEqualTo((Object)expectedResult);
            keyedBackend.setCurrentKey((Object)"abc");
            state.setCurrentNamespace((Object)namespace1);
            state.clear();
            Assertions.assertThat((Long)((Long)state.get())).isNull();
        }
    }

    @Test
    void testAggregatingStateMergingWithImmutableAccumulatorWithEmpty() throws Exception {
        AggregatingStateDescriptor stateDescr = new AggregatingStateDescriptor("my-state", (AggregateFunction)new ImmutableAggregatingAddingFunction(), Long.class);
        Integer namespace1 = 1;
        Integer namespace2 = 2;
        Integer namespace3 = 3;
        Long expectedResult = 165L;
        try (CheckpointableKeyedStateBackend keyedBackend = this.createKeyedBackend((TypeSerializer)StringSerializer.INSTANCE);){
            InternalAggregatingState state = (InternalAggregatingState)keyedBackend.getPartitionedState((Object)0, (TypeSerializer)IntSerializer.INSTANCE, (StateDescriptor)stateDescr);
            keyedBackend.setCurrentKey((Object)"def");
            state.setCurrentNamespace((Object)namespace1);
            state.add((Object)11L);
            state.add((Object)44L);
            state.setCurrentNamespace((Object)namespace3);
            state.add((Object)22L);
            state.add((Object)55L);
            state.add((Object)33L);
            keyedBackend.setCurrentKey((Object)"def");
            state.mergeNamespaces((Object)namespace1, Arrays.asList(namespace2, namespace3));
            state.setCurrentNamespace((Object)namespace1);
            Assertions.assertThat((Long)((Long)state.get())).isEqualTo((Object)expectedResult);
            keyedBackend.setCurrentKey((Object)"def");
            state.setCurrentNamespace((Object)namespace1);
            state.clear();
            Assertions.assertThat((Long)((Long)state.get())).isNull();
        }
    }

    @Test
    void testAggregatingStateMergingWithImmutableAccumulatorEmpty() throws Exception {
        AggregatingStateDescriptor stateDescr = new AggregatingStateDescriptor("my-state", (AggregateFunction)new ImmutableAggregatingAddingFunction(), Long.class);
        Integer namespace1 = 1;
        Integer namespace2 = 2;
        Integer namespace3 = 3;
        try (CheckpointableKeyedStateBackend keyedBackend = this.createKeyedBackend((TypeSerializer)StringSerializer.INSTANCE);){
            InternalAggregatingState state = (InternalAggregatingState)keyedBackend.getPartitionedState((Object)0, (TypeSerializer)IntSerializer.INSTANCE, (StateDescriptor)stateDescr);
            keyedBackend.setCurrentKey((Object)"ghi");
            state.mergeNamespaces((Object)namespace1, Arrays.asList(namespace2, namespace3));
            state.setCurrentNamespace((Object)namespace1);
            Assertions.assertThat((Long)((Long)state.get())).isNull();
        }
    }

    @Test
    void testAggregatingStateMergingWithImmutableAccumulatorInTargetNamespace() throws Exception {
        AggregatingStateDescriptor stateDescr = new AggregatingStateDescriptor("my-state", (AggregateFunction)new ImmutableAggregatingAddingFunction(), Long.class);
        Integer namespace1 = 1;
        Integer namespace2 = 2;
        Integer namespace3 = 3;
        Long expectedResult = 165L;
        try (CheckpointableKeyedStateBackend keyedBackend = this.createKeyedBackend((TypeSerializer)StringSerializer.INSTANCE);){
            InternalAggregatingState state = (InternalAggregatingState)keyedBackend.getPartitionedState((Object)0, (TypeSerializer)IntSerializer.INSTANCE, (StateDescriptor)stateDescr);
            keyedBackend.setCurrentKey((Object)"jkl");
            state.setCurrentNamespace((Object)namespace1);
            state.add((Object)11L);
            state.add((Object)22L);
            state.add((Object)33L);
            state.add((Object)44L);
            state.add((Object)55L);
            keyedBackend.setCurrentKey((Object)"jkl");
            state.mergeNamespaces((Object)namespace1, Arrays.asList(namespace2, namespace3));
            state.setCurrentNamespace((Object)namespace1);
            Assertions.assertThat((Long)((Long)state.get())).isEqualTo((Object)expectedResult);
            keyedBackend.setCurrentKey((Object)"jkl");
            state.setCurrentNamespace((Object)namespace1);
            state.clear();
            Assertions.assertThat((Long)((Long)state.get())).isNull();
        }
    }

    @Test
    void testAggregatingStateMergingWithImmutableAccumulatorInASingleNamespace() throws Exception {
        AggregatingStateDescriptor stateDescr = new AggregatingStateDescriptor("my-state", (AggregateFunction)new ImmutableAggregatingAddingFunction(), Long.class);
        Integer namespace1 = 1;
        Integer namespace2 = 2;
        Integer namespace3 = 3;
        Long expectedResult = 165L;
        try (CheckpointableKeyedStateBackend keyedBackend = this.createKeyedBackend((TypeSerializer)StringSerializer.INSTANCE);){
            InternalAggregatingState state = (InternalAggregatingState)keyedBackend.getPartitionedState((Object)0, (TypeSerializer)IntSerializer.INSTANCE, (StateDescriptor)stateDescr);
            keyedBackend.setCurrentKey((Object)"mno");
            state.setCurrentNamespace((Object)namespace3);
            state.add((Object)11L);
            state.add((Object)22L);
            state.add((Object)33L);
            state.add((Object)44L);
            state.add((Object)55L);
            keyedBackend.setCurrentKey((Object)"mno");
            state.mergeNamespaces((Object)namespace1, Arrays.asList(namespace2, namespace3));
            state.setCurrentNamespace((Object)namespace1);
            Assertions.assertThat((Long)((Long)state.get())).isEqualTo((Object)expectedResult);
            keyedBackend.setCurrentKey((Object)"mno");
            state.setCurrentNamespace((Object)namespace1);
            state.clear();
            Assertions.assertThat((Long)((Long)state.get())).isNull();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    void testMapStateIsEmpty() throws Exception {
        MapStateDescriptor kvId = new MapStateDescriptor("id", Integer.class, Long.class);
        CheckpointableKeyedStateBackend backend = this.createKeyedBackend((TypeSerializer)IntSerializer.INSTANCE);
        try {
            int i;
            MapState state = (MapState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
            backend.setCurrentKey((Object)1);
            Assertions.assertThat((boolean)state.isEmpty()).isTrue();
            int stateSize = 1024;
            for (i = 0; i < stateSize; ++i) {
                state.put((Object)i, (Object)((long)i * 2L));
                Assertions.assertThat((boolean)state.isEmpty()).isFalse();
            }
            for (i = 0; i < stateSize; ++i) {
                Assertions.assertThat((boolean)state.isEmpty()).isFalse();
                state.remove((Object)i);
            }
            Assertions.assertThat((boolean)state.isEmpty()).isTrue();
        }
        finally {
            backend.dispose();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    void testMapStateIteratorArbitraryAccess() throws Exception {
        MapStateDescriptor kvId = new MapStateDescriptor("id", Integer.class, Long.class);
        CheckpointableKeyedStateBackend backend = this.createKeyedBackend((TypeSerializer)IntSerializer.INSTANCE);
        try {
            MapState state = (MapState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
            backend.setCurrentKey((Object)1);
            int stateSize = 4096;
            for (int i = 0; i < stateSize; ++i) {
                state.put((Object)i, (Object)((long)i * 2L));
            }
            Iterator iterator = state.iterator();
            int iteratorCount = 0;
            while (iterator.hasNext()) {
                Map.Entry entry = (Map.Entry)iterator.next();
                Assertions.assertThat((Integer)((Integer)entry.getKey())).isEqualTo(iteratorCount);
                switch (ThreadLocalRandom.current().nextInt() % 3) {
                    case 0: {
                        iterator.remove();
                        Assertions.assertThatThrownBy(iterator::remove).isInstanceOf(IllegalStateException.class);
                        break;
                    }
                    case 1: {
                        iterator.hasNext();
                        iterator.remove();
                        break;
                    }
                }
                ++iteratorCount;
            }
            Assertions.assertThat((int)iteratorCount).isEqualTo(stateSize);
        }
        finally {
            backend.dispose();
        }
    }

    @Test
    void testValueStateNullAsDefaultValue() throws Exception {
        CheckpointableKeyedStateBackend backend = this.createKeyedBackend((TypeSerializer)IntSerializer.INSTANCE);
        ValueStateDescriptor kvId = new ValueStateDescriptor("id", String.class, null);
        ValueState state = (ValueState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
        backend.setCurrentKey((Object)1);
        Assertions.assertThat((String)((String)state.value())).isNull();
        state.update((Object)"Ciao");
        Assertions.assertThat((String)((String)state.value())).isEqualTo("Ciao");
        state.clear();
        Assertions.assertThat((String)((String)state.value())).isNull();
        backend.dispose();
    }

    @Test
    void testValueStateDefaultValue() throws Exception {
        CheckpointableKeyedStateBackend backend = this.createKeyedBackend((TypeSerializer)IntSerializer.INSTANCE);
        ValueStateDescriptor kvId = new ValueStateDescriptor("id", String.class, (Object)"Hello");
        ValueState state = (ValueState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
        backend.setCurrentKey((Object)1);
        Assertions.assertThat((String)((String)state.value())).isEqualTo("Hello");
        state.update((Object)"Ciao");
        Assertions.assertThat((String)((String)state.value())).isEqualTo("Ciao");
        state.clear();
        Assertions.assertThat((String)((String)state.value())).isEqualTo("Hello");
        backend.dispose();
    }

    @Test
    void testReducingStateDefaultValue() throws Exception {
        CheckpointableKeyedStateBackend backend = this.createKeyedBackend((TypeSerializer)IntSerializer.INSTANCE);
        ReducingStateDescriptor kvId = new ReducingStateDescriptor("id", (ReduceFunction)new AppendingReduce(), String.class);
        ReducingState state = (ReducingState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
        backend.setCurrentKey((Object)1);
        Assertions.assertThat((String)((String)state.get())).isNull();
        state.add((Object)"Ciao");
        Assertions.assertThat((String)((String)state.get())).isEqualTo("Ciao");
        state.clear();
        Assertions.assertThat((String)((String)state.get())).isNull();
        backend.dispose();
    }

    @Test
    void testListStateDefaultValue() throws Exception {
        CheckpointableKeyedStateBackend backend = this.createKeyedBackend((TypeSerializer)IntSerializer.INSTANCE);
        ListStateDescriptor kvId = new ListStateDescriptor("id", String.class);
        ListState state = (ListState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
        backend.setCurrentKey((Object)1);
        Assertions.assertThat((Iterable)((Iterable)state.get())).isNull();
        state.update(Arrays.asList("Ciao", "Bello"));
        Assertions.assertThat((Iterable)((Iterable)state.get())).containsExactlyInAnyOrder((Object[])new String[]{"Ciao", "Bello"});
        state.clear();
        Assertions.assertThat((Iterable)((Iterable)state.get())).isNull();
        backend.dispose();
    }

    @Test
    void testMapStateDefaultValue() throws Exception {
        CheckpointableKeyedStateBackend backend = this.createKeyedBackend((TypeSerializer)IntSerializer.INSTANCE);
        MapStateDescriptor kvId = new MapStateDescriptor("id", String.class, String.class);
        MapState state = (MapState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
        backend.setCurrentKey((Object)1);
        Assertions.assertThat((Iterable)state.entries()).isNotNull();
        Assertions.assertThat(state.entries().iterator()).isExhausted();
        state.put((Object)"Ciao", (Object)"Hello");
        state.put((Object)"Bello", (Object)"Nice");
        Assertions.assertThat((Iterable)state.entries()).isNotNull();
        Assertions.assertThat((String)((String)state.get((Object)"Ciao"))).isEqualTo("Hello");
        Assertions.assertThat((String)((String)state.get((Object)"Bello"))).isEqualTo("Nice");
        state.clear();
        Assertions.assertThat((Iterable)state.entries()).isNotNull();
        Assertions.assertThat(state.entries().iterator()).isExhausted();
        backend.dispose();
    }

    private static final class MutableLong {
        long value;

        private MutableLong() {
        }
    }

    private static class ImmutableAggregatingAddingFunction
    implements AggregateFunction<Long, Long, Long> {
        private ImmutableAggregatingAddingFunction() {
        }

        public Long createAccumulator() {
            return 0L;
        }

        public Long add(Long value, Long accumulator) {
            accumulator = accumulator + value;
            return accumulator;
        }

        public Long getResult(Long accumulator) {
            return accumulator;
        }

        public Long merge(Long a, Long b) {
            return a + b;
        }
    }

    private static class MutableAggregatingAddingFunction
    implements AggregateFunction<Long, MutableLong, Long> {
        private MutableAggregatingAddingFunction() {
        }

        public MutableLong createAccumulator() {
            return new MutableLong();
        }

        public MutableLong add(Long value, MutableLong accumulator) {
            accumulator.value += value.longValue();
            return accumulator;
        }

        public Long getResult(MutableLong accumulator) {
            return accumulator.value;
        }

        public MutableLong merge(MutableLong a, MutableLong b) {
            a.value += b.value;
            return a;
        }
    }

    private static class AppendingReduce
    implements ReduceFunction<String> {
        private AppendingReduce() {
        }

        public String reduce(String value1, String value2) throws Exception {
            return value1 + "," + value2;
        }
    }
}

