/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.state;

import java.io.IOException;
import java.io.Serializable;
import java.util.concurrent.RunnableFuture;
import java.util.function.Supplier;
import java.util.stream.Stream;
import javax.annotation.Nonnull;
import org.apache.flink.api.common.state.State;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.asyncprocessing.MockStateExecutor;
import org.apache.flink.runtime.asyncprocessing.StateExecutor;
import org.apache.flink.runtime.asyncprocessing.StateRequestHandler;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
import org.apache.flink.runtime.state.AbstractStateBackend;
import org.apache.flink.runtime.state.AsyncKeyedStateBackend;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.CheckpointableKeyedStateBackend;
import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.OperatorStateBackend;
import org.apache.flink.runtime.state.PriorityComparable;
import org.apache.flink.runtime.state.SavepointResources;
import org.apache.flink.runtime.state.SnapshotResult;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.StateSnapshotTransformer;
import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement;
import org.apache.flink.runtime.state.v2.StateDescriptor;
import org.apache.flink.util.function.FunctionWithException;

public class StateBackendTestUtils {
    public static AbstractStateBackend wrapStateBackendWithSnapshotFunction(AbstractStateBackend delegatedStataBackend, SerializableFunctionWithException<RunnableFuture<SnapshotResult<KeyedStateHandle>>> snapshotResultFunction) {
        return new ApplyingSnapshotStateBackend(delegatedStataBackend, snapshotResultFunction);
    }

    public static StateBackend buildAsyncStateBackend(StateBackend delegatedSyncStateBackend) {
        return new TestAsyncStateBackend(delegatedSyncStateBackend).setInnerState(null).setStateExecutor(new MockStateExecutor());
    }

    public static StateBackend buildAsyncStateBackend(Supplier<org.apache.flink.api.common.state.v2.State> innerStateSupplier, StateExecutor stateExecutor) {
        return new TestAsyncStateBackend((StateBackend)new HashMapStateBackend()).setInnerState(innerStateSupplier).setStateExecutor(stateExecutor);
    }

    private StateBackendTestUtils() {
    }

    @FunctionalInterface
    public static interface SerializableFunctionWithException<T>
    extends FunctionWithException<T, T, Exception>,
    Serializable {
    }

    private static class ApplyingSnapshotStateBackend
    extends AbstractStateBackend {
        private static final long serialVersionUID = 1L;
        private final AbstractStateBackend delegatedStataBackend;
        private final SerializableFunctionWithException<RunnableFuture<SnapshotResult<KeyedStateHandle>>> snapshotResultFunction;

        public ApplyingSnapshotStateBackend(AbstractStateBackend delegatedStataBackend, SerializableFunctionWithException<RunnableFuture<SnapshotResult<KeyedStateHandle>>> snapshotResultFunction) {
            this.delegatedStataBackend = delegatedStataBackend;
            this.snapshotResultFunction = snapshotResultFunction;
        }

        public boolean useManagedMemory() {
            return this.delegatedStataBackend.useManagedMemory();
        }

        public <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(StateBackend.KeyedStateBackendParameters<K> parameters) throws IOException {
            final AbstractKeyedStateBackend delegatedKeyedStateBackend = this.delegatedStataBackend.createKeyedStateBackend(parameters);
            return new AbstractKeyedStateBackend<K>(parameters.getKvStateRegistry(), parameters.getKeySerializer(), parameters.getEnv().getUserCodeClassLoader().asClassLoader(), parameters.getEnv().getExecutionConfig(), parameters.getTtlTimeProvider(), delegatedKeyedStateBackend.getLatencyTrackingStateConfig(), parameters.getCancelStreamRegistry(), delegatedKeyedStateBackend.getKeyContext()){

                public void setCurrentKey(K newKey) {
                    delegatedKeyedStateBackend.setCurrentKey(newKey);
                }

                public void notifyCheckpointComplete(long checkpointId) throws Exception {
                    delegatedKeyedStateBackend.notifyCheckpointComplete(checkpointId);
                }

                @Nonnull
                public SavepointResources<K> savepoint() throws Exception {
                    return delegatedKeyedStateBackend.savepoint();
                }

                public int numKeyValueStateEntries() {
                    return delegatedKeyedStateBackend.numKeyValueStateEntries();
                }

                public <N> Stream<K> getKeys(String state, N namespace) {
                    return delegatedKeyedStateBackend.getKeys(state, namespace);
                }

                public <N> Stream<Tuple2<K, N>> getKeysAndNamespaces(String state) {
                    return delegatedKeyedStateBackend.getKeysAndNamespaces(state);
                }

                @Nonnull
                public <N, SV, SEV, S extends State, IS extends S> IS createOrUpdateInternalState(@Nonnull TypeSerializer<N> namespaceSerializer, @Nonnull org.apache.flink.api.common.state.StateDescriptor<S, SV> stateDesc, @Nonnull StateSnapshotTransformer.StateSnapshotTransformFactory<SEV> snapshotTransformFactory) throws Exception {
                    return (IS)delegatedKeyedStateBackend.createOrUpdateInternalState(namespaceSerializer, stateDesc, snapshotTransformFactory);
                }

                @Nonnull
                public <T extends HeapPriorityQueueElement & PriorityComparable<? super T>> KeyGroupedInternalPriorityQueue<T> create(@Nonnull String stateName, @Nonnull TypeSerializer<T> byteOrderedElementSerializer) {
                    return delegatedKeyedStateBackend.create(stateName, byteOrderedElementSerializer);
                }

                @Nonnull
                public RunnableFuture<SnapshotResult<KeyedStateHandle>> snapshot(long checkpointId, long timestamp, @Nonnull CheckpointStreamFactory streamFactory, @Nonnull CheckpointOptions checkpointOptions) throws Exception {
                    RunnableFuture snapshotResultRunnableFuture = delegatedKeyedStateBackend.snapshot(checkpointId, timestamp, streamFactory, checkpointOptions);
                    return (RunnableFuture)snapshotResultFunction.apply(snapshotResultRunnableFuture);
                }

                public void dispose() {
                    super.dispose();
                    delegatedKeyedStateBackend.dispose();
                }

                public void close() throws IOException {
                    super.close();
                    delegatedKeyedStateBackend.close();
                }
            };
        }

        public OperatorStateBackend createOperatorStateBackend(StateBackend.OperatorStateBackendParameters parameters) throws Exception {
            return this.delegatedStataBackend.createOperatorStateBackend(parameters);
        }
    }

