/*
 * Decompiled with CFR 0.152.
 */
package org.apache.storm.scheduler.resource;

import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import org.apache.storm.scheduler.Cluster;
import org.apache.storm.scheduler.ExecutorDetails;
import org.apache.storm.scheduler.IScheduler;
import org.apache.storm.scheduler.Topologies;
import org.apache.storm.scheduler.TopologyDetails;
import org.apache.storm.scheduler.WorkerSlot;
import org.apache.storm.scheduler.resource.RAS_Node;
import org.apache.storm.scheduler.resource.RAS_Nodes;
import org.apache.storm.scheduler.resource.ResourceUtils;
import org.apache.storm.scheduler.resource.SchedulingResult;
import org.apache.storm.scheduler.resource.SchedulingState;
import org.apache.storm.scheduler.resource.SchedulingStatus;
import org.apache.storm.scheduler.resource.User;
import org.apache.storm.scheduler.resource.strategies.eviction.IEvictionStrategy;
import org.apache.storm.scheduler.resource.strategies.priority.ISchedulingPriorityStrategy;
import org.apache.storm.scheduler.resource.strategies.scheduling.IStrategy;
import org.apache.storm.utils.ReflectionUtils;
import org.apache.storm.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ResourceAwareScheduler
implements IScheduler {
    private SchedulingState schedulingState;
    private Map<String, Object> conf;
    private static final Logger LOG = LoggerFactory.getLogger(ResourceAwareScheduler.class);

    public void prepare(Map<String, Object> conf) {
        this.conf = conf;
    }

    public void schedule(Topologies topologies, Cluster cluster) {
        LOG.debug("\n\n\nRerunning ResourceAwareScheduler...");
        this.initialize(topologies, cluster);
        LOG.info("Cluster scheduling:\n{}", (Object)ResourceUtils.printScheduling((Cluster)cluster, (Topologies)topologies));
        LOG.info("Nodes:\n{}", (Object)this.schedulingState.nodes);
        for (User user : this.getUserMap().values()) {
            LOG.info(user.getDetailedInfo());
        }
        ISchedulingPriorityStrategy schedulingPrioritystrategy = null;
        while (true) {
            TopologyDetails td;
            if (schedulingPrioritystrategy == null) {
                try {
                    schedulingPrioritystrategy = (ISchedulingPriorityStrategy)ReflectionUtils.newInstance((String)((String)this.conf.get("resource.aware.scheduler.priority.strategy")));
                }
                catch (RuntimeException ex) {
                    LOG.error(String.format("failed to create instance of priority strategy: %s with error: %s! No topologies will be scheduled.", this.conf.get("resource.aware.scheduler.priority.strategy"), ex.getMessage()), (Throwable)ex);
                    break;
                }
            }
            try {
                schedulingPrioritystrategy.prepare(this.schedulingState);
                td = schedulingPrioritystrategy.getNextTopologyToSchedule();
            }
            catch (Exception ex) {
                LOG.error(String.format("Exception thrown when running priority strategy %s. No topologies will be scheduled! Error: %s", schedulingPrioritystrategy.getClass().getName(), ex.getMessage()), (Object[])ex.getStackTrace());
                break;
            }
            if (td == null) break;
            this.scheduleTopology(td);
            LOG.debug("Nodes after scheduling:\n{}", (Object)this.schedulingState.nodes);
        }
        this.updateChanges(cluster, topologies);
    }

    private void updateChanges(Cluster cluster, Topologies topologies) {
        cluster.setAssignments(this.schedulingState.cluster.getAssignments());
        cluster.setBlacklistedHosts(this.schedulingState.cluster.getBlacklistedHosts());
        cluster.setStatusMap(this.schedulingState.cluster.getStatusMap());
        cluster.setSupervisorsResourcesMap(this.schedulingState.cluster.getSupervisorsResourcesMap());
        cluster.setTopologyResourcesMap(this.schedulingState.cluster.getTopologyResourcesMap());
        cluster.setWorkerResourcesMap(this.schedulingState.cluster.getWorkerResourcesMap());
        this.updateSupervisorsResources(cluster, topologies);
    }

    public void scheduleTopology(TopologyDetails td) {
        block18: {
            User topologySubmitter;
            block20: {
                SchedulingState schedulingState;
                block17: {
                    SchedulingResult result;
                    block19: {
                        boolean madeSpace;
                        topologySubmitter = (User)this.schedulingState.userMap.get(td.getTopologySubmitter());
                        if (this.schedulingState.cluster.getUnassignedExecutors(td).size() <= 0) break block20;
                        LOG.debug("/********Scheduling topology {} from User {}************/", (Object)td.getName(), (Object)topologySubmitter);
                        schedulingState = this.checkpointSchedulingState();
                        IStrategy rasStrategy = null;
                        try {
                            rasStrategy = (IStrategy)ReflectionUtils.newInstance((String)((String)td.getConf().get("topology.scheduler.strategy")));
                        }
                        catch (RuntimeException e) {
                            LOG.error("failed to create instance of IStrategy: {} with error: {}! Topology {} will not be scheduled.", new Object[]{td.getName(), td.getConf().get("topology.scheduler.strategy"), e.getMessage()});
                            topologySubmitter = this.cleanup(schedulingState, td);
                            topologySubmitter.moveTopoFromPendingToInvalid(td);
                            this.schedulingState.cluster.setStatus(td.getId(), "Unsuccessful in scheduling - failed to create instance of topology strategy " + td.getConf().get("topology.scheduler.strategy") + ". Please check logs for details");
                            return;
                        }
                        IEvictionStrategy evictionStrategy = null;
                        do {
                            result = null;
                            try {
                                rasStrategy.prepare(new SchedulingState(this.schedulingState));
                                result = rasStrategy.schedule(td);
                            }
                            catch (Exception ex) {
                                LOG.error(String.format("Exception thrown when running strategy %s to schedule topology %s. Topology will not be scheduled!", rasStrategy.getClass().getName(), td.getName()), (Throwable)ex);
                                topologySubmitter = this.cleanup(schedulingState, td);
                                topologySubmitter.moveTopoFromPendingToInvalid(td);
                                this.schedulingState.cluster.setStatus(td.getId(), "Unsuccessful in scheduling - Exception thrown when running strategy {}" + rasStrategy.getClass().getName() + ". Please check logs for details");
                            }
                            LOG.debug("scheduling result: {}", (Object)result);
                            if (result == null || !result.isValid()) break block17;
                            if (result.isSuccess()) {
                                try {
                                    if (this.mkAssignment(td, result.getSchedulingResultMap())) {
                                        topologySubmitter.moveTopoFromPendingToRunning(td);
                                        this.schedulingState.cluster.setStatus(td.getId(), "Running - " + result.getMessage());
                                        break block18;
                                    }
                                    topologySubmitter = this.cleanup(schedulingState, td);
                                    topologySubmitter.moveTopoFromPendingToAttempted(td);
                                    this.schedulingState.cluster.setStatus(td.getId(), "Unsuccessful in scheduling - Unable to assign executors to nodes. Please check logs for details");
                                }
                                catch (IllegalStateException ex) {
                                    LOG.error("Unsuccessful in scheduling - IllegalStateException thrown when attempting to assign executors to nodes.", (Throwable)ex);
                                    topologySubmitter = this.cleanup(schedulingState, td);
                                    topologySubmitter.moveTopoFromPendingToAttempted(td);
                                    this.schedulingState.cluster.setStatus(td.getId(), "Unsuccessful in scheduling - IllegalStateException thrown when attempting to assign executors to nodes. Please check log for details.");
                                }
                                break block18;
                            }
                            if (result.getStatus() != SchedulingStatus.FAIL_NOT_ENOUGH_RESOURCES) break block19;
                            if (evictionStrategy == null) {
                                try {
                                    evictionStrategy = (IEvictionStrategy)ReflectionUtils.newInstance((String)((String)this.conf.get("resource.aware.scheduler.eviction.strategy")));
                                }
                                catch (RuntimeException e) {
                                    LOG.error("failed to create instance of eviction strategy: {} with error: {}! No topology eviction will be done.", this.conf.get("resource.aware.scheduler.eviction.strategy"), (Object)e.getMessage());
                                    topologySubmitter.moveTopoFromPendingToAttempted(td);
                                    break block18;
                                }
                            }
                            madeSpace = false;
                            try {
                                evictionStrategy.prepare(this.schedulingState);
                                madeSpace = evictionStrategy.makeSpaceForTopo(td);
                            }
                            catch (Exception ex) {
                                LOG.error(String.format("Exception thrown when running eviction strategy %s to schedule topology %s. No evictions will be done! Error: %s", evictionStrategy.getClass().getName(), td.getName(), ex.getClass().getName()), (Throwable)ex);
                                topologySubmitter = this.cleanup(schedulingState, td);
                                topologySubmitter.moveTopoFromPendingToAttempted(td);
                                break block18;
                            }
                        } while (madeSpace);
                        LOG.debug("Could not make space for topo {} will move to attempted", (Object)td);
                        topologySubmitter = this.cleanup(schedulingState, td);
                        topologySubmitter.moveTopoFromPendingToAttempted(td);
                        this.schedulingState.cluster.setStatus(td.getId(), "Not enough resources to schedule - " + result.getErrorMessage());
                        break block18;
                    }
                    if (result.getStatus() == SchedulingStatus.FAIL_INVALID_TOPOLOGY) {
                        topologySubmitter = this.cleanup(schedulingState, td);
                        topologySubmitter.moveTopoFromPendingToInvalid(td, this.schedulingState.cluster);
                    } else {
                        topologySubmitter = this.cleanup(schedulingState, td);
                        topologySubmitter.moveTopoFromPendingToAttempted(td, this.schedulingState.cluster);
                    }
                    break block18;
                }
                LOG.warn("Scheduling results returned from topology {} is not vaild! Topology with be ignored.", (Object)td.getName());
                topologySubmitter = this.cleanup(schedulingState, td);
                topologySubmitter.moveTopoFromPendingToInvalid(td, this.schedulingState.cluster);
                break block18;
            }
            LOG.warn("Topology {} is already fully scheduled!", (Object)td.getName());
            topologySubmitter.moveTopoFromPendingToRunning(td);
            if (this.schedulingState.cluster.getStatusMap().get(td.getId()) == null || ((String)this.schedulingState.cluster.getStatusMap().get(td.getId())).equals("")) {
                this.schedulingState.cluster.setStatus(td.getId(), "Fully Scheduled");
            }
        }
    }

    private User cleanup(SchedulingState schedulingState, TopologyDetails td) {
        this.restoreCheckpointSchedulingState(schedulingState);
        return (User)this.schedulingState.userMap.get(td.getTopologySubmitter());
    }

    private boolean mkAssignment(TopologyDetails td, Map<WorkerSlot, Collection<ExecutorDetails>> schedulerAssignmentMap) {
        if (schedulerAssignmentMap != null) {
            double requestedMemOnHeap = td.getTotalRequestedMemOnHeap();
            double requestedMemOffHeap = td.getTotalRequestedMemOffHeap();
            double requestedCpu = td.getTotalRequestedCpu();
            double assignedMemOnHeap = 0.0;
            double assignedMemOffHeap = 0.0;
            double assignedCpu = 0.0;
            HashMap<WorkerSlot, Double[]> workerResources = new HashMap<WorkerSlot, Double[]>();
            HashSet<String> nodesUsed = new HashSet<String>();
            for (Map.Entry<WorkerSlot, Collection<ExecutorDetails>> workerToTasksEntry : schedulerAssignmentMap.entrySet()) {
                WorkerSlot targetSlot = workerToTasksEntry.getKey();
                Collection<ExecutorDetails> execsNeedScheduling = workerToTasksEntry.getValue();
                RAS_Node targetNode = this.schedulingState.nodes.getNodeById(targetSlot.getNodeId());
                targetSlot = this.allocateResourceToSlot(td, execsNeedScheduling, targetSlot);
                targetNode.assign(targetSlot, td, execsNeedScheduling);
                LOG.debug("ASSIGNMENT    TOPOLOGY: {}  TASKS: {} To Node: {} on Slot: {}", new Object[]{td.getName(), execsNeedScheduling, targetNode.getHostname(), targetSlot.getPort()});
                for (ExecutorDetails exec : execsNeedScheduling) {
                    targetNode.consumeResourcesforTask(exec, td);
                }
                if (!nodesUsed.contains(targetNode.getId())) {
                    nodesUsed.add(targetNode.getId());
                }
                assignedMemOnHeap += targetSlot.getAllocatedMemOnHeap();
                assignedMemOffHeap += targetSlot.getAllocatedMemOffHeap();
                assignedCpu += targetSlot.getAllocatedCpu();
                Double[] worker_resources = new Double[]{requestedMemOnHeap, requestedMemOffHeap, requestedCpu, targetSlot.getAllocatedMemOnHeap(), targetSlot.getAllocatedMemOffHeap(), targetSlot.getAllocatedCpu()};
                workerResources.put(targetSlot, worker_resources);
            }
            Double[] resources = new Double[]{requestedMemOnHeap, requestedMemOffHeap, requestedCpu, assignedMemOnHeap, assignedMemOffHeap, assignedCpu};
            LOG.debug("setTopologyResources for {}: requested on-heap mem, off-heap mem, cpu: {} {} {} assigned on-heap mem, off-heap mem, cpu: {} {} {}", new Object[]{td.getId(), requestedMemOnHeap, requestedMemOffHeap, requestedCpu, assignedMemOnHeap, assignedMemOffHeap, assignedCpu});
            this.schedulingState.cluster.setTopologyResources(td.getId(), resources);
            this.schedulingState.cluster.setWorkerResources(td.getId(), workerResources);
            return true;
        }
        LOG.warn("schedulerAssignmentMap for topo {} is null. This shouldn't happen!", (Object)td.getName());
        return false;
    }

    private WorkerSlot allocateResourceToSlot(TopologyDetails td, Collection<ExecutorDetails> executors, WorkerSlot slot) {
        double onHeapMem = 0.0;
        double offHeapMem = 0.0;
        double cpu = 0.0;
        for (ExecutorDetails exec : executors) {
            Double cpuForExec;
            Double offHeapMemForExec;
            Double onHeapMemForExec = td.getOnHeapMemoryRequirement(exec);
            if (onHeapMemForExec != null) {
                onHeapMem += onHeapMemForExec.doubleValue();
            }
            if ((offHeapMemForExec = td.getOffHeapMemoryRequirement(exec)) != null) {
                offHeapMem += offHeapMemForExec.doubleValue();
            }
            if ((cpuForExec = td.getTotalCpuReqTask(exec)) == null) continue;
            cpu += cpuForExec.doubleValue();
        }
        return new WorkerSlot(slot.getNodeId(), (Number)slot.getPort(), onHeapMem, offHeapMem, cpu);
    }

    private void updateSupervisorsResources(Cluster cluster, Topologies topologies) {
        HashMap supervisors_resources = new HashMap();
        Map nodes = RAS_Nodes.getAllNodesFrom((Cluster)cluster, (Topologies)topologies);
        for (Map.Entry entry : nodes.entrySet()) {
            RAS_Node node = (RAS_Node)entry.getValue();
            Double totalMem = node.getTotalMemoryResources();
            Double totalCpu = node.getTotalCpuResources();
            Double usedMem = totalMem - node.getAvailableMemoryResources();
            Double usedCpu = totalCpu - node.getAvailableCpuResources();
            Double[] resources = new Double[]{totalMem, totalCpu, usedMem, usedCpu};
            supervisors_resources.put(entry.getKey(), resources);
        }
        cluster.setSupervisorsResourcesMap(supervisors_resources);
    }

    public User getUser(String user) {
        return (User)this.schedulingState.userMap.get(user);
    }

    public Map<String, User> getUserMap() {
        return this.schedulingState.userMap;
    }

    private Map<String, User> getUsers(Topologies topologies, Cluster cluster) {
        HashMap<String, User> userMap = new HashMap<String, User>();
        Map<String, Map<String, Double>> userResourcePools = this.getUserResourcePools();
        LOG.debug("userResourcePools: {}", userResourcePools);
        for (TopologyDetails td : topologies.getTopologies()) {
            String topologySubmitter = td.getTopologySubmitter();
            if (topologySubmitter == null || topologySubmitter.equals("")) {
                LOG.error("Cannot determine user for topology {}.  Will skip scheduling this topology", (Object)td.getName());
                continue;
            }
            if (!userMap.containsKey(topologySubmitter)) {
                userMap.put(topologySubmitter, new User(topologySubmitter, userResourcePools.get(topologySubmitter)));
            }
            if (cluster.getUnassignedExecutors(td).size() > 0) {
                LOG.debug("adding td: {} to pending queue", (Object)td.getName());
                ((User)userMap.get(topologySubmitter)).addTopologyToPendingQueue(td);
                continue;
            }
            LOG.debug("adding td: {} to running queue with existing status: {}", (Object)td.getName(), cluster.getStatusMap().get(td.getId()));
            ((User)userMap.get(topologySubmitter)).addTopologyToRunningQueue(td);
            if (cluster.getStatusMap().get(td.getId()) != null && !((String)cluster.getStatusMap().get(td.getId())).equals("")) continue;
            cluster.setStatus(td.getId(), "Fully Scheduled");
        }
        return userMap;
    }

    private void initialize(Topologies topologies, Cluster cluster) {
        Map<String, User> userMap = this.getUsers(topologies, cluster);
        this.schedulingState = new SchedulingState(userMap, cluster, topologies, this.conf);
    }

    private Map<String, Map<String, Double>> getUserResourcePools() {
        Map fromFile;
        Map tmp;
        Object raw = this.conf.get("resource.aware.scheduler.user.pools");
        HashMap<String, Map<String, Double>> ret = new HashMap<String, Map<String, Double>>();
        if (raw != null) {
            for (Map.Entry userPoolEntry : ((Map)raw).entrySet()) {
                String user = (String)userPoolEntry.getKey();
                ret.put(user, new HashMap());
                for (Map.Entry resourceEntry : ((Map)userPoolEntry.getValue()).entrySet()) {
                    ((Map)ret.get(user)).put(resourceEntry.getKey(), ((Number)resourceEntry.getValue()).doubleValue());
                }
            }
        }
        if ((tmp = (Map)(fromFile = Utils.findAndReadConfigFile((String)"user-resource-pools.yaml", (boolean)false)).get("resource.aware.scheduler.user.pools")) != null) {
            for (Map.Entry userPoolEntry : tmp.entrySet()) {
                String user = (String)userPoolEntry.getKey();
                ret.put(user, new HashMap());
                for (Map.Entry resourceEntry : ((Map)userPoolEntry.getValue()).entrySet()) {
                    ((Map)ret.get(user)).put(resourceEntry.getKey(), ((Number)resourceEntry.getValue()).doubleValue());
                }
            }
        }
        return ret;
    }

    private SchedulingState checkpointSchedulingState() {
        LOG.debug("/*********Checkpoint scheduling state************/");
        for (User user : this.schedulingState.userMap.values()) {
            LOG.debug(user.getDetailedInfo());
        }
        LOG.debug(ResourceUtils.printScheduling((Cluster)this.schedulingState.cluster, (Topologies)this.schedulingState.topologies));
        LOG.debug("nodes:\n{}", (Object)this.schedulingState.nodes);
        LOG.debug("/*********End************/");
        return new SchedulingState(this.schedulingState);
    }

    private void restoreCheckpointSchedulingState(SchedulingState schedulingState) {
        LOG.debug("/*********restoring scheduling state************/");
        this.schedulingState = schedulingState;
        for (User user : this.schedulingState.userMap.values()) {
            LOG.debug(user.getDetailedInfo());
        }
        LOG.debug(ResourceUtils.printScheduling((Cluster)this.schedulingState.cluster, (Topologies)this.schedulingState.topologies));
        LOG.debug("nodes:\n{}", (Object)this.schedulingState.nodes);
        LOG.debug("/*********End************/");
    }
}

