/*
 * Decompiled with CFR 0.152.
 */
package org.apache.storm.daemon.supervisor;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.storm.cluster.IStormClusterState;
import org.apache.storm.cluster.VersionedData;
import org.apache.storm.daemon.supervisor.ContainerLauncher;
import org.apache.storm.daemon.supervisor.EventManagerPushCallback;
import org.apache.storm.daemon.supervisor.Slot;
import org.apache.storm.daemon.supervisor.Supervisor;
import org.apache.storm.daemon.supervisor.SupervisorUtils;
import org.apache.storm.daemon.supervisor.UniFunc;
import org.apache.storm.event.EventManager;
import org.apache.storm.generated.Assignment;
import org.apache.storm.generated.ExecutorInfo;
import org.apache.storm.generated.LocalAssignment;
import org.apache.storm.generated.NodeInfo;
import org.apache.storm.generated.ProfileRequest;
import org.apache.storm.generated.WorkerResources;
import org.apache.storm.localizer.ILocalizer;
import org.apache.storm.scheduler.ISupervisor;
import org.apache.storm.utils.LocalState;
import org.apache.storm.utils.Time;
import org.apache.storm.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ReadClusterState
implements Runnable,
AutoCloseable {
    private static final Logger LOG = LoggerFactory.getLogger(ReadClusterState.class);
    private final Map<String, Object> superConf;
    private final IStormClusterState stormClusterState;
    private final EventManager syncSupEventManager;
    private final AtomicReference<Map<String, VersionedData<Assignment>>> assignmentVersions;
    private final Map<Integer, Slot> slots = new HashMap<Integer, Slot>();
    private final AtomicInteger readRetry = new AtomicInteger(0);
    private final String assignmentId;
    private final ISupervisor iSuper;
    private final ILocalizer localizer;
    private final ContainerLauncher launcher;
    private final String host;
    private final LocalState localState;
    private final IStormClusterState clusterState;
    private final AtomicReference<Map<Long, LocalAssignment>> cachedAssignments;
    private static final long WARN_MILLIS = 1000L;
    private static final long ERROR_MILLIS = 60000L;
    public static final UniFunc<Slot> DEFAULT_ON_ERROR_TIMEOUT = slot -> {
        throw new IllegalStateException("It took over 60000ms to shut down slot " + slot);
    };
    public static final UniFunc<Slot> DEFAULT_ON_WARN_TIMEOUT = slot -> LOG.warn("It has taken {}ms so far and {} is still not shut down.", (Object)1000L, slot);
    public static final UniFunc<Slot> THREAD_DUMP_ON_ERROR = slot -> {
        LOG.warn("Shutdown of slot {} appears to be stuck\n{}", slot, (Object)Utils.threadDump());
        DEFAULT_ON_ERROR_TIMEOUT.call((Slot)slot);
    };

    public ReadClusterState(Supervisor supervisor) throws Exception {
        this.superConf = supervisor.getConf();
        this.stormClusterState = supervisor.getStormClusterState();
        this.syncSupEventManager = supervisor.getEventManger();
        this.assignmentVersions = new AtomicReference(new HashMap());
        this.assignmentId = supervisor.getAssignmentId();
        this.iSuper = supervisor.getiSupervisor();
        this.localizer = supervisor.getAsyncLocalizer();
        this.host = supervisor.getHostName();
        this.localState = supervisor.getLocalState();
        this.clusterState = supervisor.getStormClusterState();
        this.cachedAssignments = supervisor.getCurrAssignment();
        this.launcher = ContainerLauncher.make(this.superConf, this.assignmentId, supervisor.getSharedContext());
        List ports = (List)this.superConf.get("supervisor.slots.ports");
        for (Object port : ports) {
            this.slots.put(((Number)port).intValue(), this.mkSlot(((Number)port).intValue()));
        }
        try {
            Collection<String> workers = SupervisorUtils.supervisorWorkerIds(this.superConf);
            for (Slot slot : this.slots.values()) {
                String workerId = slot.getWorkerId();
                if (workerId == null) continue;
                workers.remove(workerId);
            }
            if (!workers.isEmpty()) {
                supervisor.killWorkers(workers, this.launcher);
            }
        }
        catch (Exception e) {
            LOG.warn("Error trying to clean up old workers", (Throwable)e);
        }
        try {
            this.localizer.cleanupUnusedTopologies();
        }
        catch (Exception e) {
            LOG.warn("Error trying to clean up old topologies", (Throwable)e);
        }
        for (Slot slot : this.slots.values()) {
            slot.start();
        }
    }

    private Slot mkSlot(int port) throws Exception {
        return new Slot(this.localizer, this.superConf, this.launcher, this.host, port, this.localState, this.clusterState, this.iSuper, this.cachedAssignments);
    }

    @Override
    public synchronized void run() {
        try {
            EventManagerPushCallback syncCallback = new EventManagerPushCallback(this, this.syncSupEventManager);
            List stormIds = this.stormClusterState.assignments((Runnable)syncCallback);
            Map<String, VersionedData<Assignment>> assignmentsSnapshot = this.getAssignmentsSnapshot(this.stormClusterState, stormIds, this.assignmentVersions.get(), syncCallback);
            Map<Integer, LocalAssignment> allAssignments = this.readAssignments(assignmentsSnapshot);
            if (allAssignments == null) {
                return;
            }
            Map<String, List<ProfileRequest>> topoIdToProfilerActions = this.getProfileActions(this.stormClusterState, stormIds);
            HashSet<Integer> assignedPorts = new HashSet<Integer>();
            LOG.debug("Synchronizing supervisor");
            LOG.debug("All assignment: {}", allAssignments);
            LOG.debug("Topology Ids -> Profiler Actions {}", topoIdToProfilerActions);
            for (Integer port : allAssignments.keySet()) {
                if (!this.iSuper.confirmAssigned(port.intValue())) continue;
                assignedPorts.add(port);
            }
            HashSet<Integer> allPorts = new HashSet<Integer>(assignedPorts);
            allPorts.addAll(this.slots.keySet());
            HashMap<Integer, HashSet<Slot.TopoProfileAction>> filtered = new HashMap<Integer, HashSet<Slot.TopoProfileAction>>();
            for (Map.Entry<String, List<ProfileRequest>> entry : topoIdToProfilerActions.entrySet()) {
                String topoId = entry.getKey();
                if (entry.getValue() == null) continue;
                for (ProfileRequest req : entry.getValue()) {
                    NodeInfo ni = req.get_nodeInfo();
                    if (!this.host.equals(ni.get_node())) continue;
                    Long port = (Long)ni.get_port().iterator().next();
                    HashSet<Slot.TopoProfileAction> actions = (HashSet<Slot.TopoProfileAction>)filtered.get(port.intValue());
                    if (actions == null) {
                        actions = new HashSet<Slot.TopoProfileAction>();
                        filtered.put(port.intValue(), actions);
                    }
                    actions.add(new Slot.TopoProfileAction(topoId, req));
                }
            }
            for (Integer port : allPorts) {
                Slot slot = this.slots.get(port);
                if (slot == null) {
                    slot = this.mkSlot(port);
                    this.slots.put(port, slot);
                    slot.start();
                }
                slot.setNewAssignment(allAssignments.get(port));
                slot.addProfilerActions((Set)filtered.get(port));
            }
        }
        catch (Exception e) {
            LOG.error("Failed to Sync Supervisor", (Throwable)e);
            throw new RuntimeException(e);
        }
    }

    protected Map<String, VersionedData<Assignment>> getAssignmentsSnapshot(IStormClusterState stormClusterState, List<String> topoIds, Map<String, VersionedData<Assignment>> localAssignmentVersion, Runnable callback) throws Exception {
        HashMap<String, VersionedData<Assignment>> updateAssignmentVersion = new HashMap<String, VersionedData<Assignment>>();
        for (String topoId : topoIds) {
            Integer recordedVersion = -1;
            Integer version = stormClusterState.assignmentVersion(topoId, callback);
            VersionedData<Assignment> locAssignment = localAssignmentVersion.get(topoId);
            if (locAssignment != null) {
                recordedVersion = locAssignment.getVersion();
            }
            if (version == null) continue;
            if (version == recordedVersion) {
                updateAssignmentVersion.put(topoId, locAssignment);
                continue;
            }
            VersionedData assignmentVersion = stormClusterState.assignmentInfoWithVersion(topoId, callback);
            updateAssignmentVersion.put(topoId, (VersionedData<Assignment>)assignmentVersion);
        }
        return updateAssignmentVersion;
    }

    protected Map<String, List<ProfileRequest>> getProfileActions(IStormClusterState stormClusterState, List<String> stormIds) throws Exception {
        HashMap<String, List<ProfileRequest>> ret = new HashMap<String, List<ProfileRequest>>();
        for (String stormId : stormIds) {
            List profileRequests = stormClusterState.getTopologyProfileRequests(stormId);
            ret.put(stormId, profileRequests);
        }
        return ret;
    }

    protected Map<Integer, LocalAssignment> readAssignments(Map<String, VersionedData<Assignment>> assignmentsSnapshot) {
        try {
            HashMap<Integer, LocalAssignment> portLA = new HashMap<Integer, LocalAssignment>();
            for (Map.Entry<String, VersionedData<Assignment>> assignEntry : assignmentsSnapshot.entrySet()) {
                String topoId = assignEntry.getKey();
                Assignment assignment = (Assignment)assignEntry.getValue().getData();
                Map<Integer, LocalAssignment> portTasks = this.readMyExecutors(topoId, this.assignmentId, assignment);
                for (Map.Entry<Integer, LocalAssignment> entry : portTasks.entrySet()) {
                    Integer port = entry.getKey();
                    LocalAssignment la = entry.getValue();
                    if (!portLA.containsKey(port)) {
                        portLA.put(port, la);
                        continue;
                    }
                    throw new RuntimeException("Should not have multiple topologies assigned to one port " + port + " " + la + " " + portLA);
                }
            }
            this.readRetry.set(0);
            return portLA;
        }
        catch (RuntimeException e) {
            if (this.readRetry.get() > 2) {
                throw e;
            }
            this.readRetry.addAndGet(1);
            LOG.warn("{} : retrying {} of 3", (Object)e.getMessage(), (Object)this.readRetry.get());
            return null;
        }
    }

    protected Map<Integer, LocalAssignment> readMyExecutors(String stormId, String assignmentId, Assignment assignment) {
        Map executorNodePort;
        HashMap<Integer, LocalAssignment> portTasks = new HashMap<Integer, LocalAssignment>();
        HashMap slotsResources = new HashMap();
        Map nodeInfoWorkerResourcesMap = assignment.get_worker_resources();
        if (nodeInfoWorkerResourcesMap != null) {
            for (Map.Entry entry : nodeInfoWorkerResourcesMap.entrySet()) {
                if (!((NodeInfo)entry.getKey()).get_node().equals(assignmentId)) continue;
                Set ports = ((NodeInfo)entry.getKey()).get_port();
                for (Long port : ports) {
                    slotsResources.put(port, entry.getValue());
                }
            }
        }
        if ((executorNodePort = assignment.get_executor_node_port()) != null) {
            for (Map.Entry entry : executorNodePort.entrySet()) {
                if (!((NodeInfo)entry.getValue()).get_node().equals(assignmentId)) continue;
                for (Long port : ((NodeInfo)entry.getValue()).get_port()) {
                    LocalAssignment localAssignment = (LocalAssignment)portTasks.get(port.intValue());
                    if (localAssignment == null) {
                        ArrayList executors = new ArrayList();
                        localAssignment = new LocalAssignment(stormId, executors);
                        if (slotsResources.containsKey(port)) {
                            localAssignment.set_resources((WorkerResources)slotsResources.get(port));
                        }
                        portTasks.put(port.intValue(), localAssignment);
                    }
                    List executorInfoList = localAssignment.get_executors();
                    executorInfoList.add(new ExecutorInfo(((Long)((List)entry.getKey()).get(0)).intValue(), ((Long)((List)entry.getKey()).get(((List)entry.getKey()).size() - 1)).intValue()));
                }
            }
        }
        return portTasks;
    }

    public synchronized void shutdownAllWorkers(UniFunc<Slot> onWarnTimeout, UniFunc<Slot> onErrorTimeout) {
        for (Slot slot : this.slots.values()) {
            LOG.info("Setting {} assignment to null", (Object)slot);
            slot.setNewAssignment(null);
        }
        if (onWarnTimeout == null) {
            onWarnTimeout = DEFAULT_ON_WARN_TIMEOUT;
        }
        if (onErrorTimeout == null) {
            onErrorTimeout = DEFAULT_ON_ERROR_TIMEOUT;
        }
        long startTime = Time.currentTimeMillis();
        Exception exp = null;
        for (Slot slot : this.slots.values()) {
            LOG.info("Waiting for {} to be EMPTY, currently {}", (Object)slot, (Object)slot.getMachineState());
            try {
                while (slot.getMachineState() != Slot.MachineState.EMPTY) {
                    long timeSpentMillis = Time.currentTimeMillis() - startTime;
                    if (timeSpentMillis > 60000L) {
                        onErrorTimeout.call(slot);
                    }
                    if (timeSpentMillis > 1000L) {
                        onWarnTimeout.call(slot);
                    }
                    if (Time.isSimulating()) {
                        Time.advanceTime((long)100L);
                    }
                    Thread.sleep(100L);
                }
            }
            catch (Exception e) {
                LOG.error("Error trying to shutdown workers in {}", (Object)slot, (Object)e);
                exp = e;
            }
        }
        if (exp != null) {
            if (exp instanceof RuntimeException) {
                throw (RuntimeException)exp;
            }
            throw new RuntimeException(exp);
        }
    }

    @Override
    public void close() {
        for (Slot slot : this.slots.values()) {
            try {
                slot.close();
            }
            catch (Exception e) {
                LOG.error("Error trying to shutdown {}", (Object)slot, (Object)e);
            }
        }
    }
}

