/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.state.ttl;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.RunnableFuture;
import javax.annotation.Nonnull;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.state.State;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.operators.testutils.MockEnvironment;
import org.apache.flink.runtime.state.CheckpointStorage;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.CheckpointableKeyedStateBackend;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyedStateBackendParametersImpl;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.SharedStateRegistry;
import org.apache.flink.runtime.state.SharedStateRegistryImpl;
import org.apache.flink.runtime.state.SnapshotResult;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.internal.InternalKvState;
import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
import org.apache.flink.util.Preconditions;

public abstract class StateBackendTestContext {
    public static final int NUMBER_OF_KEY_GROUPS = 10;
    private final StateBackend stateBackend;
    private final CheckpointOptions checkpointOptions;
    private final CheckpointStreamFactory checkpointStreamFactory;
    private final TtlTimeProvider timeProvider;
    private final SharedStateRegistry sharedStateRegistry;
    private final List<KeyedStateHandle> snapshots;
    private MockEnvironment env;
    private CheckpointableKeyedStateBackend<String> keyedStateBackend;

    protected StateBackendTestContext(TtlTimeProvider timeProvider) {
        this.timeProvider = (TtlTimeProvider)Preconditions.checkNotNull((Object)timeProvider);
        this.stateBackend = (StateBackend)Preconditions.checkNotNull((Object)this.createStateBackend());
        this.checkpointOptions = CheckpointOptions.forCheckpointWithDefaultLocation();
        this.checkpointStreamFactory = this.createCheckpointStreamFactory();
        this.sharedStateRegistry = new SharedStateRegistryImpl();
        this.snapshots = new ArrayList<KeyedStateHandle>();
    }

    protected abstract StateBackend createStateBackend();

    protected CheckpointStorage createCheckpointStorage() {
        if (this.stateBackend instanceof CheckpointStorage) {
            return (CheckpointStorage)this.stateBackend;
        }
        throw new IllegalStateException("The state backend under test does not implement CheckpointStorage.Please override 'createCheckpointStorage' and provide an appropriatecheckpoint storage instance");
    }

    private CheckpointStreamFactory createCheckpointStreamFactory() {
        try {
            return this.createCheckpointStorage().createCheckpointStorage(new JobID()).resolveCheckpointStorageLocation(2L, this.checkpointOptions.getTargetLocation());
        }
        catch (IOException e) {
            throw new RuntimeException("unexpected");
        }
    }

    void createAndRestoreKeyedStateBackend(KeyedStateHandle snapshot) {
        this.createAndRestoreKeyedStateBackend(10, snapshot);
    }

    void createAndRestoreKeyedStateBackend(int numberOfKeyGroups, KeyedStateHandle snapshot) {
        List stateHandles;
        if (snapshot == null) {
            stateHandles = Collections.emptyList();
        } else {
            stateHandles = new ArrayList(1);
            stateHandles.add(snapshot);
        }
        this.env = MockEnvironment.builder().build();
        try {
            this.disposeKeyedStateBackend();
            this.keyedStateBackend = this.stateBackend.createKeyedStateBackend((StateBackend.KeyedStateBackendParameters)new KeyedStateBackendParametersImpl((Environment)this.env, new JobID(), "test", (TypeSerializer)StringSerializer.INSTANCE, numberOfKeyGroups, new KeyGroupRange(0, numberOfKeyGroups - 1), this.env.getTaskKvStateRegistry(), this.timeProvider, (MetricGroup)new UnregisteredMetricsGroup(), (name, value) -> {}, stateHandles, new CloseableRegistry(), 1.0));
        }
        catch (Exception e) {
            throw new RuntimeException("unexpected", e);
        }
    }

    void dispose() throws Exception {
        this.disposeKeyedStateBackend();
        for (KeyedStateHandle snapshot : this.snapshots) {
            snapshot.discardState();
        }
        this.snapshots.clear();
        this.sharedStateRegistry.close();
        if (this.env != null) {
            this.env.close();
        }
    }

    private void disposeKeyedStateBackend() {
        if (this.keyedStateBackend != null) {
            this.keyedStateBackend.dispose();
            this.keyedStateBackend = null;
        }
    }

    KeyedStateHandle takeSnapshot() throws Exception {
        SnapshotResult snapshotResult = (SnapshotResult)this.triggerSnapshot().get();
        KeyedStateHandle jobManagerOwnedSnapshot = (KeyedStateHandle)snapshotResult.getJobManagerOwnedSnapshot();
        if (jobManagerOwnedSnapshot != null) {
            jobManagerOwnedSnapshot.registerSharedStates(this.sharedStateRegistry, 0L);
        }
        return jobManagerOwnedSnapshot;
    }

    @Nonnull
    RunnableFuture<SnapshotResult<KeyedStateHandle>> triggerSnapshot() throws Exception {
        RunnableFuture snapshotRunnableFuture = this.keyedStateBackend.snapshot(682375462392L, 10L, this.checkpointStreamFactory, this.checkpointOptions);
        if (!snapshotRunnableFuture.isDone()) {
            snapshotRunnableFuture.run();
        }
        return snapshotRunnableFuture;
    }

    public void setCurrentKey(String key) {
        Preconditions.checkNotNull(this.keyedStateBackend, (String)"keyed backend is not initialised");
        this.keyedStateBackend.setCurrentKey((Object)key);
    }

    <N, S extends State, V> S createState(StateDescriptor<S, V> stateDescriptor, N defaultNamespace) throws Exception {
        State state = this.keyedStateBackend.getOrCreateKeyedState((TypeSerializer)StringSerializer.INSTANCE, stateDescriptor);
        ((InternalKvState)state).setCurrentNamespace(defaultNamespace);
        return (S)state;
    }

    public <B extends CheckpointableKeyedStateBackend<String>> B getKeyedStateBackend() {
        return (B)this.keyedStateBackend;
    }
}

