/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.contrib.streaming.state;

import java.io.Closeable;
import java.io.IOException;
import java.io.OutputStream;
import java.net.URI;
import java.nio.file.Files;
import java.nio.file.StandardOpenOption;
import java.nio.file.attribute.FileAttribute;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.contrib.streaming.state.RocksDBStateDataTransferHelper;
import org.apache.flink.contrib.streaming.state.StateHandleDownloadSpec;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.ICloseableRegistry;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.fs.PathsCopyingFileSystem;
import org.apache.flink.runtime.state.IncrementalKeyedStateHandle;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FileUtils;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.IOUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.concurrent.FutureUtils;
import org.apache.flink.util.function.ThrowingRunnable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RocksDBStateDownloader
implements Closeable {
    private static final Logger LOG = LoggerFactory.getLogger(RocksDBStateDownloader.class);
    private final RocksDBStateDataTransferHelper transfer;

    @VisibleForTesting
    public RocksDBStateDownloader(int restoringThreadNum) {
        this(RocksDBStateDataTransferHelper.forThreadNum(restoringThreadNum));
    }

    public RocksDBStateDownloader(RocksDBStateDataTransferHelper transfer) {
        this.transfer = transfer;
    }

    public void transferAllStateDataToDirectory(Collection<StateHandleDownloadSpec> downloadRequests, ICloseableRegistry closeableRegistry) throws Exception {
        CloseableRegistry internalCloser = new CloseableRegistry();
        closeableRegistry.registerCloseable((Closeable)internalCloser);
        try {
            FutureUtils.ConjunctFuture downloadFuture = FutureUtils.completeAll((Collection)this.createDownloadRunnables(downloadRequests, internalCloser).stream().map(runnable -> CompletableFuture.runAsync(runnable, this.transfer.getExecutorService())).collect(Collectors.toList()));
            InterruptedException interruptedException = null;
            while (!downloadFuture.isDone() || downloadFuture.isCompletedExceptionally()) {
                try {
                    downloadFuture.get();
                }
                catch (InterruptedException e) {
                    LOG.warn("Interrupted while waiting for state download, continue waiting");
                    interruptedException = interruptedException == null ? e : interruptedException;
                }
            }
            if (interruptedException != null) {
                Thread.currentThread().interrupt();
                throw interruptedException;
            }
        }
        catch (Exception e) {
            downloadRequests.stream().map(StateHandleDownloadSpec::getDownloadDestination).map(java.nio.file.Path::toFile).forEach(FileUtils::deleteDirectoryQuietly);
            Throwable throwable = ExceptionUtils.stripExecutionException((Throwable)e);
            throwable = ExceptionUtils.stripException((Throwable)throwable, RuntimeException.class);
            if (throwable instanceof IOException) {
                throw (IOException)throwable;
            }
            throw new FlinkRuntimeException("Failed to download data for state handles.", (Throwable)e);
        }
        finally {
            if (closeableRegistry.unregisterCloseable((Closeable)internalCloser)) {
                IOUtils.closeQuietly((AutoCloseable)internalCloser);
            }
        }
    }

    private Collection<Runnable> createDownloadRunnables(Collection<StateHandleDownloadSpec> downloadRequests, CloseableRegistry closeableRegistry) throws IOException {
        HashMap<FileSystem.FSKey, List> filesSystemsFilesToDownload = new HashMap<FileSystem.FSKey, List>();
        ArrayList<Runnable> runnables = new ArrayList<Runnable>();
        for (StateHandleDownloadSpec downloadSpec : downloadRequests) {
            for (IncrementalKeyedStateHandle.HandleAndLocalPath handleAndLocalPath : this.getAllHandles(downloadSpec)) {
                java.nio.file.Path downloadDestination = downloadSpec.getDownloadDestination().resolve(handleAndLocalPath.getLocalPath());
                if (this.canCopyPaths(handleAndLocalPath)) {
                    Path remotePath = (Path)handleAndLocalPath.getHandle().maybeGetPath().get();
                    long size = handleAndLocalPath.getHandle().getStateSize();
                    FileSystem.FSKey newFSKey = new FileSystem.FSKey(remotePath.toUri());
                    filesSystemsFilesToDownload.computeIfAbsent(newFSKey, fsKey -> new ArrayList()).add(PathsCopyingFileSystem.CopyRequest.of((Path)remotePath, (Path)new Path(downloadDestination.toUri()), (long)size));
                    continue;
                }
                runnables.add(this.createDownloadRunnableUsingStreams(handleAndLocalPath.getHandle(), downloadDestination, closeableRegistry));
            }
        }
        for (List filesToDownload : filesSystemsFilesToDownload.values()) {
            Preconditions.checkState((!filesToDownload.isEmpty() ? 1 : 0) != 0);
            FileSystem srcFileSystem = FileSystem.get((URI)((PathsCopyingFileSystem.CopyRequest)filesToDownload.get(0)).getSource().toUri());
            runnables.add(this.createDownloadRunnableUsingCopyFiles((PathsCopyingFileSystem)srcFileSystem, filesToDownload, closeableRegistry));
        }
        return runnables;
    }

    private boolean canCopyPaths(IncrementalKeyedStateHandle.HandleAndLocalPath handleAndLocalPath) throws IOException {
        Optional remotePath = handleAndLocalPath.getHandle().maybeGetPath();
        if (!remotePath.isPresent()) {
            return false;
        }
        return FileSystem.get((URI)((Path)remotePath.get()).toUri()).canCopyPaths((Path)remotePath.get(), new Path(handleAndLocalPath.getLocalPath()));
    }

    private Iterable<? extends IncrementalKeyedStateHandle.HandleAndLocalPath> getAllHandles(StateHandleDownloadSpec downloadSpec) {
        return Stream.concat(downloadSpec.getStateHandle().getSharedState().stream(), downloadSpec.getStateHandle().getPrivateState().stream()).collect(Collectors.toList());
    }

    private Runnable createDownloadRunnableUsingCopyFiles(PathsCopyingFileSystem fileSystem, List<PathsCopyingFileSystem.CopyRequest> copyRequests, CloseableRegistry closeableRegistry) {
        LOG.debug("Using copy paths for {} of file system [{}]", copyRequests, (Object)fileSystem);
        return ThrowingRunnable.unchecked(() -> fileSystem.copyFiles(copyRequests, (ICloseableRegistry)closeableRegistry));
    }

    private Runnable createDownloadRunnableUsingStreams(StreamStateHandle remoteFileHandle, java.nio.file.Path destination, CloseableRegistry closeableRegistry) {
        return ThrowingRunnable.unchecked(() -> this.downloadDataForStateHandle(remoteFileHandle, destination, closeableRegistry));
    }

    private void downloadDataForStateHandle(StreamStateHandle remoteFileHandle, java.nio.file.Path restoreFilePath, CloseableRegistry closeableRegistry) throws IOException {
        if (closeableRegistry.isClosed()) {
            return;
        }
        try {
            int numBytes;
            FSDataInputStream inputStream = remoteFileHandle.openInputStream();
            closeableRegistry.registerCloseable((AutoCloseable)inputStream);
            Files.createDirectories(restoreFilePath.getParent(), new FileAttribute[0]);
            OutputStream outputStream = Files.newOutputStream(restoreFilePath, StandardOpenOption.CREATE_NEW);
            closeableRegistry.registerCloseable((AutoCloseable)outputStream);
            byte[] buffer = new byte[8192];
            while ((numBytes = inputStream.read(buffer)) != -1) {
                outputStream.write(buffer, 0, numBytes);
            }
            closeableRegistry.unregisterAndCloseAll(new Closeable[]{outputStream, inputStream});
        }
        catch (Exception ex) {
            IOUtils.closeQuietly((AutoCloseable)closeableRegistry);
            throw new IOException(ex);
        }
    }

    @Override
    public void close() throws IOException {
        this.transfer.close();
    }
}

