/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.fs.hdfs;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.lang.reflect.Modifier;
import java.time.Duration;
import java.util.EnumSet;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.time.Deadline;
import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
import org.apache.flink.core.fs.RecoverableWriter;
import org.apache.flink.runtime.fs.hdfs.BaseHadoopFsRecoverableFsDataOutputStream;
import org.apache.flink.runtime.fs.hdfs.HadoopFsRecoverable;
import org.apache.flink.runtime.util.HadoopUtils;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.IOUtils;
import org.apache.flink.util.Preconditions;
import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.fs.viewfs.ViewFileSystem;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.util.VersionInfo;

@Internal
class HadoopRecoverableFsDataOutputStream
extends BaseHadoopFsRecoverableFsDataOutputStream {
    private static final long LEASE_TIMEOUT = 100000L;
    private static Method truncateHandle;

    HadoopRecoverableFsDataOutputStream(FileSystem fs, Path targetFile, Path tempFile, boolean noLocalWrite) throws IOException {
        HadoopRecoverableFsDataOutputStream.ensureTruncateInitialized();
        this.fs = (FileSystem)Preconditions.checkNotNull((Object)fs);
        this.targetFile = (Path)Preconditions.checkNotNull((Object)targetFile);
        this.tempFile = (Path)Preconditions.checkNotNull((Object)tempFile);
        this.out = noLocalWrite ? fs.create(tempFile, FsPermission.getFileDefault(), EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE, CreateFlag.NO_LOCAL_WRITE), fs.getConf().getInt("io.file.buffer.size", 4096), fs.getDefaultReplication(), fs.getDefaultBlockSize(), null) : fs.create(tempFile);
    }

    @VisibleForTesting
    HadoopRecoverableFsDataOutputStream(FileSystem fs, Path targetFile, Path tempFile, FSDataOutputStream out) {
        this.fs = (FileSystem)Preconditions.checkNotNull((Object)fs);
        this.targetFile = (Path)Preconditions.checkNotNull((Object)targetFile);
        this.tempFile = (Path)Preconditions.checkNotNull((Object)tempFile);
        this.out = out;
    }

    HadoopRecoverableFsDataOutputStream(FileSystem fs, HadoopFsRecoverable recoverable) throws IOException {
        HadoopRecoverableFsDataOutputStream.ensureTruncateInitialized();
        this.fs = (FileSystem)Preconditions.checkNotNull((Object)fs);
        this.targetFile = (Path)Preconditions.checkNotNull((Object)recoverable.targetFile());
        this.tempFile = (Path)Preconditions.checkNotNull((Object)recoverable.tempFile());
        HadoopRecoverableFsDataOutputStream.safelyTruncateFile(fs, this.tempFile, recoverable);
        this.out = fs.append(this.tempFile);
        long pos = this.out.getPos();
        if (pos != recoverable.offset()) {
            IOUtils.closeQuietly((AutoCloseable)this.out);
            throw new IOException("Truncate failed: " + this.tempFile + " (requested=" + recoverable.offset() + " ,size=" + pos + ")");
        }
    }

    protected RecoverableFsDataOutputStream.Committer createCommitterFromResumeRecoverable(HadoopFsRecoverable recoverable) {
        return new HadoopFsCommitter(this.fs, recoverable);
    }

    private static void safelyTruncateFile(FileSystem fileSystem, Path path, HadoopFsRecoverable recoverable) throws IOException {
        boolean truncated;
        HadoopRecoverableFsDataOutputStream.ensureTruncateInitialized();
        HadoopRecoverableFsDataOutputStream.revokeLeaseByFileSystem(fileSystem, path);
        try {
            truncated = HadoopRecoverableFsDataOutputStream.truncate(fileSystem, path, recoverable.offset());
        }
        catch (Exception e) {
            throw new IOException("Problem while truncating file: " + path, e);
        }
        if (!truncated) {
            HadoopRecoverableFsDataOutputStream.revokeLeaseByFileSystem(fileSystem, path);
        }
    }

    private static void ensureTruncateInitialized() throws FlinkRuntimeException {
        if (HadoopUtils.isMinHadoopVersion(2, 7) && truncateHandle == null) {
            Method truncateMethod;
            try {
                truncateMethod = FileSystem.class.getMethod("truncate", Path.class, Long.TYPE);
            }
            catch (NoSuchMethodException e) {
                throw new FlinkRuntimeException("Could not find a public truncate method on the Hadoop File System.");
            }
            if (!Modifier.isPublic(truncateMethod.getModifiers())) {
                throw new FlinkRuntimeException("Could not find a public truncate method on the Hadoop File System.");
            }
            truncateHandle = truncateMethod;
        }
    }

    private static boolean truncate(FileSystem hadoopFs, Path file, long length) throws IOException {
        block5: {
            if (!HadoopUtils.isMinHadoopVersion(2, 7)) {
                throw new IllegalStateException("Truncation is not available in hadoop version < 2.7 , You are on Hadoop " + VersionInfo.getVersion());
            }
            if (truncateHandle != null) {
                try {
                    return (Boolean)truncateHandle.invoke((Object)hadoopFs, file, length);
                }
                catch (InvocationTargetException e) {
                    ExceptionUtils.rethrowIOException((Throwable)e.getTargetException());
                    break block5;
                }
                catch (Throwable t) {
                    throw new IOException("Truncation of file failed because of access/linking problems with Hadoop's truncate call. This is most likely a dependency conflict or class loading problem.");
                }
            }
            throw new IllegalStateException("Truncation handle has not been initialized");
        }
        return false;
    }

    private static boolean revokeLeaseByFileSystem(FileSystem fs, Path path) throws IOException {
        if (fs instanceof ViewFileSystem) {
            ViewFileSystem vfs = (ViewFileSystem)fs;
            Path resolvePath = vfs.resolvePath(path);
            FileSystem resolveFs = resolvePath.getFileSystem(fs.getConf());
            return HadoopRecoverableFsDataOutputStream.waitUntilLeaseIsRevoked(resolveFs, resolvePath);
        }
        return HadoopRecoverableFsDataOutputStream.waitUntilLeaseIsRevoked(fs, path);
    }

    private static boolean waitUntilLeaseIsRevoked(FileSystem fs, Path path) throws IOException {
        Preconditions.checkState((boolean)(fs instanceof DistributedFileSystem));
        DistributedFileSystem dfs = (DistributedFileSystem)fs;
        dfs.recoverLease(path);
        Deadline deadline = Deadline.now().plus(Duration.ofMillis(100000L));
        boolean isClosed = dfs.isFileClosed(path);
        while (!isClosed && deadline.hasTimeLeft()) {
            try {
                Thread.sleep(500L);
            }
            catch (InterruptedException e1) {
                throw new IOException("Recovering the lease failed: ", e1);
            }
            isClosed = dfs.isFileClosed(path);
        }
        return isClosed;
    }

    static class HadoopFsCommitter
    implements RecoverableFsDataOutputStream.Committer {
        private final FileSystem fs;
        private final HadoopFsRecoverable recoverable;

        HadoopFsCommitter(FileSystem fs, HadoopFsRecoverable recoverable) {
            this.fs = (FileSystem)Preconditions.checkNotNull((Object)fs);
            this.recoverable = (HadoopFsRecoverable)Preconditions.checkNotNull((Object)recoverable);
        }

        public void commit() throws IOException {
            FileStatus srcStatus;
            Path src = this.recoverable.tempFile();
            Path dest = this.recoverable.targetFile();
            long expectedLength = this.recoverable.offset();
            try {
                srcStatus = this.fs.getFileStatus(src);
            }
            catch (IOException e) {
                throw new IOException("Cannot clean commit: Staging file does not exist.");
            }
            if (srcStatus.getLen() != expectedLength) {
                throw new IOException("Cannot clean commit: File has trailing junk data.");
            }
            try {
                this.fs.rename(src, dest);
            }
            catch (IOException e) {
                throw new IOException("Committing file by rename failed: " + src + " to " + dest, e);
            }
        }

        public void commitAfterRecovery() throws IOException {
            Path src = this.recoverable.tempFile();
            Path dest = this.recoverable.targetFile();
            long expectedLength = this.recoverable.offset();
            FileStatus srcStatus = null;
            try {
                srcStatus = this.fs.getFileStatus(src);
            }
            catch (FileNotFoundException fileNotFoundException) {
            }
            catch (IOException e) {
                throw new IOException("Committing during recovery failed: Could not access status of source file.");
            }
            if (srcStatus != null) {
                if (srcStatus.getLen() > expectedLength) {
                    HadoopRecoverableFsDataOutputStream.safelyTruncateFile(this.fs, src, this.recoverable);
                }
                try {
                    this.fs.rename(src, dest);
                }
                catch (IOException e) {
                    throw new IOException("Committing file by rename failed: " + src + " to " + dest, e);
                }
            } else if (!this.fs.exists(dest)) {
                // empty if block
            }
        }

        public RecoverableWriter.CommitRecoverable getRecoverable() {
            return this.recoverable;
        }
    }
}

