/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.state.changelog.restore;

import java.util.Collection;
import java.util.HashMap;
import java.util.Objects;
import java.util.stream.Collectors;
import org.apache.flink.annotation.Internal;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
import org.apache.flink.runtime.state.CheckpointableKeyedStateBackend;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.TaskStateManager;
import org.apache.flink.runtime.state.changelog.ChangelogStateBackendHandle;
import org.apache.flink.runtime.state.changelog.ChangelogStateHandle;
import org.apache.flink.runtime.state.changelog.StateChange;
import org.apache.flink.runtime.state.changelog.StateChangelogHandleReader;
import org.apache.flink.state.changelog.restore.ChangelogBackendLogApplier;
import org.apache.flink.state.changelog.restore.ChangelogRestoreTarget;
import org.apache.flink.state.changelog.restore.StateID;
import org.apache.flink.util.CloseableIterator;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.function.BiFunctionWithException;
import org.apache.flink.util.function.FunctionWithException;

@Internal
public class ChangelogBackendRestoreOperation {
    public static <K> CheckpointableKeyedStateBackend<K> restore(Configuration configuration, ClassLoader classLoader, TaskStateManager taskStateManager, Collection<ChangelogStateBackendHandle> stateHandles, BaseBackendBuilder<K> baseBackendBuilder, ChangelogRestoreTargetBuilder<K> changelogRestoreTargetBuilder) throws Exception {
        Collection<KeyedStateHandle> baseState = ChangelogBackendRestoreOperation.extractBaseState(stateHandles);
        AbstractKeyedStateBackend baseBackend = (AbstractKeyedStateBackend)baseBackendBuilder.apply(baseState);
        ChangelogRestoreTarget changelogRestoreTarget = (ChangelogRestoreTarget)changelogRestoreTargetBuilder.apply(baseBackend, stateHandles);
        for (ChangelogStateBackendHandle handle : stateHandles) {
            if (handle == null) continue;
            ChangelogBackendRestoreOperation.readBackendHandle(configuration, taskStateManager, changelogRestoreTarget, handle, classLoader);
        }
        return changelogRestoreTarget.getRestoredKeyedStateBackend();
    }

    private static <T extends ChangelogStateHandle> void readBackendHandle(Configuration configuration, TaskStateManager taskStateManager, ChangelogRestoreTarget<?> changelogRestoreTarget, ChangelogStateBackendHandle backendHandle, ClassLoader classLoader) throws Exception {
        HashMap<Short, StateID> stateIds = new HashMap<Short, StateID>();
        for (ChangelogStateHandle changelogHandle : backendHandle.getNonMaterializedStateHandles()) {
            StateChangelogHandleReader changelogHandleReader = taskStateManager.getStateChangelogStorageView(configuration, changelogHandle).createReader();
            CloseableIterator changes = changelogHandleReader.getChanges(changelogHandle);
            try {
                while (changes.hasNext()) {
                    ChangelogBackendLogApplier.apply((StateChange)changes.next(), changelogRestoreTarget, classLoader, stateIds);
                }
            }
            finally {
                if (changes == null) continue;
                changes.close();
            }
        }
    }

    private static Collection<KeyedStateHandle> extractBaseState(Collection<ChangelogStateBackendHandle> stateHandles) {
        Preconditions.checkNotNull(stateHandles);
        return stateHandles.stream().filter(Objects::nonNull).map(ChangelogStateBackendHandle::getMaterializedStateHandles).flatMap(Collection::stream).collect(Collectors.toList());
    }

    private ChangelogBackendRestoreOperation() {
    }

    @FunctionalInterface
    public static interface ChangelogRestoreTargetBuilder<K>
    extends BiFunctionWithException<AbstractKeyedStateBackend<K>, Collection<ChangelogStateBackendHandle>, ChangelogRestoreTarget<K>, Exception> {
    }

    @FunctionalInterface
    public static interface BaseBackendBuilder<K>
    extends FunctionWithException<Collection<KeyedStateHandle>, AbstractKeyedStateBackend<K>, Exception> {
    }
}

