/*
 * Decompiled with CFR 0.152.
 */
package org.apache.storm.daemon.supervisor;

import java.io.File;
import java.io.IOException;
import java.net.UnknownHostException;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.io.FileUtils;
import org.apache.storm.StormTimer;
import org.apache.storm.cluster.ClusterStateContext;
import org.apache.storm.cluster.ClusterUtils;
import org.apache.storm.cluster.DaemonType;
import org.apache.storm.cluster.IStormClusterState;
import org.apache.storm.daemon.DaemonCommon;
import org.apache.storm.daemon.supervisor.ContainerLauncher;
import org.apache.storm.daemon.supervisor.DefaultUncaughtExceptionHandler;
import org.apache.storm.daemon.supervisor.EventManagerPushCallback;
import org.apache.storm.daemon.supervisor.Killable;
import org.apache.storm.daemon.supervisor.ReadClusterState;
import org.apache.storm.daemon.supervisor.Slot;
import org.apache.storm.daemon.supervisor.StandaloneSupervisor;
import org.apache.storm.daemon.supervisor.SupervisorUtils;
import org.apache.storm.daemon.supervisor.UniFunc;
import org.apache.storm.daemon.supervisor.timer.SupervisorHealthCheck;
import org.apache.storm.daemon.supervisor.timer.SupervisorHeartbeat;
import org.apache.storm.daemon.supervisor.timer.UpdateBlobs;
import org.apache.storm.event.EventManager;
import org.apache.storm.event.EventManagerImp;
import org.apache.storm.generated.LocalAssignment;
import org.apache.storm.localizer.AsyncLocalizer;
import org.apache.storm.localizer.ILocalizer;
import org.apache.storm.localizer.Localizer;
import org.apache.storm.messaging.IContext;
import org.apache.storm.metric.StormMetricsRegistry;
import org.apache.storm.scheduler.ISupervisor;
import org.apache.storm.utils.ConfigUtils;
import org.apache.storm.utils.LocalState;
import org.apache.storm.utils.ObjectReader;
import org.apache.storm.utils.ServerConfigUtils;
import org.apache.storm.utils.ServerUtils;
import org.apache.storm.utils.Time;
import org.apache.storm.utils.Utils;
import org.apache.storm.utils.VersionInfo;
import org.apache.zookeeper.data.ACL;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Supervisor
implements DaemonCommon,
AutoCloseable {
    private static final Logger LOG = LoggerFactory.getLogger(Supervisor.class);
    private final Map<String, Object> conf;
    private final IContext sharedContext;
    private volatile boolean active;
    private final ISupervisor iSupervisor;
    private final Utils.UptimeComputer upTime;
    private final String stormVersion;
    private final IStormClusterState stormClusterState;
    private final LocalState localState;
    private final String supervisorId;
    private final String assignmentId;
    private final String hostName;
    private final AtomicReference<Map<Long, LocalAssignment>> currAssignment;
    private final StormTimer heartbeatTimer;
    private final StormTimer eventTimer;
    private final StormTimer blobUpdateTimer;
    private final Localizer localizer;
    private final AsyncLocalizer asyncLocalizer;
    private EventManager eventManager;
    private ReadClusterState readState;

    private Supervisor(ISupervisor iSupervisor) throws IOException {
        this(Utils.readStormConfig(), null, iSupervisor);
    }

    public Supervisor(Map<String, Object> conf, IContext sharedContext, ISupervisor iSupervisor) throws IOException {
        this.conf = conf;
        this.iSupervisor = iSupervisor;
        this.active = true;
        this.upTime = Utils.makeUptimeComputer();
        this.stormVersion = VersionInfo.getVersion();
        this.sharedContext = sharedContext;
        iSupervisor.prepare(conf, ServerConfigUtils.supervisorIsupervisorDir(conf));
        List<ACL> acls = null;
        if (Utils.isZkAuthenticationConfiguredStormServer(conf)) {
            acls = SupervisorUtils.supervisorZkAcls();
        }
        try {
            this.stormClusterState = ClusterUtils.mkStormClusterState(conf, acls, (ClusterStateContext)new ClusterStateContext(DaemonType.SUPERVISOR));
        }
        catch (Exception e) {
            LOG.error("supervisor can't create stormClusterState");
            throw Utils.wrapInRuntime((Exception)e);
        }
        try {
            this.localState = ServerConfigUtils.supervisorState(conf);
            this.localizer = ServerUtils.createLocalizer(conf, ConfigUtils.supervisorLocalDir(conf));
            this.asyncLocalizer = new AsyncLocalizer(conf, this.localizer);
        }
        catch (IOException e) {
            throw Utils.wrapInRuntime((Exception)e);
        }
        this.supervisorId = iSupervisor.getSupervisorId();
        this.assignmentId = iSupervisor.getAssignmentId();
        try {
            this.hostName = Utils.hostname();
        }
        catch (UnknownHostException e) {
            throw Utils.wrapInRuntime((Exception)e);
        }
        this.currAssignment = new AtomicReference(new HashMap());
        this.heartbeatTimer = new StormTimer(null, (Thread.UncaughtExceptionHandler)new DefaultUncaughtExceptionHandler());
        this.eventTimer = new StormTimer(null, (Thread.UncaughtExceptionHandler)new DefaultUncaughtExceptionHandler());
        this.blobUpdateTimer = new StormTimer("blob-update-timer", (Thread.UncaughtExceptionHandler)new DefaultUncaughtExceptionHandler());
    }

    public String getId() {
        return this.supervisorId;
    }

    IContext getSharedContext() {
        return this.sharedContext;
    }

    public Map<String, Object> getConf() {
        return this.conf;
    }

    public ISupervisor getiSupervisor() {
        return this.iSupervisor;
    }

    public Utils.UptimeComputer getUpTime() {
        return this.upTime;
    }

    public String getStormVersion() {
        return this.stormVersion;
    }

    public IStormClusterState getStormClusterState() {
        return this.stormClusterState;
    }

    LocalState getLocalState() {
        return this.localState;
    }

    public String getAssignmentId() {
        return this.assignmentId;
    }

    public String getHostName() {
        return this.hostName;
    }

    public AtomicReference<Map<Long, LocalAssignment>> getCurrAssignment() {
        return this.currAssignment;
    }

    public Localizer getLocalizer() {
        return this.localizer;
    }

    ILocalizer getAsyncLocalizer() {
        return this.asyncLocalizer;
    }

    EventManager getEventManger() {
        return this.eventManager;
    }

    public void launch() throws Exception {
        LOG.info("Starting Supervisor with conf {}", this.conf);
        String path = ServerConfigUtils.supervisorTmpDir(this.conf);
        FileUtils.cleanDirectory((File)new File(path));
        Localizer localizer = this.getLocalizer();
        SupervisorHeartbeat hb = new SupervisorHeartbeat(this.conf, this);
        hb.run();
        Integer heartbeatFrequency = ObjectReader.getInt((Object)this.conf.get("supervisor.heartbeat.frequency.secs"));
        this.heartbeatTimer.scheduleRecurring(0, heartbeatFrequency.intValue(), (Runnable)hb);
        this.eventManager = new EventManagerImp(false);
        this.readState = new ReadClusterState(this);
        Set<String> downloadedTopoIds = SupervisorUtils.readDownloadedTopologyIds(this.conf);
        for (String topoId : downloadedTopoIds) {
            SupervisorUtils.addBlobReferences(localizer, topoId, this.conf);
        }
        localizer.startCleaner();
        UpdateBlobs updateBlobsThread = new UpdateBlobs(this);
        if (((Boolean)this.conf.get("supervisor.enable")).booleanValue()) {
            this.eventTimer.scheduleRecurring(0, 10, (Runnable)new EventManagerPushCallback(this.readState, this.eventManager));
            this.blobUpdateTimer.scheduleRecurring(30, 30, (Runnable)new EventManagerPushCallback(updateBlobsThread, this.eventManager));
            this.eventTimer.scheduleRecurring(300, 300, (Runnable)new SupervisorHealthCheck(this));
        }
        LOG.info("Starting supervisor with id {} at host {}.", (Object)this.getId(), (Object)this.getHostName());
    }

    private void launchDaemon() {
        LOG.info("Starting supervisor for storm version '{}'.", (Object)VersionInfo.getVersion());
        try {
            Map<String, Object> conf = this.getConf();
            if (ConfigUtils.isLocalMode(conf)) {
                throw new IllegalArgumentException("Cannot start server in local mode!");
            }
            this.launch();
            Utils.addShutdownHookWithForceKillIn1Sec(() -> this.close());
            this.registerWorkerNumGauge("supervisor:num-slots-used-gauge", conf);
            StormMetricsRegistry.startMetricsReporters(conf);
        }
        catch (Exception e) {
            LOG.error("Failed to start supervisor\n", (Throwable)e);
            System.exit(1);
        }
    }

    private void registerWorkerNumGauge(String name, final Map<String, Object> conf) {
        StormMetricsRegistry.registerGauge(name, new Callable<Integer>(){

            @Override
            public Integer call() throws Exception {
                Collection<String> pids = SupervisorUtils.supervisorWorkerIds(conf);
                return pids.size();
            }
        });
    }

    @Override
    public void close() {
        try {
            LOG.info("Shutting down supervisor {}", (Object)this.getId());
            this.active = false;
            this.heartbeatTimer.close();
            this.eventTimer.close();
            this.blobUpdateTimer.close();
            if (this.eventManager != null) {
                this.eventManager.close();
            }
            if (this.readState != null) {
                this.readState.close();
            }
            this.asyncLocalizer.shutdown();
            this.localizer.shutdown();
            this.getStormClusterState().disconnect();
        }
        catch (Exception e) {
            LOG.error("Error Shutting down", (Throwable)e);
        }
    }

    void killWorkers(Collection<String> workerIds, ContainerLauncher launcher) throws InterruptedException, IOException {
        HashSet<Killable> containers = new HashSet<Killable>();
        for (String workerId : workerIds) {
            try {
                Killable k = launcher.recoverContainer(workerId, this.localState);
                if (!k.areAllProcessesDead()) {
                    k.kill();
                    containers.add(k);
                    continue;
                }
                k.cleanUp();
            }
            catch (Exception e) {
                LOG.error("Error trying to kill {}", (Object)workerId, (Object)e);
            }
        }
        int shutdownSleepSecs = ObjectReader.getInt((Object)this.conf.get("supervisor.worker.shutdown.sleep.secs"), (Integer)1);
        if (!containers.isEmpty()) {
            Time.sleepSecs((long)shutdownSleepSecs);
        }
        for (Killable k : containers) {
            try {
                k.forceKill();
                long start = Time.currentTimeMillis();
                while (!k.areAllProcessesDead()) {
                    if (Time.currentTimeMillis() - start > 10000L) {
                        throw new RuntimeException("Giving up on killing " + k + " after " + (Time.currentTimeMillis() - start) + " ms");
                    }
                    Time.sleep((long)100L);
                    k.forceKill();
                }
                k.cleanUp();
            }
            catch (Exception e) {
                LOG.error("Error trying to clean up {}", (Object)k, (Object)e);
            }
        }
    }

    public void shutdownAllWorkers(UniFunc<Slot> onWarnTimeout, UniFunc<Slot> onErrorTimeout) {
        if (this.readState != null) {
            this.readState.shutdownAllWorkers(onWarnTimeout, onErrorTimeout);
        } else {
            try {
                ContainerLauncher launcher = ContainerLauncher.make(this.getConf(), this.getId(), this.getSharedContext());
                this.killWorkers(SupervisorUtils.supervisorWorkerIds(this.conf), launcher);
            }
            catch (Exception e) {
                throw Utils.wrapInRuntime((Exception)e);
            }
        }
    }

    public boolean isWaiting() {
        if (!this.active) {
            return true;
        }
        return this.heartbeatTimer.isTimerWaiting() && this.eventTimer.isTimerWaiting() && this.eventManager.waiting();
    }

    public static void main(String[] args) throws IOException {
        Utils.setupDefaultUncaughtExceptionHandler();
        Supervisor instance = new Supervisor(new StandaloneSupervisor());
        instance.launchDaemon();
    }
}

