/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.mapreduce.filecache;

import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.util.AbstractMap;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.Map;
import java.util.Random;
import java.util.TreeMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.TaskController;
import org.apache.hadoop.mapreduce.filecache.ClientDistributedCacheManager;
import org.apache.hadoop.mapreduce.filecache.DistributedCache;
import org.apache.hadoop.mapreduce.filecache.TaskDistributedCacheManager;
import org.apache.hadoop.mapreduce.util.MRAsyncDiskService;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.RunJar;

@InterfaceAudience.Private
public class TrackerDistributedCacheManager {
    private LinkedHashMap<String, CacheStatus> cachedArchives = new LinkedHashMap();
    private static final long DEFAULT_CACHE_SIZE = 0x280000000L;
    private static final long DEFAULT_CACHE_SUBDIR_LIMIT = 10000L;
    private static final float DEFAULT_CACHE_KEEP_AROUND_PCT = 0.75f;
    private long allowedCacheSize;
    private long allowedCacheSubdirs;
    private long allowedCacheSizeCleanupGoal;
    private long allowedCacheSubdirsCleanupGoal;
    private static final Log LOG = LogFactory.getLog(TrackerDistributedCacheManager.class);
    private final LocalFileSystem localFs;
    private LocalDirAllocator lDirAllocator;
    private TaskController taskController;
    private Configuration trackerConf;
    private Random random = new Random();
    private MRAsyncDiskService asyncDiskService;
    protected BaseDirManager baseDirManager = new BaseDirManager();
    protected CleanupThread cleanupThread;

    public TrackerDistributedCacheManager(Configuration conf, TaskController taskController) throws IOException {
        this.localFs = FileSystem.getLocal((Configuration)conf);
        this.trackerConf = conf;
        this.lDirAllocator = new LocalDirAllocator("mapreduce.cluster.local.dir");
        this.taskController = taskController;
        this.allowedCacheSize = conf.getLong("mapreduce.tasktracker.cache.local.size", 0x280000000L);
        this.allowedCacheSubdirs = conf.getLong("mapreduce.tasktracker.cache.local.numberdirectories", 10000L);
        double cleanupPct = conf.getFloat("mapreduce.tasktracker.cache.local.keep.pct", 0.75f);
        this.allowedCacheSizeCleanupGoal = (long)((double)this.allowedCacheSize * cleanupPct);
        this.allowedCacheSubdirsCleanupGoal = (long)((double)this.allowedCacheSubdirs * cleanupPct);
        this.cleanupThread = new CleanupThread(conf);
    }

