/*
 * Decompiled with CFR 0.152.
 */
package org.apache.storm.daemon;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import org.apache.commons.lang.StringUtils;
import org.apache.storm.Thrift;
import org.apache.storm.cluster.IStormClusterState;
import org.apache.storm.daemon.Acker;
import org.apache.storm.generated.Bolt;
import org.apache.storm.generated.ComponentCommon;
import org.apache.storm.generated.GlobalStreamId;
import org.apache.storm.generated.Grouping;
import org.apache.storm.generated.InvalidTopologyException;
import org.apache.storm.generated.NodeInfo;
import org.apache.storm.generated.SpoutSpec;
import org.apache.storm.generated.StateSpoutSpec;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.generated.StreamInfo;
import org.apache.storm.metric.EventLoggerBolt;
import org.apache.storm.metric.MetricsConsumerBolt;
import org.apache.storm.metric.SystemBolt;
import org.apache.storm.metric.filter.FilterByMetricName;
import org.apache.storm.metric.util.DataPointExpander;
import org.apache.storm.security.auth.IAuthorizer;
import org.apache.storm.task.IBolt;
import org.apache.storm.task.WorkerTopologyContext;
import org.apache.storm.utils.ConfigUtils;
import org.apache.storm.utils.ObjectReader;
import org.apache.storm.utils.ThriftTopologyUtils;
import org.apache.storm.utils.Utils;
import org.json.simple.JSONValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class StormCommon {
    private static StormCommon _instance = new StormCommon();
    private static final Logger LOG = LoggerFactory.getLogger(StormCommon.class);
    public static final String SYSTEM_STREAM_ID = "__system";
    public static final String EVENTLOGGER_COMPONENT_ID = "__eventlogger";
    public static final String EVENTLOGGER_STREAM_ID = "__eventlog";
    public static final String TOPOLOGY_METRICS_CONSUMER_CLASS = "class";
    public static final String TOPOLOGY_METRICS_CONSUMER_ARGUMENT = "argument";
    public static final String TOPOLOGY_METRICS_CONSUMER_MAX_RETAIN_METRIC_TUPLES = "max.retain.metric.tuples";
    public static final String TOPOLOGY_METRICS_CONSUMER_PARALLELISM_HINT = "parallelism.hint";
    public static final String TOPOLOGY_METRICS_CONSUMER_WHITELIST = "whitelist";
    public static final String TOPOLOGY_METRICS_CONSUMER_BLACKLIST = "blacklist";
    public static final String TOPOLOGY_METRICS_CONSUMER_EXPAND_MAP_TYPE = "expandMapType";
    public static final String TOPOLOGY_METRICS_CONSUMER_METRIC_NAME_SEPARATOR = "metricNameSeparator";

    public static StormCommon setInstance(StormCommon common) {
        StormCommon oldInstance = _instance;
        _instance = common;
        return oldInstance;
    }

    @Deprecated
    public static String getStormId(IStormClusterState stormClusterState, String topologyName) {
        return stormClusterState.getTopoId(topologyName).get();
    }

    public static void validateDistributedMode(Map<String, Object> conf) {
        if (ConfigUtils.isLocalMode(conf)) {
            throw new IllegalArgumentException("Cannot start server in local mode!");
        }
    }

    private static void validateIds(StormTopology topology) throws InvalidTopologyException {
        ArrayList<String> componentIds = new ArrayList<String>();
        for (StormTopology._Fields field : Thrift.getTopologyFields()) {
            if (ThriftTopologyUtils.isWorkerHook(field) || ThriftTopologyUtils.isDependencies(field)) continue;
            Object value = topology.getFieldValue(field);
            Map componentMap = (Map)value;
            componentIds.addAll(componentMap.keySet());
            for (String id : componentMap.keySet()) {
                if (!Utils.isSystemId(id)) continue;
                throw new InvalidTopologyException(id + " is not a valid component id.");
            }
            for (Object componentObj : componentMap.values()) {
                ComponentCommon common = StormCommon.getComponentCommon(componentObj);
                Set<String> streamIds = common.get_streams().keySet();
                for (String id : streamIds) {
                    if (!Utils.isSystemId(id)) continue;
                    throw new InvalidTopologyException(id + " is not a valid stream id.");
                }
            }
        }
        List<String> offending = Utils.getRepeat(componentIds);
        if (!offending.isEmpty()) {
            throw new InvalidTopologyException("Duplicate component ids: " + offending);
        }
    }

    private static boolean isEmptyInputs(ComponentCommon common) {
        if (common.get_inputs() == null) {
            return true;
        }
        return common.get_inputs().isEmpty();
    }

    public static Map<String, Object> allComponents(StormTopology topology) {
        HashMap<String, Object> components = new HashMap<String, Object>();
        List<StormTopology._Fields> topologyFields = Arrays.asList(Thrift.getTopologyFields());
        for (StormTopology._Fields field : topologyFields) {
            if (ThriftTopologyUtils.isWorkerHook(field) || ThriftTopologyUtils.isDependencies(field)) continue;
            components.putAll((Map)topology.getFieldValue(field));
        }
        return components;
    }

    public static Map<String, Object> componentConf(Object component) {
        try {
            HashMap<String, Object> conf = new HashMap<String, Object>();
            ComponentCommon common = StormCommon.getComponentCommon(component);
            String jconf = common.get_json_conf();
            if (jconf != null) {
                conf.putAll((Map)JSONValue.parseWithException((String)jconf));
            }
            return conf;
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    public static void validateBasic(StormTopology topology) throws InvalidTopologyException {
        StormCommon.validateIds(topology);
        for (StormTopology._Fields field : Thrift.getSpoutFields()) {
            Map spoutComponents = (Map)topology.getFieldValue(field);
            if (spoutComponents == null) continue;
            for (Object obj : spoutComponents.values()) {
                ComponentCommon common = StormCommon.getComponentCommon(obj);
                if (StormCommon.isEmptyInputs(common)) continue;
                throw new InvalidTopologyException("May not declare inputs for a spout");
            }
        }
        Map<String, Object> componentMap = StormCommon.allComponents(topology);
        for (Object componentObj : componentMap.values()) {
            Map<String, Object> conf = StormCommon.componentConf(componentObj);
            ComponentCommon common = StormCommon.getComponentCommon(componentObj);
            int parallelismHintNum = Thrift.getParallelismHint(common);
            Integer taskNum = ObjectReader.getInt(conf.get("topology.tasks"), 0);
            if (taskNum <= 0 || parallelismHintNum > 0) continue;
            throw new InvalidTopologyException("Number of executors must be greater than 0 when number of tasks is greater than 0");
        }
    }

    private static Set<String> getStreamOutputFields(Map<String, StreamInfo> streams) {
        HashSet<String> outputFields = new HashSet<String>();
        for (StreamInfo streamInfo : streams.values()) {
            outputFields.addAll(streamInfo.get_output_fields());
        }
        return outputFields;
    }

    public static void validateStructure(StormTopology topology) throws InvalidTopologyException {
        Map<String, Object> componentMap = StormCommon.allComponents(topology);
        for (Map.Entry<String, Object> entry : componentMap.entrySet()) {
            String componentId = entry.getKey();
            ComponentCommon common = StormCommon.getComponentCommon(entry.getValue());
            Map<GlobalStreamId, Grouping> inputs = common.get_inputs();
            for (Map.Entry<GlobalStreamId, Grouping> input : inputs.entrySet()) {
                String sourceStreamId = input.getKey().get_streamId();
                String sourceComponentId = input.getKey().get_componentId();
                if (!componentMap.keySet().contains(sourceComponentId)) {
                    throw new InvalidTopologyException("Component: [" + componentId + "] subscribes from non-existent component [" + sourceComponentId + "]");
                }
                ComponentCommon sourceComponent = StormCommon.getComponentCommon(componentMap.get(sourceComponentId));
                if (!sourceComponent.get_streams().containsKey(sourceStreamId)) {
                    throw new InvalidTopologyException("Component: [" + componentId + "] subscribes from non-existent stream: [" + sourceStreamId + "] of component [" + sourceComponentId + "]");
                }
                Grouping grouping = input.getValue();
                if (Thrift.groupingType(grouping) != Grouping._Fields.FIELDS) continue;
                ArrayList<String> fields = new ArrayList<String>(grouping.get_fields());
                Map<String, StreamInfo> streams = sourceComponent.get_streams();
                Set<String> sourceOutputFields = StormCommon.getStreamOutputFields(streams);
                fields.removeAll(sourceOutputFields);
                if (fields.size() == 0) continue;
                throw new InvalidTopologyException("Component: [" + componentId + "] subscribes from stream: [" + sourceStreamId + "] of component [" + sourceComponentId + "] + with non-existent fields: " + fields);
            }
        }
    }

    public static Map<GlobalStreamId, Grouping> ackerInputs(StormTopology topology) {
        HashMap<GlobalStreamId, Grouping> inputs = new HashMap<GlobalStreamId, Grouping>();
        Set<String> boltIds = topology.get_bolts().keySet();
        Set<String> spoutIds = topology.get_spouts().keySet();
        for (String id : spoutIds) {
            inputs.put(Utils.getGlobalStreamId(id, "__ack_init"), Thrift.prepareFieldsGrouping(Arrays.asList("id")));
        }
        for (String id : boltIds) {
            inputs.put(Utils.getGlobalStreamId(id, "__ack_ack"), Thrift.prepareFieldsGrouping(Arrays.asList("id")));
            inputs.put(Utils.getGlobalStreamId(id, "__ack_fail"), Thrift.prepareFieldsGrouping(Arrays.asList("id")));
            inputs.put(Utils.getGlobalStreamId(id, "__ack_reset_timeout"), Thrift.prepareFieldsGrouping(Arrays.asList("id")));
        }
        return inputs;
    }

    public static IBolt makeAckerBolt() {
        return _instance.makeAckerBoltImpl();
    }

    public IBolt makeAckerBoltImpl() {
        return new Acker();
    }

    public static void addAcker(Map<String, Object> conf, StormTopology topology) {
        ComponentCommon common;
        int ackerNum = ObjectReader.getInt(conf.get("topology.acker.executors"), ObjectReader.getInt(conf.get("topology.workers")));
        Map<GlobalStreamId, Grouping> inputs = StormCommon.ackerInputs(topology);
        HashMap<String, StreamInfo> outputStreams = new HashMap<String, StreamInfo>();
        outputStreams.put("__ack_ack", Thrift.directOutputFields(Arrays.asList("id", "time-delta-ms")));
        outputStreams.put("__ack_fail", Thrift.directOutputFields(Arrays.asList("id", "time-delta-ms")));
        outputStreams.put("__ack_reset_timeout", Thrift.directOutputFields(Arrays.asList("id", "time-delta-ms")));
        HashMap<String, Object> ackerConf = new HashMap<String, Object>();
        ackerConf.put("topology.tasks", ackerNum);
        ackerConf.put("topology.tick.tuple.freq.secs", ObjectReader.getInt(conf.get("topology.message.timeout.secs")));
        Bolt acker = Thrift.prepareSerializedBoltDetails(inputs, StormCommon.makeAckerBolt(), outputStreams, ackerNum, ackerConf);
        for (Bolt bolt : topology.get_bolts().values()) {
            common = bolt.get_common();
            common.put_to_streams("__ack_ack", Thrift.outputFields(Arrays.asList("id", "ack-val")));
            common.put_to_streams("__ack_fail", Thrift.outputFields(Arrays.asList("id")));
            common.put_to_streams("__ack_reset_timeout", Thrift.outputFields(Arrays.asList("id")));
        }
        for (SpoutSpec spout : topology.get_spouts().values()) {
            common = spout.get_common();
            Map<String, Object> spoutConf = StormCommon.componentConf(spout);
            spoutConf.put("topology.tick.tuple.freq.secs", ObjectReader.getInt(conf.get("topology.message.timeout.secs")));
            common.set_json_conf(JSONValue.toJSONString(spoutConf));
            common.put_to_streams("__ack_init", Thrift.outputFields(Arrays.asList("id", "init-val", "spout-task")));
            common.put_to_inputs(Utils.getGlobalStreamId("__acker", "__ack_ack"), Thrift.prepareDirectGrouping());
            common.put_to_inputs(Utils.getGlobalStreamId("__acker", "__ack_fail"), Thrift.prepareDirectGrouping());
            common.put_to_inputs(Utils.getGlobalStreamId("__acker", "__ack_reset_timeout"), Thrift.prepareDirectGrouping());
        }
        topology.put_to_bolts("__acker", acker);
    }

    public static ComponentCommon getComponentCommon(Object component) {
        ComponentCommon common = null;
        if (component instanceof StateSpoutSpec) {
            common = ((StateSpoutSpec)component).get_common();
        } else if (component instanceof SpoutSpec) {
            common = ((SpoutSpec)component).get_common();
        } else if (component instanceof Bolt) {
            common = ((Bolt)component).get_common();
        }
        return common;
    }

    public static void addMetricStreams(StormTopology topology) {
        for (Object component : StormCommon.allComponents(topology).values()) {
            ComponentCommon common = StormCommon.getComponentCommon(component);
            StreamInfo streamInfo = Thrift.outputFields(Arrays.asList("task-info", "data-points"));
            common.put_to_streams("__metrics", streamInfo);
        }
    }

    public static void addSystemStreams(StormTopology topology) {
        for (Object component : StormCommon.allComponents(topology).values()) {
            ComponentCommon common = StormCommon.getComponentCommon(component);
            StreamInfo streamInfo = Thrift.outputFields(Arrays.asList("event"));
            common.put_to_streams(SYSTEM_STREAM_ID, streamInfo);
        }
    }

    public static List<String> eventLoggerBoltFields() {
        return Arrays.asList("component-id", "message-id", "ts", "values");
    }

    public static Map<GlobalStreamId, Grouping> eventLoggerInputs(StormTopology topology) {
        HashMap<GlobalStreamId, Grouping> inputs = new HashMap<GlobalStreamId, Grouping>();
        HashSet<String> allIds = new HashSet<String>();
        allIds.addAll(topology.get_bolts().keySet());
        allIds.addAll(topology.get_spouts().keySet());
        for (String id : allIds) {
            inputs.put(Utils.getGlobalStreamId(id, EVENTLOGGER_STREAM_ID), Thrift.prepareFieldsGrouping(Arrays.asList("component-id")));
        }
        return inputs;
    }

    public static void addEventLogger(Map<String, Object> conf, StormTopology topology) {
        Integer numExecutors = ObjectReader.getInt(conf.get("topology.eventlogger.executors"), ObjectReader.getInt(conf.get("topology.workers")));
        HashMap<String, Object> componentConf = new HashMap<String, Object>();
        componentConf.put("topology.tasks", numExecutors);
        componentConf.put("topology.tick.tuple.freq.secs", ObjectReader.getInt(conf.get("topology.message.timeout.secs")));
        Bolt eventLoggerBolt = Thrift.prepareSerializedBoltDetails(StormCommon.eventLoggerInputs(topology), new EventLoggerBolt(), null, numExecutors, componentConf);
        for (Object component : StormCommon.allComponents(topology).values()) {
            ComponentCommon common = StormCommon.getComponentCommon(component);
            common.put_to_streams(EVENTLOGGER_STREAM_ID, Thrift.outputFields(StormCommon.eventLoggerBoltFields()));
        }
        topology.put_to_bolts(EVENTLOGGER_COMPONENT_ID, eventLoggerBolt);
    }

    public static Map<String, Bolt> metricsConsumerBoltSpecs(Map<String, Object> conf, StormTopology topology) {
        HashMap<String, Bolt> metricsConsumerBolts = new HashMap<String, Bolt>();
        HashSet<String> componentIdsEmitMetrics = new HashSet<String>();
        componentIdsEmitMetrics.addAll(StormCommon.allComponents(topology).keySet());
        componentIdsEmitMetrics.add(SYSTEM_STREAM_ID);
        HashMap<GlobalStreamId, Grouping> inputs = new HashMap<GlobalStreamId, Grouping>();
        for (String componentId : componentIdsEmitMetrics) {
            inputs.put(Utils.getGlobalStreamId(componentId, "__metrics"), Thrift.prepareShuffleGrouping());
        }
        List registerInfo = (List)conf.get("topology.metrics.consumer.register");
        if (registerInfo != null) {
            HashMap<String, Integer> classOccurrencesMap = new HashMap<String, Integer>();
            for (Map info : registerInfo) {
                String className = (String)info.get(TOPOLOGY_METRICS_CONSUMER_CLASS);
                Object argument = info.get(TOPOLOGY_METRICS_CONSUMER_ARGUMENT);
                Integer maxRetainMetricTuples = ObjectReader.getInt(info.get(TOPOLOGY_METRICS_CONSUMER_MAX_RETAIN_METRIC_TUPLES), 100);
                Integer phintNum = ObjectReader.getInt(info.get(TOPOLOGY_METRICS_CONSUMER_PARALLELISM_HINT), 1);
                HashMap<String, Object> metricsConsumerConf = new HashMap<String, Object>();
                metricsConsumerConf.put("topology.tasks", phintNum);
                List whitelist = (List)info.get(TOPOLOGY_METRICS_CONSUMER_WHITELIST);
                List blacklist = (List)info.get(TOPOLOGY_METRICS_CONSUMER_BLACKLIST);
                FilterByMetricName filterPredicate = new FilterByMetricName(whitelist, blacklist);
                Boolean expandMapType = ObjectReader.getBoolean(info.get(TOPOLOGY_METRICS_CONSUMER_EXPAND_MAP_TYPE), false);
                String metricNameSeparator = ObjectReader.getString(info.get(TOPOLOGY_METRICS_CONSUMER_METRIC_NAME_SEPARATOR), ".");
                DataPointExpander expander = new DataPointExpander(expandMapType, metricNameSeparator);
                MetricsConsumerBolt boltInstance = new MetricsConsumerBolt(className, argument, maxRetainMetricTuples, filterPredicate, expander);
                Bolt metricsConsumerBolt = Thrift.prepareSerializedBoltDetails(inputs, boltInstance, null, phintNum, metricsConsumerConf);
                String id = className;
                if (classOccurrencesMap.containsKey(className)) {
                    int occurrenceNum = (Integer)classOccurrencesMap.get(className);
                    classOccurrencesMap.put(className, ++occurrenceNum);
                    id = "__metrics" + className + "#" + occurrenceNum;
                } else {
                    classOccurrencesMap.put(className, 1);
                }
                metricsConsumerBolts.put(id, metricsConsumerBolt);
            }
        }
        return metricsConsumerBolts;
    }

    public static void addMetricComponents(Map<String, Object> conf, StormTopology topology) {
        Map<String, Bolt> metricsConsumerBolts = StormCommon.metricsConsumerBoltSpecs(conf, topology);
        for (Map.Entry<String, Bolt> entry : metricsConsumerBolts.entrySet()) {
            topology.put_to_bolts(entry.getKey(), entry.getValue());
        }
    }

    public static void addSystemComponents(Map<String, Object> conf, StormTopology topology) {
        HashMap<String, StreamInfo> outputStreams = new HashMap<String, StreamInfo>();
        outputStreams.put("__tick", Thrift.outputFields(Arrays.asList("rate_secs")));
        outputStreams.put("__metrics_tick", Thrift.outputFields(Arrays.asList("interval")));
        outputStreams.put("__credentials", Thrift.outputFields(Arrays.asList("creds")));
        HashMap<String, Object> boltConf = new HashMap<String, Object>();
        boltConf.put("topology.tasks", 0);
        Bolt systemBoltSpec = Thrift.prepareSerializedBoltDetails(null, new SystemBolt(), outputStreams, 0, boltConf);
        topology.put_to_bolts(SYSTEM_STREAM_ID, systemBoltSpec);
    }

    public static StormTopology systemTopology(Map<String, Object> topoConf, StormTopology topology) throws InvalidTopologyException {
        return _instance.systemTopologyImpl(topoConf, topology);
    }

    protected StormTopology systemTopologyImpl(Map<String, Object> topoConf, StormTopology topology) throws InvalidTopologyException {
        StormCommon.validateBasic(topology);
        StormTopology ret = topology.deepCopy();
        StormCommon.addAcker(topoConf, ret);
        if (StormCommon.hasEventLoggers(topoConf)) {
            StormCommon.addEventLogger(topoConf, ret);
        }
        StormCommon.addMetricComponents(topoConf, ret);
        StormCommon.addSystemComponents(topoConf, ret);
        StormCommon.addMetricStreams(ret);
        StormCommon.addSystemStreams(ret);
        StormCommon.validateStructure(ret);
        return ret;
    }

    public static boolean hasAckers(Map<String, Object> topoConf) {
        Object ackerNum = topoConf.get("topology.acker.executors");
        return ackerNum == null || ObjectReader.getInt(ackerNum) > 0;
    }

    public static boolean hasEventLoggers(Map<String, Object> topoConf) {
        Object eventLoggerNum = topoConf.get("topology.eventlogger.executors");
        return eventLoggerNum == null || ObjectReader.getInt(eventLoggerNum) > 0;
    }

    public static int numStartExecutors(Object component) throws InvalidTopologyException {
        ComponentCommon common = StormCommon.getComponentCommon(component);
        return Thrift.getParallelismHint(common);
    }

    public static Map<Integer, String> stormTaskInfo(StormTopology userTopology, Map<String, Object> topoConf) throws InvalidTopologyException {
        return _instance.stormTaskInfoImpl(userTopology, topoConf);
    }

    protected Map<Integer, String> stormTaskInfoImpl(StormTopology userTopology, Map<String, Object> topoConf) throws InvalidTopologyException {
        HashMap<Integer, String> taskIdToComponentId = new HashMap<Integer, String>();
        StormTopology systemTopology = StormCommon.systemTopology(topoConf, userTopology);
        Map<String, Object> components = StormCommon.allComponents(systemTopology);
        TreeMap<String, Integer> componentIdToTaskNum = new TreeMap<String, Integer>();
        for (Map.Entry<String, Object> entry : components.entrySet()) {
            Map<String, Object> conf = StormCommon.componentConf(entry.getValue());
            Object taskNum = conf.get("topology.tasks");
            componentIdToTaskNum.put(entry.getKey(), ObjectReader.getInt(taskNum));
        }
        int taskId = 1;
        for (Map.Entry entry : componentIdToTaskNum.entrySet()) {
            String componentId = (String)entry.getKey();
            Integer taskNum = (Integer)entry.getValue();
            while (taskNum > 0) {
                taskIdToComponentId.put(taskId, componentId);
                Integer n = taskNum;
                Integer n2 = taskNum = Integer.valueOf(taskNum - 1);
                ++taskId;
            }
        }
        return taskIdToComponentId;
    }

    public static List<Integer> executorIdToTasks(List<Long> executorId) {
        ArrayList<Integer> taskIds = new ArrayList<Integer>();
        for (int taskId = executorId.get(0).intValue(); taskId <= executorId.get(1).intValue(); ++taskId) {
            taskIds.add(taskId);
        }
        return taskIds;
    }

    public static Map<Integer, NodeInfo> taskToNodeport(Map<List<Long>, NodeInfo> executorToNodePort) {
        HashMap<Integer, NodeInfo> tasksToNodePort = new HashMap<Integer, NodeInfo>();
        for (Map.Entry<List<Long>, NodeInfo> entry : executorToNodePort.entrySet()) {
            List<Integer> taskIds = StormCommon.executorIdToTasks(entry.getKey());
            for (Integer taskId : taskIds) {
                tasksToNodePort.put(taskId, entry.getValue());
            }
        }
        return tasksToNodePort;
    }

    public static IAuthorizer mkAuthorizationHandler(String klassName, Map<String, Object> conf) throws IllegalAccessException, InstantiationException, ClassNotFoundException {
        return _instance.mkAuthorizationHandlerImpl(klassName, conf);
    }

    protected IAuthorizer mkAuthorizationHandlerImpl(String klassName, Map<String, Object> conf) throws ClassNotFoundException, IllegalAccessException, InstantiationException {
        Class<?> aznClass;
        IAuthorizer aznHandler = null;
        if (StringUtils.isNotBlank((String)klassName) && (aznClass = Class.forName(klassName)) != null) {
            aznHandler = (IAuthorizer)aznClass.newInstance();
            if (aznHandler != null) {
                aznHandler.prepare(conf);
            }
            LOG.debug("authorization class name:{}, class:{}, handler:{}", new Object[]{klassName, aznClass, aznHandler});
        }
        return aznHandler;
    }

    public static WorkerTopologyContext makeWorkerContext(Map<String, Object> workerData) {
        try {
            StormTopology stormTopology = (StormTopology)workerData.get("system-topology");
            Map topoConf = (Map)workerData.get("storm-conf");
            Map taskToComponent = (Map)workerData.get("task->component");
            Map componentToSortedTasks = (Map)workerData.get("component->sorted-tasks");
            Map componentToStreamToFields = (Map)workerData.get("component->stream->fields");
            String stormId = (String)workerData.get("storm-id");
            Map conf = (Map)workerData.get("conf");
            Integer port = (Integer)workerData.get("port");
            String codeDir = ConfigUtils.supervisorStormResourcesPath(ConfigUtils.supervisorStormDistRoot(conf, stormId));
            String pidDir = ConfigUtils.workerPidsRoot(conf, stormId);
            List workerTasks = (List)workerData.get("task-ids");
            Map defaultResources = (Map)workerData.get("default-shared-resources");
            Map userResources = (Map)workerData.get("user-shared-resources");
            return new WorkerTopologyContext(stormTopology, topoConf, taskToComponent, componentToSortedTasks, componentToStreamToFields, stormId, codeDir, pidDir, port, workerTasks, defaultResources, userResources);
        }
        catch (IOException e) {
            throw Utils.wrapInRuntime(e);
        }
    }
}

