/*
 * Decompiled with CFR 0.152.
 */
package org.apache.storm.daemon.supervisor;

import com.google.common.base.Joiner;
import com.google.common.collect.Lists;
import java.io.File;
import java.io.FilenameFilter;
import java.io.IOException;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import org.apache.commons.lang.StringUtils;
import org.apache.storm.DaemonConfig;
import org.apache.storm.container.ResourceIsolationInterface;
import org.apache.storm.daemon.supervisor.AdvancedFSOps;
import org.apache.storm.daemon.supervisor.ClientSupervisorUtils;
import org.apache.storm.daemon.supervisor.Container;
import org.apache.storm.daemon.supervisor.ContainerRecoveryException;
import org.apache.storm.daemon.supervisor.ExitCodeCallback;
import org.apache.storm.generated.LocalAssignment;
import org.apache.storm.generated.ProfileAction;
import org.apache.storm.generated.ProfileRequest;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.generated.WorkerResources;
import org.apache.storm.utils.ConfigUtils;
import org.apache.storm.utils.LocalState;
import org.apache.storm.utils.ObjectReader;
import org.apache.storm.utils.ServerUtils;
import org.apache.storm.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BasicContainer
extends Container {
    private static final Logger LOG = LoggerFactory.getLogger(BasicContainer.class);
    private static final FilenameFilter jarFilter = (dir, name) -> name.endsWith(".jar");
    private static final Joiner CPJ = Joiner.on((String)ServerUtils.CLASS_PATH_SEPARATOR).skipNulls();
    protected final LocalState _localState;
    protected final String _profileCmd;
    protected final String _stormHome = System.getProperty("storm.home");
    protected volatile boolean _exitedEarly = false;
    static final DepLRUCache DEP_LOC_CACHE = new DepLRUCache();

    public BasicContainer(Container.ContainerType type, Map<String, Object> conf, String supervisorId, int port, LocalAssignment assignment, ResourceIsolationInterface resourceIsolationManager, LocalState localState, String workerId) throws IOException {
        this(type, conf, supervisorId, port, assignment, resourceIsolationManager, localState, workerId, null, null, null);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    BasicContainer(Container.ContainerType type, Map<String, Object> conf, String supervisorId, int port, LocalAssignment assignment, ResourceIsolationInterface resourceIsolationManager, LocalState localState, String workerId, Map<String, Object> topoConf, AdvancedFSOps ops, String profileCmd) throws IOException {
        super(type, conf, supervisorId, port, assignment, resourceIsolationManager, workerId, topoConf, ops);
        assert (localState != null);
        this._localState = localState;
        if (type.isRecovery() && !type.isOnlyKillable()) {
            LocalState localState2 = localState;
            synchronized (localState2) {
                String wid = null;
                Map workerToPort = localState.getApprovedWorkers();
                for (Map.Entry entry : workerToPort.entrySet()) {
                    if (port != (Integer)entry.getValue()) continue;
                    wid = (String)entry.getKey();
                }
                if (wid == null) {
                    throw new ContainerRecoveryException("Could not find worker id for " + port + " " + assignment);
                }
                LOG.info("Recovered Worker {}", wid);
                this._workerId = wid;
            }
        } else if (this._workerId == null) {
            this.createNewWorkerId();
        }
        if (profileCmd == null) {
            profileCmd = this._stormHome + Utils.FILE_PATH_SEPARATOR + "bin" + Utils.FILE_PATH_SEPARATOR + conf.get("worker.profiler.command");
        }
        this._profileCmd = profileCmd;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void createNewWorkerId() {
        this._type.assertFull();
        assert (this._workerId == null);
        LocalState localState = this._localState;
        synchronized (localState) {
            this._workerId = Utils.uuid();
            HashMap<String, Integer> workerToPort = this._localState.getApprovedWorkers();
            if (workerToPort == null) {
                workerToPort = new HashMap<String, Integer>(1);
            }
            BasicContainer.removeWorkersOn((Map<String, Integer>)workerToPort, this._port);
            workerToPort.put(this._workerId, this._port);
            this._localState.setApprovedWorkers(workerToPort);
            LOG.info("Created Worker ID {}", (Object)this._workerId);
        }
    }

    private static void removeWorkersOn(Map<String, Integer> workerToPort, int _port) {
        Iterator<Map.Entry<String, Integer>> i = workerToPort.entrySet().iterator();
        while (i.hasNext()) {
            Map.Entry<String, Integer> found = i.next();
            if (_port != found.getValue()) continue;
            LOG.warn("Deleting worker {} from state", (Object)found.getKey());
            i.remove();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void cleanUpForRestart() throws IOException {
        String origWorkerId = this._workerId;
        super.cleanUpForRestart();
        LocalState localState = this._localState;
        synchronized (localState) {
            Map workersToPort = this._localState.getApprovedWorkers();
            workersToPort.remove(origWorkerId);
            BasicContainer.removeWorkersOn(workersToPort, this._port);
            this._localState.setApprovedWorkers(workersToPort);
            LOG.info("Removed Worker ID {}", (Object)origWorkerId);
        }
    }

    @Override
    public void relaunch() throws IOException {
        this._type.assertFull();
        this._type = Container.ContainerType.LAUNCH;
        this.createNewWorkerId();
        this.setup();
        this.launch();
    }

    @Override
    public boolean didMainProcessExit() {
        return this._exitedEarly;
    }

    protected boolean runProfilingCommand(List<String> command, Map<String, String> env, String logPrefix, File targetDir) throws IOException, InterruptedException {
        this._type.assertFull();
        Process p = ClientSupervisorUtils.launchProcess(command, env, (String)logPrefix, null, (File)targetDir);
        int ret = p.waitFor();
        return ret == 0;
    }

    @Override
    public boolean runProfiling(ProfileRequest request, boolean stop) throws IOException, InterruptedException {
        this._type.assertFull();
        String targetDir = ConfigUtils.workerArtifactsRoot((Map)this._conf, (String)this._topologyId, (Integer)this._port);
        HashMap<String, String> env = (HashMap<String, String>)this._topoConf.get("topology.environment");
        if (env == null) {
            env = new HashMap<String, String>();
        }
        String str = ConfigUtils.workerArtifactsPidPath((Map)this._conf, (String)this._topologyId, (Integer)this._port);
        String workerPid = this._ops.slurpString(new File(str)).trim();
        ProfileAction profileAction = request.get_action();
        String logPrefix = "ProfilerAction process " + this._topologyId + ":" + this._port + " PROFILER_ACTION: " + profileAction + " ";
        List<String> command = this.mkProfileCommand(profileAction, stop, workerPid, targetDir);
        File targetFile = new File(targetDir);
        if (command.size() > 0) {
            return this.runProfilingCommand(command, env, logPrefix, targetFile);
        }
        LOG.warn("PROFILING REQUEST NOT SUPPORTED {} IGNORED...", (Object)request);
        return true;
    }

    private List<String> mkProfileCommand(ProfileAction action, boolean stop, String workerPid, String targetDir) {
        switch (action) {
            case JMAP_DUMP: {
                return this.jmapDumpCmd(workerPid, targetDir);
            }
            case JSTACK_DUMP: {
                return this.jstackDumpCmd(workerPid, targetDir);
            }
            case JPROFILE_DUMP: {
                return this.jprofileDump(workerPid, targetDir);
            }
            case JVM_RESTART: {
                return this.jprofileJvmRestart(workerPid);
            }
            case JPROFILE_STOP: {
                if (stop) {
                    return this.jprofileStop(workerPid, targetDir);
                }
                return this.jprofileStart(workerPid);
            }
        }
        return Lists.newArrayList();
    }

    private List<String> jmapDumpCmd(String pid, String targetDir) {
        return Lists.newArrayList((Object[])new String[]{this._profileCmd, pid, "jmap", targetDir});
    }

    private List<String> jstackDumpCmd(String pid, String targetDir) {
        return Lists.newArrayList((Object[])new String[]{this._profileCmd, pid, "jstack", targetDir});
    }

    private List<String> jprofileStart(String pid) {
        return Lists.newArrayList((Object[])new String[]{this._profileCmd, pid, "start"});
    }

    private List<String> jprofileStop(String pid, String targetDir) {
        return Lists.newArrayList((Object[])new String[]{this._profileCmd, pid, "stop", targetDir});
    }

    private List<String> jprofileDump(String pid, String targetDir) {
        return Lists.newArrayList((Object[])new String[]{this._profileCmd, pid, "dump", targetDir});
    }

    private List<String> jprofileJvmRestart(String pid) {
        return Lists.newArrayList((Object[])new String[]{this._profileCmd, pid, "kill"});
    }

    protected String javaLibraryPath(String stormRoot, Map<String, Object> conf) {
        String resourceRoot = stormRoot + Utils.FILE_PATH_SEPARATOR + "resources";
        String os = System.getProperty("os.name").replaceAll("\\s+", "_");
        String arch = System.getProperty("os.arch");
        String archResourceRoot = resourceRoot + Utils.FILE_PATH_SEPARATOR + os + "-" + arch;
        String ret = CPJ.join((Object)archResourceRoot, (Object)resourceRoot, new Object[]{conf.get("java.library.path")});
        return ret;
    }

    protected String getWildcardDir(File dir) {
        return Paths.get(dir.toString(), "*").toString();
    }

    protected List<String> frameworkClasspath() {
        File stormWorkerLibDir = new File(this._stormHome, "lib-worker");
        String topoConfDir = System.getenv("STORM_CONF_DIR") != null ? System.getenv("STORM_CONF_DIR") : new File(this._stormHome, "conf").getAbsolutePath();
        File stormExtlibDir = new File(this._stormHome, "extlib");
        String extcp = System.getenv("STORM_EXT_CLASSPATH");
        LinkedList<String> pathElements = new LinkedList<String>();
        pathElements.add(this.getWildcardDir(stormWorkerLibDir));
        pathElements.add(this.getWildcardDir(stormExtlibDir));
        pathElements.add(extcp);
        pathElements.add(topoConfDir);
        return pathElements;
    }

    private List<String> asStringList(Object o) {
        if (o instanceof String) {
            return Arrays.asList((String)o);
        }
        if (o instanceof List) {
            return (List)o;
        }
        return Collections.EMPTY_LIST;
    }

    protected String getWorkerClassPath(String stormJar, List<String> dependencyLocations) {
        ArrayList<String> workercp = new ArrayList<String>();
        workercp.addAll(this.asStringList(this._topoConf.get("topology.classpath.beginning")));
        workercp.addAll(this.frameworkClasspath());
        workercp.add(stormJar);
        workercp.addAll(dependencyLocations);
        workercp.addAll(this.asStringList(this._topoConf.get("topology.classpath")));
        return CPJ.join(workercp);
    }

    private String substituteChildOptsInternal(String string, int memOnheap) {
        if (StringUtils.isNotBlank((String)string)) {
            String p = String.valueOf(this._port);
            string = string.replace("%ID%", p);
            string = string.replace("%WORKER-ID%", this._workerId);
            string = string.replace("%TOPOLOGY-ID%", this._topologyId);
            string = string.replace("%WORKER-PORT%", p);
            if (memOnheap > 0) {
                string = string.replace("%HEAP-MEM%", String.valueOf(memOnheap));
            }
        }
        return string;
    }

    protected List<String> substituteChildopts(Object value) {
        return this.substituteChildopts(value, -1);
    }

    protected List<String> substituteChildopts(Object value, int memOnheap) {
        ArrayList<String> rets;
        block4: {
            block3: {
                String[] strings;
                rets = new ArrayList<String>();
                if (!(value instanceof String)) break block3;
                String string = this.substituteChildOptsInternal((String)value, memOnheap);
                if (!StringUtils.isNotBlank((String)string)) break block4;
                for (String s : strings = string.split("\\s+")) {
                    if (!StringUtils.isNotBlank((String)s)) continue;
                    rets.add(s);
                }
                break block4;
            }
            if (value instanceof List) {
                List objects = (List)value;
                for (String object : objects) {
                    String str = this.substituteChildOptsInternal(object, memOnheap);
                    if (!StringUtils.isNotBlank((String)str)) continue;
                    rets.add(str);
                }
            }
        }
        return rets;
    }

    protected void launchWorkerProcess(List<String> command, Map<String, String> env, String logPrefix, ExitCodeCallback processExitCallback, File targetDir) throws IOException {
        if (this._resourceIsolationManager != null) {
            command = this._resourceIsolationManager.getLaunchCommand(this._workerId, command);
        }
        ClientSupervisorUtils.launchProcess(command, env, (String)logPrefix, (ExitCodeCallback)processExitCallback, (File)targetDir);
    }

    private String getWorkerLoggingConfigFile() {
        String log4jConfigurationDir = (String)this._conf.get("storm.log4j2.conf.dir");
        if (StringUtils.isNotBlank((String)log4jConfigurationDir)) {
            if (!ServerUtils.isAbsolutePath(log4jConfigurationDir)) {
                log4jConfigurationDir = this._stormHome + Utils.FILE_PATH_SEPARATOR + log4jConfigurationDir;
            }
        } else {
            log4jConfigurationDir = this._stormHome + Utils.FILE_PATH_SEPARATOR + "log4j2";
        }
        if (ServerUtils.IS_ON_WINDOWS && !log4jConfigurationDir.startsWith("file:")) {
            log4jConfigurationDir = "file:///" + log4jConfigurationDir;
        }
        return log4jConfigurationDir + Utils.FILE_PATH_SEPARATOR + "worker.xml";
    }

    public static List<String> getDependencyLocationsFor(Map<String, Object> conf, String topologyId, AdvancedFSOps ops, String stormRoot) throws IOException {
        return DEP_LOC_CACHE.get(conf, topologyId, ops, stormRoot).get();
    }

    private List<String> getClassPathParams(String stormRoot) throws IOException {
        String stormJar = ConfigUtils.supervisorStormJarPath((String)stormRoot);
        List<String> dependencyLocations = BasicContainer.getDependencyLocationsFor(this._conf, this._topologyId, this._ops, stormRoot);
        String workerClassPath = this.getWorkerClassPath(stormJar, dependencyLocations);
        ArrayList<String> classPathParams = new ArrayList<String>();
        classPathParams.add("-cp");
        classPathParams.add(workerClassPath);
        return classPathParams;
    }

    private List<String> getCommonParams() {
        String workersArtifacts = ConfigUtils.workerArtifactsRoot((Map)this._conf);
        String stormLogDir = ConfigUtils.getLogDir();
        String log4jConfigurationFile = this.getWorkerLoggingConfigFile();
        ArrayList<String> commonParams = new ArrayList<String>();
        commonParams.add("-Dlogging.sensitivity=" + (String)Utils.OR((Object)((String)this._topoConf.get("topology.logging.sensitivity")), (Object)"S3"));
        commonParams.add("-Dlogfile.name=worker.log");
        commonParams.add("-Dstorm.home=" + (String)Utils.OR((Object)this._stormHome, (Object)""));
        commonParams.add("-Dworkers.artifacts=" + workersArtifacts);
        commonParams.add("-Dstorm.id=" + this._topologyId);
        commonParams.add("-Dworker.id=" + this._workerId);
        commonParams.add("-Dworker.port=" + this._port);
        commonParams.add("-Dstorm.log.dir=" + stormLogDir);
        commonParams.add("-Dlog4j.configurationFile=" + log4jConfigurationFile);
        commonParams.add("-DLog4jContextSelector=org.apache.logging.log4j.core.selector.BasicContextSelector");
        commonParams.add("-Dstorm.local.dir=" + this._conf.get("storm.local.dir"));
        return commonParams;
    }

    private int getMemOnHeap(WorkerResources resources) {
        int memOnheap = 0;
        memOnheap = resources != null && resources.is_set_mem_on_heap() && resources.get_mem_on_heap() > 0.0 ? (int)Math.ceil(resources.get_mem_on_heap()) : ObjectReader.getInt(this._topoConf.get("worker.heap.memory.mb"), (Integer)768);
        return memOnheap;
    }

    private List<String> getWorkerProfilerChildOpts(int memOnheap) {
        List<String> workerProfilerChildopts = new ArrayList<String>();
        if (ObjectReader.getBoolean(this._conf.get("worker.profiler.enabled"), (boolean)false)) {
            workerProfilerChildopts = this.substituteChildopts(this._conf.get("worker.profiler.childopts"), memOnheap);
        }
        return workerProfilerChildopts;
    }

    protected String javaCmd(String cmd) {
        String ret = null;
        String javaHome = System.getenv().get("JAVA_HOME");
        ret = StringUtils.isNotBlank((String)javaHome) ? javaHome + Utils.FILE_PATH_SEPARATOR + "bin" + Utils.FILE_PATH_SEPARATOR + cmd : cmd;
        return ret;
    }

    private List<String> mkLaunchCommand(int memOnheap, String stormRoot, String jlp) throws IOException {
        String javaCmd = this.javaCmd("java");
        String stormOptions = ConfigUtils.concatIfNotNull((String)System.getProperty("storm.options"));
        String topoConfFile = ConfigUtils.concatIfNotNull((String)System.getProperty("storm.conf.file"));
        String workerTmpDir = ConfigUtils.workerTmpRoot((Map)this._conf, (String)this._workerId);
        List<String> classPathParams = this.getClassPathParams(stormRoot);
        List<String> commonParams = this.getCommonParams();
        ArrayList<String> commandList = new ArrayList<String>();
        commandList.add(javaCmd);
        commandList.addAll(classPathParams);
        commandList.addAll(this.substituteChildopts(this._topoConf.get("topology.worker.logwriter.childopts")));
        commandList.addAll(commonParams);
        commandList.add("org.apache.storm.LogWriter");
        commandList.add(javaCmd);
        commandList.add("-server");
        commandList.addAll(commonParams);
        commandList.addAll(this.substituteChildopts(this._conf.get("worker.childopts"), memOnheap));
        commandList.addAll(this.substituteChildopts(this._topoConf.get("topology.worker.childopts"), memOnheap));
        commandList.addAll(this.substituteChildopts(Utils.OR(this._topoConf.get("topology.worker.gc.childopts"), this._conf.get("worker.gc.childopts")), memOnheap));
        commandList.addAll(this.getWorkerProfilerChildOpts(memOnheap));
        commandList.add("-Djava.library.path=" + jlp);
        commandList.add("-Dstorm.conf.file=" + topoConfFile);
        commandList.add("-Dstorm.options=" + stormOptions);
        commandList.add("-Djava.io.tmpdir=" + workerTmpDir);
        commandList.addAll(classPathParams);
        commandList.add("org.apache.storm.daemon.worker.Worker");
        commandList.add(this._topologyId);
        commandList.add(this._supervisorId);
        commandList.add(String.valueOf(this._port));
        commandList.add(this._workerId);
        return commandList;
    }

    @Override
    public void launch() throws IOException {
        this._type.assertFull();
        LOG.info("Launching worker with assignment {} for this supervisor {} on port {} with id {}", new Object[]{this._assignment, this._supervisorId, this._port, this._workerId});
        String logPrefix = "Worker Process " + this._workerId;
        ProcessExitCallback processExitCallback = new ProcessExitCallback(logPrefix);
        this._exitedEarly = false;
        WorkerResources resources = this._assignment.get_resources();
        int memOnheap = this.getMemOnHeap(resources);
        String stormRoot = ConfigUtils.supervisorStormDistRoot((Map)this._conf, (String)this._topologyId);
        String jlp = this.javaLibraryPath(stormRoot, this._conf);
        List<String> commandList = this.mkLaunchCommand(memOnheap, stormRoot, jlp);
        HashMap<String, String> topEnvironment = new HashMap<String, String>();
        Map environment = (Map)this._topoConf.get("topology.environment");
        if (environment != null) {
            topEnvironment.putAll(environment);
        }
        topEnvironment.put("LD_LIBRARY_PATH", jlp);
        if (this._resourceIsolationManager != null) {
            int memoffheap = (int)Math.ceil(resources.get_mem_off_heap());
            int cpu = (int)Math.ceil(resources.get_cpu());
            int cGroupMem = (int)Math.ceil((Double)this._conf.get(DaemonConfig.STORM_CGROUP_MEMORY_LIMIT_TOLERANCE_MARGIN_MB));
            int memoryValue = memoffheap + memOnheap + cGroupMem;
            int cpuValue = cpu;
            HashMap<String, Number> map = new HashMap<String, Number>();
            map.put("cpu", cpuValue);
            map.put("memory", memoryValue);
            this._resourceIsolationManager.reserveResourcesForWorker(this._workerId, map);
        }
        LOG.info("Launching worker with command: {}. ", (Object)ServerUtils.shellCmd(commandList));
        String workerDir = ConfigUtils.workerRoot((Map)this._conf, (String)this._workerId);
        this.launchWorkerProcess(commandList, topEnvironment, logPrefix, processExitCallback, new File(workerDir));
    }

    static class DepLRUCache {
        public final int _maxSize = 100;
        private LinkedHashMap<String, DependencyLocations> _cache = new LinkedHashMap<String, DependencyLocations>(){

            @Override
            protected boolean removeEldestEntry(Map.Entry<String, DependencyLocations> eldest) {
                return this.size() > 100;
            }
        };

        DepLRUCache() {
        }

        public synchronized DependencyLocations get(Map<String, Object> conf, String topologyId, AdvancedFSOps ops, String stormRoot) {
            DependencyLocations dl = this._cache.get(topologyId);
            if (dl == null) {
                this._cache.putIfAbsent(topologyId, new DependencyLocations(conf, topologyId, ops, stormRoot));
                dl = this._cache.get(topologyId);
            }
            return dl;
        }

        public synchronized void clear() {
            this._cache.clear();
        }
    }

    private static class DependencyLocations {
        private List<String> _data = null;
        private final Map<String, Object> _conf;
        private final String _topologyId;
        private final AdvancedFSOps _ops;
        private final String _stormRoot;

        public DependencyLocations(Map<String, Object> conf, String topologyId, AdvancedFSOps ops, String stormRoot) {
            this._conf = conf;
            this._topologyId = topologyId;
            this._ops = ops;
            this._stormRoot = stormRoot;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public String toString() {
            List<String> data;
            DependencyLocations dependencyLocations = this;
            synchronized (dependencyLocations) {
                data = this._data;
            }
            return "DEP_LOCS for " + this._topologyId + " => " + data;
        }

        public synchronized List<String> get() throws IOException {
            if (this._data != null) {
                return this._data;
            }
            StormTopology stormTopology = ConfigUtils.readSupervisorTopology(this._conf, (String)this._topologyId, (AdvancedFSOps)this._ops);
            ArrayList<String> dependencyLocations = new ArrayList<String>();
            if (stormTopology.get_dependency_jars() != null) {
                for (String dependency : stormTopology.get_dependency_jars()) {
                    dependencyLocations.add(new File(this._stormRoot, dependency).getAbsolutePath());
                }
            }
            if (stormTopology.get_dependency_artifacts() != null) {
                for (String dependency : stormTopology.get_dependency_artifacts()) {
                    dependencyLocations.add(new File(this._stormRoot, dependency).getAbsolutePath());
                }
            }
            this._data = dependencyLocations;
            return this._data;
        }
    }

    private class ProcessExitCallback
    implements ExitCodeCallback {
        private final String _logPrefix;

        public ProcessExitCallback(String logPrefix) {
            this._logPrefix = logPrefix;
        }

        public void call(int exitCode) {
            LOG.info("{} exited with code: {}", (Object)this._logPrefix, (Object)exitCode);
            BasicContainer.this._exitedEarly = true;
        }
    }
}

