/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.state.changelog.restore;

import java.io.IOException;
import java.util.Collection;
import java.util.concurrent.RunnableFuture;
import java.util.stream.Stream;
import javax.annotation.Nonnull;
import org.apache.flink.api.common.state.State;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.CheckpointableKeyedStateBackend;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.PriorityComparable;
import org.apache.flink.runtime.state.SavepointResources;
import org.apache.flink.runtime.state.SnapshotResult;
import org.apache.flink.runtime.state.StateSnapshotTransformer;
import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement;
import org.apache.flink.runtime.state.internal.InternalKvState;
import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
import org.apache.flink.state.changelog.ChangelogKeyGroupedPriorityQueue;
import org.apache.flink.state.changelog.ChangelogState;
import org.apache.flink.state.changelog.ChangelogStateFactory;
import org.apache.flink.state.changelog.KvStateChangeLogger;
import org.apache.flink.state.changelog.StateChangeLogger;
import org.apache.flink.state.changelog.restore.ChangelogRestoreTarget;
import org.apache.flink.state.changelog.restore.FunctionDelegationHelper;
import org.apache.flink.util.function.ThrowingConsumer;

public class ChangelogMigrationRestoreTarget<K>
implements ChangelogRestoreTarget<K> {
    private final AbstractKeyedStateBackend<K> keyedStateBackend;
    private final ChangelogStateFactory changelogStateFactory;
    private final FunctionDelegationHelper functionDelegationHelper = new FunctionDelegationHelper();

    public ChangelogMigrationRestoreTarget(AbstractKeyedStateBackend<K> keyedStateBackend, ChangelogStateFactory changelogStateFactory) {
        this.keyedStateBackend = keyedStateBackend;
        this.changelogStateFactory = changelogStateFactory;
    }

    @Override
    public KeyGroupRange getKeyGroupRange() {
        return this.keyedStateBackend.getKeyGroupRange();
    }

    @Override
    public <N, S extends State, V> S createKeyedState(TypeSerializer<N> namespaceSerializer, StateDescriptor<S, V> stateDescriptor) throws Exception {
        InternalKvState kvState = (InternalKvState)this.keyedStateBackend.createOrUpdateInternalState(namespaceSerializer, stateDescriptor, StateSnapshotTransformer.StateSnapshotTransformFactory.noTransform(), true);
        ChangelogState changelogState = this.changelogStateFactory.getExistingState(stateDescriptor.getName(), StateMetaInfoSnapshot.BackendStateType.KEY_VALUE);
        if (changelogState == null) {
            changelogState = this.changelogStateFactory.create(stateDescriptor, kvState, VoidStateChangeLogger.getInstance(), this.keyedStateBackend);
        } else {
            changelogState.setDelegatedState(kvState);
        }
        this.functionDelegationHelper.addOrUpdate(stateDescriptor);
        return (S)((State)changelogState);
    }

    @Override
    @Nonnull
    public <T extends HeapPriorityQueueElement & PriorityComparable<? super T>> KeyGroupedInternalPriorityQueue<T> createPqState(@Nonnull String stateName, @Nonnull TypeSerializer<T> byteOrderedElementSerializer) {
        KeyGroupedInternalPriorityQueue internalPriorityQueue = this.keyedStateBackend.create(stateName, byteOrderedElementSerializer, true);
        ChangelogKeyGroupedPriorityQueue queue = (ChangelogKeyGroupedPriorityQueue)this.changelogStateFactory.getExistingState(stateName, StateMetaInfoSnapshot.BackendStateType.PRIORITY_QUEUE);
        if (queue == null) {
            queue = this.changelogStateFactory.create(stateName, internalPriorityQueue, VoidStateChangeLogger.getInstance(), byteOrderedElementSerializer);
        } else {
            queue.setDelegatedState(internalPriorityQueue);
        }
        return queue;
    }

    @Override
    public ChangelogState getExistingState(String name, StateMetaInfoSnapshot.BackendStateType type) {
        return this.changelogStateFactory.getExistingState(name, type);
    }

    @Override
    public CheckpointableKeyedStateBackend<K> getRestoredKeyedStateBackend() {
        return ChangelogMigrationRestoreTarget.wrapKeyedStateBackend(this.keyedStateBackend, this.changelogStateFactory, this.functionDelegationHelper);
    }

    private static <K> AbstractKeyedStateBackend<K> wrapKeyedStateBackend(final AbstractKeyedStateBackend<K> keyedStateBackend, final ChangelogStateFactory changelogStateFactory, final FunctionDelegationHelper functionDelegationHelper) {
        return new AbstractKeyedStateBackend<K>(keyedStateBackend){

            public void setCurrentKey(K newKey) {
                keyedStateBackend.setCurrentKey(newKey);
            }

            public void notifyCheckpointComplete(long checkpointId) throws Exception {
                keyedStateBackend.notifyCheckpointComplete(checkpointId);
            }

            @Nonnull
            public SavepointResources<K> savepoint() throws Exception {
                return keyedStateBackend.savepoint();
            }

            public int numKeyValueStateEntries() {
                return keyedStateBackend.numKeyValueStateEntries();
            }

            public <N> Stream<K> getKeys(String state, N namespace) {
                return keyedStateBackend.getKeys(state, namespace);
            }

            public <N> Stream<Tuple2<K, N>> getKeysAndNamespaces(String state) {
                return keyedStateBackend.getKeysAndNamespaces(state);
            }

            @Nonnull
            public <N, SV, SEV, S extends State, IS extends S> IS createOrUpdateInternalState(@Nonnull TypeSerializer<N> namespaceSerializer, @Nonnull StateDescriptor<S, SV> stateDesc, @Nonnull StateSnapshotTransformer.StateSnapshotTransformFactory<SEV> snapshotTransformFactory) throws Exception {
                return (IS)keyedStateBackend.createOrUpdateInternalState(namespaceSerializer, stateDesc, snapshotTransformFactory);
            }

            public <N, S extends State> S getPartitionedState(N namespace, TypeSerializer<N> namespaceSerializer, StateDescriptor<S, ?> stateDescriptor) throws Exception {
                State partitionedState = super.getPartitionedState(namespace, namespaceSerializer, stateDescriptor);
                functionDelegationHelper.addOrUpdate(stateDescriptor);
                return (S)partitionedState;
            }

            public <N, S extends State, V> S getOrCreateKeyedState(TypeSerializer<N> namespaceSerializer, StateDescriptor<S, V> stateDescriptor) throws Exception {
                State keyedState = super.getOrCreateKeyedState(namespaceSerializer, stateDescriptor);
                functionDelegationHelper.addOrUpdate(stateDescriptor);
                return (S)keyedState;
            }

            @Nonnull
            public <T extends HeapPriorityQueueElement & PriorityComparable<? super T>> KeyGroupedInternalPriorityQueue<T> create(@Nonnull String stateName, @Nonnull TypeSerializer<T> byteOrderedElementSerializer) {
                return keyedStateBackend.create(stateName, byteOrderedElementSerializer);
            }

            @Nonnull
            public RunnableFuture<SnapshotResult<KeyedStateHandle>> snapshot(long checkpointId, long timestamp, @Nonnull CheckpointStreamFactory streamFactory, @Nonnull CheckpointOptions checkpointOptions) throws Exception {
                return keyedStateBackend.snapshot(checkpointId, timestamp, streamFactory, checkpointOptions);
            }

            public void dispose() {
                keyedStateBackend.dispose();
                changelogStateFactory.dispose();
            }
        };
    }

    private static class VoidStateChangeLogger<Value, Namespace>
    implements KvStateChangeLogger<Value, Namespace>,
    StateChangeLogger<Value, Namespace> {
        private static final VoidStateChangeLogger<Object, Object> INSTANCE = new VoidStateChangeLogger();

        public static <Value, Namespace> VoidStateChangeLogger<Value, Namespace> getInstance() {
            return INSTANCE;
        }

        private VoidStateChangeLogger() {
        }

        @Override
        public void namespacesMerged(Namespace target, Collection<Namespace> sources) throws IOException {
        }

        @Override
        public void valueUpdated(Value newValue, Namespace ns) throws IOException {
        }

        @Override
        public void valueUpdatedInternal(Value newValue, Namespace ns) throws IOException {
        }

        @Override
        public void valueAdded(Value addedValue, Namespace ns) throws IOException {
        }

        @Override
        public void valueCleared(Namespace ns) throws IOException {
        }

        @Override
        public void valueElementAdded(ThrowingConsumer<DataOutputView, IOException> dataSerializer, Namespace ns) throws IOException {
        }

        @Override
        public void valueElementAddedOrUpdated(ThrowingConsumer<DataOutputView, IOException> dataSerializer, Namespace ns) throws IOException {
        }

        @Override
        public void valueElementRemoved(ThrowingConsumer<DataOutputView, IOException> dataSerializer, Namespace ns) throws IOException {
        }

        @Override
        public void resetWritingMetaFlag() {
        }

        @Override
        public void close() throws IOException {
        }
    }
}