    public TrackerDistributedCacheManager(Configuration conf, TaskController taskController, MRAsyncDiskService asyncDiskService) throws IOException {
        this(conf, taskController);
        this.asyncDiskService = asyncDiskService;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    Path getLocalCache(URI cache, Configuration conf, String subDir, FileStatus fileStatus, boolean isArchive, long confFileStamp, Path currentWorkDir, boolean honorSymLinkConf, boolean isPublic) throws IOException {
        CacheStatus lcacheStatus;
        String key = this.getKey(cache, conf, confFileStamp, TrackerDistributedCacheManager.getLocalizedCacheOwner(isPublic), isArchive);
        LinkedHashMap<String, CacheStatus> linkedHashMap = this.cachedArchives;
        synchronized (linkedHashMap) {
            lcacheStatus = this.cachedArchives.get(key);
            if (lcacheStatus == null) {
                String uniqueString = String.valueOf(this.random.nextLong());
                String cachePath = new Path(subDir, new Path(uniqueString, this.makeRelative(cache, conf))).toString();
                Path localPath = this.lDirAllocator.getLocalPathForWrite(cachePath, fileStatus.getLen(), this.trackerConf);
                lcacheStatus = new CacheStatus(new Path(localPath.toString().replace(cachePath, "")), localPath, new Path(subDir), uniqueString, key);
                this.cachedArchives.put(key, lcacheStatus);
            }
            lcacheStatus.incRefCount();
        }
        boolean initSuccessful = false;
        try {
            CacheStatus cacheStatus = lcacheStatus;
            synchronized (cacheStatus) {
                if (!lcacheStatus.isInited()) {
                    FileSystem fs = FileSystem.get((URI)cache, (Configuration)conf);
                    this.checkStampSinceJobStarted(conf, fs, cache, confFileStamp, lcacheStatus, fileStatus);
                    this.localizeCache(conf, cache, confFileStamp, lcacheStatus, isArchive, isPublic);
                    lcacheStatus.initComplete();
                } else {
                    this.checkCacheStatusValidity(conf, cache, confFileStamp, lcacheStatus, fileStatus, isArchive);
                }
                this.createSymlink(conf, cache, lcacheStatus, isArchive, currentWorkDir, honorSymLinkConf);
            }
            initSuccessful = true;
            cacheStatus = lcacheStatus.localizedLoadPath;
            return cacheStatus;
        }
        finally {
            if (!initSuccessful) {
                lcacheStatus.decRefCount();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void releaseCache(URI cache, Configuration conf, long timeStamp, String owner, boolean isArchive) throws IOException {
        String key = this.getKey(cache, conf, timeStamp, owner, isArchive);
        LinkedHashMap<String, CacheStatus> linkedHashMap = this.cachedArchives;
        synchronized (linkedHashMap) {
            CacheStatus lcacheStatus = this.cachedArchives.get(key);
            if (lcacheStatus == null) {
                LOG.warn((Object)("Cannot find localized cache: " + cache + " (key: " + key + ") in releaseCache!"));
                return;
            }
            lcacheStatus.decRefCount();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    int getReferenceCount(URI cache, Configuration conf, long timeStamp, String owner, boolean isArchive) throws IOException {
        String key = this.getKey(cache, conf, timeStamp, owner, isArchive);
        LinkedHashMap<String, CacheStatus> linkedHashMap = this.cachedArchives;
        synchronized (linkedHashMap) {
            CacheStatus lcacheStatus = this.cachedArchives.get(key);
            if (lcacheStatus == null) {
                throw new IOException("Cannot find localized cache: " + cache);
            }
            return lcacheStatus.getRefCount();
        }
    }

    static String getLocalizedCacheOwner(boolean isPublic) throws IOException {
        String user = isPublic ? UserGroupInformation.getLoginUser().getShortUserName() : UserGroupInformation.getCurrentUser().getShortUserName();
        return user;
    }

    private static void deleteLocalPath(MRAsyncDiskService asyncDiskService, LocalFileSystem fs, Path path) throws IOException {
        String localPathToDelete;
        boolean deleted = false;
        if (asyncDiskService != null && !(deleted = asyncDiskService.moveAndDeleteAbsolutePath(localPathToDelete = path.toUri().getPath()))) {
            LOG.warn((Object)("Cannot find DistributedCache path " + localPathToDelete + " on any of the asyncDiskService volumes!"));
        }
        if (!deleted) {
            fs.delete(path, true);
        }
        LOG.info((Object)("Deleted path " + path));
    }

    String makeRelative(URI cache, Configuration conf) throws IOException {
        URI defaultUri;
        String host = cache.getHost();
        if (host == null) {
            host = cache.getScheme();
        }
        if (host == null && (host = (defaultUri = FileSystem.get((Configuration)conf).getUri()).getHost()) == null) {
            host = defaultUri.getScheme();
        }
        String path = host + cache.getPath();
        path = path.replace(":/", "/");
        return path;
    }

    String getKey(URI cache, Configuration conf, long timeStamp, String user, boolean isArchive) throws IOException {
        return (isArchive ? "a" : "f") + "^" + this.makeRelative(cache, conf) + String.valueOf(timeStamp) + user;
    }

    static FileStatus getFileStatus(Configuration conf, URI cache) throws IOException {
        return ClientDistributedCacheManager.getFileStatus(conf, cache);
    }

    long getTimestamp(Configuration conf, URI cache) throws IOException {
        return TrackerDistributedCacheManager.getFileStatus(conf, cache).getModificationTime();
    }

    void checkCacheStatusValidity(Configuration conf, URI cache, long confFileStamp, CacheStatus cacheStatus, FileStatus fileStatus, boolean isArchive) throws IOException {
        FileSystem fs = FileSystem.get((URI)cache, (Configuration)conf);
        if (!this.ifExistsAndFresh(conf, fs, cache, confFileStamp, cacheStatus, fileStatus)) {
            throw new IOException("Stale cache file: " + cacheStatus.localizedLoadPath + " for cache-file: " + cache);
        }
        LOG.info((Object)String.format("Using existing cache of %s->%s", cache.toString(), cacheStatus.localizedLoadPath));
    }

    private void createSymlink(Configuration conf, URI cache, CacheStatus cacheStatus, boolean isArchive, Path currentWorkDir, boolean honorSymLinkConf) throws IOException {
        boolean doSymlink;
        boolean bl = doSymlink = honorSymLinkConf && DistributedCache.getSymlink(conf);
        if (cache.getFragment() == null) {
            doSymlink = false;
        }
        String link = currentWorkDir.toString() + "/" + cache.getFragment();
        File flink = new File(link);
        if (doSymlink && !flink.exists()) {
            FileUtil.symLink((String)cacheStatus.localizedLoadPath.toString(), (String)link);
        }
    }

    Path localizeCache(Configuration conf, URI cache, long confFileStamp, CacheStatus cacheStatus, boolean isArchive, boolean isPublic) throws IOException {
        long cacheSize;
        FileSystem fs = FileSystem.get((URI)cache, (Configuration)conf);
        LocalFileSystem localFs = FileSystem.getLocal((Configuration)conf);
        Path parchive = null;
        parchive = isArchive ? new Path(cacheStatus.localizedLoadPath, new Path(cacheStatus.localizedLoadPath.getName())) : cacheStatus.localizedLoadPath;
        if (!localFs.mkdirs(parchive.getParent())) {
            throw new IOException("Mkdirs failed to create directory " + cacheStatus.localizedLoadPath.toString());
        }
        String cacheId = cache.getPath();
        fs.copyToLocalFile(new Path(cacheId), parchive);
        if (isArchive) {
            String tmpArchive = parchive.toString().toLowerCase();
            File srcFile = new File(parchive.toString());
            File destDir = new File(parchive.getParent().toString());
            LOG.info((Object)String.format("Extracting %s to %s", srcFile.toString(), destDir.toString()));
            if (tmpArchive.endsWith(".jar")) {
                RunJar.unJar((File)srcFile, (File)destDir);
            } else if (tmpArchive.endsWith(".zip")) {
                FileUtil.unZip((File)srcFile, (File)destDir);
            } else if (TrackerDistributedCacheManager.isTarFile(tmpArchive)) {
                FileUtil.unTar((File)srcFile, (File)destDir);
            } else {
                LOG.warn((Object)String.format("Cache file %s specified as archive, but not valid extension.", srcFile.toString()));
            }
        }
        cacheStatus.size = cacheSize = FileUtil.getDU((File)new File(parchive.getParent().toString()));
        this.baseDirManager.addCacheUpdate(cacheStatus);
        this.setPermissions(conf, cacheStatus, isPublic);
        cacheStatus.mtime = this.getTimestamp(conf, cache);
        LOG.info((Object)String.format("Cached %s as %s", cache.toString(), cacheStatus.localizedLoadPath));
        return cacheStatus.localizedLoadPath;
    }

    private void setPermissions(Configuration conf, CacheStatus cacheStatus, boolean isPublic) throws IOException {
        if (isPublic) {
            Path localizedUniqueDir = cacheStatus.getLocalizedUniqueDir();
            LOG.info((Object)("Doing chmod on localdir :" + localizedUniqueDir));
            try {
                FileUtil.chmod((String)localizedUniqueDir.toString(), (String)"ugo+rx", (boolean)true);
            }
            catch (InterruptedException e) {
                LOG.warn((Object)("Exception in chmod" + e.toString()));
                throw new IOException(e);
            }
        } else {
            TaskController.DistributedCacheFileContext context = new TaskController.DistributedCacheFileContext(conf.get("mapreduce.job.user.name"), new File(cacheStatus.localizedBaseDir.toString()), cacheStatus.localizedBaseDir, cacheStatus.uniqueString);
            this.taskController.initializeDistributedCacheFile(context);
        }
    }

    private static boolean isTarFile(String filename) {
        return filename.endsWith(".tgz") || filename.endsWith(".tar.gz") || filename.endsWith(".tar");
    }

    long checkStampSinceJobStarted(Configuration conf, FileSystem fs, URI cache, long confFileStamp, CacheStatus lcacheStatus, FileStatus fileStatus) throws IOException {
        long dfsFileStamp = fileStatus != null ? fileStatus.getModificationTime() : this.getTimestamp(conf, cache);
        if (dfsFileStamp != confFileStamp) {
            LOG.fatal((Object)("File: " + cache + " has changed on HDFS since job started"));
            throw new IOException("File: " + cache + " has changed on HDFS since job started");
        }
        return dfsFileStamp;
    }

    private boolean ifExistsAndFresh(Configuration conf, FileSystem fs, URI cache, long confFileStamp, CacheStatus lcacheStatus, FileStatus fileStatus) throws IOException {
        long dfsFileStamp = this.checkStampSinceJobStarted(conf, fs, cache, confFileStamp, lcacheStatus, fileStatus);
        return dfsFileStamp == lcacheStatus.mtime;
    }

    public static void createAllSymlink(Configuration conf, File jobCacheDir, File workDir) throws IOException {
        if (jobCacheDir == null || !jobCacheDir.isDirectory() || workDir == null || !workDir.isDirectory()) {
            return;
        }
        boolean createSymlink = DistributedCache.getSymlink(conf);
        if (createSymlink) {
            File[] list = jobCacheDir.listFiles();
            for (int i = 0; i < list.length; ++i) {
                String target = list[i].getAbsolutePath();
                String link = new File(workDir, list[i].getName()).toString();
                LOG.info((Object)String.format("Creating symlink: %s <- %s", target, link));
                int ret = FileUtil.symLink((String)target, (String)link);
                if (ret == 0) continue;
                LOG.warn((Object)String.format("Failed to create symlink: %s <- %s", target, link));
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void purgeCache() {
        LinkedHashMap<String, CacheStatus> linkedHashMap = this.cachedArchives;
        synchronized (linkedHashMap) {
            for (Map.Entry<String, CacheStatus> f : this.cachedArchives.entrySet()) {
                try {
                    TrackerDistributedCacheManager.deleteLocalPath(this.asyncDiskService, this.localFs, f.getValue().localizedLoadPath);
                }
                catch (IOException ie) {
                    LOG.debug((Object)"Error cleaning up cache", (Throwable)ie);
                }
            }
            this.cachedArchives.clear();
        }
    }

    public TaskDistributedCacheManager newTaskDistributedCacheManager(Configuration taskConf) throws IOException {
        return new TaskDistributedCacheManager(this, taskConf);
    }

    static String[] getFileVisibilities(Configuration conf) {
        return conf.getStrings("mapreduce.job.cache.files.visibilities");
    }

    static String[] getArchiveVisibilities(Configuration conf) {
        return conf.getStrings("mapreduce.job.cache.archives.visibilities");
    }

    static void setLocalArchives(Configuration conf, String str) {
        conf.set("mapreduce.job.cache.local.archives", str);
    }

    public static void setLocalFiles(Configuration conf, String str) {
        conf.set("mapreduce.job.cache.local.files", str);
    }

    public void startCleanupThread() {
        this.cleanupThread.start();
    }

    public void stopCleanupThread() {
        this.cleanupThread.stopRunning();
        this.cleanupThread.interrupt();
    }

    protected class BaseDirManager {
        private TreeMap<Path, CacheDir> properties = new TreeMap();

        protected BaseDirManager() {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void checkAndCleanup() throws IOException {
            LinkedList<CacheStatus> toBeDeletedCache = new LinkedList<CacheStatus>();
            HashMap<Path, CacheDir> toBeCleanedBaseDir = new HashMap<Path, CacheDir>();
            AbstractMap abstractMap = this.properties;
            synchronized (abstractMap) {
                for (Map.Entry<Path, CacheDir> baseDir : this.properties.entrySet()) {
                    CacheDir baseDirCounts = baseDir.getValue();
                    if (TrackerDistributedCacheManager.this.allowedCacheSize >= baseDirCounts.size && TrackerDistributedCacheManager.this.allowedCacheSubdirs >= baseDirCounts.subdirs) continue;
                    CacheDir tcc = new CacheDir();
                    tcc.size = baseDirCounts.size - TrackerDistributedCacheManager.this.allowedCacheSizeCleanupGoal;
                    tcc.subdirs = baseDirCounts.subdirs - TrackerDistributedCacheManager.this.allowedCacheSubdirsCleanupGoal;
                    toBeCleanedBaseDir.put(baseDir.getKey(), tcc);
                }
            }
            abstractMap = TrackerDistributedCacheManager.this.cachedArchives;
            synchronized (abstractMap) {
                Iterator it = TrackerDistributedCacheManager.this.cachedArchives.entrySet().iterator();
                while (it.hasNext()) {
                    Map.Entry entry = it.next();
                    String cacheId = (String)entry.getKey();
                    CacheStatus cacheStatus = (CacheStatus)TrackerDistributedCacheManager.this.cachedArchives.get(cacheId);
                    CacheDir leftToClean = (CacheDir)toBeCleanedBaseDir.get(cacheStatus.getBaseDir());
                    if (leftToClean == null || leftToClean.size <= 0L && leftToClean.subdirs <= 0L || cacheStatus.isUsed()) continue;
                    leftToClean.size -= cacheStatus.size;
                    --leftToClean.subdirs;
                    toBeDeletedCache.add(cacheStatus);
                    it.remove();
                }
            }
            Iterator i$ = toBeDeletedCache.iterator();
            while (i$.hasNext()) {
                CacheStatus cacheStatus;
                CacheStatus cacheStatus2 = cacheStatus = (CacheStatus)i$.next();
                synchronized (cacheStatus2) {
                    TrackerDistributedCacheManager.deleteLocalPath(TrackerDistributedCacheManager.this.asyncDiskService, FileSystem.getLocal((Configuration)TrackerDistributedCacheManager.this.trackerConf), cacheStatus.getLocalizedUniqueDir());
                    this.deleteCacheUpdate(cacheStatus);
                }
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void deleteCacheUpdate(CacheStatus cacheStatus) {
            if (!cacheStatus.inited) {
                return;
            }
            TreeMap<Path, CacheDir> treeMap = this.properties;
            synchronized (treeMap) {
                CacheDir cacheDir = this.properties.get(cacheStatus.getBaseDir());
                if (cacheDir != null) {
                    cacheDir.size -= cacheStatus.size;
                    --cacheDir.subdirs;
                } else {
                    LOG.warn((Object)("Cannot find size and number of subdirectories of baseDir: " + cacheStatus.getBaseDir()));
                }
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void addCacheUpdate(CacheStatus cacheStatus) {
            long cacheSize = cacheStatus.size;
            LOG.info((Object)("Adding in cache " + cacheStatus.localizedLoadPath + " at " + cacheStatus.localizedBaseDir + " size:" + cacheStatus.size));
            TreeMap<Path, CacheDir> treeMap = this.properties;
            synchronized (treeMap) {
                CacheDir cacheDir = this.properties.get(cacheStatus.getBaseDir());
                if (cacheDir != null) {
                    cacheDir.size += cacheSize;
                    ++cacheDir.subdirs;
                } else {
                    cacheDir = new CacheDir();
                    cacheDir.size = cacheSize;
                    cacheDir.subdirs = 1L;
                    this.properties.put(cacheStatus.getBaseDir(), cacheDir);
                }
            }
        }
    }

    private static class CacheDir {
        long size;
        long subdirs;

        private CacheDir() {
        }
    }

    protected class CleanupThread
    extends Thread {
        private long cleanUpCheckPeriod = 60000L;
        private volatile boolean running = true;

        public CleanupThread(Configuration conf) {
            this.cleanUpCheckPeriod = conf.getLong("mapreduce.tasktracker.distributedcache.checkperiod", this.cleanUpCheckPeriod);
        }

        public void stopRunning() {
            this.running = false;
        }

        @Override
        public void run() {
            while (this.running) {
                try {
                    Thread.sleep(this.cleanUpCheckPeriod);
                    TrackerDistributedCacheManager.this.baseDirManager.checkAndCleanup();
                }
                catch (IOException e) {
                    LOG.error((Object)"Exception in DistributedCache CleanupThread.", (Throwable)e);
                }
                catch (InterruptedException e) {
                    LOG.info((Object)"Cleanup...", (Throwable)e);
                    this.running = false;
                }
                catch (Throwable t) {
                    this.exitTaskTracker(t);
                }
            }
        }

        protected void exitTaskTracker(Throwable t) {
            LOG.fatal((Object)"Distributed Cache cleanup thread received runtime exception. Exiting the TaskTracker", t);
            Runtime.getRuntime().exit(-1);
        }
    }

    class CacheStatus {
        private int refcount;
        long size;
        long mtime;
        boolean inited = false;
        final Path subDir;
        final String uniqueString;
        final Path localizedLoadPath;
        final Path localizedBaseDir;
        private final String key;

        public CacheStatus(Path baseDir, Path localLoadPath, Path subDir, String uniqueString, String key) {
            this.localizedLoadPath = localLoadPath;
            this.refcount = 0;
            this.mtime = -1L;
            this.localizedBaseDir = baseDir;
            this.size = 0L;
            this.subDir = subDir;
            this.uniqueString = uniqueString;
            this.key = key;
        }

        public synchronized void incRefCount() {
            ++this.refcount;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void decRefCount() {
            LinkedHashMap linkedHashMap = TrackerDistributedCacheManager.this.cachedArchives;
            synchronized (linkedHashMap) {
                CacheStatus cacheStatus = this;
                synchronized (cacheStatus) {
                    --this.refcount;
                    if (this.refcount <= 0) {
                        String key = this.key;
                        TrackerDistributedCacheManager.this.cachedArchives.remove(key);
                        TrackerDistributedCacheManager.this.cachedArchives.put(key, this);
                    }
                }
            }
        }

        public int getRefCount() {
            return this.refcount;
        }

        public synchronized boolean isUsed() {
            return this.refcount > 0;
        }

        Path getBaseDir() {
            return this.localizedBaseDir;
        }

        void initComplete() {
            this.inited = true;
        }

        boolean isInited() {
            return this.inited;
        }

        Path getLocalizedUniqueDir() {
            return new Path(this.localizedBaseDir, new Path(this.subDir, this.uniqueString));
        }
    }
}