    private static class TestAsyncKeyedStateBackend
    implements AsyncKeyedStateBackend {
        private final Supplier<org.apache.flink.api.common.state.v2.State> innerStateSupplier;
        private final StateExecutor stateExecutor;

        public TestAsyncKeyedStateBackend(Supplier<org.apache.flink.api.common.state.v2.State> innerStateSupplier, StateExecutor stateExecutor) {
            this.innerStateSupplier = innerStateSupplier;
            this.stateExecutor = stateExecutor;
        }

        public void setup(@Nonnull StateRequestHandler stateRequestHandler) {
        }

        @Nonnull
        public <SV, S extends org.apache.flink.api.common.state.v2.State> S createState(@Nonnull StateDescriptor<SV> stateDesc) {
            return (S)this.innerStateSupplier.get();
        }

        @Nonnull
        public StateExecutor createStateExecutor() {
            return this.stateExecutor;
        }

        public void dispose() {
        }

        public void close() {
        }
    }

    private static class TestAsyncStateBackend
    implements StateBackend {
        private final StateBackend delegatedStateBackend;
        private Supplier<org.apache.flink.api.common.state.v2.State> innerStateSupplier;
        private StateExecutor stateExecutor;

        public TestAsyncStateBackend(StateBackend delegatedStateBackend) {
            this.delegatedStateBackend = delegatedStateBackend;
        }

        public TestAsyncStateBackend setInnerState(Supplier<org.apache.flink.api.common.state.v2.State> innerStateSupplier) {
            this.innerStateSupplier = innerStateSupplier;
            return this;
        }

        public TestAsyncStateBackend setStateExecutor(StateExecutor stateExecutor) {
            this.stateExecutor = stateExecutor;
            return this;
        }

        public boolean supportsAsyncKeyedStateBackend() {
            return true;
        }

        public <K> AsyncKeyedStateBackend createAsyncKeyedStateBackend(StateBackend.KeyedStateBackendParameters<K> parameters) throws Exception {
            return this.delegatedStateBackend.supportsAsyncKeyedStateBackend() ? this.delegatedStateBackend.createAsyncKeyedStateBackend(parameters) : new TestAsyncKeyedStateBackend(this.innerStateSupplier, this.stateExecutor);
        }

        public <K> CheckpointableKeyedStateBackend<K> createKeyedStateBackend(StateBackend.KeyedStateBackendParameters<K> parameters) throws Exception {
            return this.delegatedStateBackend.createKeyedStateBackend(parameters);
        }

        public OperatorStateBackend createOperatorStateBackend(StateBackend.OperatorStateBackendParameters parameters) throws Exception {
            return this.delegatedStateBackend.createOperatorStateBackend(parameters);
        }
    }
}

