/*
 * Decompiled with CFR 0.152.
 */
package org.apache.storm.scheduler.resource.strategies.scheduling;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.TreeSet;
import org.apache.commons.collections.ListUtils;
import org.apache.storm.scheduler.Cluster;
import org.apache.storm.scheduler.ExecutorDetails;
import org.apache.storm.scheduler.Topologies;
import org.apache.storm.scheduler.TopologyDetails;
import org.apache.storm.scheduler.WorkerSlot;
import org.apache.storm.scheduler.resource.Component;
import org.apache.storm.scheduler.resource.RAS_Node;
import org.apache.storm.scheduler.resource.RAS_Nodes;
import org.apache.storm.scheduler.resource.ResourceUtils;
import org.apache.storm.scheduler.resource.SchedulingResult;
import org.apache.storm.scheduler.resource.SchedulingState;
import org.apache.storm.scheduler.resource.SchedulingStatus;
import org.apache.storm.scheduler.resource.strategies.scheduling.IStrategy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DefaultResourceAwareStrategy
implements IStrategy {
    private static final Logger LOG = LoggerFactory.getLogger(DefaultResourceAwareStrategy.class);
    private Cluster _cluster;
    private Topologies _topologies;
    private Map<String, List<String>> _clusterInfo;
    private RAS_Nodes _nodes;
    private TreeSet<ObjectResources> _sortedRacks = null;
    private Map<String, TreeSet<ObjectResources>> _rackIdToSortedNodes = new HashMap<String, TreeSet<ObjectResources>>();

    @Override
    public void prepare(SchedulingState schedulingState) {
        this._cluster = schedulingState.cluster;
        this._topologies = schedulingState.topologies;
        this._nodes = schedulingState.nodes;
        this._clusterInfo = schedulingState.cluster.getNetworkTopography();
        LOG.debug(this.getClusterInfo());
    }

    @Override
    public SchedulingResult schedule(TopologyDetails td) {
        SchedulingResult result2;
        if (this._nodes.getNodes().size() <= 0) {
            LOG.warn("No available nodes to schedule tasks on!");
            return SchedulingResult.failure(SchedulingStatus.FAIL_NOT_ENOUGH_RESOURCES, "No available nodes to schedule tasks on!");
        }
        HashSet<ExecutorDetails> unassignedExecutors = new HashSet<ExecutorDetails>(this._cluster.getUnassignedExecutors(td));
        HashMap<WorkerSlot, Collection<ExecutorDetails>> schedulerAssignmentMap = new HashMap<WorkerSlot, Collection<ExecutorDetails>>();
        LOG.debug("ExecutorsNeedScheduling: {}", unassignedExecutors);
        ArrayList<ExecutorDetails> scheduledTasks = new ArrayList<ExecutorDetails>();
        List<Component> spouts = this.getSpouts(td);
        if (spouts.size() == 0) {
            LOG.error("Cannot find a Spout!");
            return SchedulingResult.failure(SchedulingStatus.FAIL_INVALID_TOPOLOGY, "Cannot find a Spout!");
        }
        List<ExecutorDetails> orderedExecutors = this.orderExecutors(td, unassignedExecutors);
        HashSet<ExecutorDetails> executorsNotScheduled = new HashSet<ExecutorDetails>(unassignedExecutors);
        for (ExecutorDetails exec : orderedExecutors) {
            LOG.debug("\n\nAttempting to schedule: {} of component {}[ REQ {} ]", new Object[]{exec, td.getExecutorToComponent().get(exec), td.getTaskResourceReqList(exec)});
            this.scheduleExecutor(exec, td, schedulerAssignmentMap, scheduledTasks);
        }
        executorsNotScheduled.removeAll(scheduledTasks);
        LOG.debug("/* Scheduling left over task (most likely sys tasks) */");
        for (ExecutorDetails exec : executorsNotScheduled) {
            this.scheduleExecutor(exec, td, schedulerAssignmentMap, scheduledTasks);
        }
        executorsNotScheduled.removeAll(scheduledTasks);
        if (executorsNotScheduled.size() > 0) {
            LOG.error("Not all executors successfully scheduled: {}", executorsNotScheduled);
            schedulerAssignmentMap = null;
            result2 = SchedulingResult.failure(SchedulingStatus.FAIL_NOT_ENOUGH_RESOURCES, td.getExecutors().size() - unassignedExecutors.size() + "/" + td.getExecutors().size() + " executors scheduled");
        } else {
            LOG.debug("All resources successfully scheduled!");
            result2 = SchedulingResult.successWithMsg(schedulerAssignmentMap, "Fully Scheduled by DefaultResourceAwareStrategy");
        }
        if (schedulerAssignmentMap == null) {
            LOG.error("Topology {} not successfully scheduled!", (Object)td.getId());
        }
        return result2;
    }

    private void scheduleExecutor(ExecutorDetails exec, TopologyDetails td, Map<WorkerSlot, Collection<ExecutorDetails>> schedulerAssignmentMap, Collection<ExecutorDetails> scheduledTasks) {
        WorkerSlot targetSlot = this.findWorkerForExec(exec, td, schedulerAssignmentMap);
        if (targetSlot != null) {
            RAS_Node targetNode = this.idToNode(targetSlot.getNodeId());
            if (!schedulerAssignmentMap.containsKey(targetSlot)) {
                schedulerAssignmentMap.put(targetSlot, new LinkedList());
            }
            schedulerAssignmentMap.get(targetSlot).add(exec);
            targetNode.consumeResourcesforTask(exec, td);
            scheduledTasks.add(exec);
            LOG.debug("TASK {} assigned to Node: {} avail [ mem: {} cpu: {} ] total [ mem: {} cpu: {} ] on slot: {} on Rack: {}", new Object[]{exec, targetNode.getHostname(), targetNode.getAvailableMemoryResources(), targetNode.getAvailableCpuResources(), targetNode.getTotalMemoryResources(), targetNode.getTotalCpuResources(), targetSlot, this.nodeToRack(targetNode)});
        } else {
            LOG.error("Not Enough Resources to schedule Task {}", (Object)exec);
        }
    }

    private WorkerSlot findWorkerForExec(ExecutorDetails exec, TopologyDetails td, Map<WorkerSlot, Collection<ExecutorDetails>> scheduleAssignmentMap) {
        WorkerSlot ws = null;
        if (this._sortedRacks == null) {
            this._sortedRacks = this.sortRacks(td.getId(), scheduleAssignmentMap);
        }
        for (ObjectResources rack : this._sortedRacks) {
            ws = this.getBestWorker(exec, td, rack.id, scheduleAssignmentMap);
            if (ws == null) continue;
            LOG.debug("best rack: {}", (Object)rack.id);
            break;
        }
        return ws;
    }

    private WorkerSlot getBestWorker(ExecutorDetails exec, TopologyDetails td, String rackId, Map<WorkerSlot, Collection<ExecutorDetails>> scheduleAssignmentMap) {
        if (!this._rackIdToSortedNodes.containsKey(rackId)) {
            this._rackIdToSortedNodes.put(rackId, this.sortNodes(this.getAvailableNodesFromRack(rackId), rackId, td.getId(), scheduleAssignmentMap));
        }
        TreeSet<ObjectResources> sortedNodes = this._rackIdToSortedNodes.get(rackId);
        double taskMem = td.getTotalMemReqTask(exec);
        double taskCPU = td.getTotalCpuReqTask(exec);
        for (ObjectResources nodeResources : sortedNodes) {
            RAS_Node n = this._nodes.getNodeById(nodeResources.id);
            if (!(n.getAvailableCpuResources() >= taskCPU) || !(n.getAvailableMemoryResources() >= taskMem) || n.getFreeSlots().size() <= 0) continue;
            for (WorkerSlot ws : n.getFreeSlots()) {
                if (!this.checkWorkerConstraints(exec, ws, td, scheduleAssignmentMap)) continue;
                return ws;
            }
        }
        return null;
    }

    private TreeSet<ObjectResources> sortNodes(List<RAS_Node> availNodes, String rackId, final String topoId, final Map<WorkerSlot, Collection<ExecutorDetails>> scheduleAssignmentMap) {
        AllResources allResources = new AllResources("RACK");
        List<ObjectResources> nodes = allResources.objectResources;
        HashMap nodeIdToRackId = new HashMap();
        for (RAS_Node ras_node : availNodes) {
            String nodeId = ras_node.getId();
            ObjectResources node = new ObjectResources(nodeId);
            double availMem = ras_node.getAvailableMemoryResources();
            double availCpu = ras_node.getAvailableCpuResources();
            int freeSlots = ras_node.totalSlotsFree();
            double totalMem = ras_node.getTotalMemoryResources();
            double totalCpu = ras_node.getTotalCpuResources();
            int totalSlots = ras_node.totalSlots();
            node.availMem = availMem;
            node.totalMem = totalMem;
            node.availCpu = availCpu;
            node.totalCpu = totalCpu;
            nodes.add(node);
            allResources.availMemResourcesOverall += availMem;
            allResources.availCpuResourcesOverall += availCpu;
            allResources.totalMemResourcesOverall += totalMem;
            allResources.totalCpuResourcesOverall += totalCpu;
        }
        LOG.debug("Rack {}: Overall Avail [ CPU {} MEM {} ] Total [ CPU {} MEM {} ]", new Object[]{rackId, allResources.availCpuResourcesOverall, allResources.availMemResourcesOverall, allResources.totalCpuResourcesOverall, allResources.totalMemResourcesOverall});
        return this.sortObjectResources(allResources, new ExistingScheduleFunc(){

            @Override
            public int getNumExistingSchedule(String objectId) {
                WorkerSlot workerSlot;
                LinkedList<ExecutorDetails> execs = new LinkedList<ExecutorDetails>();
                if (DefaultResourceAwareStrategy.this._cluster.getAssignmentById(topoId) != null) {
                    for (Map.Entry<Object, Object> entry : DefaultResourceAwareStrategy.this._cluster.getAssignmentById(topoId).getExecutorToSlot().entrySet()) {
                        workerSlot = (WorkerSlot)entry.getValue();
                        ExecutorDetails exec = (ExecutorDetails)entry.getKey();
                        if (!workerSlot.getNodeId().equals(objectId)) continue;
                        execs.add(exec);
                    }
                }
                for (Map.Entry entry : scheduleAssignmentMap.entrySet()) {
                    workerSlot = (WorkerSlot)entry.getKey();
                    if (!workerSlot.getNodeId().equals(objectId)) continue;
                    execs.addAll((Collection)entry.getValue());
                }
                return execs.size();
            }
        });
    }

    TreeSet<ObjectResources> sortRacks(final String topoId, final Map<WorkerSlot, Collection<ExecutorDetails>> scheduleAssignmentMap) {
        AllResources allResources = new AllResources("Cluster");
        List<ObjectResources> racks = allResources.objectResources;
        final HashMap<String, String> nodeIdToRackId = new HashMap<String, String>();
        for (Map.Entry<String, List<String>> entry : this._clusterInfo.entrySet()) {
            String rackId = entry.getKey();
            List<String> nodeIds = entry.getValue();
            ObjectResources rack = new ObjectResources(rackId);
            racks.add(rack);
            for (String nodeId : nodeIds) {
                RAS_Node node = this._nodes.getNodeById(this.NodeHostnameToId(nodeId));
                double availMem = node.getAvailableMemoryResources();
                double availCpu = node.getAvailableCpuResources();
                double totalMem = node.getTotalMemoryResources();
                double totalCpu = node.getTotalCpuResources();
                rack.availMem += availMem;
                rack.totalMem += totalMem;
                rack.availCpu += availCpu;
                rack.totalCpu += totalCpu;
                nodeIdToRackId.put(nodeId, rack.id);
                allResources.availMemResourcesOverall += availMem;
                allResources.availCpuResourcesOverall += availCpu;
                allResources.totalMemResourcesOverall += totalMem;
                allResources.totalCpuResourcesOverall += totalCpu;
            }
        }
        LOG.debug("Cluster Overall Avail [ CPU {} MEM {} ] Total [ CPU {} MEM {} ]", new Object[]{allResources.availCpuResourcesOverall, allResources.availMemResourcesOverall, allResources.totalCpuResourcesOverall, allResources.totalMemResourcesOverall});
        return this.sortObjectResources(allResources, new ExistingScheduleFunc(){

            @Override
            public int getNumExistingSchedule(String objectId) {
                String rackId = objectId;
                LinkedList<ExecutorDetails> execs = new LinkedList<ExecutorDetails>();
                if (DefaultResourceAwareStrategy.this._cluster.getAssignmentById(topoId) != null) {
                    for (Map.Entry<Object, Object> entry : DefaultResourceAwareStrategy.this._cluster.getAssignmentById(topoId).getExecutorToSlot().entrySet()) {
                        String nodeId = ((WorkerSlot)entry.getValue()).getNodeId();
                        String hostname = DefaultResourceAwareStrategy.this.idToNode(nodeId).getHostname();
                        ExecutorDetails exec = (ExecutorDetails)entry.getKey();
                        if (nodeIdToRackId.get(hostname) == null || !((String)nodeIdToRackId.get(hostname)).equals(rackId)) continue;
                        execs.add(exec);
                    }
                }
                for (Map.Entry entry : scheduleAssignmentMap.entrySet()) {
                    WorkerSlot workerSlot = (WorkerSlot)entry.getKey();
                    String nodeId = workerSlot.getNodeId();
                    String hostname = DefaultResourceAwareStrategy.this.idToNode(nodeId).getHostname();
                    if (!((String)nodeIdToRackId.get(hostname)).equals(rackId)) continue;
                    execs.addAll((Collection)entry.getValue());
                }
                return execs.size();
            }
        });
    }

    private TreeSet<ObjectResources> sortObjectResources(final AllResources allResources, final ExistingScheduleFunc existingScheduleFunc) {
        for (ObjectResources objectResources : allResources.objectResources) {
            StringBuilder sb = new StringBuilder();
            if (allResources.availCpuResourcesOverall <= 0.0 || allResources.availMemResourcesOverall <= 0.0) {
                objectResources.effectiveResources = 0.0;
            } else {
                LinkedList<Double> values = new LinkedList<Double>();
                double cpuPercent = objectResources.availCpu / allResources.availCpuResourcesOverall * 100.0;
                values.add(cpuPercent);
                sb.append(String.format("CPU %f(%f%%) ", objectResources.availCpu, cpuPercent));
                double memoryPercent = objectResources.availMem / allResources.availMemResourcesOverall * 100.0;
                values.add(memoryPercent);
                sb.append(String.format("MEM %f(%f%%) ", objectResources.availMem, memoryPercent));
                objectResources.effectiveResources = (Double)Collections.min(values);
            }
            LOG.debug("{}: Avail [ {} ] Total [ CPU {} MEM {}] effective resources: {}", new Object[]{objectResources.id, sb.toString(), objectResources.totalCpu, objectResources.totalMem, objectResources.effectiveResources});
        }
        TreeSet<ObjectResources> sortedObjectResources = new TreeSet<ObjectResources>(new Comparator<ObjectResources>(){

            @Override
            public int compare(ObjectResources o1, ObjectResources o2) {
                int execsScheduled2;
                int execsScheduled1 = existingScheduleFunc.getNumExistingSchedule(o1.id);
                if (execsScheduled1 > (execsScheduled2 = existingScheduleFunc.getNumExistingSchedule(o2.id))) {
                    return -1;
                }
                if (execsScheduled1 < execsScheduled2) {
                    return 1;
                }
                if (o1.effectiveResources > o2.effectiveResources) {
                    return -1;
                }
                if (o1.effectiveResources < o2.effectiveResources) {
                    return 1;
                }
                LinkedList<Double> o1_values = new LinkedList<Double>();
                LinkedList<Double> o2_values = new LinkedList<Double>();
                o1_values.add(o1.availCpu / allResources.availCpuResourcesOverall * 100.0);
                o2_values.add(o2.availCpu / allResources.availCpuResourcesOverall * 100.0);
                o1_values.add(o1.availMem / allResources.availMemResourcesOverall * 100.0);
                o2_values.add(o2.availMem / allResources.availMemResourcesOverall * 100.0);
                double o1_avg = ResourceUtils.avg(o1_values);
                double o2_avg = ResourceUtils.avg(o2_values);
                if (o1_avg > o2_avg) {
                    return -1;
                }
                if (o1_avg < o2_avg) {
                    return 1;
                }
                return o1.id.compareTo(o2.id);
            }
        });
        sortedObjectResources.addAll(allResources.objectResources);
        LOG.debug("Sorted Object Resources: {}", sortedObjectResources);
        return sortedObjectResources;
    }

    private String nodeToRack(RAS_Node node) {
        for (Map.Entry<String, List<String>> entry : this._clusterInfo.entrySet()) {
            if (!entry.getValue().contains(node.getHostname())) continue;
            return entry.getKey();
        }
        LOG.error("Node: {} not found in any racks", (Object)node.getHostname());
        return null;
    }

    private List<RAS_Node> getAvailableNodesFromRack(String rackId) {
        ArrayList<RAS_Node> retList = new ArrayList<RAS_Node>();
        for (String node_id : this._clusterInfo.get(rackId)) {
            retList.add(this._nodes.getNodeById(this.NodeHostnameToId(node_id)));
        }
        return retList;
    }

    private Set<Component> sortComponents(final Map<String, Component> componentMap) {
        TreeSet<Component> sortedComponents = new TreeSet<Component>(new Comparator<Component>(){

            @Override
            public int compare(Component o1, Component o2) {
                int connections1 = 0;
                int connections2 = 0;
                for (String childId : ListUtils.union(o1.children, o1.parents)) {
                    connections1 += ((Component)componentMap.get((Object)childId)).execs.size() * o1.execs.size();
                }
                for (String childId : ListUtils.union(o2.children, o2.parents)) {
                    connections2 += ((Component)componentMap.get((Object)childId)).execs.size() * o2.execs.size();
                }
                if (connections1 > connections1) {
                    return -1;
                }
                if (connections1 < connections2) {
                    return 1;
                }
                return o1.id.compareTo(o2.id);
            }
        });
        sortedComponents.addAll(componentMap.values());
        return sortedComponents;
    }

    private Set<Component> sortNeighbors(final Component thisComp, Map<String, Component> componentMap) {
        TreeSet<Component> sortedComponents = new TreeSet<Component>(new Comparator<Component>(){

            @Override
            public int compare(Component o1, Component o2) {
                int connections2;
                int connections1 = o1.execs.size() * thisComp.execs.size();
                if (connections1 > (connections2 = o2.execs.size() * thisComp.execs.size())) {
                    return -1;
                }
                if (connections1 < connections2) {
                    return 1;
                }
                return o1.id.compareTo(o2.id);
            }
        });
        sortedComponents.addAll(componentMap.values());
        return sortedComponents;
    }

    private List<ExecutorDetails> orderExecutors(TopologyDetails td, Collection<ExecutorDetails> unassignedExecutors) {
        Map<String, Component> componentMap = td.getComponents();
        LinkedList<ExecutorDetails> execsScheduled = new LinkedList<ExecutorDetails>();
        HashMap compToExecsToSchedule = new HashMap();
        for (Component component : componentMap.values()) {
            compToExecsToSchedule.put(component.id, new LinkedList());
            for (ExecutorDetails exec : component.execs) {
                if (!unassignedExecutors.contains(exec)) continue;
                ((Queue)compToExecsToSchedule.get(component.id)).add(exec);
            }
        }
        Set<Component> sortedComponents = this.sortComponents(componentMap);
        sortedComponents.addAll(componentMap.values());
        for (Component currComp : sortedComponents) {
            HashMap<String, Component> neighbors = new HashMap<String, Component>();
            for (String compId : ListUtils.union(currComp.children, currComp.parents)) {
                neighbors.put(compId, componentMap.get(compId));
            }
            Set<Component> sortedNeighbors = this.sortNeighbors(currComp, neighbors);
            Queue currCompExesToSched = (Queue)compToExecsToSchedule.get(currComp.id);
            boolean flag = false;
            do {
                flag = false;
                if (!currCompExesToSched.isEmpty()) {
                    execsScheduled.add((ExecutorDetails)currCompExesToSched.poll());
                    flag = true;
                }
                for (Component neighborComp : sortedNeighbors) {
                    Queue neighborCompExesToSched = (Queue)compToExecsToSchedule.get(neighborComp.id);
                    if (neighborCompExesToSched.isEmpty()) continue;
                    execsScheduled.add((ExecutorDetails)neighborCompExesToSched.poll());
                    flag = true;
                }
            } while (flag);
        }
        return execsScheduled;
    }

    private List<Component> getSpouts(TopologyDetails td) {
        ArrayList<Component> spouts = new ArrayList<Component>();
        for (Component c : td.getComponents().values()) {
            if (c.type != Component.ComponentType.SPOUT) continue;
            spouts.add(c);
        }
        return spouts;
    }

    private Double getWorkerScheduledMemoryAvailable(WorkerSlot ws, TopologyDetails td, Map<WorkerSlot, Collection<ExecutorDetails>> scheduleAssignmentMap) {
        Double memScheduleUsed = this.getWorkerScheduledMemoryUse(ws, td, scheduleAssignmentMap);
        return td.getTopologyWorkerMaxHeapSize() - memScheduleUsed;
    }

    private Double getWorkerScheduledMemoryUse(WorkerSlot ws, TopologyDetails td, Map<WorkerSlot, Collection<ExecutorDetails>> scheduleAssignmentMap) {
        Double totalMem = 0.0;
        Collection<ExecutorDetails> execs = scheduleAssignmentMap.get(ws);
        if (execs != null) {
            for (ExecutorDetails exec : execs) {
                totalMem = totalMem + td.getTotalMemReqTask(exec);
            }
        }
        return totalMem;
    }

    private boolean checkWorkerConstraints(ExecutorDetails exec, WorkerSlot ws, TopologyDetails td, Map<WorkerSlot, Collection<ExecutorDetails>> scheduleAssignmentMap) {
        boolean retVal = false;
        if (this.getWorkerScheduledMemoryAvailable(ws, td, scheduleAssignmentMap) >= td.getTotalMemReqTask(exec)) {
            retVal = true;
        }
        return retVal;
    }

    private String getClusterInfo() {
        String retVal = "Cluster info:\n";
        for (Map.Entry<String, List<String>> clusterEntry : this._clusterInfo.entrySet()) {
            String clusterId = clusterEntry.getKey();
            retVal = retVal + "Rack: " + clusterId + "\n";
            for (String nodeHostname : clusterEntry.getValue()) {
                RAS_Node node = this.idToNode(this.NodeHostnameToId(nodeHostname));
                retVal = retVal + "-> Node: " + node.getHostname() + " " + node.getId() + "\n";
                retVal = retVal + "--> Avail Resources: {Mem " + node.getAvailableMemoryResources() + ", CPU " + node.getAvailableCpuResources() + " Slots: " + node.totalSlotsFree() + "}\n";
                retVal = retVal + "--> Total Resources: {Mem " + node.getTotalMemoryResources() + ", CPU " + node.getTotalCpuResources() + " Slots: " + node.totalSlots() + "}\n";
            }
        }
        return retVal;
    }

    public String NodeHostnameToId(String hostname) {
        for (RAS_Node n : this._nodes.getNodes()) {
            if (n.getHostname() == null || !n.getHostname().equals(hostname)) continue;
            return n.getId();
        }
        LOG.error("Cannot find Node with hostname {}", (Object)hostname);
        return null;
    }

    public RAS_Node idToNode(String id) {
        RAS_Node ret = this._nodes.getNodeById(id);
        if (ret == null) {
            LOG.error("Cannot find Node with Id: {}", (Object)id);
        }
        return ret;
    }

    static class ObjectResources {
        String id;
        double availMem = 0.0;
        double totalMem = 0.0;
        double availCpu = 0.0;
        double totalCpu = 0.0;
        double effectiveResources = 0.0;

        public ObjectResources(String id) {
            this.id = id;
        }

        public String toString() {
            return this.id;
        }
    }

    static class AllResources {
        List<ObjectResources> objectResources = new LinkedList<ObjectResources>();
        double availMemResourcesOverall = 0.0;
        double totalMemResourcesOverall = 0.0;
        double availCpuResourcesOverall = 0.0;
        double totalCpuResourcesOverall = 0.0;
        String identifier;

        public AllResources(String identifier) {
            this.identifier = identifier;
        }
    }

    private static interface ExistingScheduleFunc {
        public int getNumExistingSchedule(String var1);
    }
}

