/*
 * Decompiled with CFR 0.152.
 */
package org.apache.storm.stats;

import com.google.common.collect.Lists;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.storm.cluster.ExecutorBeat;
import org.apache.storm.cluster.IStormClusterState;
import org.apache.storm.generated.Bolt;
import org.apache.storm.generated.BoltAggregateStats;
import org.apache.storm.generated.BoltStats;
import org.apache.storm.generated.ClusterWorkerHeartbeat;
import org.apache.storm.generated.CommonAggregateStats;
import org.apache.storm.generated.ComponentAggregateStats;
import org.apache.storm.generated.ComponentPageInfo;
import org.apache.storm.generated.ComponentType;
import org.apache.storm.generated.ErrorInfo;
import org.apache.storm.generated.ExecutorAggregateStats;
import org.apache.storm.generated.ExecutorInfo;
import org.apache.storm.generated.ExecutorSpecificStats;
import org.apache.storm.generated.ExecutorStats;
import org.apache.storm.generated.ExecutorSummary;
import org.apache.storm.generated.GlobalStreamId;
import org.apache.storm.generated.SpecificAggregateStats;
import org.apache.storm.generated.SpoutAggregateStats;
import org.apache.storm.generated.SpoutStats;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.generated.TopologyPageInfo;
import org.apache.storm.generated.TopologyStats;
import org.apache.storm.generated.WorkerResources;
import org.apache.storm.generated.WorkerSummary;
import org.apache.storm.scheduler.WorkerSlot;
import org.apache.storm.utils.Time;
import org.apache.storm.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class StatsUtil {
    private static final Logger LOG = LoggerFactory.getLogger(StatsUtil.class);
    public static final String TYPE = "type";
    public static final String SPOUT = "spout";
    public static final String BOLT = "bolt";
    private static final String UPTIME = "uptime";
    private static final String HOST = "host";
    private static final String PORT = "port";
    private static final String NUM_TASKS = "num-tasks";
    private static final String NUM_EXECUTORS = "num-executors";
    private static final String NUM_WORKERS = "num-workers";
    private static final String CAPACITY = "capacity";
    private static final String STATS = "stats";
    private static final String EXECUTOR_STATS = "executor-stats";
    private static final String EXECUTOR_ID = "executor-id";
    private static final String LAST_ERROR = "lastError";
    private static final String HEARTBEAT = "heartbeat";
    private static final String TIME_SECS = "time-secs";
    private static final String RATE = "rate";
    private static final String ACKED = "acked";
    private static final String FAILED = "failed";
    private static final String EXECUTED = "executed";
    private static final String EMITTED = "emitted";
    private static final String TRANSFERRED = "transferred";
    private static final String EXEC_LATENCIES = "execute-latencies";
    private static final String PROC_LATENCIES = "process-latencies";
    private static final String COMP_LATENCIES = "complete-latencies";
    private static final String EXEC_LATENCY = "execute-latency";
    private static final String PROC_LATENCY = "process-latency";
    private static final String COMP_LATENCY = "complete-latency";
    private static final String EXEC_LAT_TOTAL = "executeLatencyTotal";
    private static final String PROC_LAT_TOTAL = "processLatencyTotal";
    private static final String COMP_LAT_TOTAL = "completeLatencyTotal";
    private static final String WIN_TO_EMITTED = "window->emitted";
    private static final String WIN_TO_ACKED = "window->acked";
    private static final String WIN_TO_FAILED = "window->failed";
    private static final String WIN_TO_EXECUTED = "window->executed";
    private static final String WIN_TO_TRANSFERRED = "window->transferred";
    private static final String WIN_TO_EXEC_LAT = "window->execute-latency";
    private static final String WIN_TO_PROC_LAT = "window->process-latency";
    private static final String WIN_TO_COMP_LAT = "window->complete-latency";
    private static final String WIN_TO_COMP_LAT_WGT_AVG = "window->comp-lat-wgt-avg";
    private static final String WIN_TO_EXEC_LAT_WGT_AVG = "window->exec-lat-wgt-avg";
    private static final String WIN_TO_PROC_LAT_WGT_AVG = "window->proc-lat-wgt-avg";
    private static final String BOLT_TO_STATS = "bolt-id->stats";
    private static final String SPOUT_TO_STATS = "spout-id->stats";
    private static final String SID_TO_OUT_STATS = "sid->output-stats";
    private static final String CID_SID_TO_IN_STATS = "cid+sid->input-stats";
    private static final String WORKERS_SET = "workers-set";
    public static final int TEN_MIN_IN_SECONDS = 600;
    public static final String TEN_MIN_IN_SECONDS_STR = "600";
    public static final IdentityTransformer IDENTITY = new IdentityTransformer();
    private static final ToStringTransformer TO_STRING = new ToStringTransformer();
    private static final FromGlobalStreamIdTransformer FROM_GSID = new FromGlobalStreamIdTransformer();
    public static final ToGlobalStreamIdTransformer TO_GSID = new ToGlobalStreamIdTransformer();

    public static Map<String, Number> aggBoltLatAndCount(Map<List<String>, Double> id2execAvg, Map<List<String>, Double> id2procAvg, Map<List<String>, Long> id2numExec) {
        HashMap<String, Number> ret = new HashMap<String, Number>();
        StatsUtil.putKV(ret, EXEC_LAT_TOTAL, StatsUtil.weightAvgAndSum(id2execAvg, id2numExec));
        StatsUtil.putKV(ret, PROC_LAT_TOTAL, StatsUtil.weightAvgAndSum(id2procAvg, id2numExec));
        StatsUtil.putKV(ret, EXECUTED, StatsUtil.sumValues(id2numExec));
        return ret;
    }

    public static Map<String, Number> aggSpoutLatAndCount(Map<String, Double> id2compAvg, Map<String, Long> id2numAcked) {
        HashMap<String, Number> ret = new HashMap<String, Number>();
        StatsUtil.putKV(ret, COMP_LAT_TOTAL, StatsUtil.weightAvgAndSum(id2compAvg, id2numAcked));
        StatsUtil.putKV(ret, ACKED, StatsUtil.sumValues(id2numAcked));
        return ret;
    }

    public static <K> Map<K, Map> aggBoltStreamsLatAndCount(Map<K, Double> id2execAvg, Map<K, Double> id2procAvg, Map<K, Long> id2numExec) {
        HashMap ret = new HashMap();
        if (id2execAvg == null || id2procAvg == null || id2numExec == null) {
            return ret;
        }
        for (K k : id2execAvg.keySet()) {
            HashMap subMap = new HashMap();
            StatsUtil.putKV(subMap, EXEC_LAT_TOTAL, StatsUtil.weightAvg(id2execAvg, id2numExec, k));
            StatsUtil.putKV(subMap, PROC_LAT_TOTAL, StatsUtil.weightAvg(id2procAvg, id2numExec, k));
            StatsUtil.putKV(subMap, EXECUTED, id2numExec.get(k));
            ret.put(k, subMap);
        }
        return ret;
    }

    public static <K> Map<K, Map> aggSpoutStreamsLatAndCount(Map<K, Double> id2compAvg, Map<K, Long> id2acked) {
        HashMap ret = new HashMap();
        if (id2compAvg == null || id2acked == null) {
            return ret;
        }
        for (K k : id2compAvg.keySet()) {
            HashMap subMap = new HashMap();
            StatsUtil.putKV(subMap, COMP_LAT_TOTAL, StatsUtil.weightAvg(id2compAvg, id2acked, k));
            StatsUtil.putKV(subMap, ACKED, id2acked.get(k));
            ret.put(k, subMap);
        }
        return ret;
    }

    public static Map<String, Object> aggPreMergeCompPageBolt(Map<String, Object> beat, String window, boolean includeSys) {
        HashMap<String, Object> ret = new HashMap<String, Object>();
        StatsUtil.putKV(ret, EXECUTOR_ID, StatsUtil.getByKey(beat, "exec-id"));
        StatsUtil.putKV(ret, HOST, StatsUtil.getByKey(beat, HOST));
        StatsUtil.putKV(ret, PORT, StatsUtil.getByKey(beat, PORT));
        StatsUtil.putKV(ret, UPTIME, StatsUtil.getByKey(beat, UPTIME));
        StatsUtil.putKV(ret, NUM_EXECUTORS, 1);
        StatsUtil.putKV(ret, NUM_TASKS, StatsUtil.getByKey(beat, NUM_TASKS));
        Map stat2win2sid2num = StatsUtil.getMapByKey(beat, STATS);
        StatsUtil.putKV(ret, CAPACITY, StatsUtil.computeAggCapacity(stat2win2sid2num, StatsUtil.getByKeyOr0(beat, UPTIME).intValue()));
        Map inputStats = new HashMap();
        Map sid2acked = (Map)StatsUtil.windowSetConverter(StatsUtil.getMapByKey(stat2win2sid2num, ACKED), TO_STRING).get(window);
        Map sid2failed = (Map)StatsUtil.windowSetConverter(StatsUtil.getMapByKey(stat2win2sid2num, FAILED), TO_STRING).get(window);
        StatsUtil.putKV(inputStats, ACKED, sid2acked != null ? sid2acked : new HashMap());
        StatsUtil.putKV(inputStats, FAILED, sid2failed != null ? sid2failed : new HashMap());
        inputStats = StatsUtil.swapMapOrder(inputStats);
        Map sid2execLat = (Map)StatsUtil.windowSetConverter(StatsUtil.getMapByKey(stat2win2sid2num, EXEC_LATENCIES), TO_STRING).get(window);
        Map sid2procLat = (Map)StatsUtil.windowSetConverter(StatsUtil.getMapByKey(stat2win2sid2num, PROC_LATENCIES), TO_STRING).get(window);
        Map sid2exec = (Map)StatsUtil.windowSetConverter(StatsUtil.getMapByKey(stat2win2sid2num, EXECUTED), TO_STRING).get(window);
        StatsUtil.mergeMaps(inputStats, StatsUtil.aggBoltStreamsLatAndCount(sid2execLat, sid2procLat, sid2exec));
        StatsUtil.putKV(ret, CID_SID_TO_IN_STATS, inputStats);
        Map outputStats = new HashMap();
        Map sid2emitted = (Map)StatsUtil.windowSetConverter(StatsUtil.getMapByKey(stat2win2sid2num, EMITTED), TO_STRING).get(window);
        Map sid2transferred = (Map)StatsUtil.windowSetConverter(StatsUtil.getMapByKey(stat2win2sid2num, TRANSFERRED), TO_STRING).get(window);
        if (sid2emitted != null) {
            StatsUtil.putKV(outputStats, EMITTED, StatsUtil.filterSysStreams2Stat(sid2emitted, includeSys));
        } else {
            StatsUtil.putKV(outputStats, EMITTED, new HashMap());
        }
        if (sid2transferred != null) {
            StatsUtil.putKV(outputStats, TRANSFERRED, StatsUtil.filterSysStreams2Stat(sid2transferred, includeSys));
        } else {
            StatsUtil.putKV(outputStats, TRANSFERRED, new HashMap());
        }
        outputStats = StatsUtil.swapMapOrder(outputStats);
        StatsUtil.putKV(ret, SID_TO_OUT_STATS, outputStats);
        return ret;
    }

    public static Map<String, Object> aggPreMergeCompPageSpout(Map<String, Object> beat, String window, boolean includeSys) {
        HashMap<String, Object> ret = new HashMap<String, Object>();
        StatsUtil.putKV(ret, EXECUTOR_ID, StatsUtil.getByKey(beat, "exec-id"));
        StatsUtil.putKV(ret, HOST, StatsUtil.getByKey(beat, HOST));
        StatsUtil.putKV(ret, PORT, StatsUtil.getByKey(beat, PORT));
        StatsUtil.putKV(ret, UPTIME, StatsUtil.getByKey(beat, UPTIME));
        StatsUtil.putKV(ret, NUM_EXECUTORS, 1);
        StatsUtil.putKV(ret, NUM_TASKS, StatsUtil.getByKey(beat, NUM_TASKS));
        Map stat2win2sid2num = StatsUtil.getMapByKey(beat, STATS);
        Map outputStats = new HashMap();
        Map win2sid2acked = StatsUtil.windowSetConverter(StatsUtil.getMapByKey(stat2win2sid2num, ACKED), TO_STRING);
        Map win2sid2failed = StatsUtil.windowSetConverter(StatsUtil.getMapByKey(stat2win2sid2num, FAILED), TO_STRING);
        Map win2sid2emitted = StatsUtil.windowSetConverter(StatsUtil.getMapByKey(stat2win2sid2num, EMITTED), TO_STRING);
        Map win2sid2transferred = StatsUtil.windowSetConverter(StatsUtil.getMapByKey(stat2win2sid2num, TRANSFERRED), TO_STRING);
        Map win2sid2compLat = StatsUtil.windowSetConverter(StatsUtil.getMapByKey(stat2win2sid2num, COMP_LATENCIES), TO_STRING);
        StatsUtil.putKV(outputStats, ACKED, win2sid2acked.get(window));
        StatsUtil.putKV(outputStats, FAILED, win2sid2failed.get(window));
        HashMap sid2emitted = (HashMap)win2sid2emitted.get(window);
        if (sid2emitted == null) {
            sid2emitted = new HashMap();
        }
        StatsUtil.putKV(outputStats, EMITTED, StatsUtil.filterSysStreams2Stat(sid2emitted, includeSys));
        HashMap sid2transferred = (HashMap)win2sid2transferred.get(window);
        if (sid2transferred == null) {
            sid2transferred = new HashMap();
        }
        StatsUtil.putKV(outputStats, TRANSFERRED, StatsUtil.filterSysStreams2Stat(sid2transferred, includeSys));
        outputStats = StatsUtil.swapMapOrder(outputStats);
        Map sid2compLat = (Map)win2sid2compLat.get(window);
        Map sid2acked = (Map)win2sid2acked.get(window);
        StatsUtil.mergeMaps(outputStats, StatsUtil.aggSpoutStreamsLatAndCount(sid2compLat, sid2acked));
        StatsUtil.putKV(ret, SID_TO_OUT_STATS, outputStats);
        return ret;
    }

    public static <K, V extends Number> Map<String, Object> aggPreMergeTopoPageBolt(Map<String, Object> beat, String window, boolean includeSys) {
        HashMap<String, Object> ret = new HashMap<String, Object>();
        HashMap<String, Number> subRet = new HashMap<String, Number>();
        StatsUtil.putKV(subRet, NUM_EXECUTORS, 1);
        StatsUtil.putKV(subRet, NUM_TASKS, StatsUtil.getByKey(beat, NUM_TASKS));
        Map<K, V> stat2win2sid2num = StatsUtil.getMapByKey(beat, STATS);
        StatsUtil.putKV(subRet, CAPACITY, StatsUtil.computeAggCapacity(stat2win2sid2num, StatsUtil.getByKeyOr0(beat, UPTIME).intValue()));
        for (String key : new String[]{EMITTED, TRANSFERRED, ACKED, FAILED}) {
            Map<String, Map<K, V>> stat = StatsUtil.windowSetConverter(StatsUtil.getMapByKey(stat2win2sid2num, key), TO_STRING);
            if (EMITTED.equals(key) || TRANSFERRED.equals(key)) {
                stat = StatsUtil.filterSysStreams(stat, includeSys);
            }
            Map<K, V> winStat = stat.get(window);
            long sum = 0L;
            if (winStat != null) {
                for (Number v : winStat.values()) {
                    sum += v.longValue();
                }
            }
            StatsUtil.putKV(subRet, key, sum);
        }
        Map win2sid2execLat = StatsUtil.windowSetConverter(StatsUtil.getMapByKey(stat2win2sid2num, EXEC_LATENCIES), TO_STRING);
        Map win2sid2procLat = StatsUtil.windowSetConverter(StatsUtil.getMapByKey(stat2win2sid2num, PROC_LATENCIES), TO_STRING);
        Map win2sid2exec = StatsUtil.windowSetConverter(StatsUtil.getMapByKey(stat2win2sid2num, EXECUTED), TO_STRING);
        subRet.putAll(StatsUtil.aggBoltLatAndCount((Map)win2sid2execLat.get(window), (Map)win2sid2procLat.get(window), (Map)win2sid2exec.get(window)));
        ret.put((String)StatsUtil.getByKey(beat, "comp-id"), subRet);
        return ret;
    }

    public static <K, V extends Number> Map<String, Object> aggPreMergeTopoPageSpout(Map<String, Object> m, String window, boolean includeSys) {
        HashMap<String, Object> ret = new HashMap<String, Object>();
        HashMap<String, Number> subRet = new HashMap<String, Number>();
        StatsUtil.putKV(subRet, NUM_EXECUTORS, 1);
        StatsUtil.putKV(subRet, NUM_TASKS, StatsUtil.getByKey(m, NUM_TASKS));
        Map<K, V> stat2win2sid2num = StatsUtil.getMapByKey(m, STATS);
        for (String key : new String[]{EMITTED, TRANSFERRED, FAILED}) {
            Map<String, Map<K, V>> stat = StatsUtil.windowSetConverter((Map)stat2win2sid2num.get(key), TO_STRING);
            if (EMITTED.equals(key) || TRANSFERRED.equals(key)) {
                stat = StatsUtil.filterSysStreams(stat, includeSys);
            }
            Map<K, V> winStat = stat.get(window);
            long sum = 0L;
            if (winStat != null) {
                for (Number v : winStat.values()) {
                    sum += v.longValue();
                }
            }
            StatsUtil.putKV(subRet, key, sum);
        }
        Map win2sid2compLat = StatsUtil.windowSetConverter(StatsUtil.getMapByKey(stat2win2sid2num, COMP_LATENCIES), TO_STRING);
        Map win2sid2acked = StatsUtil.windowSetConverter(StatsUtil.getMapByKey(stat2win2sid2num, ACKED), TO_STRING);
        subRet.putAll(StatsUtil.aggSpoutLatAndCount((Map)win2sid2compLat.get(window), (Map)win2sid2acked.get(window)));
        ret.put((String)StatsUtil.getByKey(m, "comp-id"), subRet);
        return ret;
    }

    public static Map<String, Object> mergeAggCompStatsCompPageBolt(Map<String, Object> accBoltStats, Map<String, Object> boltStats) {
        HashMap<String, Object> ret = new HashMap<String, Object>();
        Map accIn = StatsUtil.getMapByKey(accBoltStats, CID_SID_TO_IN_STATS);
        Map accOut = StatsUtil.getMapByKey(accBoltStats, SID_TO_OUT_STATS);
        Map boltIn = StatsUtil.getMapByKey(boltStats, CID_SID_TO_IN_STATS);
        Map boltOut = StatsUtil.getMapByKey(boltStats, SID_TO_OUT_STATS);
        int numExecutors = StatsUtil.getByKeyOr0(accBoltStats, NUM_EXECUTORS).intValue();
        StatsUtil.putKV(ret, NUM_EXECUTORS, numExecutors + 1);
        StatsUtil.putKV(ret, NUM_TASKS, StatsUtil.sumOr0(StatsUtil.getByKeyOr0(accBoltStats, NUM_TASKS), StatsUtil.getByKeyOr0(boltStats, NUM_TASKS)));
        StatsUtil.putKV(ret, SID_TO_OUT_STATS, StatsUtil.fullMergeWithSum(accOut, boltOut));
        StatsUtil.putKV(ret, CID_SID_TO_IN_STATS, StatsUtil.fullMergeWithSum(accIn, boltIn));
        long executed = StatsUtil.sumStreamsLong(boltIn, EXECUTED);
        StatsUtil.putKV(ret, EXECUTED, executed);
        HashMap executorStats = new HashMap();
        StatsUtil.putKV(executorStats, EXECUTOR_ID, boltStats.get(EXECUTOR_ID));
        StatsUtil.putKV(executorStats, UPTIME, boltStats.get(UPTIME));
        StatsUtil.putKV(executorStats, HOST, boltStats.get(HOST));
        StatsUtil.putKV(executorStats, PORT, boltStats.get(PORT));
        StatsUtil.putKV(executorStats, CAPACITY, boltStats.get(CAPACITY));
        StatsUtil.putKV(executorStats, EMITTED, StatsUtil.sumStreamsLong(boltOut, EMITTED));
        StatsUtil.putKV(executorStats, TRANSFERRED, StatsUtil.sumStreamsLong(boltOut, TRANSFERRED));
        StatsUtil.putKV(executorStats, ACKED, StatsUtil.sumStreamsLong(boltIn, ACKED));
        StatsUtil.putKV(executorStats, FAILED, StatsUtil.sumStreamsLong(boltIn, FAILED));
        StatsUtil.putKV(executorStats, EXECUTED, executed);
        if (executed > 0L) {
            StatsUtil.putKV(executorStats, EXEC_LATENCY, StatsUtil.sumStreamsDouble(boltIn, EXEC_LAT_TOTAL) / (double)executed);
            StatsUtil.putKV(executorStats, PROC_LATENCY, StatsUtil.sumStreamsDouble(boltIn, PROC_LAT_TOTAL) / (double)executed);
        } else {
            StatsUtil.putKV(executorStats, EXEC_LATENCY, null);
            StatsUtil.putKV(executorStats, PROC_LATENCY, null);
        }
        List executorStatsList = (List)StatsUtil.getByKey(accBoltStats, EXECUTOR_STATS);
        executorStatsList.add(executorStats);
        StatsUtil.putKV(ret, EXECUTOR_STATS, executorStatsList);
        return ret;
    }

    public static Map<String, Object> mergeAggCompStatsCompPageSpout(Map<String, Object> accSpoutStats, Map<String, Object> spoutStats) {
        HashMap<String, Object> ret = new HashMap<String, Object>();
        Map accOut = StatsUtil.getMapByKey(accSpoutStats, SID_TO_OUT_STATS);
        Map spoutOut = StatsUtil.getMapByKey(spoutStats, SID_TO_OUT_STATS);
        int numExecutors = StatsUtil.getByKeyOr0(accSpoutStats, NUM_EXECUTORS).intValue();
        StatsUtil.putKV(ret, NUM_EXECUTORS, numExecutors + 1);
        StatsUtil.putKV(ret, NUM_TASKS, StatsUtil.sumOr0(StatsUtil.getByKeyOr0(accSpoutStats, NUM_TASKS), StatsUtil.getByKeyOr0(spoutStats, NUM_TASKS)));
        StatsUtil.putKV(ret, SID_TO_OUT_STATS, StatsUtil.fullMergeWithSum(accOut, spoutOut));
        HashMap executorStats = new HashMap();
        StatsUtil.putKV(executorStats, EXECUTOR_ID, StatsUtil.getByKey(spoutStats, EXECUTOR_ID));
        StatsUtil.putKV(executorStats, UPTIME, StatsUtil.getByKey(spoutStats, UPTIME));
        StatsUtil.putKV(executorStats, HOST, StatsUtil.getByKey(spoutStats, HOST));
        StatsUtil.putKV(executorStats, PORT, StatsUtil.getByKey(spoutStats, PORT));
        StatsUtil.putKV(executorStats, EMITTED, StatsUtil.sumStreamsLong(spoutOut, EMITTED));
        StatsUtil.putKV(executorStats, TRANSFERRED, StatsUtil.sumStreamsLong(spoutOut, TRANSFERRED));
        StatsUtil.putKV(executorStats, FAILED, StatsUtil.sumStreamsLong(spoutOut, FAILED));
        long acked = StatsUtil.sumStreamsLong(spoutOut, ACKED);
        StatsUtil.putKV(executorStats, ACKED, acked);
        if (acked > 0L) {
            StatsUtil.putKV(executorStats, COMP_LATENCY, StatsUtil.sumStreamsDouble(spoutOut, COMP_LAT_TOTAL) / (double)acked);
        } else {
            StatsUtil.putKV(executorStats, COMP_LATENCY, null);
        }
        List executorStatsList = (List)StatsUtil.getByKey(accSpoutStats, EXECUTOR_STATS);
        executorStatsList.add(executorStats);
        StatsUtil.putKV(ret, EXECUTOR_STATS, executorStatsList);
        return ret;
    }

    public static Map<String, Object> mergeAggCompStatsTopoPageBolt(Map<String, Object> accBoltStats, Map<String, Object> boltStats) {
        HashMap<String, Object> ret = new HashMap<String, Object>();
        Integer numExecutors = StatsUtil.getByKeyOr0(accBoltStats, NUM_EXECUTORS).intValue();
        StatsUtil.putKV(ret, NUM_EXECUTORS, numExecutors + 1);
        StatsUtil.putKV(ret, NUM_TASKS, StatsUtil.sumOr0(StatsUtil.getByKeyOr0(accBoltStats, NUM_TASKS), StatsUtil.getByKeyOr0(boltStats, NUM_TASKS)));
        StatsUtil.putKV(ret, EMITTED, StatsUtil.sumOr0(StatsUtil.getByKeyOr0(accBoltStats, EMITTED), StatsUtil.getByKeyOr0(boltStats, EMITTED)));
        StatsUtil.putKV(ret, TRANSFERRED, StatsUtil.sumOr0(StatsUtil.getByKeyOr0(accBoltStats, TRANSFERRED), StatsUtil.getByKeyOr0(boltStats, TRANSFERRED)));
        StatsUtil.putKV(ret, EXEC_LAT_TOTAL, StatsUtil.sumOr0(StatsUtil.getByKeyOr0(accBoltStats, EXEC_LAT_TOTAL), StatsUtil.getByKeyOr0(boltStats, EXEC_LAT_TOTAL)));
        StatsUtil.putKV(ret, PROC_LAT_TOTAL, StatsUtil.sumOr0(StatsUtil.getByKeyOr0(accBoltStats, PROC_LAT_TOTAL), StatsUtil.getByKeyOr0(boltStats, PROC_LAT_TOTAL)));
        StatsUtil.putKV(ret, EXECUTED, StatsUtil.sumOr0(StatsUtil.getByKeyOr0(accBoltStats, EXECUTED), StatsUtil.getByKeyOr0(boltStats, EXECUTED)));
        StatsUtil.putKV(ret, ACKED, StatsUtil.sumOr0(StatsUtil.getByKeyOr0(accBoltStats, ACKED), StatsUtil.getByKeyOr0(boltStats, ACKED)));
        StatsUtil.putKV(ret, FAILED, StatsUtil.sumOr0(StatsUtil.getByKeyOr0(accBoltStats, FAILED), StatsUtil.getByKeyOr0(boltStats, FAILED)));
        StatsUtil.putKV(ret, CAPACITY, StatsUtil.maxOr0(StatsUtil.getByKeyOr0(accBoltStats, CAPACITY), StatsUtil.getByKeyOr0(boltStats, CAPACITY)));
        return ret;
    }

    public static Map<String, Object> mergeAggCompStatsTopoPageSpout(Map<String, Object> accSpoutStats, Map<String, Object> spoutStats) {
        HashMap<String, Object> ret = new HashMap<String, Object>();
        Integer numExecutors = StatsUtil.getByKeyOr0(accSpoutStats, NUM_EXECUTORS).intValue();
        StatsUtil.putKV(ret, NUM_EXECUTORS, numExecutors + 1);
        StatsUtil.putKV(ret, NUM_TASKS, StatsUtil.sumOr0(StatsUtil.getByKeyOr0(accSpoutStats, NUM_TASKS), StatsUtil.getByKeyOr0(spoutStats, NUM_TASKS)));
        StatsUtil.putKV(ret, EMITTED, StatsUtil.sumOr0(StatsUtil.getByKeyOr0(accSpoutStats, EMITTED), StatsUtil.getByKeyOr0(spoutStats, EMITTED)));
        StatsUtil.putKV(ret, TRANSFERRED, StatsUtil.sumOr0(StatsUtil.getByKeyOr0(accSpoutStats, TRANSFERRED), StatsUtil.getByKeyOr0(spoutStats, TRANSFERRED)));
        StatsUtil.putKV(ret, COMP_LAT_TOTAL, StatsUtil.sumOr0(StatsUtil.getByKeyOr0(accSpoutStats, COMP_LAT_TOTAL), StatsUtil.getByKeyOr0(spoutStats, COMP_LAT_TOTAL)));
        StatsUtil.putKV(ret, ACKED, StatsUtil.sumOr0(StatsUtil.getByKeyOr0(accSpoutStats, ACKED), StatsUtil.getByKeyOr0(spoutStats, ACKED)));
        StatsUtil.putKV(ret, FAILED, StatsUtil.sumOr0(StatsUtil.getByKeyOr0(accSpoutStats, FAILED), StatsUtil.getByKeyOr0(spoutStats, FAILED)));
        return ret;
    }

    public static Map<String, Object> aggTopoExecStats(String window, boolean includeSys, Map<String, Object> accStats, Map<String, Object> beat, String compType) {
        HashMap<String, Map<String, Object>> mm;
        Map<String, Long> w2acked;
        Map w2compLatWgtAvg;
        HashMap<String, Object> ret = new HashMap<String, Object>();
        Set workerSet = (Set)accStats.get(WORKERS_SET);
        Map bolt2stats = StatsUtil.getMapByKey(accStats, BOLT_TO_STATS);
        Map spout2stats = StatsUtil.getMapByKey(accStats, SPOUT_TO_STATS);
        Map win2emitted = StatsUtil.getMapByKey(accStats, WIN_TO_EMITTED);
        Map win2transferred = StatsUtil.getMapByKey(accStats, WIN_TO_TRANSFERRED);
        Map win2compLatWgtAvg = StatsUtil.getMapByKey(accStats, WIN_TO_COMP_LAT_WGT_AVG);
        Map win2acked = StatsUtil.getMapByKey(accStats, WIN_TO_ACKED);
        Map win2failed = StatsUtil.getMapByKey(accStats, WIN_TO_FAILED);
        boolean isSpout = compType.equals(SPOUT);
        Map<String, Object> cid2stats = isSpout ? StatsUtil.aggPreMergeTopoPageSpout(beat, window, includeSys) : StatsUtil.aggPreMergeTopoPageBolt(beat, window, includeSys);
        Map stats = StatsUtil.getMapByKey(beat, STATS);
        Map compLatStats = StatsUtil.getMapByKey(stats, COMP_LATENCIES);
        if (isSpout) {
            Map mm2 = new HashMap();
            Map acked = StatsUtil.getMapByKey(stats, ACKED);
            for (Object k : acked.keySet()) {
                mm2.put(k, StatsUtil.aggSpoutLatAndCount((Map)compLatStats.get(k), (Map)acked.get(k)));
            }
            mm2 = StatsUtil.swapMapOrder(mm2);
            w2compLatWgtAvg = StatsUtil.getMapByKey(mm2, COMP_LAT_TOTAL);
            w2acked = StatsUtil.getMapByKey(mm2, ACKED);
        } else {
            w2compLatWgtAvg = null;
            w2acked = StatsUtil.aggregateCountStreams(StatsUtil.getMapByKey(stats, ACKED));
        }
        workerSet.add(Lists.newArrayList((Object[])new Object[]{StatsUtil.getByKey(beat, HOST), StatsUtil.getByKey(beat, PORT)}));
        StatsUtil.putKV(ret, WORKERS_SET, workerSet);
        StatsUtil.putKV(ret, BOLT_TO_STATS, bolt2stats);
        StatsUtil.putKV(ret, SPOUT_TO_STATS, spout2stats);
        StatsUtil.putKV(ret, WIN_TO_EMITTED, StatsUtil.mergeWithSumLong(win2emitted, StatsUtil.aggregateCountStreams(StatsUtil.filterSysStreams(StatsUtil.getMapByKey(stats, EMITTED), includeSys))));
        StatsUtil.putKV(ret, WIN_TO_TRANSFERRED, StatsUtil.mergeWithSumLong(win2transferred, StatsUtil.aggregateCountStreams(StatsUtil.filterSysStreams(StatsUtil.getMapByKey(stats, TRANSFERRED), includeSys))));
        StatsUtil.putKV(ret, WIN_TO_COMP_LAT_WGT_AVG, StatsUtil.mergeWithSumDouble(win2compLatWgtAvg, w2compLatWgtAvg));
        StatsUtil.putKV(ret, WIN_TO_ACKED, isSpout ? StatsUtil.mergeWithSumLong(win2acked, w2acked) : win2acked);
        StatsUtil.putKV(ret, WIN_TO_FAILED, isSpout ? StatsUtil.mergeWithSumLong(StatsUtil.aggregateCountStreams(StatsUtil.getMapByKey(stats, FAILED)), win2failed) : win2failed);
        StatsUtil.putKV(ret, TYPE, StatsUtil.getByKey(stats, TYPE));
        if (isSpout) {
            HashSet<Object> spouts = new HashSet<Object>();
            spouts.addAll(spout2stats.keySet());
            spouts.addAll(cid2stats.keySet());
            mm = new HashMap<String, Map<String, Object>>();
            for (String string : spouts) {
                mm.put(string, StatsUtil.mergeAggCompStatsTopoPageSpout((Map)spout2stats.get(string), (Map)cid2stats.get(string)));
            }
            StatsUtil.putKV(ret, SPOUT_TO_STATS, mm);
        } else {
            HashSet<Object> bolts = new HashSet<Object>();
            bolts.addAll(bolt2stats.keySet());
            bolts.addAll(cid2stats.keySet());
            mm = new HashMap();
            for (String string : bolts) {
                mm.put(string, StatsUtil.mergeAggCompStatsTopoPageBolt((Map)bolt2stats.get(string), (Map)cid2stats.get(string)));
            }
            StatsUtil.putKV(ret, BOLT_TO_STATS, mm);
        }
        return ret;
    }

    public static TopologyPageInfo aggTopoExecsStats(String topologyId, Map exec2nodePort, Map task2component, Map<List<Integer>, Map<String, Object>> beats, StormTopology topology, String window, boolean includeSys, IStormClusterState clusterState) {
        List<Map<String, Object>> beatList = StatsUtil.extractDataFromHb(exec2nodePort, task2component, beats, includeSys, topology);
        Map<String, Object> topoStats = StatsUtil.aggregateTopoStats(window, includeSys, beatList);
        return StatsUtil.postAggregateTopoStats(task2component, exec2nodePort, topoStats, topologyId, clusterState);
    }

    public static Map<String, Object> aggregateTopoStats(String win, boolean includeSys, List<Map<String, Object>> heartbeats) {
        Map<String, Object> initVal = new HashMap<String, Object>();
        StatsUtil.putKV(initVal, WORKERS_SET, new HashSet());
        StatsUtil.putKV(initVal, BOLT_TO_STATS, new HashMap());
        StatsUtil.putKV(initVal, SPOUT_TO_STATS, new HashMap());
        StatsUtil.putKV(initVal, WIN_TO_EMITTED, new HashMap());
        StatsUtil.putKV(initVal, WIN_TO_TRANSFERRED, new HashMap());
        StatsUtil.putKV(initVal, WIN_TO_COMP_LAT_WGT_AVG, new HashMap());
        StatsUtil.putKV(initVal, WIN_TO_ACKED, new HashMap());
        StatsUtil.putKV(initVal, WIN_TO_FAILED, new HashMap());
        for (Map<String, Object> heartbeat : heartbeats) {
            String compType = (String)StatsUtil.getByKey(heartbeat, TYPE);
            initVal = StatsUtil.aggTopoExecStats(win, includeSys, initVal, heartbeat, compType);
        }
        return initVal;
    }

    public static TopologyPageInfo postAggregateTopoStats(Map task2comp, Map exec2nodePort, Map<String, Object> accData, String topologyId, IStormClusterState clusterState) {
        TopologyPageInfo ret = new TopologyPageInfo(topologyId);
        ret.set_num_tasks(task2comp.size());
        ret.set_num_workers(((Set)StatsUtil.getByKey(accData, WORKERS_SET)).size());
        ret.set_num_executors(exec2nodePort != null ? exec2nodePort.size() : 0);
        Map bolt2stats = StatsUtil.getMapByKey(accData, BOLT_TO_STATS);
        HashMap<String, ComponentAggregateStats> aggBolt2stats = new HashMap<String, ComponentAggregateStats>();
        for (Map.Entry o : bolt2stats.entrySet()) {
            Map.Entry e = o;
            String string = (String)e.getKey();
            Map m = (Map)e.getValue();
            long executed = StatsUtil.getByKeyOr0(m, EXECUTED).longValue();
            if (executed > 0L) {
                double execLatencyTotal = StatsUtil.getByKeyOr0(m, EXEC_LAT_TOTAL).doubleValue();
                StatsUtil.putKV(m, EXEC_LATENCY, execLatencyTotal / (double)executed);
                double procLatencyTotal = StatsUtil.getByKeyOr0(m, PROC_LAT_TOTAL).doubleValue();
                StatsUtil.putKV(m, PROC_LATENCY, procLatencyTotal / (double)executed);
            }
            StatsUtil.remove(m, EXEC_LAT_TOTAL);
            StatsUtil.remove(m, PROC_LAT_TOTAL);
            StatsUtil.putKV(m, "last-error", StatsUtil.getLastError(clusterState, topologyId, string));
            aggBolt2stats.put(string, StatsUtil.thriftifyBoltAggStats(m));
        }
        Map spout2stats = StatsUtil.getMapByKey(accData, SPOUT_TO_STATS);
        HashMap<String, ComponentAggregateStats> aggSpout2stats = new HashMap<String, ComponentAggregateStats>();
        for (Object e : spout2stats.entrySet()) {
            Map.Entry e2 = (Map.Entry)e;
            String id = (String)e2.getKey();
            Map m = (Map)e2.getValue();
            long acked = StatsUtil.getByKeyOr0(m, ACKED).longValue();
            if (acked > 0L) {
                double compLatencyTotal = StatsUtil.getByKeyOr0(m, COMP_LAT_TOTAL).doubleValue();
                StatsUtil.putKV(m, COMP_LATENCY, compLatencyTotal / (double)acked);
            }
            StatsUtil.remove(m, COMP_LAT_TOTAL);
            StatsUtil.putKV(m, "last-error", StatsUtil.getLastError(clusterState, topologyId, id));
            aggSpout2stats.put(id, StatsUtil.thriftifySpoutAggStats(m));
        }
        TopologyStats topologyStats = new TopologyStats();
        topologyStats.set_window_to_acked(StatsUtil.mapKeyStr(StatsUtil.getMapByKey(accData, WIN_TO_ACKED)));
        topologyStats.set_window_to_emitted(StatsUtil.mapKeyStr(StatsUtil.getMapByKey(accData, WIN_TO_EMITTED)));
        topologyStats.set_window_to_failed(StatsUtil.mapKeyStr(StatsUtil.getMapByKey(accData, WIN_TO_FAILED)));
        topologyStats.set_window_to_transferred(StatsUtil.mapKeyStr(StatsUtil.getMapByKey(accData, WIN_TO_TRANSFERRED)));
        topologyStats.set_window_to_complete_latencies_ms(StatsUtil.computeWeightedAveragesPerWindow(accData, WIN_TO_COMP_LAT_WGT_AVG, WIN_TO_ACKED));
        ret.set_topology_stats(topologyStats);
        ret.set_id_to_spout_agg_stats(aggSpout2stats);
        ret.set_id_to_bolt_agg_stats(aggBolt2stats);
        return ret;
    }

    public static <T> Map<String, Map> aggregateBoltStats(List<ExecutorSummary> statsSeq, boolean includeSys) {
        HashMap<String, Map> ret = new HashMap<String, Map>();
        Map<String, Map<String, Map<T, Long>>> commonStats = StatsUtil.aggregateCommonStats(statsSeq);
        commonStats = StatsUtil.preProcessStreamSummary(commonStats, includeSys);
        ArrayList acked = new ArrayList();
        ArrayList<Map<String, Map<T, Long>>> failed = new ArrayList<Map<String, Map<T, Long>>>();
        ArrayList executed = new ArrayList();
        ArrayList processLatencies = new ArrayList();
        ArrayList executeLatencies = new ArrayList();
        for (ExecutorSummary summary : statsSeq) {
            ExecutorStats stat = summary.get_stats();
            acked.add(stat.get_specific().get_bolt().get_acked());
            failed.add(stat.get_specific().get_bolt().get_failed());
            executed.add(stat.get_specific().get_bolt().get_executed());
            processLatencies.add(stat.get_specific().get_bolt().get_process_ms_avg());
            executeLatencies.add(stat.get_specific().get_bolt().get_execute_ms_avg());
        }
        StatsUtil.mergeMaps(ret, commonStats);
        StatsUtil.putKV(ret, ACKED, StatsUtil.aggregateCounts(acked));
        StatsUtil.putKV(ret, FAILED, StatsUtil.aggregateCounts(failed));
        StatsUtil.putKV(ret, EXECUTED, StatsUtil.aggregateCounts(executed));
        StatsUtil.putKV(ret, PROC_LATENCIES, StatsUtil.aggregateAverages(processLatencies, acked));
        StatsUtil.putKV(ret, EXEC_LATENCIES, StatsUtil.aggregateAverages(executeLatencies, executed));
        return ret;
    }

    public static Map<String, Map> aggregateSpoutStats(List<ExecutorSummary> statsSeq, boolean includeSys) {
        HashMap<String, Map> ret = new HashMap<String, Map>();
        Map commonStats = StatsUtil.aggregateCommonStats(statsSeq);
        commonStats = StatsUtil.preProcessStreamSummary(commonStats, includeSys);
        ArrayList acked = new ArrayList();
        ArrayList failed = new ArrayList();
        ArrayList completeLatencies = new ArrayList();
        for (ExecutorSummary summary : statsSeq) {
            ExecutorStats stats = summary.get_stats();
            acked.add(stats.get_specific().get_spout().get_acked());
            failed.add(stats.get_specific().get_spout().get_failed());
            completeLatencies.add(stats.get_specific().get_spout().get_complete_ms_avg());
        }
        ret.putAll(commonStats);
        StatsUtil.putKV(ret, ACKED, StatsUtil.aggregateCounts(acked));
        StatsUtil.putKV(ret, FAILED, StatsUtil.aggregateCounts(failed));
        StatsUtil.putKV(ret, COMP_LATENCIES, StatsUtil.aggregateAverages(completeLatencies, acked));
        return ret;
    }

    public static <T> Map<String, Map<String, Map<T, Long>>> aggregateCommonStats(List<ExecutorSummary> statsSeq) {
        HashMap<String, Map<String, Map<T, Long>>> ret = new HashMap<String, Map<String, Map<T, Long>>>();
        ArrayList<Map<String, Map<T, Long>>> emitted = new ArrayList<Map<String, Map<T, Long>>>();
        ArrayList<Map<String, Map<T, Long>>> transferred = new ArrayList<Map<String, Map<T, Long>>>();
        for (ExecutorSummary summ : statsSeq) {
            emitted.add(summ.get_stats().get_emitted());
            transferred.add(summ.get_stats().get_transferred());
        }
        StatsUtil.putKV(ret, EMITTED, StatsUtil.aggregateCounts(emitted));
        StatsUtil.putKV(ret, TRANSFERRED, StatsUtil.aggregateCounts(transferred));
        return ret;
    }

    public static <T> Map<String, Map<String, Map<T, Long>>> preProcessStreamSummary(Map<String, Map<String, Map<T, Long>>> streamSummary, boolean includeSys) {
        Map emitted = StatsUtil.getMapByKey(streamSummary, EMITTED);
        Map transferred = StatsUtil.getMapByKey(streamSummary, TRANSFERRED);
        StatsUtil.putKV(streamSummary, EMITTED, StatsUtil.filterSysStreams(emitted, includeSys));
        StatsUtil.putKV(streamSummary, TRANSFERRED, StatsUtil.filterSysStreams(transferred, includeSys));
        return streamSummary;
    }

    public static <K, V extends Number> Map<String, Long> aggregateCountStreams(Map<String, Map<K, V>> stats) {
        HashMap<String, Long> ret = new HashMap<String, Long>();
        for (Map.Entry<String, Map<K, V>> entry : stats.entrySet()) {
            Map<K, V> value = entry.getValue();
            long sum = 0L;
            for (Number num : value.values()) {
                sum += num.longValue();
            }
            ret.put(entry.getKey(), sum);
        }
        return ret;
    }

    public static <K> Map<String, Map<K, Double>> aggregateAverages(List<Map<String, Map<K, Double>>> avgSeq, List<Map<String, Map<K, Long>>> countSeq) {
        HashMap<String, Map<String, Double>> ret = new HashMap<String, Map<String, Double>>();
        Map<String, Map<K, List>> expands = StatsUtil.expandAveragesSeq(avgSeq, countSeq);
        for (Map.Entry<String, Map<K, List>> entry : expands.entrySet()) {
            String k = entry.getKey();
            HashMap<K, Double> tmp = new HashMap<K, Double>();
            Map<K, List> inner = entry.getValue();
            for (K kk : inner.keySet()) {
                List vv = inner.get(kk);
                tmp.put(kk, StatsUtil.valAvg(((Number)vv.get(0)).doubleValue(), ((Number)vv.get(1)).longValue()));
            }
            ret.put(k, tmp);
        }
        return ret;
    }

    public static <K> Map<String, Double> aggregateAvgStreams(Map<String, Map<K, Double>> avgs, Map<String, Map<K, Long>> counts) {
        HashMap<String, Double> ret = new HashMap<String, Double>();
        Map<String, Map<K, List>> expands = StatsUtil.expandAverages(avgs, counts);
        for (Map.Entry<String, Map<K, List>> entry : expands.entrySet()) {
            String win = entry.getKey();
            double avgTotal = 0.0;
            long cntTotal = 0L;
            Map<K, List> inner = entry.getValue();
            for (K kk : inner.keySet()) {
                List vv = inner.get(kk);
                avgTotal += ((Number)vv.get(0)).doubleValue();
                cntTotal += ((Number)vv.get(1)).longValue();
            }
            ret.put(win, StatsUtil.valAvg(avgTotal, cntTotal));
        }
        return ret;
    }

    public static Map<String, Map> spoutStreamsStats(List<ExecutorSummary> summs, boolean includeSys) {
        if (summs == null) {
            return new HashMap<String, Map>();
        }
        List<ExecutorSummary> statsSeq = StatsUtil.getFilledStats(summs);
        return StatsUtil.aggregateSpoutStreams(StatsUtil.aggregateSpoutStats(statsSeq, includeSys));
    }

    public static Map<String, Map> boltStreamsStats(List<ExecutorSummary> summs, boolean includeSys) {
        if (summs == null) {
            return new HashMap<String, Map>();
        }
        List<ExecutorSummary> statsSeq = StatsUtil.getFilledStats(summs);
        return StatsUtil.aggregateBoltStreams(StatsUtil.aggregateBoltStats(statsSeq, includeSys));
    }

    public static Map<String, Map> aggregateSpoutStreams(Map<String, Map> stats) {
        HashMap<String, Map> ret = new HashMap<String, Map>();
        StatsUtil.putKV(ret, ACKED, StatsUtil.aggregateCountStreams(StatsUtil.getMapByKey(stats, ACKED)));
        StatsUtil.putKV(ret, FAILED, StatsUtil.aggregateCountStreams(StatsUtil.getMapByKey(stats, FAILED)));
        StatsUtil.putKV(ret, EMITTED, StatsUtil.aggregateCountStreams(StatsUtil.getMapByKey(stats, EMITTED)));
        StatsUtil.putKV(ret, TRANSFERRED, StatsUtil.aggregateCountStreams(StatsUtil.getMapByKey(stats, TRANSFERRED)));
        StatsUtil.putKV(ret, COMP_LATENCIES, StatsUtil.aggregateAvgStreams(StatsUtil.getMapByKey(stats, COMP_LATENCIES), StatsUtil.getMapByKey(stats, ACKED)));
        return ret;
    }

    public static Map<String, Map> aggregateBoltStreams(Map<String, Map> stats) {
        HashMap<String, Map> ret = new HashMap<String, Map>();
        StatsUtil.putKV(ret, ACKED, StatsUtil.aggregateCountStreams(StatsUtil.getMapByKey(stats, ACKED)));
        StatsUtil.putKV(ret, FAILED, StatsUtil.aggregateCountStreams(StatsUtil.getMapByKey(stats, FAILED)));
        StatsUtil.putKV(ret, EMITTED, StatsUtil.aggregateCountStreams(StatsUtil.getMapByKey(stats, EMITTED)));
        StatsUtil.putKV(ret, TRANSFERRED, StatsUtil.aggregateCountStreams(StatsUtil.getMapByKey(stats, TRANSFERRED)));
        StatsUtil.putKV(ret, EXECUTED, StatsUtil.aggregateCountStreams(StatsUtil.getMapByKey(stats, EXECUTED)));
        StatsUtil.putKV(ret, PROC_LATENCIES, StatsUtil.aggregateAvgStreams(StatsUtil.getMapByKey(stats, PROC_LATENCIES), StatsUtil.getMapByKey(stats, ACKED)));
        StatsUtil.putKV(ret, EXEC_LATENCIES, StatsUtil.aggregateAvgStreams(StatsUtil.getMapByKey(stats, EXEC_LATENCIES), StatsUtil.getMapByKey(stats, EXECUTED)));
        return ret;
    }

    public static Map<String, Object> aggBoltExecWinStats(Map<String, Object> accStats, Map<String, Object> newStats, boolean includeSys) {
        HashMap<String, Object> ret = new HashMap<String, Object>();
        Map<String, Map<String, Number>> m = new HashMap();
        for (Object win : StatsUtil.getMapByKey(newStats, EXECUTED).keySet()) {
            m.put((String)win, StatsUtil.aggBoltLatAndCount((Map)StatsUtil.getMapByKey(newStats, EXEC_LATENCIES).get(win), (Map)StatsUtil.getMapByKey(newStats, PROC_LATENCIES).get(win), (Map)StatsUtil.getMapByKey(newStats, EXECUTED).get(win)));
        }
        m = StatsUtil.swapMapOrder(m);
        Map win2execLatWgtAvg = StatsUtil.getMapByKey(m, EXEC_LAT_TOTAL);
        Map win2procLatWgtAvg = StatsUtil.getMapByKey(m, PROC_LAT_TOTAL);
        Map win2executed = StatsUtil.getMapByKey(m, EXECUTED);
        Map emitted = StatsUtil.getMapByKey(newStats, EMITTED);
        Map<String, Long> win2emitted = StatsUtil.mergeWithSumLong(StatsUtil.aggregateCountStreams(StatsUtil.filterSysStreams(emitted, includeSys)), StatsUtil.getMapByKey(accStats, WIN_TO_EMITTED));
        StatsUtil.putKV(ret, WIN_TO_EMITTED, win2emitted);
        Map transferred = StatsUtil.getMapByKey(newStats, TRANSFERRED);
        Map<String, Long> win2transferred = StatsUtil.mergeWithSumLong(StatsUtil.aggregateCountStreams(StatsUtil.filterSysStreams(transferred, includeSys)), StatsUtil.getMapByKey(accStats, WIN_TO_TRANSFERRED));
        StatsUtil.putKV(ret, WIN_TO_TRANSFERRED, win2transferred);
        StatsUtil.putKV(ret, WIN_TO_EXEC_LAT_WGT_AVG, StatsUtil.mergeWithSumDouble(StatsUtil.getMapByKey(accStats, WIN_TO_EXEC_LAT_WGT_AVG), win2execLatWgtAvg));
        StatsUtil.putKV(ret, WIN_TO_PROC_LAT_WGT_AVG, StatsUtil.mergeWithSumDouble(StatsUtil.getMapByKey(accStats, WIN_TO_PROC_LAT_WGT_AVG), win2procLatWgtAvg));
        StatsUtil.putKV(ret, WIN_TO_EXECUTED, StatsUtil.mergeWithSumLong(StatsUtil.getMapByKey(accStats, WIN_TO_EXECUTED), win2executed));
        StatsUtil.putKV(ret, WIN_TO_ACKED, StatsUtil.mergeWithSumLong(StatsUtil.aggregateCountStreams(StatsUtil.getMapByKey(newStats, ACKED)), StatsUtil.getMapByKey(accStats, WIN_TO_ACKED)));
        StatsUtil.putKV(ret, WIN_TO_FAILED, StatsUtil.mergeWithSumLong(StatsUtil.aggregateCountStreams(StatsUtil.getMapByKey(newStats, FAILED)), StatsUtil.getMapByKey(accStats, WIN_TO_FAILED)));
        return ret;
    }

    public static Map<String, Object> aggSpoutExecWinStats(Map<String, Object> accStats, Map<String, Object> beat, boolean includeSys) {
        HashMap<String, Object> ret = new HashMap<String, Object>();
        Map<String, Map<String, Number>> m = new HashMap();
        for (Object win : StatsUtil.getMapByKey(beat, ACKED).keySet()) {
            m.put((String)win, StatsUtil.aggSpoutLatAndCount((Map)StatsUtil.getMapByKey(beat, COMP_LATENCIES).get(win), (Map)StatsUtil.getMapByKey(beat, ACKED).get(win)));
        }
        m = StatsUtil.swapMapOrder(m);
        Map win2compLatWgtAvg = StatsUtil.getMapByKey(m, COMP_LAT_TOTAL);
        Map win2acked = StatsUtil.getMapByKey(m, ACKED);
        Map emitted = StatsUtil.getMapByKey(beat, EMITTED);
        Map<String, Long> win2emitted = StatsUtil.mergeWithSumLong(StatsUtil.aggregateCountStreams(StatsUtil.filterSysStreams(emitted, includeSys)), StatsUtil.getMapByKey(accStats, WIN_TO_EMITTED));
        StatsUtil.putKV(ret, WIN_TO_EMITTED, win2emitted);
        Map transferred = StatsUtil.getMapByKey(beat, TRANSFERRED);
        Map<String, Long> win2transferred = StatsUtil.mergeWithSumLong(StatsUtil.aggregateCountStreams(StatsUtil.filterSysStreams(transferred, includeSys)), StatsUtil.getMapByKey(accStats, WIN_TO_TRANSFERRED));
        StatsUtil.putKV(ret, WIN_TO_TRANSFERRED, win2transferred);
        StatsUtil.putKV(ret, WIN_TO_COMP_LAT_WGT_AVG, StatsUtil.mergeWithSumDouble(StatsUtil.getMapByKey(accStats, WIN_TO_COMP_LAT_WGT_AVG), win2compLatWgtAvg));
        StatsUtil.putKV(ret, WIN_TO_ACKED, StatsUtil.mergeWithSumLong(StatsUtil.getMapByKey(accStats, WIN_TO_ACKED), win2acked));
        StatsUtil.putKV(ret, WIN_TO_FAILED, StatsUtil.mergeWithSumLong(StatsUtil.aggregateCountStreams(StatsUtil.getMapByKey(beat, FAILED)), StatsUtil.getMapByKey(accStats, WIN_TO_FAILED)));
        return ret;
    }

    public static <T> Map<String, Map<T, Long>> aggregateCounts(List<Map<String, Map<T, Long>>> countsSeq) {
        HashMap<String, Map<T, Long>> ret = new HashMap<String, Map<T, Long>>();
        for (Map<String, Map<T, Long>> counts : countsSeq) {
            for (Map.Entry<String, Map<T, Long>> entry : counts.entrySet()) {
                String win = entry.getKey();
                Map<T, Long> stream2count = entry.getValue();
                if (!ret.containsKey(win)) {
                    ret.put(win, stream2count);
                    continue;
                }
                Map existing = (Map)ret.get(win);
                for (Map.Entry<T, Long> subEntry : stream2count.entrySet()) {
                    T stream = subEntry.getKey();
                    if (!existing.containsKey(stream)) {
                        existing.put(stream, subEntry.getValue());
                        continue;
                    }
                    existing.put(stream, subEntry.getValue() + (Long)existing.get(stream));
                }
            }
        }
        return ret;
    }

    public static Map<String, Object> aggregateCompStats(String window, boolean includeSys, List<Map<String, Object>> beats, String compType) {
        boolean isSpout = SPOUT.equals(compType);
        Map<String, Object> initVal = new HashMap<String, Object>();
        StatsUtil.putKV(initVal, WIN_TO_ACKED, new HashMap());
        StatsUtil.putKV(initVal, WIN_TO_FAILED, new HashMap());
        StatsUtil.putKV(initVal, WIN_TO_EMITTED, new HashMap());
        StatsUtil.putKV(initVal, WIN_TO_TRANSFERRED, new HashMap());
        HashMap stats = new HashMap();
        StatsUtil.putKV(stats, EXECUTOR_STATS, new ArrayList());
        StatsUtil.putKV(stats, SID_TO_OUT_STATS, new HashMap());
        if (isSpout) {
            StatsUtil.putKV(initVal, TYPE, SPOUT);
            StatsUtil.putKV(initVal, WIN_TO_COMP_LAT_WGT_AVG, new HashMap());
        } else {
            StatsUtil.putKV(initVal, TYPE, BOLT);
            StatsUtil.putKV(initVal, WIN_TO_EXECUTED, new HashMap());
            StatsUtil.putKV(stats, CID_SID_TO_IN_STATS, new HashMap());
            StatsUtil.putKV(initVal, WIN_TO_EXEC_LAT_WGT_AVG, new HashMap());
            StatsUtil.putKV(initVal, WIN_TO_PROC_LAT_WGT_AVG, new HashMap());
        }
        StatsUtil.putKV(initVal, STATS, stats);
        for (Map<String, Object> beat : beats) {
            initVal = StatsUtil.aggCompExecStats(window, includeSys, initVal, beat, compType);
        }
        return initVal;
    }

    public static Map<String, Object> aggCompExecStats(String window, boolean includeSys, Map<String, Object> accStats, Map<String, Object> beat, String compType) {
        HashMap<String, Object> ret = new HashMap<String, Object>();
        if (SPOUT.equals(compType)) {
            ret.putAll(StatsUtil.aggSpoutExecWinStats(accStats, StatsUtil.getMapByKey(beat, STATS), includeSys));
            StatsUtil.putKV(ret, STATS, StatsUtil.mergeAggCompStatsCompPageSpout(StatsUtil.getMapByKey(accStats, STATS), StatsUtil.aggPreMergeCompPageSpout(beat, window, includeSys)));
        } else {
            ret.putAll(StatsUtil.aggBoltExecWinStats(accStats, StatsUtil.getMapByKey(beat, STATS), includeSys));
            StatsUtil.putKV(ret, STATS, StatsUtil.mergeAggCompStatsCompPageBolt(StatsUtil.getMapByKey(accStats, STATS), StatsUtil.aggPreMergeCompPageBolt(beat, window, includeSys)));
        }
        StatsUtil.putKV(ret, TYPE, compType);
        return ret;
    }

    public static Map<String, Object> postAggregateCompStats(Map<String, Object> compStats) {
        HashMap<String, Object> ret = new HashMap<String, Object>();
        String compType = (String)compStats.get(TYPE);
        Map stats = StatsUtil.getMapByKey(compStats, STATS);
        Integer numTasks = StatsUtil.getByKeyOr0(stats, NUM_TASKS).intValue();
        Integer numExecutors = StatsUtil.getByKeyOr0(stats, NUM_EXECUTORS).intValue();
        Map outStats = StatsUtil.getMapByKey(stats, SID_TO_OUT_STATS);
        StatsUtil.putKV(ret, TYPE, compType);
        StatsUtil.putKV(ret, NUM_TASKS, numTasks);
        StatsUtil.putKV(ret, NUM_EXECUTORS, numExecutors);
        StatsUtil.putKV(ret, EXECUTOR_STATS, StatsUtil.getByKey(stats, EXECUTOR_STATS));
        StatsUtil.putKV(ret, WIN_TO_EMITTED, StatsUtil.mapKeyStr(StatsUtil.getMapByKey(compStats, WIN_TO_EMITTED)));
        StatsUtil.putKV(ret, WIN_TO_TRANSFERRED, StatsUtil.mapKeyStr(StatsUtil.getMapByKey(compStats, WIN_TO_TRANSFERRED)));
        StatsUtil.putKV(ret, WIN_TO_ACKED, StatsUtil.mapKeyStr(StatsUtil.getMapByKey(compStats, WIN_TO_ACKED)));
        StatsUtil.putKV(ret, WIN_TO_FAILED, StatsUtil.mapKeyStr(StatsUtil.getMapByKey(compStats, WIN_TO_FAILED)));
        if (BOLT.equals(compType)) {
            Map inStats = StatsUtil.getMapByKey(stats, CID_SID_TO_IN_STATS);
            HashMap inStats2 = new HashMap();
            Iterator iterator = inStats.entrySet().iterator();
            while (iterator.hasNext()) {
                Map.Entry o;
                Map.Entry e = o = iterator.next();
                Object k = e.getKey();
                Map v = (Map)e.getValue();
                long executed = StatsUtil.getByKeyOr0(v, EXECUTED).longValue();
                if (executed > 0L) {
                    double executeLatencyTotal = StatsUtil.getByKeyOr0(v, EXEC_LAT_TOTAL).doubleValue();
                    double processLatencyTotal = StatsUtil.getByKeyOr0(v, PROC_LAT_TOTAL).doubleValue();
                    StatsUtil.putKV(v, EXEC_LATENCY, executeLatencyTotal / (double)executed);
                    StatsUtil.putKV(v, PROC_LATENCY, processLatencyTotal / (double)executed);
                } else {
                    StatsUtil.putKV(v, EXEC_LATENCY, 0.0);
                    StatsUtil.putKV(v, PROC_LATENCY, 0.0);
                }
                StatsUtil.remove(v, EXEC_LAT_TOTAL);
                StatsUtil.remove(v, PROC_LAT_TOTAL);
                inStats2.put(k, v);
            }
            StatsUtil.putKV(ret, CID_SID_TO_IN_STATS, inStats2);
            StatsUtil.putKV(ret, SID_TO_OUT_STATS, outStats);
            StatsUtil.putKV(ret, WIN_TO_EXECUTED, StatsUtil.mapKeyStr(StatsUtil.getMapByKey(compStats, WIN_TO_EXECUTED)));
            StatsUtil.putKV(ret, WIN_TO_EXEC_LAT, StatsUtil.computeWeightedAveragesPerWindow(compStats, WIN_TO_EXEC_LAT_WGT_AVG, WIN_TO_EXECUTED));
            StatsUtil.putKV(ret, WIN_TO_PROC_LAT, StatsUtil.computeWeightedAveragesPerWindow(compStats, WIN_TO_PROC_LAT_WGT_AVG, WIN_TO_EXECUTED));
        } else {
            HashMap outStats2 = new HashMap();
            Iterator iterator = outStats.entrySet().iterator();
            while (iterator.hasNext()) {
                Map.Entry o;
                Map.Entry e = o = iterator.next();
                Object k = e.getKey();
                Map v = (Map)e.getValue();
                long acked = StatsUtil.getByKeyOr0(v, ACKED).longValue();
                if (acked > 0L) {
                    double compLatencyTotal = StatsUtil.getByKeyOr0(v, COMP_LAT_TOTAL).doubleValue();
                    StatsUtil.putKV(v, COMP_LATENCY, compLatencyTotal / (double)acked);
                } else {
                    StatsUtil.putKV(v, COMP_LATENCY, 0.0);
                }
                StatsUtil.remove(v, COMP_LAT_TOTAL);
                outStats2.put(k, v);
            }
            StatsUtil.putKV(ret, SID_TO_OUT_STATS, outStats2);
            StatsUtil.putKV(ret, WIN_TO_COMP_LAT, StatsUtil.computeWeightedAveragesPerWindow(compStats, WIN_TO_COMP_LAT_WGT_AVG, WIN_TO_ACKED));
        }
        return ret;
    }

    public static ComponentPageInfo aggCompExecsStats(Map exec2hostPort, Map task2component, Map<List<Integer>, Map<String, Object>> beats, String window, boolean includeSys, String topologyId, StormTopology topology, String componentId) {
        List<Map<String, Object>> beatList = StatsUtil.extractDataFromHb(exec2hostPort, task2component, beats, includeSys, topology, componentId);
        Map<String, Object> compStats = StatsUtil.aggregateCompStats(window, includeSys, beatList, StatsUtil.componentType(topology, componentId));
        compStats = StatsUtil.postAggregateCompStats(compStats);
        return StatsUtil.thriftifyCompPageData(topologyId, topology, componentId, compStats);
    }

    public static List<WorkerSummary> aggWorkerStats(String stormId, String stormName, Map<Integer, String> task2Component, Map<List<Integer>, Map<String, Object>> beats, Map<List<Long>, List<Object>> exec2NodePort, Map<String, String> nodeHost, Map<WorkerSlot, WorkerResources> worker2Resources, boolean includeSys, boolean userAuthorized, String filterSupervisor) {
        HashMap<WorkerSlot, WorkerSummary> workerSummaryMap = new HashMap<WorkerSlot, WorkerSummary>();
        if (exec2NodePort != null) {
            for (Map.Entry<List<Long>, List<Object>> execNodePort : exec2NodePort.entrySet()) {
                Map<String, Object> beat;
                List<Object> nodePort = execNodePort.getValue();
                String node = (String)nodePort.get(0);
                Long port = (Long)nodePort.get(1);
                String host = nodeHost.get(node);
                WorkerSlot slot = new WorkerSlot(node, port);
                WorkerResources resources = worker2Resources.get(slot);
                if (filterSupervisor != null && !node.equals(filterSupervisor)) continue;
                WorkerSummary ws = (WorkerSummary)workerSummaryMap.get(slot);
                if (ws == null) {
                    ws = new WorkerSummary();
                    ws.set_host(host);
                    ws.set_port(port.intValue());
                    ws.set_supervisor_id(node);
                    ws.set_topology_id(stormId);
                    ws.set_topology_name(stormName);
                    ws.set_num_executors(0);
                    if (resources != null) {
                        ws.set_assigned_memonheap(resources.get_mem_on_heap());
                        ws.set_assigned_memoffheap(resources.get_mem_off_heap());
                        ws.set_assigned_cpu(resources.get_cpu());
                    } else {
                        ws.set_assigned_memonheap(0.0);
                        ws.set_assigned_memoffheap(0.0);
                        ws.set_assigned_cpu(0.0);
                    }
                    ws.set_component_to_num_tasks(new HashMap<String, Long>());
                    workerSummaryMap.put(slot, ws);
                }
                Map<String, Long> componentToNumTasks = ws.get_component_to_num_tasks();
                List<Long> exec = execNodePort.getKey();
                int hbeatSecs = 0;
                if (beats != null && (beat = beats.get(StatsUtil.convertExecutor(exec))) != null) {
                    Map hbeat = (Map)beat.get(HEARTBEAT);
                    hbeatSecs = hbeat == null ? 0 : (Integer)hbeat.get(UPTIME);
                }
                ws.set_uptime_secs(hbeatSecs);
                ws.set_num_executors(ws.get_num_executors() + 1);
                if (!userAuthorized) continue;
                int firstTask = exec.get(0).intValue();
                int lastTask = exec.get(1).intValue();
                for (int task = firstTask; task <= lastTask; ++task) {
                    String component = task2Component.get(task);
                    if (!includeSys && Utils.isSystemId(component)) continue;
                    Long counter = componentToNumTasks.get(component);
                    if (counter == null) {
                        counter = new Long(0L);
                    }
                    componentToNumTasks.put(component, counter + 1L);
                }
            }
        }
        return new ArrayList<WorkerSummary>(workerSummaryMap.values());
    }

    public static List<WorkerSummary> aggWorkerStats(String stormId, String stormName, Map<Integer, String> task2Component, Map<List<Integer>, Map<String, Object>> beats, Map<List<Long>, List<Object>> exec2NodePort, Map<String, String> nodeHost, Map<WorkerSlot, WorkerResources> worker2Resources, boolean includeSys, boolean userAuthorized) {
        return StatsUtil.aggWorkerStats(stormId, stormName, task2Component, beats, exec2NodePort, nodeHost, worker2Resources, includeSys, userAuthorized, null);
    }

    public static Map<List<Integer>, Map<String, Object>> convertExecutorBeats(Map<ExecutorInfo, ExecutorBeat> beats) {
        HashMap<List<Integer>, Map<String, Object>> ret = new HashMap<List<Integer>, Map<String, Object>>();
        for (Map.Entry<ExecutorInfo, ExecutorBeat> beat : beats.entrySet()) {
            ExecutorInfo executorInfo = beat.getKey();
            ExecutorBeat executorBeat = beat.getValue();
            ret.put(Lists.newArrayList((Object[])new Integer[]{executorInfo.get_task_start(), executorInfo.get_task_end()}), StatsUtil.convertZkExecutorHb(executorBeat));
        }
        return ret;
    }

    public static Map<String, Object> convertZkExecutorHb(ExecutorBeat beat) {
        HashMap<String, Object> ret = new HashMap<String, Object>();
        if (beat != null) {
            ret.put(TIME_SECS, beat.getTimeSecs());
            ret.put(UPTIME, beat.getUptime());
            ret.put(STATS, StatsUtil.convertExecutorStats(beat.getStats()));
        }
        return ret;
    }

    public static Map<String, Object> convertZkWorkerHb(ClusterWorkerHeartbeat workerHb) {
        HashMap<String, Object> ret = new HashMap<String, Object>();
        if (workerHb != null) {
            ret.put("storm-id", workerHb.get_storm_id());
            ret.put(EXECUTOR_STATS, StatsUtil.convertExecutorsStats(workerHb.get_executor_stats()));
            ret.put(UPTIME, workerHb.get_uptime_secs());
            ret.put(TIME_SECS, workerHb.get_time_secs());
        }
        return ret;
    }

    public static Map<List<Integer>, ExecutorStats> convertExecutorsStats(Map<ExecutorInfo, ExecutorStats> stats) {
        HashMap<List<Integer>, ExecutorStats> ret = new HashMap<List<Integer>, ExecutorStats>();
        for (Map.Entry<ExecutorInfo, ExecutorStats> entry : stats.entrySet()) {
            ExecutorInfo executorInfo = entry.getKey();
            ExecutorStats executorStats = entry.getValue();
            ret.put(Lists.newArrayList((Object[])new Integer[]{executorInfo.get_task_start(), executorInfo.get_task_end()}), executorStats);
        }
        return ret;
    }

    public static Map<String, Object> convertExecutorStats(ExecutorStats stats) {
        HashMap<String, Object> ret = new HashMap<String, Object>();
        StatsUtil.putKV(ret, EMITTED, stats.get_emitted());
        StatsUtil.putKV(ret, TRANSFERRED, stats.get_transferred());
        StatsUtil.putKV(ret, RATE, stats.get_rate());
        if (stats.get_specific().is_set_bolt()) {
            ret.putAll(StatsUtil.convertSpecificStats(stats.get_specific().get_bolt()));
            StatsUtil.putKV(ret, TYPE, BOLT);
        } else {
            ret.putAll(StatsUtil.convertSpecificStats(stats.get_specific().get_spout()));
            StatsUtil.putKV(ret, TYPE, SPOUT);
        }
        return ret;
    }

    private static Map<String, Object> convertSpecificStats(SpoutStats stats) {
        HashMap<String, Object> ret = new HashMap<String, Object>();
        StatsUtil.putKV(ret, ACKED, stats.get_acked());
        StatsUtil.putKV(ret, FAILED, stats.get_failed());
        StatsUtil.putKV(ret, COMP_LATENCIES, stats.get_complete_ms_avg());
        return ret;
    }

    private static Map<String, Object> convertSpecificStats(BoltStats stats) {
        HashMap<String, Object> ret = new HashMap<String, Object>();
        Map acked = StatsUtil.windowSetConverter(stats.get_acked(), FROM_GSID, IDENTITY);
        Map failed = StatsUtil.windowSetConverter(stats.get_failed(), FROM_GSID, IDENTITY);
        Map processAvg = StatsUtil.windowSetConverter(stats.get_process_ms_avg(), FROM_GSID, IDENTITY);
        Map executed = StatsUtil.windowSetConverter(stats.get_executed(), FROM_GSID, IDENTITY);
        Map executeAvg = StatsUtil.windowSetConverter(stats.get_execute_ms_avg(), FROM_GSID, IDENTITY);
        StatsUtil.putKV(ret, ACKED, acked);
        StatsUtil.putKV(ret, FAILED, failed);
        StatsUtil.putKV(ret, PROC_LATENCIES, processAvg);
        StatsUtil.putKV(ret, EXECUTED, executed);
        StatsUtil.putKV(ret, EXEC_LATENCIES, executeAvg);
        return ret;
    }

    public static List<Map<String, Object>> extractNodeInfosFromHbForComp(Map<List<? extends Number>, List<Object>> exec2hostPort, Map<Integer, String> task2component, boolean includeSys, String compId) {
        ArrayList<Map<String, Object>> ret = new ArrayList<Map<String, Object>>();
        HashSet<ArrayList> hostPorts = new HashSet<ArrayList>();
        for (Map.Entry<List<? extends Number>, List<Object>> entry : exec2hostPort.entrySet()) {
            List<? extends Number> key = entry.getKey();
            List<Object> value = entry.getValue();
            Integer start = key.get(0).intValue();
            String host = (String)value.get(0);
            Integer port = (Integer)value.get(1);
            String comp = task2component.get(start);
            if (compId != null && !compId.equals(comp) || !includeSys && Utils.isSystemId(comp)) continue;
            hostPorts.add(Lists.newArrayList((Object[])new Serializable[]{host, port}));
        }
        for (List list : hostPorts) {
            HashMap m = new HashMap();
            StatsUtil.putKV(m, HOST, list.get(0));
            StatsUtil.putKV(m, PORT, list.get(1));
            ret.add(m);
        }
        return ret;
    }

    public static Map<List<Integer>, Map<String, Object>> updateHeartbeatCache(Map<List<Integer>, Map<String, Object>> cache, Map<List<Integer>, Map<String, Object>> executorBeats, Set<List<Integer>> executors, Integer timeout) {
        HashMap<List<Integer>, Map<String, Object>> ret = new HashMap<List<Integer>, Map<String, Object>>();
        if (cache == null && executorBeats == null) {
            return ret;
        }
        if (cache == null) {
            cache = new HashMap<List<Integer>, Map<String, Object>>();
        }
        if (executorBeats == null) {
            executorBeats = new HashMap<List<Integer>, Map<String, Object>>();
        }
        for (List<Integer> executor : executors) {
            ret.put(executor, StatsUtil.updateExecutorCache(cache.get(executor), executorBeats.get(executor), timeout));
        }
        return ret;
    }

    public static Map<String, Object> updateExecutorCache(Map<String, Object> currBeat, Map<String, Object> newBeat, Integer timeout) {
        HashMap<String, Object> ret = new HashMap<String, Object>();
        Integer lastNimbusTime = null;
        Integer lastReportedTime = null;
        if (currBeat != null) {
            lastNimbusTime = (Integer)currBeat.get("nimbus-time");
            lastReportedTime = (Integer)currBeat.get("executor-reported-time");
        }
        Integer reportedTime = null;
        if (newBeat != null) {
            reportedTime = (Integer)newBeat.get(TIME_SECS);
        }
        if (reportedTime == null) {
            reportedTime = lastReportedTime != null ? lastReportedTime : Integer.valueOf(0);
        }
        if (lastNimbusTime == null || !reportedTime.equals(lastReportedTime)) {
            lastNimbusTime = Time.currentTimeSecs();
        }
        ret.put("is-timed-out", Time.deltaSecs(lastNimbusTime) >= timeout);
        ret.put("nimbus-time", lastNimbusTime);
        ret.put("executor-reported-time", reportedTime);
        ret.put(HEARTBEAT, newBeat);
        return ret;
    }

    public static List<Map<String, Object>> extractDataFromHb(Map executor2hostPort, Map task2component, Map<List<Integer>, Map<String, Object>> beats, boolean includeSys, StormTopology topology) {
        return StatsUtil.extractDataFromHb(executor2hostPort, task2component, beats, includeSys, topology, null);
    }

    public static List<Map<String, Object>> extractDataFromHb(Map executor2hostPort, Map task2component, Map<List<Integer>, Map<String, Object>> beats, boolean includeSys, StormTopology topology, String compId) {
        ArrayList<Map<String, Object>> ret = new ArrayList<Map<String, Object>>();
        if (executor2hostPort == null || beats == null) {
            return ret;
        }
        Iterator iterator = executor2hostPort.entrySet().iterator();
        while (iterator.hasNext()) {
            Map.Entry o;
            Map.Entry entry = o = iterator.next();
            List executor = (List)entry.getKey();
            List hostPort = (List)entry.getValue();
            Integer start = ((Number)executor.get(0)).intValue();
            Integer end = ((Number)executor.get(1)).intValue();
            String host = (String)hostPort.get(0);
            Integer port = ((Number)hostPort.get(1)).intValue();
            Map<String, Object> beat = beats.get(StatsUtil.convertExecutor(executor));
            if (beat == null) continue;
            String id = (String)task2component.get(start);
            HashMap m = new HashMap();
            if (compId != null && !compId.equals(id) || !includeSys && Utils.isSystemId(id)) continue;
            StatsUtil.putKV(m, "exec-id", entry.getKey());
            StatsUtil.putKV(m, "comp-id", id);
            StatsUtil.putKV(m, NUM_TASKS, end - start + 1);
            StatsUtil.putKV(m, HOST, host);
            StatsUtil.putKV(m, PORT, port);
            Map hb = StatsUtil.getMapByKey(beat, HEARTBEAT);
            if (hb == null) continue;
            Map stats = StatsUtil.getMapByKey(hb, STATS);
            StatsUtil.putKV(m, UPTIME, hb.get(UPTIME));
            StatsUtil.putKV(m, STATS, stats);
            String type = StatsUtil.componentType(topology, compId);
            if (type != null) {
                StatsUtil.putKV(m, TYPE, type);
            } else {
                StatsUtil.putKV(m, TYPE, stats.get(TYPE));
            }
            ret.add(m);
        }
        return ret;
    }

    private static Map<String, Double> computeWeightedAveragesPerWindow(Map<String, Object> accData, String wgtAvgKey, String divisorKey) {
        HashMap<String, Double> ret = new HashMap<String, Double>();
        Iterator iterator = StatsUtil.getMapByKey(accData, wgtAvgKey).entrySet().iterator();
        while (iterator.hasNext()) {
            Map.Entry o;
            Map.Entry e = o = iterator.next();
            Object window = e.getKey();
            double wgtAvg = ((Number)e.getValue()).doubleValue();
            long divisor = ((Number)StatsUtil.getMapByKey(accData, divisorKey).get(window)).longValue();
            if (divisor <= 0L) continue;
            ret.put(window.toString(), wgtAvg / (double)divisor);
        }
        return ret;
    }

    public static List<Integer> convertExecutor(List<Long> executor) {
        return Lists.newArrayList((Object[])new Integer[]{executor.get(0).intValue(), executor.get(1).intValue()});
    }

    public static double computeBoltCapacity(List<ExecutorSummary> executorSumms) {
        double max = 0.0;
        for (ExecutorSummary summary : executorSumms) {
            double capacity = StatsUtil.computeExecutorCapacity(summary);
            if (!(capacity > max)) continue;
            max = capacity;
        }
        return max;
    }

    public static double computeExecutorCapacity(ExecutorSummary summary) {
        ExecutorStats stats = summary.get_stats();
        if (stats == null) {
            return 0.0;
        }
        Map m = StatsUtil.aggregateBoltStats(Lists.newArrayList((Object[])new ExecutorSummary[]{summary}), true);
        m = StatsUtil.swapMapOrder(StatsUtil.aggregateBoltStreams(m));
        Map data = StatsUtil.getMapByKey(m, TEN_MIN_IN_SECONDS_STR);
        int uptime = summary.get_uptime_secs();
        int win = Math.min(uptime, 600);
        long executed = StatsUtil.getByKeyOr0(data, EXECUTED).longValue();
        double latency = StatsUtil.getByKeyOr0(data, EXEC_LATENCIES).doubleValue();
        if (win > 0) {
            return (double)executed * latency / (double)(1000 * win);
        }
        return 0.0;
    }

    public static List<ExecutorSummary> getFilledStats(List<ExecutorSummary> summs) {
        ArrayList<ExecutorSummary> ret = new ArrayList<ExecutorSummary>();
        for (ExecutorSummary summ : summs) {
            if (summ.get_stats() == null) continue;
            ret.add(summ);
        }
        return ret;
    }

    private static <K, V> Map<String, V> mapKeyStr(Map<K, V> m) {
        HashMap<String, V> ret = new HashMap<String, V>();
        for (Map.Entry<K, V> entry : m.entrySet()) {
            ret.put(entry.getKey().toString(), entry.getValue());
        }
        return ret;
    }

    private static <K1, K2> long sumStreamsLong(Map<K1, Map<K2, ?>> m, String key) {
        long sum = 0L;
        if (m == null) {
            return sum;
        }
        for (Map<K2, ?> v : m.values()) {
            for (Map.Entry<K2, ?> entry : v.entrySet()) {
                if (!entry.getKey().equals(key)) continue;
                sum += ((Number)entry.getValue()).longValue();
            }
        }
        return sum;
    }

    private static <K1, K2> double sumStreamsDouble(Map<K1, Map<K2, ?>> m, String key) {
        double sum = 0.0;
        if (m == null) {
            return sum;
        }
        for (Map<K2, ?> v : m.values()) {
            for (Map.Entry<K2, ?> entry : v.entrySet()) {
                if (!entry.getKey().equals(key)) continue;
                sum += ((Number)entry.getValue()).doubleValue();
            }
        }
        return sum;
    }

    private static Map mergeMaps(Map m1, Map m2) {
        if (m2 == null) {
            return m1;
        }
        for (Map.Entry o : m2.entrySet()) {
            Map.Entry entry = o;
            Object k = entry.getKey();
            Map existing = (Map)m1.get(k);
            if (existing == null) {
                m1.put(k, entry.getValue());
                continue;
            }
            existing.putAll((Map)m2.get(k));
        }
        return m1;
    }

    private static <K, V> Map<K, V> filterSysStreams2Stat(Map<K, V> stream2stat, boolean includeSys) {
        LOG.trace("Filter Sys Streams2Stat {}", stream2stat);
        if (!includeSys) {
            Iterator<K> itr = stream2stat.keySet().iterator();
            while (itr.hasNext()) {
                K key = itr.next();
                if (!(key instanceof String) || !Utils.isSystemId((String)key)) continue;
                itr.remove();
            }
        }
        return stream2stat;
    }

    private static <K, V> Map<String, Map<K, V>> filterSysStreams(Map<String, Map<K, V>> stats, boolean includeSys) {
        LOG.trace("Filter Sys Streams {}", stats);
        if (!includeSys) {
            for (String winOrStream : stats.keySet()) {
                Map<K, V> stream2stat = stats.get(winOrStream);
                Iterator<K> subItr = stream2stat.keySet().iterator();
                while (subItr.hasNext()) {
                    K key = subItr.next();
                    if (!(key instanceof String) || !Utils.isSystemId((String)key)) continue;
                    subItr.remove();
                }
            }
        }
        return stats;
    }

    private static <K1, K2> Map<K1, Map<K2, Number>> fullMergeWithSum(Map<K1, Map<K2, ?>> m1, Map<K1, Map<K2, ?>> m2) {
        HashSet<K1> allKeys = new HashSet<K1>();
        if (m1 != null) {
            allKeys.addAll(m1.keySet());
        }
        if (m2 != null) {
            allKeys.addAll(m2.keySet());
        }
        HashMap ret = new HashMap();
        for (Object k : allKeys) {
            Map<K2, ?> mm1 = null;
            Map<K2, ?> mm2 = null;
            if (m1 != null) {
                mm1 = m1.get(k);
            }
            if (m2 != null) {
                mm2 = m2.get(k);
            }
            ret.put(k, StatsUtil.mergeWithSum(mm1, mm2));
        }
        return ret;
    }

    private static <K> Map<K, Number> mergeWithSum(Map<K, ?> m1, Map<K, ?> m2) {
        HashMap ret = new HashMap();
        HashSet<K> allKeys = new HashSet<K>();
        if (m1 != null) {
            allKeys.addAll(m1.keySet());
        }
        if (m2 != null) {
            allKeys.addAll(m2.keySet());
        }
        for (Object k : allKeys) {
            Number n1 = StatsUtil.getOr0(m1, k);
            Number n2 = StatsUtil.getOr0(m2, k);
            if (n1 instanceof Long) {
                ret.put(k, n1.longValue() + n2.longValue());
                continue;
            }
            ret.put(k, n1.doubleValue() + n2.doubleValue());
        }
        return ret;
    }

    private static <K> Map<K, Long> mergeWithSumLong(Map<K, Long> m1, Map<K, Long> m2) {
        HashMap ret = new HashMap();
        HashSet<K> allKeys = new HashSet<K>();
        if (m1 != null) {
            allKeys.addAll(m1.keySet());
        }
        if (m2 != null) {
            allKeys.addAll(m2.keySet());
        }
        for (Object k : allKeys) {
            Number n1 = StatsUtil.getOr0(m1, k);
            Number n2 = StatsUtil.getOr0(m2, k);
            ret.put(k, n1.longValue() + n2.longValue());
        }
        return ret;
    }

    private static <K> Map<K, Double> mergeWithSumDouble(Map<K, Double> m1, Map<K, Double> m2) {
        HashMap ret = new HashMap();
        HashSet<K> allKeys = new HashSet<K>();
        if (m1 != null) {
            allKeys.addAll(m1.keySet());
        }
        if (m2 != null) {
            allKeys.addAll(m2.keySet());
        }
        for (Object k : allKeys) {
            Number n1 = StatsUtil.getOr0(m1, k);
            Number n2 = StatsUtil.getOr0(m2, k);
            ret.put(k, n1.doubleValue() + n2.doubleValue());
        }
        return ret;
    }

    private static <K> Map<String, Map<K, List>> mergeWithAddPair(Map<String, Map<K, List>> m1, Map<String, Map<K, List>> m2) {
        HashMap<String, Map<String, List>> ret = new HashMap<String, Map<String, List>>();
        HashSet<String> allKeys = new HashSet<String>();
        if (m1 != null) {
            allKeys.addAll(m1.keySet());
        }
        if (m2 != null) {
            allKeys.addAll(m2.keySet());
        }
        for (String k : allKeys) {
            Map<K, List> mm2;
            Map<K, List> mm1 = m1 != null ? m1.get(k) : null;
            Map<K, List> map = mm2 = m2 != null ? m2.get(k) : null;
            if (mm1 == null && mm2 == null) continue;
            if (mm1 == null) {
                ret.put(k, mm2);
                continue;
            }
            if (mm2 == null) {
                ret.put(k, mm1);
                continue;
            }
            HashMap tmp = new HashMap();
            for (K kk : mm1.keySet()) {
                List seq1 = mm1.get(kk);
                List seq2 = mm2.get(kk);
                ArrayList<Number> sums = new ArrayList<Number>();
                for (int i = 0; i < seq1.size(); ++i) {
                    if (seq1.get(i) instanceof Long) {
                        sums.add(((Number)seq1.get(i)).longValue() + ((Number)seq2.get(i)).longValue());
                        continue;
                    }
                    sums.add(((Number)seq1.get(i)).doubleValue() + ((Number)seq2.get(i)).doubleValue());
                }
                tmp.put(kk, sums);
            }
            ret.put(k, tmp);
        }
        return ret;
    }

    public static ClusterWorkerHeartbeat thriftifyZkWorkerHb(Map<String, Object> heartbeat) {
        ClusterWorkerHeartbeat ret = new ClusterWorkerHeartbeat();
        ret.set_uptime_secs(StatsUtil.getByKeyOr0(heartbeat, UPTIME).intValue());
        ret.set_storm_id((String)StatsUtil.getByKey(heartbeat, "storm-id"));
        ret.set_time_secs(StatsUtil.getByKeyOr0(heartbeat, TIME_SECS).intValue());
        HashMap<ExecutorInfo, ExecutorStats> convertedStats = new HashMap<ExecutorInfo, ExecutorStats>();
        Map executorStats = StatsUtil.getMapByKey(heartbeat, EXECUTOR_STATS);
        if (executorStats != null) {
            for (Map.Entry entry : executorStats.entrySet()) {
                List executor = (List)entry.getKey();
                ExecutorStats stats = (ExecutorStats)entry.getValue();
                if (null == stats) continue;
                convertedStats.put(new ExecutorInfo((Integer)executor.get(0), (Integer)executor.get(1)), stats);
            }
        }
        ret.set_executor_stats(convertedStats);
        return ret;
    }

    private static ComponentAggregateStats thriftifySpoutAggStats(Map m) {
        ComponentAggregateStats stats = new ComponentAggregateStats();
        stats.set_type(ComponentType.SPOUT);
        stats.set_last_error((ErrorInfo)StatsUtil.getByKey(m, LAST_ERROR));
        StatsUtil.thriftifyCommonAggStats(stats, m);
        SpoutAggregateStats spoutAggStats = new SpoutAggregateStats();
        spoutAggStats.set_complete_latency_ms(StatsUtil.getByKeyOr0(m, COMP_LATENCY).doubleValue());
        SpecificAggregateStats specificStats = SpecificAggregateStats.spout(spoutAggStats);
        stats.set_specific_stats(specificStats);
        return stats;
    }

    private static ComponentAggregateStats thriftifyBoltAggStats(Map m) {
        ComponentAggregateStats stats = new ComponentAggregateStats();
        stats.set_type(ComponentType.BOLT);
        stats.set_last_error((ErrorInfo)StatsUtil.getByKey(m, LAST_ERROR));
        StatsUtil.thriftifyCommonAggStats(stats, m);
        BoltAggregateStats boltAggStats = new BoltAggregateStats();
        boltAggStats.set_execute_latency_ms(StatsUtil.getByKeyOr0(m, EXEC_LATENCY).doubleValue());
        boltAggStats.set_process_latency_ms(StatsUtil.getByKeyOr0(m, PROC_LATENCY).doubleValue());
        boltAggStats.set_executed(StatsUtil.getByKeyOr0(m, EXECUTED).longValue());
        boltAggStats.set_capacity(StatsUtil.getByKeyOr0(m, CAPACITY).doubleValue());
        SpecificAggregateStats specificStats = SpecificAggregateStats.bolt(boltAggStats);
        stats.set_specific_stats(specificStats);
        return stats;
    }

    private static ExecutorAggregateStats thriftifyExecAggStats(String compId, String compType, Map m) {
        ExecutorAggregateStats stats = new ExecutorAggregateStats();
        ExecutorSummary executorSummary = new ExecutorSummary();
        List executor = (List)StatsUtil.getByKey(m, EXECUTOR_ID);
        executorSummary.set_executor_info(new ExecutorInfo(((Number)executor.get(0)).intValue(), ((Number)executor.get(1)).intValue()));
        executorSummary.set_component_id(compId);
        executorSummary.set_host((String)StatsUtil.getByKey(m, HOST));
        executorSummary.set_port(StatsUtil.getByKeyOr0(m, PORT).intValue());
        int uptime = StatsUtil.getByKeyOr0(m, UPTIME).intValue();
        executorSummary.set_uptime_secs(uptime);
        stats.set_exec_summary(executorSummary);
        if (compType.equals(SPOUT)) {
            stats.set_stats(StatsUtil.thriftifySpoutAggStats(m));
        } else {
            stats.set_stats(StatsUtil.thriftifyBoltAggStats(m));
        }
        return stats;
    }

    private static Map thriftifyBoltOutputStats(Map id2outStats) {
        HashMap ret = new HashMap();
        for (Object k : id2outStats.keySet()) {
            ret.put(k, StatsUtil.thriftifyBoltAggStats((Map)id2outStats.get(k)));
        }
        return ret;
    }

    private static Map thriftifySpoutOutputStats(Map id2outStats) {
        HashMap ret = new HashMap();
        for (Object k : id2outStats.keySet()) {
            ret.put(k, StatsUtil.thriftifySpoutAggStats((Map)id2outStats.get(k)));
        }
        return ret;
    }

    private static Map thriftifyBoltInputStats(Map cidSid2inputStats) {
        HashMap<GlobalStreamId, ComponentAggregateStats> ret = new HashMap<GlobalStreamId, ComponentAggregateStats>();
        Iterator iterator = cidSid2inputStats.entrySet().iterator();
        while (iterator.hasNext()) {
            Map.Entry e;
            Map.Entry entry = e = iterator.next();
            ret.put(StatsUtil.toGlobalStreamId((List)entry.getKey()), StatsUtil.thriftifyBoltAggStats((Map)entry.getValue()));
        }
        return ret;
    }

    private static ComponentAggregateStats thriftifyCommonAggStats(ComponentAggregateStats stats, Map m) {
        CommonAggregateStats commonStats = new CommonAggregateStats();
        commonStats.set_num_tasks(StatsUtil.getByKeyOr0(m, NUM_TASKS).intValue());
        commonStats.set_num_executors(StatsUtil.getByKeyOr0(m, NUM_EXECUTORS).intValue());
        commonStats.set_emitted(StatsUtil.getByKeyOr0(m, EMITTED).longValue());
        commonStats.set_transferred(StatsUtil.getByKeyOr0(m, TRANSFERRED).longValue());
        commonStats.set_acked(StatsUtil.getByKeyOr0(m, ACKED).longValue());
        commonStats.set_failed(StatsUtil.getByKeyOr0(m, FAILED).longValue());
        stats.set_common_stats(commonStats);
        return stats;
    }

    private static ComponentPageInfo thriftifyCompPageData(String topologyId, StormTopology topology, String compId, Map<String, Object> data) {
        Map sid2outputStats;
        Map gsid2inputStats;
        ComponentPageInfo ret = new ComponentPageInfo();
        ret.set_component_id(compId);
        Map<String, ComponentAggregateStats> win2stats = new HashMap();
        StatsUtil.putKV(win2stats, EMITTED, StatsUtil.getMapByKey(data, WIN_TO_EMITTED));
        StatsUtil.putKV(win2stats, TRANSFERRED, StatsUtil.getMapByKey(data, WIN_TO_TRANSFERRED));
        StatsUtil.putKV(win2stats, ACKED, StatsUtil.getMapByKey(data, WIN_TO_ACKED));
        StatsUtil.putKV(win2stats, FAILED, StatsUtil.getMapByKey(data, WIN_TO_FAILED));
        String compType = (String)data.get(TYPE);
        if (compType.equals(SPOUT)) {
            ret.set_component_type(ComponentType.SPOUT);
            StatsUtil.putKV(win2stats, COMP_LATENCY, StatsUtil.getMapByKey(data, WIN_TO_COMP_LAT));
        } else {
            ret.set_component_type(ComponentType.BOLT);
            StatsUtil.putKV(win2stats, EXEC_LATENCY, StatsUtil.getMapByKey(data, WIN_TO_EXEC_LAT));
            StatsUtil.putKV(win2stats, PROC_LATENCY, StatsUtil.getMapByKey(data, WIN_TO_PROC_LAT));
            StatsUtil.putKV(win2stats, EXECUTED, StatsUtil.getMapByKey(data, WIN_TO_EXECUTED));
        }
        win2stats = StatsUtil.swapMapOrder(win2stats);
        ArrayList<ExecutorAggregateStats> execStats = new ArrayList<ExecutorAggregateStats>();
        List executorStats = (List)StatsUtil.getByKey(data, EXECUTOR_STATS);
        if (executorStats != null) {
            for (Object o : executorStats) {
                execStats.add(StatsUtil.thriftifyExecAggStats(compId, compType, (Map)o));
            }
        }
        if (compType.equals(SPOUT)) {
            HashMap tmp = new HashMap();
            for (Object k : win2stats.keySet()) {
                tmp.put(k, StatsUtil.thriftifySpoutAggStats((Map)win2stats.get(k)));
            }
            win2stats = tmp;
            gsid2inputStats = null;
            sid2outputStats = StatsUtil.thriftifySpoutOutputStats(StatsUtil.getMapByKey(data, SID_TO_OUT_STATS));
        } else {
            HashMap tmp = new HashMap();
            for (Object k : win2stats.keySet()) {
                tmp.put(k, StatsUtil.thriftifyBoltAggStats((Map)win2stats.get(k)));
            }
            win2stats = tmp;
            gsid2inputStats = StatsUtil.thriftifyBoltInputStats(StatsUtil.getMapByKey(data, CID_SID_TO_IN_STATS));
            sid2outputStats = StatsUtil.thriftifyBoltOutputStats(StatsUtil.getMapByKey(data, SID_TO_OUT_STATS));
        }
        ret.set_num_executors(StatsUtil.getByKeyOr0(data, NUM_EXECUTORS).intValue());
        ret.set_num_tasks(StatsUtil.getByKeyOr0(data, NUM_TASKS).intValue());
        ret.set_topology_id(topologyId);
        ret.set_topology_name(null);
        ret.set_window_to_stats(win2stats);
        ret.set_sid_to_output_stats(sid2outputStats);
        ret.set_exec_stats(execStats);
        ret.set_gsid_to_input_stats(gsid2inputStats);
        return ret;
    }

    public static Map thriftifyStats(List stats) {
        HashMap<ExecutorInfo, ExecutorStats> ret = new HashMap<ExecutorInfo, ExecutorStats>();
        for (Object o : stats) {
            List stat = (List)o;
            List executor = (List)stat.get(0);
            int start = ((Number)executor.get(0)).intValue();
            int end = ((Number)executor.get(1)).intValue();
            Map executorStat = (Map)stat.get(1);
            ExecutorInfo executorInfo = new ExecutorInfo(start, end);
            ret.put(executorInfo, StatsUtil.thriftifyExecutorStats(executorStat));
        }
        return ret;
    }

    public static ExecutorStats thriftifyExecutorStats(Map stats) {
        ExecutorStats ret = new ExecutorStats();
        ExecutorSpecificStats specificStats = StatsUtil.thriftifySpecificStats(stats);
        ret.set_specific(specificStats);
        ret.set_emitted(StatsUtil.windowSetConverter(StatsUtil.getMapByKey(stats, EMITTED), TO_STRING, TO_STRING));
        ret.set_transferred(StatsUtil.windowSetConverter(StatsUtil.getMapByKey(stats, TRANSFERRED), TO_STRING, TO_STRING));
        ret.set_rate(((Number)StatsUtil.getByKey(stats, RATE)).doubleValue());
        return ret;
    }

    private static ExecutorSpecificStats thriftifySpecificStats(Map stats) {
        ExecutorSpecificStats specificStats = new ExecutorSpecificStats();
        String compType = (String)StatsUtil.getByKey(stats, TYPE);
        if (BOLT.equals(compType)) {
            BoltStats boltStats = new BoltStats();
            boltStats.set_acked(StatsUtil.windowSetConverter(StatsUtil.getMapByKey(stats, ACKED), TO_GSID, TO_STRING));
            boltStats.set_executed(StatsUtil.windowSetConverter(StatsUtil.getMapByKey(stats, EXECUTED), TO_GSID, TO_STRING));
            boltStats.set_execute_ms_avg(StatsUtil.windowSetConverter(StatsUtil.getMapByKey(stats, EXEC_LATENCIES), TO_GSID, TO_STRING));
            boltStats.set_failed(StatsUtil.windowSetConverter(StatsUtil.getMapByKey(stats, FAILED), TO_GSID, TO_STRING));
            boltStats.set_process_ms_avg(StatsUtil.windowSetConverter(StatsUtil.getMapByKey(stats, PROC_LATENCIES), TO_GSID, TO_STRING));
            specificStats.set_bolt(boltStats);
        } else {
            SpoutStats spoutStats = new SpoutStats();
            spoutStats.set_acked(StatsUtil.windowSetConverter(StatsUtil.getMapByKey(stats, ACKED), TO_STRING, TO_STRING));
            spoutStats.set_failed(StatsUtil.windowSetConverter(StatsUtil.getMapByKey(stats, FAILED), TO_STRING, TO_STRING));
            spoutStats.set_complete_ms_avg(StatsUtil.windowSetConverter(StatsUtil.getMapByKey(stats, COMP_LATENCIES), TO_STRING, TO_STRING));
            specificStats.set_spout(spoutStats);
        }
        return specificStats;
    }

    public static Map<List<Integer>, ExecutorStats> mkEmptyExecutorZkHbs(Set executors) {
        HashMap<List<Integer>, ExecutorStats> ret = new HashMap<List<Integer>, ExecutorStats>();
        for (Object executor : executors) {
            List startEnd = (List)executor;
            ret.put(StatsUtil.convertExecutor(startEnd), null);
        }
        return ret;
    }

    public static Map<List<Integer>, ExecutorStats> convertExecutorZkHbs(Map<List<Long>, ExecutorStats> executorBeats) {
        HashMap<List<Integer>, ExecutorStats> ret = new HashMap<List<Integer>, ExecutorStats>();
        for (Map.Entry<List<Long>, ExecutorStats> entry : executorBeats.entrySet()) {
            ret.put(StatsUtil.convertExecutor(entry.getKey()), entry.getValue());
        }
        return ret;
    }

    public static Map<String, Object> mkZkWorkerHb(String stormId, Map<List<Integer>, ExecutorStats> executorStats, Integer uptime) {
        HashMap<String, Object> ret = new HashMap<String, Object>();
        ret.put("storm-id", stormId);
        ret.put(EXECUTOR_STATS, executorStats);
        ret.put(UPTIME, uptime);
        ret.put(TIME_SECS, Time.currentTimeSecs());
        return ret;
    }

    private static GlobalStreamId toGlobalStreamId(List list) {
        return new GlobalStreamId((String)list.get(0), (String)list.get(1));
    }

    private static boolean isValidNumber(Object x) {
        return x != null && x instanceof Number && !Double.isNaN(((Number)x).doubleValue()) && !Double.isInfinite(((Number)x).doubleValue());
    }

    private static double computeAggCapacity(Map m, Integer uptime) {
        if (uptime != null && uptime != 0) {
            Map execAvg = (Map)((Map)StatsUtil.getByKey(m, EXEC_LATENCIES)).get(TEN_MIN_IN_SECONDS_STR);
            Map exec = (Map)((Map)StatsUtil.getByKey(m, EXECUTED)).get(TEN_MIN_IN_SECONDS_STR);
            HashSet allKeys = new HashSet();
            if (execAvg != null) {
                allKeys.addAll(execAvg.keySet());
            }
            if (exec != null) {
                allKeys.addAll(exec.keySet());
            }
            double totalAvg = 0.0;
            for (Object k : allKeys) {
                double avg = StatsUtil.getOr0(execAvg, k).doubleValue();
                long cnt = StatsUtil.getOr0(exec, k).longValue();
                totalAvg += avg * (double)cnt;
            }
            return totalAvg / (double)(Math.min(uptime, 600) * 1000);
        }
        return 0.0;
    }

    private static Number getOr0(Map m, Object k) {
        if (m == null) {
            return 0;
        }
        Number n = (Number)m.get(k);
        if (n == null) {
            return 0;
        }
        return n;
    }

    private static Number getByKeyOr0(Map m, String k) {
        if (m == null) {
            return 0;
        }
        Number n = (Number)m.get(k);
        if (n == null) {
            return 0;
        }
        return n;
    }

    private static <T, V1 extends Number, V2 extends Number> Double weightAvgAndSum(Map<T, V1> id2Avg, Map<T, V2> id2num) {
        double ret = 0.0;
        if (id2Avg == null || id2num == null) {
            return ret;
        }
        for (Map.Entry<T, V1> entry : id2Avg.entrySet()) {
            T k = entry.getKey();
            ret += StatsUtil.productOr0(entry.getValue(), id2num.get(k));
        }
        return ret;
    }

    private static <K, V1 extends Number, V2 extends Number> double weightAvg(Map<K, V1> id2Avg, Map<K, V2> id2num, K key) {
        if (id2Avg == null || id2num == null) {
            return 0.0;
        }
        return StatsUtil.productOr0(id2Avg.get(key), id2num.get(key));
    }

    public static String componentType(StormTopology topology, String compId) {
        if (compId == null) {
            return null;
        }
        Map<String, Bolt> bolts = topology.get_bolts();
        if (Utils.isSystemId(compId) || bolts.containsKey(compId)) {
            return BOLT;
        }
        return SPOUT;
    }

    public static void putKV(Map map, String k, Object v) {
        map.put(k, v);
    }

    private static void remove(Map map, String k) {
        map.remove(k);
    }

    public static Object getByKey(Map map, String key) {
        return map.get(key);
    }

    public static <K, V> Map<K, V> getMapByKey(Map map, String key) {
        if (map == null) {
            return null;
        }
        return (Map)map.get(key);
    }

    private static <K, V extends Number> long sumValues(Map<K, V> m) {
        long ret = 0L;
        if (m == null) {
            return ret;
        }
        for (Number n : m.values()) {
            ret += n.longValue();
        }
        return ret;
    }

    private static Number sumOr0(Object a, Object b) {
        if (StatsUtil.isValidNumber(a) && StatsUtil.isValidNumber(b)) {
            if (a instanceof Long || a instanceof Integer) {
                return ((Number)a).longValue() + ((Number)b).longValue();
            }
            return ((Number)a).doubleValue() + ((Number)b).doubleValue();
        }
        return 0;
    }

    private static double productOr0(Object a, Object b) {
        if (StatsUtil.isValidNumber(a) && StatsUtil.isValidNumber(b)) {
            return ((Number)a).doubleValue() * ((Number)b).doubleValue();
        }
        return 0.0;
    }

    private static double maxOr0(Object a, Object b) {
        if (StatsUtil.isValidNumber(a) && StatsUtil.isValidNumber(b)) {
            return Math.max(((Number)a).doubleValue(), ((Number)b).doubleValue());
        }
        return 0.0;
    }

    private static Map swapMapOrder(Map m) {
        if (m.size() == 0) {
            return m;
        }
        HashMap ret = new HashMap();
        for (Object k1 : m.keySet()) {
            Map v = (Map)m.get(k1);
            if (v == null) continue;
            for (Object k2 : v.keySet()) {
                HashMap subRet = (HashMap)ret.get(k2);
                if (subRet == null) {
                    subRet = new HashMap();
                    ret.put(k2, subRet);
                }
                subRet.put(k1, v.get(k2));
            }
        }
        return ret;
    }

    private static <K> Map<String, Map<K, List>> expandAverages(Map<String, Map<K, Double>> avgs, Map<String, Map<K, Long>> counts) {
        HashMap<String, Map<String, List>> ret = new HashMap<String, Map<String, List>>();
        for (String win : counts.keySet()) {
            HashMap<K, ArrayList> inner = new HashMap<K, ArrayList>();
            Map<K, Long> stream2cnt = counts.get(win);
            for (K stream : stream2cnt.keySet()) {
                Long cnt = stream2cnt.get(stream);
                Double avg = avgs.get(win).get(stream);
                if (avg == null) {
                    avg = 0.0;
                }
                inner.put(stream, Lists.newArrayList((Object[])new Number[]{(double)cnt.longValue() * avg, cnt}));
            }
            ret.put(win, inner);
        }
        return ret;
    }

    private static <K> Map<String, Map<K, List>> expandAveragesSeq(List<Map<String, Map<K, Double>>> avgSeq, List<Map<String, Map<K, Long>>> countSeq) {
        Map<String, Map<Object, List>> initVal = null;
        for (int i = 0; i < avgSeq.size(); ++i) {
            Map<String, Map<K, Double>> avg = avgSeq.get(i);
            Map<String, Map<K, Long>> count = countSeq.get(i);
            initVal = initVal == null ? StatsUtil.expandAverages(avg, count) : StatsUtil.mergeWithAddPair(initVal, StatsUtil.expandAverages(avg, count));
        }
        return initVal;
    }

    private static double valAvg(double t, long c) {
        if (c == 0L) {
            return 0.0;
        }
        return t / (double)c;
    }

    public static String floatStr(Double n) {
        if (n == null) {
            return "0";
        }
        return String.format("%.3f", n);
    }

    public static String errorSubset(String errorStr) {
        return errorStr.substring(0, 200);
    }

    private static ErrorInfo getLastError(IStormClusterState stormClusterState, String stormId, String compId) {
        return stormClusterState.lastError(stormId, compId);
    }

    public static <K> Map windowSetConverter(Map stats, KeyTransformer<K> firstKeyFunc) {
        return StatsUtil.windowSetConverter(stats, IDENTITY, firstKeyFunc);
    }

    public static <K1, K2> Map windowSetConverter(Map stats, KeyTransformer<K2> secKeyFunc, KeyTransformer<K1> firstKeyFunc) {
        HashMap ret = new HashMap();
        for (Map.Entry o : stats.entrySet()) {
            Map.Entry entry = o;
            K1 key1 = firstKeyFunc.transform(entry.getKey());
            HashMap subRetMap = (HashMap)ret.get(key1);
            if (subRetMap == null) {
                subRetMap = new HashMap();
            }
            ret.put(key1, subRetMap);
            Map value = (Map)entry.getValue();
            Iterator iterator = value.entrySet().iterator();
            while (iterator.hasNext()) {
                Map.Entry oo;
                Map.Entry subEntry = oo = iterator.next();
                K2 key2 = secKeyFunc.transform(subEntry.getKey());
                subRetMap.put(key2, subEntry.getValue());
            }
        }
        return ret;
    }

    static class ToStringTransformer
    implements KeyTransformer<String> {
        ToStringTransformer() {
        }

        @Override
        public String transform(Object key) {
            return key.toString();
        }
    }

    static class IdentityTransformer
    implements KeyTransformer<Object> {
        IdentityTransformer() {
        }

        @Override
        public Object transform(Object key) {
            return key;
        }
    }

    static class FromGlobalStreamIdTransformer
    implements KeyTransformer<List<String>> {
        FromGlobalStreamIdTransformer() {
        }

        @Override
        public List<String> transform(Object key) {
            GlobalStreamId sid = (GlobalStreamId)key;
            return Lists.newArrayList((Object[])new String[]{sid.get_componentId(), sid.get_streamId()});
        }
    }

    static class ToGlobalStreamIdTransformer
    implements KeyTransformer<GlobalStreamId> {
        ToGlobalStreamIdTransformer() {
        }

        @Override
        public GlobalStreamId transform(Object key) {
            List l;
            if (key instanceof List && (l = (List)key).size() > 1) {
                return new GlobalStreamId((String)l.get(0), (String)l.get(1));
            }
            return new GlobalStreamId("", key.toString());
        }
    }

    static interface KeyTransformer<T> {
        public T transform(Object var1);
    }
}

