/*
 * Decompiled with CFR 0.152.
 */
package org.apache.storm.localizer;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.io.File;
import java.io.IOException;
import java.io.OutputStream;
import java.net.JarURLConnection;
import java.net.URL;
import java.net.URLDecoder;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.apache.storm.blobstore.BlobStore;
import org.apache.storm.blobstore.ClientBlobStore;
import org.apache.storm.daemon.Shutdownable;
import org.apache.storm.daemon.supervisor.AdvancedFSOps;
import org.apache.storm.daemon.supervisor.SupervisorUtils;
import org.apache.storm.generated.LocalAssignment;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.localizer.ILocalizer;
import org.apache.storm.localizer.LocalDownloadedResource;
import org.apache.storm.localizer.LocalResource;
import org.apache.storm.localizer.LocalizedResource;
import org.apache.storm.localizer.Localizer;
import org.apache.storm.utils.ConfigUtils;
import org.apache.storm.utils.ServerConfigUtils;
import org.apache.storm.utils.ServerUtils;
import org.apache.storm.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AsyncLocalizer
implements ILocalizer,
Shutdownable {
    private static final Logger LOG = LoggerFactory.getLogger(AsyncLocalizer.class);
    private final Localizer _localizer;
    private final ExecutorService _execService;
    private final boolean _isLocalMode;
    private final Map<String, Object> _conf;
    private final Map<String, LocalDownloadedResource> _basicPending;
    private final Map<String, LocalDownloadedResource> _blobPending;
    private final AdvancedFSOps _fsOps;
    private final boolean _symlinksDisabled;

    AsyncLocalizer(Map<String, Object> conf, Localizer localizer, AdvancedFSOps ops) {
        this._conf = conf;
        this._symlinksDisabled = (Boolean)conf.getOrDefault("storm.disable.symlinks", false);
        this._isLocalMode = ConfigUtils.isLocalMode(conf);
        this._localizer = localizer;
        this._execService = Executors.newFixedThreadPool(1, new ThreadFactoryBuilder().setNameFormat("Async Localizer").build());
        this._basicPending = new HashMap<String, LocalDownloadedResource>();
        this._blobPending = new HashMap<String, LocalDownloadedResource>();
        this._fsOps = ops;
    }

    public AsyncLocalizer(Map<String, Object> conf, Localizer localizer) {
        this(conf, localizer, AdvancedFSOps.make(conf));
    }

    @Override
    public synchronized Future<Void> requestDownloadBaseTopologyBlobs(LocalAssignment assignment, int port) throws IOException {
        String topologyId = assignment.get_topology_id();
        LocalDownloadedResource localResource = this._basicPending.get(topologyId);
        if (localResource == null) {
            DownloadBaseBlobsDistributed c = this._isLocalMode ? new DownloadBaseBlobsLocal(topologyId) : new DownloadBaseBlobsDistributed(topologyId);
            localResource = new LocalDownloadedResource(this._execService.submit(c));
            this._basicPending.put(topologyId, localResource);
        }
        Future<Void> ret = localResource.reserve(port, assignment);
        LOG.debug("Reserved basic {} {}", (Object)topologyId, (Object)localResource);
        return ret;
    }

    private static String resourcesJar() throws IOException {
        String path = ServerUtils.currentClasspath();
        if (path == null) {
            return null;
        }
        for (String jpath : path.split(File.pathSeparator)) {
            if (!jpath.endsWith(".jar") || !ServerUtils.zipDoesContainDir(jpath, "resources")) continue;
            return jpath;
        }
        return null;
    }

    @Override
    public synchronized void recoverRunningTopology(LocalAssignment assignment, int port) {
        String topologyId = assignment.get_topology_id();
        LocalDownloadedResource localResource = this._basicPending.get(topologyId);
        if (localResource == null) {
            localResource = new LocalDownloadedResource(new AllDoneFuture());
            this._basicPending.put(topologyId, localResource);
        }
        localResource.reserve(port, assignment);
        LOG.debug("Recovered basic {} {}", (Object)topologyId, (Object)localResource);
        localResource = this._blobPending.get(topologyId);
        if (localResource == null) {
            localResource = new LocalDownloadedResource(new AllDoneFuture());
            this._blobPending.put(topologyId, localResource);
        }
        localResource.reserve(port, assignment);
        LOG.debug("Recovered blobs {} {}", (Object)topologyId, (Object)localResource);
    }

    @Override
    public synchronized Future<Void> requestDownloadTopologyBlobs(LocalAssignment assignment, int port) {
        String topologyId = assignment.get_topology_id();
        LocalDownloadedResource localResource = this._blobPending.get(topologyId);
        if (localResource == null) {
            DownloadBlobs c = new DownloadBlobs(topologyId);
            localResource = new LocalDownloadedResource(this._execService.submit(c));
            this._blobPending.put(topologyId, localResource);
        }
        Future<Void> ret = localResource.reserve(port, assignment);
        LOG.debug("Reserved blobs {} {}", (Object)topologyId, (Object)localResource);
        return ret;
    }

    @Override
    public synchronized void releaseSlotFor(LocalAssignment assignment, int port) throws IOException {
        String topologyId = assignment.get_topology_id();
        LOG.debug("Releasing slot for {} {}", (Object)topologyId, (Object)port);
        LocalDownloadedResource localResource = this._blobPending.get(topologyId);
        if (localResource == null || !localResource.release(port, assignment)) {
            LOG.warn("Released blob reference {} {} for something that we didn't have {}", new Object[]{topologyId, port, localResource});
        } else if (localResource.isDone()) {
            LOG.info("Released blob reference {} {} Cleaning up BLOB references...", (Object)topologyId, (Object)port);
            this._blobPending.remove(topologyId);
            Map topoConf = ConfigUtils.readSupervisorStormConf(this._conf, (String)topologyId);
            Map blobstoreMap = (Map)topoConf.get("topology.blobstore.map");
            if (blobstoreMap != null) {
                String user = (String)topoConf.get("topology.submitter.user");
                String topoName = (String)topoConf.get("topology.name");
                for (Map.Entry entry : blobstoreMap.entrySet()) {
                    String key = (String)entry.getKey();
                    Map blobInfo = (Map)entry.getValue();
                    try {
                        this._localizer.removeBlobReference(key, user, topoName, SupervisorUtils.shouldUncompressBlob(blobInfo));
                    }
                    catch (Exception e) {
                        throw new IOException(e);
                    }
                }
            }
        } else {
            LOG.debug("Released blob reference {} {} still waiting on {}", new Object[]{topologyId, port, localResource});
        }
        localResource = this._basicPending.get(topologyId);
        if (localResource == null || !localResource.release(port, assignment)) {
            LOG.warn("Released basic reference {} {} for something that we didn't have {}", new Object[]{topologyId, port, localResource});
        } else if (localResource.isDone()) {
            LOG.info("Released blob reference {} {} Cleaning up basic files...", (Object)topologyId, (Object)port);
            this._basicPending.remove(topologyId);
            String path = ConfigUtils.supervisorStormDistRoot(this._conf, (String)topologyId);
            this._fsOps.deleteIfExists(new File(path), null, "rmr " + topologyId);
        } else {
            LOG.debug("Released basic reference {} {} still waiting on {}", new Object[]{topologyId, port, localResource});
        }
    }

    @Override
    public synchronized void cleanupUnusedTopologies() throws IOException {
        File distRoot = new File(ConfigUtils.supervisorStormDistRoot(this._conf));
        LOG.info("Cleaning up unused topologies in {}", (Object)distRoot);
        File[] children = distRoot.listFiles();
        if (children != null) {
            for (File topoDir : children) {
                String topoId = URLDecoder.decode(topoDir.getName(), "UTF-8");
                if (this._basicPending.get(topoId) != null || this._blobPending.get(topoId) != null) continue;
                this._fsOps.deleteIfExists(topoDir, null, "rmr " + topoId);
            }
        }
    }

    public void shutdown() {
        this._execService.shutdown();
    }

    private class DownloadBlobs
    implements Callable<Void> {
        private final String _topologyId;

        public DownloadBlobs(String topologyId) {
            this._topologyId = topologyId;
        }

        @Override
        public Void call() throws Exception {
            try {
                List<LocalResource> tmp;
                String stormroot = ConfigUtils.supervisorStormDistRoot((Map)AsyncLocalizer.this._conf, (String)this._topologyId);
                Map topoConf = ConfigUtils.readSupervisorStormConf((Map)AsyncLocalizer.this._conf, (String)this._topologyId);
                Map blobstoreMap = (Map)topoConf.get("topology.blobstore.map");
                String user = (String)topoConf.get("topology.submitter.user");
                String topoName = (String)topoConf.get("topology.name");
                ArrayList<LocalResource> localResourceList = new ArrayList<LocalResource>();
                if (blobstoreMap != null && (tmp = SupervisorUtils.blobstoreMapToLocalresources(blobstoreMap)) != null) {
                    localResourceList.addAll(tmp);
                }
                StormTopology stormCode = ConfigUtils.readSupervisorTopology((Map)AsyncLocalizer.this._conf, (String)this._topologyId, (AdvancedFSOps)AsyncLocalizer.this._fsOps);
                ArrayList dependencies = new ArrayList();
                if (stormCode.is_set_dependency_jars()) {
                    dependencies.addAll(stormCode.get_dependency_jars());
                }
                if (stormCode.is_set_dependency_artifacts()) {
                    dependencies.addAll(stormCode.get_dependency_artifacts());
                }
                for (String dependency : dependencies) {
                    localResourceList.add(new LocalResource(dependency, false));
                }
                if (!localResourceList.isEmpty()) {
                    File userDir = AsyncLocalizer.this._localizer.getLocalUserFileCacheDir(user);
                    if (!AsyncLocalizer.this._fsOps.fileExists(userDir)) {
                        AsyncLocalizer.this._fsOps.forceMkdir(userDir);
                    }
                    List<LocalizedResource> localizedResources = AsyncLocalizer.this._localizer.getBlobs(localResourceList, user, topoName, userDir);
                    AsyncLocalizer.this._fsOps.setupBlobPermissions(userDir, user);
                    if (!AsyncLocalizer.this._symlinksDisabled) {
                        for (LocalizedResource localizedResource : localizedResources) {
                            Map blobInfo;
                            String keyName = localizedResource.getKey();
                            File rsrcFilePath = new File(localizedResource.getCurrentSymlinkPath());
                            String symlinkName = null;
                            symlinkName = blobstoreMap != null ? ((blobInfo = (Map)blobstoreMap.get(keyName)) != null && blobInfo.containsKey("localname") ? (String)blobInfo.get("localname") : keyName) : keyName;
                            AsyncLocalizer.this._fsOps.createSymlink(new File(stormroot, symlinkName), rsrcFilePath);
                        }
                    }
                }
                return null;
            }
            catch (Exception e) {
                LOG.warn("Caught Exception While Downloading (rethrowing)... ", (Throwable)e);
                throw e;
            }
        }
    }

    private class DownloadBaseBlobsLocal
    extends DownloadBaseBlobsDistributed {
        public DownloadBaseBlobsLocal(String topologyId) throws IOException {
            super(topologyId);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        protected void downloadBaseBlobs(File tmproot) throws Exception {
            AsyncLocalizer.this._fsOps.forceMkdir(tmproot);
            String stormCodeKey = ConfigUtils.masterStormCodeKey((String)this._topologyId);
            String topoConfKey = ConfigUtils.masterStormConfKey((String)this._topologyId);
            File codePath = new File(ConfigUtils.supervisorStormCodePath((String)tmproot.getAbsolutePath()));
            File confPath = new File(ConfigUtils.supervisorStormConfPath((String)tmproot.getAbsolutePath()));
            BlobStore blobStore = ServerUtils.getNimbusBlobStore(AsyncLocalizer.this._conf, null);
            try {
                try (OutputStream codeOutStream = AsyncLocalizer.this._fsOps.getOutputStream(codePath);){
                    blobStore.readBlobTo(stormCodeKey, codeOutStream, null);
                }
                var8_8 = null;
                try (OutputStream confOutStream = AsyncLocalizer.this._fsOps.getOutputStream(confPath);){
                    blobStore.readBlobTo(topoConfKey, confOutStream, null);
                }
                catch (Throwable throwable) {
                    var8_8 = throwable;
                    throw throwable;
                }
            }
            finally {
                blobStore.shutdown();
            }
            ClassLoader classloader = Thread.currentThread().getContextClassLoader();
            String resourcesJar = AsyncLocalizer.resourcesJar();
            URL url = classloader.getResource("resources");
            String targetDir = tmproot + Utils.FILE_PATH_SEPARATOR;
            if (resourcesJar != null) {
                LOG.info("Extracting resources from jar at {} to {}", (Object)resourcesJar, (Object)targetDir);
                ServerUtils.extractDirFromJar(resourcesJar, "resources", new File(targetDir));
            } else if (url != null) {
                LOG.info("Copying resources at {} to {}", (Object)url, (Object)targetDir);
                if ("jar".equals(url.getProtocol())) {
                    JarURLConnection urlConnection = (JarURLConnection)url.openConnection();
                    ServerUtils.extractDirFromJar(urlConnection.getJarFileURL().getFile(), "resources", new File(targetDir));
                } else {
                    AsyncLocalizer.this._fsOps.copyDirectory(new File(url.getFile()), new File(targetDir, "resources"));
                }
            }
        }
    }

    private class DownloadBaseBlobsDistributed
    implements Callable<Void> {
        protected final String _topologyId;
        protected final File _stormRoot;

        public DownloadBaseBlobsDistributed(String topologyId) throws IOException {
            this._topologyId = topologyId;
            this._stormRoot = new File(ConfigUtils.supervisorStormDistRoot((Map)AsyncLocalizer.this._conf, (String)this._topologyId));
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        protected void downloadBaseBlobs(File tmproot) throws Exception {
            String stormJarKey = ConfigUtils.masterStormJarKey((String)this._topologyId);
            String stormCodeKey = ConfigUtils.masterStormCodeKey((String)this._topologyId);
            String topoConfKey = ConfigUtils.masterStormConfKey((String)this._topologyId);
            String jarPath = ConfigUtils.supervisorStormJarPath((String)tmproot.getAbsolutePath());
            String codePath = ConfigUtils.supervisorStormCodePath((String)tmproot.getAbsolutePath());
            String confPath = ConfigUtils.supervisorStormConfPath((String)tmproot.getAbsolutePath());
            AsyncLocalizer.this._fsOps.forceMkdir(tmproot);
            AsyncLocalizer.this._fsOps.restrictDirectoryPermissions(tmproot);
            ClientBlobStore blobStore = ServerUtils.getClientBlobStoreForSupervisor(AsyncLocalizer.this._conf);
            try {
                ServerUtils.downloadResourcesAsSupervisor(stormJarKey, jarPath, blobStore);
                ServerUtils.downloadResourcesAsSupervisor(stormCodeKey, codePath, blobStore);
                ServerUtils.downloadResourcesAsSupervisor(topoConfKey, confPath, blobStore);
            }
            finally {
                blobStore.shutdown();
            }
            ServerUtils.extractDirFromJar(jarPath, "resources", tmproot);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public Void call() throws Exception {
            try {
                if (AsyncLocalizer.this._fsOps.fileExists(this._stormRoot)) {
                    if (!AsyncLocalizer.this._fsOps.supportsAtomicDirectoryMove()) {
                        LOG.warn("{} may have partially downloaded blobs, recovering", (Object)this._topologyId);
                        AsyncLocalizer.this._fsOps.deleteIfExists(this._stormRoot);
                    } else {
                        LOG.warn("{} already downloaded blobs, skipping", (Object)this._topologyId);
                        return null;
                    }
                }
                boolean deleteAll = true;
                String tmproot = ServerConfigUtils.supervisorTmpDir(AsyncLocalizer.this._conf) + Utils.FILE_PATH_SEPARATOR + Utils.uuid();
                File tr = new File(tmproot);
                try {
                    this.downloadBaseBlobs(tr);
                    AsyncLocalizer.this._fsOps.moveDirectoryPreferAtomic(tr, this._stormRoot);
                    AsyncLocalizer.this._fsOps.setupStormCodeDir(ConfigUtils.readSupervisorStormConf((Map)AsyncLocalizer.this._conf, (String)this._topologyId), this._stormRoot);
                    deleteAll = false;
                }
                finally {
                    if (deleteAll) {
                        LOG.warn("Failed to download basic resources for topology-id {}", (Object)this._topologyId);
                        AsyncLocalizer.this._fsOps.deleteIfExists(tr);
                        AsyncLocalizer.this._fsOps.deleteIfExists(this._stormRoot);
                    }
                }
                return null;
            }
            catch (Exception e) {
                LOG.warn("Caught Exception While Downloading (rethrowing)... ", (Throwable)e);
                throw e;
            }
        }
    }

    private static class AllDoneFuture
    implements Future<Void> {
        private AllDoneFuture() {
        }

        @Override
        public boolean cancel(boolean mayInterruptIfRunning) {
            return false;
        }

        @Override
        public boolean isCancelled() {
            return false;
        }

        @Override
        public boolean isDone() {
            return true;
        }

        @Override
        public Void get() {
            return null;
        }

        @Override
        public Void get(long timeout, TimeUnit unit) {
            return null;
        }
    }
}

