/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.contrib.streaming.state.restore;

import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.attribute.FileAttribute;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.function.Function;
import java.util.stream.Collectors;
import javax.annotation.Nonnegative;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
import org.apache.flink.contrib.streaming.state.RocksDBIncrementalCheckpointUtils;
import org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend;
import org.apache.flink.contrib.streaming.state.RocksDBNativeMetricOptions;
import org.apache.flink.contrib.streaming.state.RocksDBOperationUtils;
import org.apache.flink.contrib.streaming.state.RocksDBStateDataTransferHelper;
import org.apache.flink.contrib.streaming.state.RocksDBStateDownloader;
import org.apache.flink.contrib.streaming.state.RocksDBWriteBatchWrapper;
import org.apache.flink.contrib.streaming.state.RocksIteratorWrapper;
import org.apache.flink.contrib.streaming.state.StateHandleDownloadSpec;
import org.apache.flink.contrib.streaming.state.restore.RocksDBHandle;
import org.apache.flink.contrib.streaming.state.restore.RocksDBRestoreOperation;
import org.apache.flink.contrib.streaming.state.restore.RocksDBRestoreResult;
import org.apache.flink.contrib.streaming.state.ttl.RocksDbTtlCompactFiltersManager;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.fs.ICloseableRegistry;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.state.BackendBuildingException;
import org.apache.flink.runtime.state.CompositeKeySerializationUtils;
import org.apache.flink.runtime.state.IncrementalKeyedStateHandle;
import org.apache.flink.runtime.state.IncrementalLocalKeyedStateHandle;
import org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandle;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyedBackendSerializationProxy;
import org.apache.flink.runtime.state.RegisteredStateMetaInfoBase;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.StateSerializerProvider;
import org.apache.flink.runtime.state.StateUtil;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
import org.apache.flink.runtime.taskmanager.AsyncExceptionHandler;
import org.apache.flink.util.FileUtils;
import org.apache.flink.util.IOUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.ResourceGuard;
import org.apache.flink.util.StateMigrationException;
import org.apache.flink.util.clock.SystemClock;
import org.apache.flink.util.function.RunnableWithException;
import org.rocksdb.ColumnFamilyDescriptor;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.ColumnFamilyOptions;
import org.rocksdb.DBOptions;
import org.rocksdb.ExportImportFilesMetaData;
import org.rocksdb.ReadOptions;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RocksDBIncrementalRestoreOperation<K>
implements RocksDBRestoreOperation {
    private static final Logger logger = LoggerFactory.getLogger(RocksDBIncrementalRestoreOperation.class);
    private static final Class<? extends IncrementalKeyedStateHandle>[] EXPECTED_STATE_HANDLE_CLASSES = new Class[]{IncrementalRemoteKeyedStateHandle.class, IncrementalLocalKeyedStateHandle.class};
    private final String operatorIdentifier;
    private final SortedMap<Long, Collection<IncrementalKeyedStateHandle.HandleAndLocalPath>> restoredSstFiles;
    private final RocksDBHandle rocksHandle;
    private final Collection<IncrementalKeyedStateHandle> restoreStateHandles;
    private final CloseableRegistry cancelStreamRegistryForRestore;
    private final CloseableRegistry cancelRegistryForBackend;
    private final KeyGroupRange keyGroupRange;
    private final File instanceBasePath;
    private final int numberOfTransferringThreads;
    private final int keyGroupPrefixBytes;
    private final StateSerializerProvider<K> keySerializerProvider;
    private final ClassLoader userCodeClassLoader;
    private final StateBackend.CustomInitializationMetrics customInitializationMetrics;
    private final ResourceGuard dbResourceGuard;
    private long lastCompletedCheckpointId;
    private UUID backendUID;
    private final long writeBatchSize;
    private final double overlapFractionThreshold;
    private boolean isKeySerializerCompatibilityChecked;
    private final boolean useIngestDbRestoreMode;
    private final boolean asyncCompactAfterRescale;
    private final boolean useDeleteFilesInRange;
    private final ExecutorService ioExecutor;
    private final AsyncExceptionHandler asyncExceptionHandler;

    public RocksDBIncrementalRestoreOperation(String operatorIdentifier, KeyGroupRange keyGroupRange, int keyGroupPrefixBytes, int numberOfTransferringThreads, ResourceGuard dbResourceGuard, CloseableRegistry cancelStreamRegistryForRestore, CloseableRegistry cancelRegistryForBackend, ClassLoader userCodeClassLoader, Map<String, RocksDBKeyedStateBackend.RocksDbKvStateInfo> kvStateInformation, StateSerializerProvider<K> keySerializerProvider, File instanceBasePath, File instanceRocksDBPath, DBOptions dbOptions, Function<String, ColumnFamilyOptions> columnFamilyOptionsFactory, RocksDBNativeMetricOptions nativeMetricOptions, MetricGroup metricGroup, StateBackend.CustomInitializationMetrics customInitializationMetrics, @Nonnull Collection<IncrementalKeyedStateHandle> restoreStateHandles, @Nonnull RocksDbTtlCompactFiltersManager ttlCompactFiltersManager, @Nonnegative long writeBatchSize, Long writeBufferManagerCapacity, double overlapFractionThreshold, boolean useIngestDbRestoreMode, boolean asyncCompactAfterRescale, boolean useDeleteFilesInRange, ExecutorService ioExecutor, AsyncExceptionHandler asyncExceptionHandler) {
        this.rocksHandle = new RocksDBHandle(kvStateInformation, instanceRocksDBPath, dbOptions, columnFamilyOptionsFactory, nativeMetricOptions, metricGroup, ttlCompactFiltersManager, writeBufferManagerCapacity);
        this.operatorIdentifier = operatorIdentifier;
        this.restoredSstFiles = new TreeMap<Long, Collection<IncrementalKeyedStateHandle.HandleAndLocalPath>>();
        this.lastCompletedCheckpointId = -1L;
        this.backendUID = UUID.randomUUID();
        this.writeBatchSize = writeBatchSize;
        this.overlapFractionThreshold = overlapFractionThreshold;
        this.customInitializationMetrics = customInitializationMetrics;
        this.restoreStateHandles = restoreStateHandles;
        this.dbResourceGuard = dbResourceGuard;
        this.cancelStreamRegistryForRestore = cancelStreamRegistryForRestore;
        this.cancelRegistryForBackend = cancelRegistryForBackend;
        this.keyGroupRange = keyGroupRange;
        this.instanceBasePath = instanceBasePath;
        this.numberOfTransferringThreads = numberOfTransferringThreads;
        this.keyGroupPrefixBytes = keyGroupPrefixBytes;
        this.keySerializerProvider = keySerializerProvider;
        this.userCodeClassLoader = userCodeClassLoader;
        this.useIngestDbRestoreMode = useIngestDbRestoreMode;
        this.asyncCompactAfterRescale = asyncCompactAfterRescale;
        this.useDeleteFilesInRange = useDeleteFilesInRange;
        this.ioExecutor = ioExecutor;
        this.asyncExceptionHandler = asyncExceptionHandler;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public RocksDBRestoreResult restore() throws Exception {
        if (this.restoreStateHandles == null || this.restoreStateHandles.isEmpty()) {
            return null;
        }
        logger.info("Starting RocksDB incremental recovery in operator {} target key-group range {}. Use IngestDB={}, Use AsyncCompaction={}, State Handles={}", new Object[]{this.operatorIdentifier, this.keyGroupRange.prettyPrintInterval(), this.useIngestDbRestoreMode, this.asyncCompactAfterRescale, this.restoreStateHandles});
        ArrayList allDownloadSpecs = new ArrayList(this.restoreStateHandles.size());
        ArrayList localKeyedStateHandles = new ArrayList(this.restoreStateHandles.size());
        Path absolutInstanceBasePath = this.instanceBasePath.getAbsoluteFile().toPath();
        try {
            this.runAndReportDuration(() -> this.makeAllStateHandlesLocal(absolutInstanceBasePath, localKeyedStateHandles, allDownloadSpecs), "DownloadStateDurationMs");
            this.runAndReportDuration(() -> this.restoreFromLocalState(localKeyedStateHandles), "RestoreStateDurationMs");
            logger.info("Finished RocksDB incremental recovery in operator {} with target key-group range range {}.", (Object)this.operatorIdentifier, (Object)this.keyGroupRange.prettyPrintInterval());
            RocksDBRestoreResult rocksDBRestoreResult = new RocksDBRestoreResult(this.rocksHandle.getDb(), this.rocksHandle.getDefaultColumnFamilyHandle(), this.rocksHandle.getNativeMetricMonitor(), this.lastCompletedCheckpointId, this.backendUID, this.restoredSstFiles, this.createAsyncCompactionTask());
            return rocksDBRestoreResult;
        }
        finally {
            allDownloadSpecs.stream().map(StateHandleDownloadSpec::getDownloadDestination).forEach(this::cleanUpPathQuietly);
        }
    }

    @Nullable
    private Runnable createAsyncCompactionTask() {
        if (!this.asyncCompactAfterRescale) {
            return null;
        }
        return () -> {
            long t = System.currentTimeMillis();
            logger.info("Starting async compaction after restore for backend {} in operator {}", (Object)this.backendUID, (Object)this.operatorIdentifier);
            try {
                RunnableWithException asyncRangeCompactionTask = RocksDBIncrementalCheckpointUtils.createAsyncRangeCompactionTask(this.rocksHandle.getDb(), this.rocksHandle.getColumnFamilyHandles(), this.keyGroupPrefixBytes, this.keyGroupRange, this.dbResourceGuard, this.cancelRegistryForBackend);
                this.runAndReportDuration(asyncRangeCompactionTask, "RestoreAsyncCompactionDurationMs");
                logger.info("Completed async compaction after restore for backend {} in operator {} after {} ms.", new Object[]{this.backendUID, this.operatorIdentifier, System.currentTimeMillis() - t});
            }
            catch (Throwable throwable) {
                this.asyncExceptionHandler.handleAsyncException(String.format("Failed async compaction after restore for backend {} in operator {} after {} ms.", this.backendUID, this.operatorIdentifier, System.currentTimeMillis() - t), throwable);
            }
        };
    }

    private void restoreFromLocalState(List<IncrementalLocalKeyedStateHandle> localKeyedStateHandles) throws Exception {
        if (localKeyedStateHandles.size() == 1) {
            this.initBaseDBFromSingleStateHandle(localKeyedStateHandles.get(0));
        } else {
            this.restoreFromMultipleStateHandles(localKeyedStateHandles);
        }
    }

    private void makeAllStateHandlesLocal(Path absolutInstanceBasePath, List<IncrementalLocalKeyedStateHandle> localKeyedStateHandlesOut, List<StateHandleDownloadSpec> allDownloadSpecsOut) throws Exception {
        for (IncrementalKeyedStateHandle stateHandle : this.restoreStateHandles) {
            if (stateHandle instanceof IncrementalRemoteKeyedStateHandle) {
                StateHandleDownloadSpec downloadRequest = new StateHandleDownloadSpec((IncrementalRemoteKeyedStateHandle)stateHandle, absolutInstanceBasePath.resolve(UUID.randomUUID().toString()));
                allDownloadSpecsOut.add(downloadRequest);
                continue;
            }
            if (stateHandle instanceof IncrementalLocalKeyedStateHandle) {
                localKeyedStateHandlesOut.add((IncrementalLocalKeyedStateHandle)stateHandle);
                continue;
            }
            throw StateUtil.unexpectedStateHandleException((Class[])EXPECTED_STATE_HANDLE_CLASSES, stateHandle.getClass());
        }
        allDownloadSpecsOut.stream().map(StateHandleDownloadSpec::createLocalStateHandleForDownloadedState).forEach(localKeyedStateHandlesOut::add);
        this.transferRemoteStateToLocalDirectory(allDownloadSpecsOut);
    }

    private void initBaseDBFromSingleStateHandle(IncrementalLocalKeyedStateHandle stateHandle) throws Exception {
        logger.info("Starting opening base RocksDB instance in operator {} with target key-group range {} from state handle {}.", new Object[]{this.operatorIdentifier, this.keyGroupRange.prettyPrintInterval(), stateHandle});
        this.restoreBaseDBFromLocalState(stateHandle);
        KeyGroupRange stateHandleKeyGroupRange = stateHandle.getKeyGroupRange();
        if (Objects.equals(stateHandleKeyGroupRange, this.keyGroupRange)) {
            this.restorePreviousIncrementalFilesStatus((IncrementalKeyedStateHandle)stateHandle);
        } else {
            try {
                RocksDBIncrementalCheckpointUtils.clipDBWithKeyGroupRange(this.rocksHandle.getDb(), this.rocksHandle.getColumnFamilyHandles(), this.keyGroupRange, stateHandleKeyGroupRange, this.keyGroupPrefixBytes, this.useDeleteFilesInRange);
            }
            catch (RocksDBException e) {
                String errMsg = "Failed to clip DB after initialization.";
                logger.error(errMsg, (Throwable)e);
                throw new BackendBuildingException(errMsg, (Throwable)e);
            }
        }
        logger.info("Finished opening base RocksDB instance in operator {} with target key-group range {}.", (Object)this.operatorIdentifier, (Object)this.keyGroupRange.prettyPrintInterval());
    }

    private void restoreFromMultipleStateHandles(List<IncrementalLocalKeyedStateHandle> localKeyedStateHandles) throws Exception {
        logger.info("Starting to restore backend with range {} in operator {} from multiple state handles {} with useIngestDbRestoreMode = {}.", new Object[]{this.keyGroupRange.prettyPrintInterval(), this.operatorIdentifier, localKeyedStateHandles, this.useIngestDbRestoreMode});
        byte[] startKeyGroupPrefixBytes = new byte[this.keyGroupPrefixBytes];
        CompositeKeySerializationUtils.serializeKeyGroup((int)this.keyGroupRange.getStartKeyGroup(), (byte[])startKeyGroupPrefixBytes);
        byte[] stopKeyGroupPrefixBytes = new byte[this.keyGroupPrefixBytes];
        CompositeKeySerializationUtils.serializeKeyGroup((int)(this.keyGroupRange.getEndKeyGroup() + 1), (byte[])stopKeyGroupPrefixBytes);
        if (this.useIngestDbRestoreMode) {
            this.mergeStateHandlesWithClipAndIngest(localKeyedStateHandles, startKeyGroupPrefixBytes, stopKeyGroupPrefixBytes);
        } else {
            this.mergeStateHandlesWithCopyFromTemporaryInstance(localKeyedStateHandles, startKeyGroupPrefixBytes, stopKeyGroupPrefixBytes);
        }
        logger.info("Completed restoring backend with range {} in operator {} from multiple state handles with useIngestDbRestoreMode = {}.", new Object[]{this.keyGroupRange.prettyPrintInterval(), this.operatorIdentifier, this.useIngestDbRestoreMode});
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void mergeStateHandlesWithClipAndIngest(List<IncrementalLocalKeyedStateHandle> localKeyedStateHandles, byte[] startKeyGroupPrefixBytes, byte[] stopKeyGroupPrefixBytes) throws Exception {
        Path absolutInstanceBasePath = this.instanceBasePath.getAbsoluteFile().toPath();
        Path exportCfBasePath = absolutInstanceBasePath.resolve("export-cfs");
        Files.createDirectories(exportCfBasePath, new FileAttribute[0]);
        HashMap<RegisteredStateMetaInfoBase.Key, List<ExportImportFilesMetaData>> exportedColumnFamilyMetaData = new HashMap<RegisteredStateMetaInfoBase.Key, List<ExportImportFilesMetaData>>(localKeyedStateHandles.size());
        ArrayList<IncrementalLocalKeyedStateHandle> notImportableHandles = new ArrayList<IncrementalLocalKeyedStateHandle>(localKeyedStateHandles.size());
        try {
            KeyGroupRange exportedSstKeyGroupsRange = this.exportColumnFamiliesWithSstDataInKeyGroupsRange(exportCfBasePath, localKeyedStateHandles, exportedColumnFamilyMetaData, notImportableHandles);
            if (exportedColumnFamilyMetaData.isEmpty()) {
                this.mergeStateHandlesWithCopyFromTemporaryInstance(notImportableHandles, startKeyGroupPrefixBytes, stopKeyGroupPrefixBytes);
            } else {
                this.initBaseDBFromColumnFamilyImports(exportedColumnFamilyMetaData, exportedSstKeyGroupsRange);
                this.copyToBaseDBUsingTempDBs(notImportableHandles, startKeyGroupPrefixBytes, stopKeyGroupPrefixBytes);
            }
        }
        finally {
            exportedColumnFamilyMetaData.values().forEach(IOUtils::closeAllQuietly);
            this.cleanUpPathQuietly(exportCfBasePath);
        }
    }

    private KeyGroupRange exportColumnFamiliesWithSstDataInKeyGroupsRange(Path exportCfBasePath, List<IncrementalLocalKeyedStateHandle> localKeyedStateHandles, Map<RegisteredStateMetaInfoBase.Key, List<ExportImportFilesMetaData>> exportedColumnFamiliesOut, List<IncrementalLocalKeyedStateHandle> skipped) throws Exception {
        logger.info("Starting restore export for backend with range {} in operator {}.", (Object)this.keyGroupRange.prettyPrintInterval(), (Object)this.operatorIdentifier);
        int minExportKeyGroup = Integer.MAX_VALUE;
        int maxExportKeyGroup = Integer.MIN_VALUE;
        int index = 0;
        for (IncrementalLocalKeyedStateHandle stateHandle : localKeyedStateHandles) {
            String logLineSuffix = " for state handle at index " + index + " with proclaimed key-group range " + stateHandle.getKeyGroupRange().prettyPrintInterval() + " for backend with range " + this.keyGroupRange.prettyPrintInterval() + " in operator " + this.operatorIdentifier + ".";
            logger.debug("Opening temporary database" + logLineSuffix);
            try (RestoredDBInstance tmpRestoreDBInfo = this.restoreTempDBInstanceFromLocalState(stateHandle);){
                List<ColumnFamilyHandle> tmpColumnFamilyHandles = tmpRestoreDBInfo.columnFamilyHandles;
                logger.debug("Checking actual keys of sst files" + logLineSuffix);
                RocksDBIncrementalCheckpointUtils.RangeCheckResult rangeCheckResult = RocksDBIncrementalCheckpointUtils.checkSstDataAgainstKeyGroupRange(tmpRestoreDBInfo.db, this.keyGroupPrefixBytes, stateHandle.getKeyGroupRange());
                logger.info("{}" + logLineSuffix, (Object)rangeCheckResult);
                if (rangeCheckResult.allInRange()) {
                    logger.debug("Start exporting" + logLineSuffix);
                    List<RegisteredStateMetaInfoBase> registeredStateMetaInfoBases = tmpRestoreDBInfo.stateMetaInfoSnapshots.stream().map(RegisteredStateMetaInfoBase::fromMetaInfoSnapshot).collect(Collectors.toList());
                    RocksDBIncrementalCheckpointUtils.exportColumnFamilies(tmpRestoreDBInfo.db, tmpColumnFamilyHandles, registeredStateMetaInfoBases, exportCfBasePath, exportedColumnFamiliesOut);
                    minExportKeyGroup = Math.min(minExportKeyGroup, stateHandle.getKeyGroupRange().getStartKeyGroup());
                    maxExportKeyGroup = Math.max(maxExportKeyGroup, stateHandle.getKeyGroupRange().getEndKeyGroup());
                    logger.debug("Done exporting" + logLineSuffix);
                } else {
                    skipped.add(stateHandle);
                    logger.debug("Skipped export" + logLineSuffix);
                }
            }
            ++index;
        }
        KeyGroupRange exportedKeyGroupsRange = minExportKeyGroup <= maxExportKeyGroup ? new KeyGroupRange(minExportKeyGroup, maxExportKeyGroup) : KeyGroupRange.EMPTY_KEY_GROUP_RANGE;
        logger.info("Completed restore export for backend with range {} in operator {}. {} exported handles with overall exported range {}. {} Skipped handles: {}.", new Object[]{this.keyGroupRange.prettyPrintInterval(), this.operatorIdentifier, localKeyedStateHandles.size() - skipped.size(), exportedKeyGroupsRange.prettyPrintInterval(), skipped.size(), skipped});
        return exportedKeyGroupsRange;
    }

    private void mergeStateHandlesWithCopyFromTemporaryInstance(List<IncrementalLocalKeyedStateHandle> localKeyedStateHandles, byte[] startKeyGroupPrefixBytes, byte[] stopKeyGroupPrefixBytes) throws Exception {
        logger.info("Starting to merge state for backend with range {} in operator {} from multiple state handles using temporary instances.", (Object)this.keyGroupRange.prettyPrintInterval(), (Object)this.operatorIdentifier);
        IncrementalLocalKeyedStateHandle selectedInitialHandle = localKeyedStateHandles.remove(RocksDBIncrementalCheckpointUtils.findTheBestStateHandleForInitial(localKeyedStateHandles, this.keyGroupRange, this.overlapFractionThreshold));
        Preconditions.checkNotNull((Object)selectedInitialHandle);
        this.initBaseDBFromSingleStateHandle(selectedInitialHandle);
        this.copyToBaseDBUsingTempDBs(localKeyedStateHandles, startKeyGroupPrefixBytes, stopKeyGroupPrefixBytes);
        logger.info("Completed merging state for backend with range {} in operator {} from multiple state handles using temporary instances.", (Object)this.keyGroupRange.prettyPrintInterval(), (Object)this.operatorIdentifier);
    }

    private void initBaseDBFromColumnFamilyImports(Map<RegisteredStateMetaInfoBase.Key, List<ExportImportFilesMetaData>> exportedColumnFamilyMetaData, KeyGroupRange exportKeyGroupRange) throws Exception {
        logger.info("Starting to import exported state handles for backend with range {} in operator {} using Clip/Ingest DB with exported range {}.", new Object[]{this.keyGroupRange.prettyPrintInterval(), this.operatorIdentifier, exportKeyGroupRange.prettyPrintInterval()});
        this.rocksHandle.openDB();
        for (Map.Entry<RegisteredStateMetaInfoBase.Key, List<ExportImportFilesMetaData>> entry : exportedColumnFamilyMetaData.entrySet()) {
            this.rocksHandle.registerStateColumnFamilyHandleWithImport(entry.getKey(), entry.getValue(), (ICloseableRegistry)this.cancelStreamRegistryForRestore);
        }
        RocksDBIncrementalCheckpointUtils.clipDBWithKeyGroupRange(this.rocksHandle.getDb(), this.rocksHandle.getColumnFamilyHandles(), this.keyGroupRange, exportKeyGroupRange, this.keyGroupPrefixBytes, this.useDeleteFilesInRange);
        logger.info("Completed importing exported state handles for backend with range {} in operator {} using Clip/Ingest DB.", (Object)this.keyGroupRange.prettyPrintInterval(), (Object)this.operatorIdentifier);
    }

    private void restorePreviousIncrementalFilesStatus(IncrementalKeyedStateHandle localKeyedStateHandle) {
        this.backendUID = localKeyedStateHandle.getBackendIdentifier();
        this.restoredSstFiles.put(localKeyedStateHandle.getCheckpointId(), localKeyedStateHandle.getSharedStateHandles());
        this.lastCompletedCheckpointId = localKeyedStateHandle.getCheckpointId();
        logger.info("Restored previous incremental files status in backend with range {} in operator {}: backend uuid {}, last checkpoint id {}.", new Object[]{this.keyGroupRange.prettyPrintInterval(), this.operatorIdentifier, this.backendUID, this.lastCompletedCheckpointId});
    }

    private void restoreBaseDBFromLocalState(IncrementalLocalKeyedStateHandle localKeyedStateHandle) throws Exception {
        KeyedBackendSerializationProxy<K> serializationProxy = this.readMetaData(localKeyedStateHandle.getMetaDataStateHandle());
        List stateMetaInfoSnapshots = serializationProxy.getStateMetaInfoSnapshots();
        Path restoreSourcePath = localKeyedStateHandle.getDirectoryStateHandle().getDirectory();
        this.rocksHandle.openDB(this.createColumnFamilyDescriptors(stateMetaInfoSnapshots, true), stateMetaInfoSnapshots, restoreSourcePath, (ICloseableRegistry)this.cancelStreamRegistryForRestore);
    }

    private void transferRemoteStateToLocalDirectory(Collection<StateHandleDownloadSpec> downloadSpecs) throws Exception {
        logger.info("Start downloading remote state to local directory in operator {} for target key-group range {}.", (Object)this.operatorIdentifier, (Object)this.keyGroupRange.prettyPrintInterval());
        try (RocksDBStateDownloader rocksDBStateDownloader = new RocksDBStateDownloader(RocksDBStateDataTransferHelper.forThreadNumIfSpecified(this.numberOfTransferringThreads, this.ioExecutor));){
            rocksDBStateDownloader.transferAllStateDataToDirectory(downloadSpecs, (ICloseableRegistry)this.cancelStreamRegistryForRestore);
            logger.info("Finished downloading remote state to local directory in operator {} for target key-group range {}.", (Object)this.operatorIdentifier, (Object)this.keyGroupRange.prettyPrintInterval());
        }
    }

    private void copyToBaseDBUsingTempDBs(List<IncrementalLocalKeyedStateHandle> toImport, byte[] startKeyGroupPrefixBytes, byte[] stopKeyGroupPrefixBytes) throws Exception {
        if (toImport.isEmpty()) {
            return;
        }
        logger.info("Starting to copy state handles for backend with range {} in operator {} using temporary instances.", (Object)this.keyGroupRange.prettyPrintInterval(), (Object)this.operatorIdentifier);
        try (RocksDBWriteBatchWrapper writeBatchWrapper = new RocksDBWriteBatchWrapper(this.rocksHandle.getDb(), this.writeBatchSize);
             Closeable ignored = this.cancelStreamRegistryForRestore.registerCloseableTemporarily(writeBatchWrapper.getCancelCloseable());){
            for (IncrementalLocalKeyedStateHandle handleToCopy : toImport) {
                RestoredDBInstance restoredDBInstance = this.restoreTempDBInstanceFromLocalState(handleToCopy);
                try {
                    this.copyTempDbIntoBaseDb(restoredDBInstance, writeBatchWrapper, startKeyGroupPrefixBytes, stopKeyGroupPrefixBytes);
                }
                finally {
                    if (restoredDBInstance == null) continue;
                    restoredDBInstance.close();
                }
            }
        }
        logger.info("Competed copying state handles for backend with range {} in operator {} using temporary instances.", (Object)this.keyGroupRange.prettyPrintInterval(), (Object)this.operatorIdentifier);
    }

    private void copyTempDbIntoBaseDb(RestoredDBInstance tmpRestoreDBInfo, RocksDBWriteBatchWrapper writeBatchWrapper, byte[] startKeyGroupPrefixBytes, byte[] stopKeyGroupPrefixBytes) throws Exception {
        logger.debug("Starting copy of state handle {} for backend with range {} in operator {} to base DB using temporary instance.", new Object[]{tmpRestoreDBInfo.srcStateHandle, this.keyGroupRange.prettyPrintInterval(), this.operatorIdentifier});
        List<ColumnFamilyDescriptor> tmpColumnFamilyDescriptors = tmpRestoreDBInfo.columnFamilyDescriptors;
        List<ColumnFamilyHandle> tmpColumnFamilyHandles = tmpRestoreDBInfo.columnFamilyHandles;
        for (int descIdx = 0; descIdx < tmpColumnFamilyDescriptors.size(); ++descIdx) {
            ColumnFamilyHandle tmpColumnFamilyHandle = tmpColumnFamilyHandles.get(descIdx);
            ColumnFamilyHandle targetColumnFamilyHandle = this.rocksHandle.getOrRegisterStateColumnFamilyHandle(null, (StateMetaInfoSnapshot)tmpRestoreDBInfo.stateMetaInfoSnapshots.get((int)descIdx), (ICloseableRegistry)this.cancelStreamRegistryForRestore).columnFamilyHandle;
            try (RocksIteratorWrapper iterator = RocksDBOperationUtils.getRocksIterator(tmpRestoreDBInfo.db, tmpColumnFamilyHandle, tmpRestoreDBInfo.readOptions);){
                iterator.seek(startKeyGroupPrefixBytes);
                while (iterator.isValid() && RocksDBIncrementalCheckpointUtils.beforeThePrefixBytes(iterator.key(), stopKeyGroupPrefixBytes)) {
                    writeBatchWrapper.put(targetColumnFamilyHandle, iterator.key(), iterator.value());
                    iterator.next();
                }
                continue;
            }
        }
        logger.debug("Finished copy of state handle {} for backend with range {} in operator {} using temporary instance.", new Object[]{tmpRestoreDBInfo.srcStateHandle, this.keyGroupRange.prettyPrintInterval(), this.operatorIdentifier});
    }

    private void cleanUpPathQuietly(@Nonnull Path path) {
        try {
            FileUtils.deleteDirectory((File)path.toFile());
        }
        catch (IOException ex) {
            logger.warn("Failed to clean up path " + path, (Throwable)ex);
        }
    }

    private RestoredDBInstance restoreTempDBInstanceFromLocalState(IncrementalLocalKeyedStateHandle stateHandle) throws Exception {
        KeyedBackendSerializationProxy<K> serializationProxy = this.readMetaData(stateHandle.getMetaDataStateHandle());
        List stateMetaInfoSnapshots = serializationProxy.getStateMetaInfoSnapshots();
        List<ColumnFamilyDescriptor> columnFamilyDescriptors = this.createColumnFamilyDescriptors(stateMetaInfoSnapshots, false);
        ArrayList<ColumnFamilyHandle> columnFamilyHandles = new ArrayList<ColumnFamilyHandle>(stateMetaInfoSnapshots.size() + 1);
        RocksDB restoreDb = RocksDBOperationUtils.openDB(stateHandle.getDirectoryStateHandle().getDirectory().toString(), columnFamilyDescriptors, columnFamilyHandles, RocksDBOperationUtils.createColumnFamilyOptions(this.rocksHandle.getColumnFamilyOptionsFactory(), "default"), this.rocksHandle.getDbOptions());
        return new RestoredDBInstance(restoreDb, columnFamilyHandles, columnFamilyDescriptors, stateMetaInfoSnapshots, stateHandle);
    }

    private List<ColumnFamilyDescriptor> createColumnFamilyDescriptors(List<StateMetaInfoSnapshot> stateMetaInfoSnapshots, boolean registerTtlCompactFilter) {
        ArrayList<ColumnFamilyDescriptor> columnFamilyDescriptors = new ArrayList<ColumnFamilyDescriptor>(stateMetaInfoSnapshots.size());
        for (StateMetaInfoSnapshot stateMetaInfoSnapshot : stateMetaInfoSnapshots) {
            RegisteredStateMetaInfoBase metaInfoBase = RegisteredStateMetaInfoBase.fromMetaInfoSnapshot((StateMetaInfoSnapshot)stateMetaInfoSnapshot);
            ColumnFamilyDescriptor columnFamilyDescriptor = RocksDBOperationUtils.createColumnFamilyDescriptor(metaInfoBase, this.rocksHandle.getColumnFamilyOptionsFactory(), registerTtlCompactFilter ? this.rocksHandle.getTtlCompactFiltersManager() : null, this.rocksHandle.getWriteBufferManagerCapacity());
            columnFamilyDescriptors.add(columnFamilyDescriptor);
        }
        return columnFamilyDescriptors;
    }

    private void runAndReportDuration(RunnableWithException runnable, String metricName) throws Exception {
        SystemClock clock = SystemClock.getInstance();
        long startTime = clock.relativeTimeMillis();
        runnable.run();
        this.customInitializationMetrics.addMetric(metricName, clock.relativeTimeMillis() - startTime);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private KeyedBackendSerializationProxy<K> readMetaData(StreamStateHandle metaStateHandle) throws Exception {
        FSDataInputStream inputStream = null;
        try {
            inputStream = metaStateHandle.openInputStream();
            this.cancelStreamRegistryForRestore.registerCloseable((AutoCloseable)inputStream);
            DataInputViewStreamWrapper in = new DataInputViewStreamWrapper((InputStream)inputStream);
            KeyedBackendSerializationProxy<K> keyedBackendSerializationProxy = this.readMetaData((DataInputView)in);
            return keyedBackendSerializationProxy;
        }
        finally {
            if (this.cancelStreamRegistryForRestore.unregisterCloseable((AutoCloseable)inputStream)) {
                inputStream.close();
            }
        }
    }

    KeyedBackendSerializationProxy<K> readMetaData(DataInputView dataInputView) throws IOException, StateMigrationException {
        KeyedBackendSerializationProxy serializationProxy = new KeyedBackendSerializationProxy(this.userCodeClassLoader);
        serializationProxy.read(dataInputView);
        if (!this.isKeySerializerCompatibilityChecked) {
            TypeSerializer currentSerializer = this.keySerializerProvider.currentSchemaSerializer();
            TypeSerializerSchemaCompatibility keySerializerSchemaCompat = this.keySerializerProvider.setPreviousSerializerSnapshotForRestoredState(serializationProxy.getKeySerializerSnapshot());
            if (keySerializerSchemaCompat.isCompatibleAfterMigration() || keySerializerSchemaCompat.isIncompatible()) {
                throw new StateMigrationException("The new key serializer (" + currentSerializer + ") must be compatible with the previous key serializer (" + this.keySerializerProvider.previousSchemaSerializer() + ").");
            }
            this.isKeySerializerCompatibilityChecked = true;
        }
        return serializationProxy;
    }

    @Override
    public void close() throws Exception {
        this.rocksHandle.close();
    }

    private static class RestoredDBInstance
    implements AutoCloseable {
        @Nonnull
        private final RocksDB db;
        @Nonnull
        private final ColumnFamilyHandle defaultColumnFamilyHandle;
        @Nonnull
        private final List<ColumnFamilyHandle> columnFamilyHandles;
        @Nonnull
        private final List<ColumnFamilyDescriptor> columnFamilyDescriptors;
        @Nonnull
        private final List<StateMetaInfoSnapshot> stateMetaInfoSnapshots;
        private final ReadOptions readOptions;
        private final IncrementalLocalKeyedStateHandle srcStateHandle;

        private RestoredDBInstance(@Nonnull RocksDB db, @Nonnull List<ColumnFamilyHandle> columnFamilyHandles, @Nonnull List<ColumnFamilyDescriptor> columnFamilyDescriptors, @Nonnull List<StateMetaInfoSnapshot> stateMetaInfoSnapshots, @Nonnull IncrementalLocalKeyedStateHandle srcStateHandle) {
            this.db = db;
            this.defaultColumnFamilyHandle = columnFamilyHandles.remove(0);
            this.columnFamilyHandles = columnFamilyHandles;
            this.columnFamilyDescriptors = columnFamilyDescriptors;
            this.stateMetaInfoSnapshots = stateMetaInfoSnapshots;
            this.readOptions = new ReadOptions();
            this.srcStateHandle = srcStateHandle;
        }

        @Override
        public void close() {
            ArrayList<ColumnFamilyOptions> columnFamilyOptions = new ArrayList<ColumnFamilyOptions>(this.columnFamilyDescriptors.size() + 1);
            this.columnFamilyDescriptors.forEach(cfd -> columnFamilyOptions.add(cfd.getOptions()));
            RocksDBOperationUtils.addColumnFamilyOptionsToCloseLater(columnFamilyOptions, this.defaultColumnFamilyHandle);
            IOUtils.closeQuietly((AutoCloseable)this.defaultColumnFamilyHandle);
            IOUtils.closeAllQuietly(this.columnFamilyHandles);
            IOUtils.closeQuietly((AutoCloseable)this.db);
            IOUtils.closeAllQuietly(columnFamilyOptions);
            IOUtils.closeQuietly((AutoCloseable)this.readOptions);
        }
    }
}

