/*
 * Decompiled with CFR 0.152.
 */
package org.apache.storm.trident;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import org.apache.storm.ILocalDRPC;
import org.apache.storm.drpc.DRPCSpout;
import org.apache.storm.generated.GlobalStreamId;
import org.apache.storm.generated.Grouping;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.grouping.CustomStreamGrouping;
import org.apache.storm.topology.BoltDeclarer;
import org.apache.storm.topology.IRichSpout;
import org.apache.storm.topology.SpoutDeclarer;
import org.apache.storm.trident.JoinOutFieldsMode;
import org.apache.storm.trident.JoinType;
import org.apache.storm.trident.Stream;
import org.apache.storm.trident.TridentState;
import org.apache.storm.trident.drpc.ReturnResultsReducer;
import org.apache.storm.trident.fluent.GroupedStream;
import org.apache.storm.trident.fluent.IAggregatableStream;
import org.apache.storm.trident.fluent.UniqueIdGen;
import org.apache.storm.trident.graph.GraphGrouper;
import org.apache.storm.trident.graph.Group;
import org.apache.storm.trident.operation.DefaultResourceDeclarer;
import org.apache.storm.trident.operation.GroupedMultiReducer;
import org.apache.storm.trident.operation.MultiReducer;
import org.apache.storm.trident.operation.impl.FilterExecutor;
import org.apache.storm.trident.operation.impl.GroupedMultiReducerExecutor;
import org.apache.storm.trident.operation.impl.IdentityMultiReducer;
import org.apache.storm.trident.operation.impl.JoinerMultiReducer;
import org.apache.storm.trident.operation.impl.PreservingFieldsOrderJoinerMultiReducer;
import org.apache.storm.trident.operation.impl.TrueFilter;
import org.apache.storm.trident.partition.IdentityGrouping;
import org.apache.storm.trident.planner.Node;
import org.apache.storm.trident.planner.NodeStateInfo;
import org.apache.storm.trident.planner.PartitionNode;
import org.apache.storm.trident.planner.ProcessorNode;
import org.apache.storm.trident.planner.SpoutNode;
import org.apache.storm.trident.planner.SubtopologyBolt;
import org.apache.storm.trident.planner.processor.EachProcessor;
import org.apache.storm.trident.planner.processor.MultiReducerProcessor;
import org.apache.storm.trident.spout.BatchSpoutExecutor;
import org.apache.storm.trident.spout.IBatchSpout;
import org.apache.storm.trident.spout.IOpaquePartitionedTridentSpout;
import org.apache.storm.trident.spout.IPartitionedTridentSpout;
import org.apache.storm.trident.spout.ITridentDataSource;
import org.apache.storm.trident.spout.ITridentSpout;
import org.apache.storm.trident.spout.OpaquePartitionedTridentSpoutExecutor;
import org.apache.storm.trident.spout.PartitionedTridentSpoutExecutor;
import org.apache.storm.trident.spout.RichSpoutBatchExecutor;
import org.apache.storm.trident.state.StateFactory;
import org.apache.storm.trident.state.StateSpec;
import org.apache.storm.trident.topology.TridentTopologyBuilder;
import org.apache.storm.trident.util.ErrorEdgeFactory;
import org.apache.storm.trident.util.IndexedEdge;
import org.apache.storm.trident.util.TridentUtils;
import org.apache.storm.tuple.Fields;
import org.apache.storm.utils.Utils;
import org.jgrapht.DirectedGraph;
import org.jgrapht.EdgeFactory;
import org.jgrapht.UndirectedGraph;
import org.jgrapht.alg.ConnectivityInspector;
import org.jgrapht.graph.DefaultDirectedGraph;
import org.jgrapht.graph.Pseudograph;

public class TridentTopology {
    final DefaultDirectedGraph<Node, IndexedEdge> _graph;
    final Map<String, List<Node>> _colocate;
    final UniqueIdGen _gen;
    Map<String, Number> _resourceDefaults = new HashMap<String, Number>();
    Map<String, Number> _masterCoordResources = new HashMap<String, Number>();

    public TridentTopology() {
        this((DefaultDirectedGraph<Node, IndexedEdge>)new DefaultDirectedGraph((EdgeFactory)new ErrorEdgeFactory()), new LinkedHashMap<String, List<Node>>(), new UniqueIdGen());
    }

    private TridentTopology(DefaultDirectedGraph<Node, IndexedEdge> graph, Map<String, List<Node>> colocate, UniqueIdGen gen) {
        this._graph = graph;
        this._colocate = colocate;
        this._gen = gen;
    }

    public Stream newStream(String txId, IRichSpout spout) {
        return this.newStream(txId, new RichSpoutBatchExecutor(spout));
    }

    public Stream newStream(String txId, IBatchSpout spout) {
        SpoutNode n = new SpoutNode(this.getUniqueStreamId(), spout.getOutputFields(), txId, spout, SpoutNode.SpoutType.BATCH);
        return this.addNode(n);
    }

    public Stream newStream(String txId, ITridentSpout spout) {
        SpoutNode n = new SpoutNode(this.getUniqueStreamId(), spout.getOutputFields(), txId, spout, SpoutNode.SpoutType.BATCH);
        return this.addNode(n);
    }

    public Stream newStream(String txId, IPartitionedTridentSpout spout) {
        return this.newStream(txId, new PartitionedTridentSpoutExecutor(spout));
    }

    public Stream newStream(String txId, IOpaquePartitionedTridentSpout spout) {
        return this.newStream(txId, new OpaquePartitionedTridentSpoutExecutor(spout));
    }

    public Stream newStream(String txId, ITridentDataSource dataSource) {
        if (dataSource instanceof IBatchSpout) {
            return this.newStream(txId, (IBatchSpout)dataSource);
        }
        if (dataSource instanceof ITridentSpout) {
            return this.newStream(txId, (ITridentSpout)dataSource);
        }
        if (dataSource instanceof IPartitionedTridentSpout) {
            return this.newStream(txId, (IPartitionedTridentSpout)dataSource);
        }
        if (dataSource instanceof IOpaquePartitionedTridentSpout) {
            return this.newStream(txId, (IOpaquePartitionedTridentSpout)dataSource);
        }
        throw new UnsupportedOperationException("Unsupported stream");
    }

    public Stream newDRPCStream(String function) {
        return this.newDRPCStream(new DRPCSpout(function));
    }

    public Stream newDRPCStream(String function, ILocalDRPC server) {
        DRPCSpout spout = server == null ? new DRPCSpout(function) : new DRPCSpout(function, server);
        return this.newDRPCStream(spout);
    }

    private Stream newDRPCStream(DRPCSpout spout) {
        SpoutNode n = new SpoutNode(this.getUniqueStreamId(), TridentUtils.getSingleOutputStreamFields(spout), null, spout, SpoutNode.SpoutType.DRPC);
        Stream nextStream = this.addNode(n);
        return nextStream.project(new Fields("args"));
    }

    public TridentState newStaticState(StateFactory factory) {
        return this.newStaticState(new StateSpec(factory));
    }

    public TridentState newStaticState(StateSpec spec) {
        String stateId = this.getUniqueStateId();
        Node n = new Node(this.getUniqueStreamId(), null, new Fields(new String[0]));
        n.stateInfo = new NodeStateInfo(stateId, spec);
        this.registerNode(n);
        return new TridentState(this, n);
    }

    public Stream multiReduce(Stream s1, Stream s2, MultiReducer function, Fields outputFields) {
        return this.multiReduce(Arrays.asList(s1, s2), function, outputFields);
    }

    public Stream multiReduce(Fields inputFields1, Stream s1, Fields inputFields2, Stream s2, MultiReducer function, Fields outputFields) {
        return this.multiReduce(Arrays.asList(inputFields1, inputFields2), Arrays.asList(s1, s2), function, outputFields);
    }

    public Stream multiReduce(GroupedStream s1, GroupedStream s2, GroupedMultiReducer function, Fields outputFields) {
        return this.multiReduce(Arrays.asList(s1, s2), function, outputFields);
    }

    public Stream multiReduce(Fields inputFields1, GroupedStream s1, Fields inputFields2, GroupedStream s2, GroupedMultiReducer function, Fields outputFields) {
        return this.multiReduce(Arrays.asList(inputFields1, inputFields2), Arrays.asList(s1, s2), function, outputFields);
    }

    public Stream multiReduce(List<Stream> streams, MultiReducer function, Fields outputFields) {
        return this.multiReduce(TridentTopology.getAllOutputFields(streams), streams, function, outputFields);
    }

    public Stream multiReduce(List<GroupedStream> streams, GroupedMultiReducer function, Fields outputFields) {
        return this.multiReduce(TridentTopology.getAllOutputFields(streams), streams, function, outputFields);
    }

    public Stream multiReduce(List<Fields> inputFields, List<Stream> streams, MultiReducer function, Fields outputFields) {
        ArrayList<String> names = new ArrayList<String>();
        for (Stream s : streams) {
            if (s._name == null) continue;
            names.add(s._name);
        }
        ProcessorNode n = new ProcessorNode(this.getUniqueStreamId(), Utils.join(names, "-"), outputFields, outputFields, new MultiReducerProcessor(inputFields, function));
        return this.addSourcedNode(streams, (Node)n);
    }

    public Stream multiReduce(List<Fields> inputFields, List<GroupedStream> groupedStreams, GroupedMultiReducer function, Fields outputFields) {
        ArrayList<Fields> fullInputFields = new ArrayList<Fields>();
        ArrayList<Stream> streams = new ArrayList<Stream>();
        ArrayList<Fields> fullGroupFields = new ArrayList<Fields>();
        for (int i = 0; i < groupedStreams.size(); ++i) {
            GroupedStream gs = groupedStreams.get(i);
            Fields groupFields = gs.getGroupFields();
            fullGroupFields.add(groupFields);
            streams.add(gs.toStream().partitionBy(groupFields));
            fullInputFields.add(TridentUtils.fieldsUnion(groupFields, inputFields.get(i)));
        }
        return this.multiReduce(fullInputFields, streams, (MultiReducer)new GroupedMultiReducerExecutor(function, fullGroupFields, inputFields), outputFields);
    }

    public Stream merge(Fields outputFields, Stream ... streams) {
        return this.merge(outputFields, Arrays.asList(streams));
    }

    public Stream merge(Fields outputFields, List<Stream> streams) {
        return this.multiReduce(streams, new IdentityMultiReducer(), outputFields);
    }

    public Stream merge(Stream ... streams) {
        return this.merge(Arrays.asList(streams));
    }

    public Stream merge(List<Stream> streams) {
        return this.merge(streams.get(0).getOutputFields(), streams);
    }

    public Stream join(Stream s1, Fields joinFields1, Stream s2, Fields joinFields2, Fields outFields) {
        return this.join(Arrays.asList(s1, s2), Arrays.asList(joinFields1, joinFields2), outFields);
    }

    public Stream join(List<Stream> streams, List<Fields> joinFields, Fields outFields) {
        return this.join(streams, joinFields, outFields, JoinType.INNER);
    }

    public Stream join(Stream s1, Fields joinFields1, Stream s2, Fields joinFields2, Fields outFields, JoinType type) {
        return this.join(Arrays.asList(s1, s2), Arrays.asList(joinFields1, joinFields2), outFields, type);
    }

    public Stream join(List<Stream> streams, List<Fields> joinFields, Fields outFields, JoinType type) {
        return this.join(streams, joinFields, outFields, TridentTopology.repeat(streams.size(), type));
    }

    public Stream join(Stream s1, Fields joinFields1, Stream s2, Fields joinFields2, Fields outFields, List<JoinType> mixed) {
        return this.join(Arrays.asList(s1, s2), Arrays.asList(joinFields1, joinFields2), outFields, mixed);
    }

    public Stream join(List<Stream> streams, List<Fields> joinFields, Fields outFields, List<JoinType> mixed) {
        return this.join(streams, joinFields, outFields, mixed, JoinOutFieldsMode.COMPACT);
    }

    public Stream join(Stream s1, Fields joinFields1, Stream s2, Fields joinFields2, Fields outFields, JoinOutFieldsMode mode) {
        return this.join(Arrays.asList(s1, s2), Arrays.asList(joinFields1, joinFields2), outFields, mode);
    }

    public Stream join(List<Stream> streams, List<Fields> joinFields, Fields outFields, JoinOutFieldsMode mode) {
        return this.join(streams, joinFields, outFields, JoinType.INNER, mode);
    }

    public Stream join(Stream s1, Fields joinFields1, Stream s2, Fields joinFields2, Fields outFields, JoinType type, JoinOutFieldsMode mode) {
        return this.join(Arrays.asList(s1, s2), Arrays.asList(joinFields1, joinFields2), outFields, type, mode);
    }

    public Stream join(List<Stream> streams, List<Fields> joinFields, Fields outFields, JoinType type, JoinOutFieldsMode mode) {
        return this.join(streams, joinFields, outFields, TridentTopology.repeat(streams.size(), type), mode);
    }

    public Stream join(Stream s1, Fields joinFields1, Stream s2, Fields joinFields2, Fields outFields, List<JoinType> mixed, JoinOutFieldsMode mode) {
        return this.join(Arrays.asList(s1, s2), Arrays.asList(joinFields1, joinFields2), outFields, mixed, mode);
    }

    public Stream join(List<Stream> streams, List<Fields> joinFields, Fields outFields, List<JoinType> mixed, JoinOutFieldsMode mode) {
        switch (mode) {
            case COMPACT: {
                return this.multiReduce(TridentTopology.strippedInputFields(streams, joinFields), TridentTopology.groupedStreams(streams, joinFields), (GroupedMultiReducer)new JoinerMultiReducer(mixed, joinFields.get(0).size(), TridentTopology.strippedInputFields(streams, joinFields)), outFields);
            }
            case PRESERVE: {
                return this.multiReduce(TridentTopology.strippedInputFields(streams, joinFields), TridentTopology.groupedStreams(streams, joinFields), (GroupedMultiReducer)new PreservingFieldsOrderJoinerMultiReducer(mixed, joinFields.get(0).size(), TridentTopology.getAllOutputFields(streams), joinFields, TridentTopology.strippedInputFields(streams, joinFields)), outFields);
            }
        }
        throw new IllegalArgumentException("Unsupported out-fields mode: " + (Object)((Object)mode));
    }

    public TridentTopology setResourceDefaults(DefaultResourceDeclarer defaults) {
        this._resourceDefaults = defaults.getResources();
        return this;
    }

    public TridentTopology setMasterCoordResources(DefaultResourceDeclarer resources) {
        this._masterCoordResources = resources.getResources();
        return this;
    }

    public StormTopology build() {
        DefaultDirectedGraph graph = (DefaultDirectedGraph)this._graph.clone();
        TridentTopology.completeDRPC((DefaultDirectedGraph<Node, IndexedEdge>)graph, this._colocate, this._gen);
        ArrayList<SpoutNode> spoutNodes = new ArrayList<SpoutNode>();
        LinkedHashSet boltNodes = new LinkedHashSet();
        for (Object n : graph.vertexSet()) {
            if (n instanceof SpoutNode) {
                spoutNodes.add((SpoutNode)n);
                continue;
            }
            if (n instanceof PartitionNode) continue;
            boltNodes.add(n);
        }
        LinkedHashSet<Group> initialGroups = new LinkedHashSet<Group>();
        for (List list : this._colocate.values()) {
            Group g = new Group((DirectedGraph)graph, list);
            boltNodes.removeAll(list);
            initialGroups.add(g);
        }
        for (Node node : boltNodes) {
            initialGroups.add(new Group((DirectedGraph)graph, node));
        }
        GraphGrouper grouper = new GraphGrouper((DirectedGraph<Node, IndexedEdge>)graph, initialGroups);
        grouper.mergeFully();
        Collection<Group> collection = grouper.getAllGroups();
        for (Iterator e : new HashSet(graph.edgeSet())) {
            if (((IndexedEdge)((Object)e)).source instanceof PartitionNode || ((IndexedEdge)((Object)e)).target instanceof PartitionNode) continue;
            Group group = grouper.nodeGroup((Node)((IndexedEdge)((Object)e)).source);
            Group g2 = grouper.nodeGroup((Node)((IndexedEdge)((Object)e)).target);
            if (group == null && !(((IndexedEdge)((Object)e)).source instanceof SpoutNode)) {
                throw new RuntimeException("Planner exception: Null source group must indicate a spout node at this phase of planning");
            }
            if (group != null && group.equals(g2)) continue;
            graph.removeEdge(e);
            PartitionNode pNode = TridentTopology.makeIdentityPartition((Node)((IndexedEdge)((Object)e)).source);
            graph.addVertex((Object)pNode);
            graph.addEdge(((IndexedEdge)((Object)e)).source, (Object)pNode, new IndexedEdge<PartitionNode>((PartitionNode)((IndexedEdge)((Object)e)).source, pNode, 0));
            graph.addEdge((Object)pNode, ((IndexedEdge)((Object)e)).target, new IndexedEdge<PartitionNode>(pNode, (PartitionNode)((IndexedEdge)((Object)e)).target, ((IndexedEdge)((Object)e)).index));
        }
        ArrayList<Object> forNewGroups = new ArrayList<Object>();
        for (Group group : collection) {
            for (PartitionNode n : TridentTopology.extraPartitionInputs(group)) {
                Node idNode = this.makeIdentityNode(n.allOutputFields);
                PartitionNode newPartitionNode = new PartitionNode(idNode.streamId, n.name, idNode.allOutputFields, n.thriftGrouping);
                Iterator<Group> parentNode = TridentUtils.getParent((DirectedGraph)graph, n);
                Set outgoing = graph.outgoingEdgesOf((Object)n);
                graph.removeVertex((Object)n);
                graph.addVertex((Object)idNode);
                graph.addVertex((Object)newPartitionNode);
                TridentTopology.addEdge((DirectedGraph)graph, parentNode, idNode, 0);
                TridentTopology.addEdge((DirectedGraph)graph, idNode, newPartitionNode, 0);
                for (IndexedEdge e : outgoing) {
                    TridentTopology.addEdge((DirectedGraph)graph, newPartitionNode, e.target, e.index);
                }
                Group parentGroup = grouper.nodeGroup((Node)((Object)parentNode));
                if (parentGroup == null) {
                    forNewGroups.add(idNode);
                    continue;
                }
                parentGroup.nodes.add(idNode);
            }
        }
        for (Node node : forNewGroups) {
            grouper.addGroup(new Group((DirectedGraph)graph, node));
        }
        for (Node node : spoutNodes) {
            grouper.addGroup(new Group((DirectedGraph)graph, node));
        }
        grouper.reindex();
        Collection<Group> collection2 = grouper.getAllGroups();
        HashMap<Node, String> batchGroupMap = new HashMap<Node, String>();
        List list = new ConnectivityInspector((DirectedGraph)graph).connectedSets();
        for (int i = 0; i < list.size(); ++i) {
            String groupId = "bg" + i;
            for (Node n : (Set)list.get(i)) {
                batchGroupMap.put(n, groupId);
            }
        }
        Map<Group, Integer> parallelisms = TridentTopology.getGroupParallelisms((DirectedGraph<Node, IndexedEdge>)graph, grouper, collection2);
        TridentTopologyBuilder builder = new TridentTopologyBuilder();
        Map<Node, String> spoutIds = TridentTopology.genSpoutIds(spoutNodes);
        Map<Group, String> boltIds = TridentTopology.genBoltIds(collection2);
        for (SpoutNode sn : spoutNodes) {
            Integer parallelism = parallelisms.get(grouper.nodeGroup(sn));
            HashMap<String, Number> spoutRes = new HashMap<String, Number>(this._resourceDefaults);
            spoutRes.putAll(sn.getResources());
            Number onHeap = (Number)spoutRes.get("topology.component.resources.onheap.memory.mb");
            Number offHeap = (Number)spoutRes.get("topology.component.resources.offheap.memory.mb");
            Number cpuLoad = (Number)spoutRes.get("topology.component.cpu.pcore.percent");
            SpoutDeclarer spoutDeclarer = null;
            if (sn.type == SpoutNode.SpoutType.DRPC) {
                spoutDeclarer = builder.setBatchPerTupleSpout(spoutIds.get(sn), sn.streamId, (IRichSpout)sn.spout, parallelism, (String)batchGroupMap.get(sn));
            } else {
                ITridentSpout<Object> s;
                if (sn.spout instanceof IBatchSpout) {
                    s = new BatchSpoutExecutor((IBatchSpout)sn.spout);
                } else if (sn.spout instanceof ITridentSpout) {
                    s = (ITridentSpout)sn.spout;
                } else {
                    throw new RuntimeException("Regular rich spouts not supported yet... try wrapping in a RichSpoutBatchExecutor");
                }
                spoutDeclarer = builder.setSpout(spoutIds.get(sn), sn.streamId, sn.txId, s, parallelism, (String)batchGroupMap.get(sn));
            }
            if (onHeap != null) {
                if (offHeap != null) {
                    spoutDeclarer.setMemoryLoad(onHeap, offHeap);
                } else {
                    spoutDeclarer.setMemoryLoad(onHeap);
                }
            }
            if (cpuLoad == null) continue;
            spoutDeclarer.setCPULoad(cpuLoad);
        }
        for (Group g : collection2) {
            if (TridentTopology.isSpoutGroup(g)) continue;
            Integer p = parallelisms.get(g);
            Map<String, String> streamToGroup = TridentTopology.getOutputStreamBatchGroups(g, batchGroupMap);
            Map<String, Number> groupRes = g.getResources(this._resourceDefaults);
            Number onHeap = groupRes.get("topology.component.resources.onheap.memory.mb");
            Number offHeap = groupRes.get("topology.component.resources.offheap.memory.mb");
            Number cpuLoad = groupRes.get("topology.component.cpu.pcore.percent");
            BoltDeclarer d = builder.setBolt(boltIds.get(g), new SubtopologyBolt((DefaultDirectedGraph<Node, IndexedEdge>)graph, g.nodes, batchGroupMap), p, TridentTopology.committerBatches(g, batchGroupMap), streamToGroup);
            if (onHeap != null) {
                if (offHeap != null) {
                    d.setMemoryLoad(onHeap, offHeap);
                } else {
                    d.setMemoryLoad(onHeap);
                }
            }
            if (cpuLoad != null) {
                d.setCPULoad(cpuLoad);
            }
            Collection<PartitionNode> inputs = TridentTopology.uniquedSubscriptions(TridentTopology.externalGroupInputs(g));
            for (PartitionNode n : inputs) {
                Node parent = TridentUtils.getParent((DirectedGraph)graph, n);
                String componentId = parent instanceof SpoutNode ? spoutIds.get(parent) : boltIds.get(grouper.nodeGroup(parent));
                d.grouping(new GlobalStreamId(componentId, n.streamId), n.thriftGrouping);
            }
        }
        HashMap<String, Number> combinedMasterCoordResources = new HashMap<String, Number>(this._resourceDefaults);
        combinedMasterCoordResources.putAll(this._masterCoordResources);
        return builder.buildTopology(combinedMasterCoordResources);
    }

    private static Map<String, Number> mergeDefaultResources(Map<String, Number> res, Map defaultConfig) {
        HashMap<String, Number> ret = new HashMap<String, Number>();
        Number onHeapDefault = (Number)defaultConfig.get("topology.component.resources.onheap.memory.mb");
        Number offHeapDefault = (Number)defaultConfig.get("topology.component.resources.offheap.memory.mb");
        Number cpuLoadDefault = (Number)defaultConfig.get("topology.component.cpu.pcore.percent");
        if (res == null) {
            ret.put("topology.component.resources.onheap.memory.mb", onHeapDefault);
            ret.put("topology.component.resources.offheap.memory.mb", offHeapDefault);
            ret.put("topology.component.cpu.pcore.percent", cpuLoadDefault);
            return ret;
        }
        Number onHeap = res.get("topology.component.resources.onheap.memory.mb");
        Number offHeap = res.get("topology.component.resources.offheap.memory.mb");
        Number cpuLoad = res.get("topology.component.cpu.pcore.percent");
        onHeap = onHeap == null ? (Number)onHeapDefault : (Number)Math.max(onHeap.doubleValue(), onHeapDefault.doubleValue());
        offHeap = offHeap == null ? (Number)offHeapDefault : (Number)Math.max(offHeap.doubleValue(), offHeapDefault.doubleValue());
        cpuLoad = cpuLoad == null ? (Number)cpuLoadDefault : (Number)Math.max(cpuLoad.doubleValue(), cpuLoadDefault.doubleValue());
        ret.put("topology.component.resources.onheap.memory.mb", onHeap);
        ret.put("topology.component.resources.offheap.memory.mb", offHeap);
        ret.put("topology.component.cpu.pcore.percent", cpuLoad);
        return ret;
    }

    private static void completeDRPC(DefaultDirectedGraph<Node, IndexedEdge> graph, Map<String, List<Node>> colocate, UniqueIdGen gen) {
        List connectedComponents = new ConnectivityInspector(graph).connectedSets();
        for (Set g : connectedComponents) {
            TridentTopology.checkValidJoins(g);
        }
        TridentTopology helper = new TridentTopology(graph, colocate, gen);
        for (Set g : connectedComponents) {
            SpoutNode drpcNode = TridentTopology.getDRPCSpoutNode(g);
            if (drpcNode == null) continue;
            Stream lastStream = new Stream(helper, null, TridentTopology.getLastAddedNode(g));
            Stream s = new Stream(helper, null, drpcNode);
            helper.multiReduce(s.project(new Fields("return-info")).batchGlobal(), lastStream.batchGlobal(), (MultiReducer)new ReturnResultsReducer(), new Fields(new String[0]));
        }
    }

    private static Node getLastAddedNode(Collection<Node> g) {
        Node ret = null;
        for (Node n : g) {
            if (ret != null && n.creationIndex <= ret.creationIndex) continue;
            ret = n;
        }
        return ret;
    }

    private static SpoutNode getDRPCSpoutNode(Collection<Node> g) {
        for (Node n : g) {
            SpoutNode.SpoutType type;
            if (!(n instanceof SpoutNode) || (type = ((SpoutNode)n).type) != SpoutNode.SpoutType.DRPC) continue;
            return (SpoutNode)n;
        }
        return null;
    }

    private static void checkValidJoins(Collection<Node> g) {
        boolean hasDRPCSpout = false;
        boolean hasBatchSpout = false;
        for (Node n : g) {
            if (!(n instanceof SpoutNode)) continue;
            SpoutNode.SpoutType type = ((SpoutNode)n).type;
            if (type == SpoutNode.SpoutType.BATCH) {
                hasBatchSpout = true;
                continue;
            }
            if (type != SpoutNode.SpoutType.DRPC) continue;
            hasDRPCSpout = true;
        }
        if (hasBatchSpout && hasDRPCSpout) {
            throw new RuntimeException("Cannot join DRPC stream with streams originating from other spouts");
        }
    }

    private static boolean isSpoutGroup(Group g) {
        return g.nodes.size() == 1 && g.nodes.iterator().next() instanceof SpoutNode;
    }

    private static Collection<PartitionNode> uniquedSubscriptions(Set<PartitionNode> subscriptions) {
        HashMap<String, PartitionNode> ret = new HashMap<String, PartitionNode>();
        for (PartitionNode n : subscriptions) {
            PartitionNode curr = (PartitionNode)ret.get(n.streamId);
            if (curr != null && !curr.thriftGrouping.equals(n.thriftGrouping)) {
                throw new RuntimeException("Multiple subscriptions to the same stream with different groupings. Should be impossible since that is explicitly guarded against.");
            }
            ret.put(n.streamId, n);
        }
        return ret.values();
    }

    private static Map<Node, String> genSpoutIds(Collection<SpoutNode> spoutNodes) {
        HashMap<Node, String> ret = new HashMap<Node, String>();
        int ctr = 0;
        for (SpoutNode n : spoutNodes) {
            if (n.type == SpoutNode.SpoutType.BATCH) {
                ret.put(n, "spout-" + n.txId);
                continue;
            }
            if (n.type == SpoutNode.SpoutType.DRPC) {
                ret.put(n, "spout-" + ((DRPCSpout)n.spout).get_function() + ctr);
                ++ctr;
                continue;
            }
            ret.put(n, "spout" + ctr);
            ++ctr;
        }
        return ret;
    }

    private static Map<Group, String> genBoltIds(Collection<Group> groups) {
        HashMap<Group, String> ret = new HashMap<Group, String>();
        int ctr = 0;
        for (Group g : groups) {
            if (TridentTopology.isSpoutGroup(g)) continue;
            ArrayList<String> name = new ArrayList<String>();
            name.add("b");
            name.add("" + ctr);
            String groupName = TridentTopology.getGroupName(g);
            if (groupName != null && !groupName.isEmpty()) {
                name.add(TridentTopology.getGroupName(g));
            }
            ret.put(g, Utils.join(name, "-"));
            ++ctr;
        }
        return ret;
    }

    private static String getGroupName(Group g) {
        TreeMap<Integer, String> sortedNames = new TreeMap<Integer, String>();
        for (Node n : g.nodes) {
            if (n.name == null) continue;
            sortedNames.put(n.creationIndex, n.name);
        }
        ArrayList<String> names = new ArrayList<String>();
        String prevName = null;
        for (String n : sortedNames.values()) {
            if (prevName != null && n.equals(prevName)) continue;
            prevName = n;
            names.add(n);
        }
        return Utils.join(names, "-");
    }

    private static Map<String, String> getOutputStreamBatchGroups(Group g, Map<Node, String> batchGroupMap) {
        HashMap<String, String> ret = new HashMap<String, String>();
        Set<PartitionNode> externalGroupOutputs = TridentTopology.externalGroupOutputs(g);
        for (PartitionNode n : externalGroupOutputs) {
            ret.put(n.streamId, batchGroupMap.get(n));
        }
        return ret;
    }

    private static Set<String> committerBatches(Group g, Map<Node, String> batchGroupMap) {
        HashSet<String> ret = new HashSet<String>();
        for (Node n : g.nodes) {
            if (!(n instanceof ProcessorNode) || !((ProcessorNode)n).committer) continue;
            ret.add(batchGroupMap.get(n));
        }
        return ret;
    }

    private static Map<Group, Integer> getGroupParallelisms(DirectedGraph<Node, IndexedEdge> graph, GraphGrouper grouper, Collection<Group> groups) {
        Pseudograph equivs = new Pseudograph(Object.class);
        for (Group g : groups) {
            equivs.addVertex((Object)g);
        }
        for (Group g : groups) {
            for (PartitionNode n : TridentTopology.externalGroupInputs(g)) {
                Node parent;
                Group parentGroup;
                if (!TridentTopology.isIdentityPartition(n) || (parentGroup = grouper.nodeGroup(parent = (Node)TridentUtils.getParent(graph, n))) == null || parentGroup.equals(g)) continue;
                equivs.addEdge((Object)parentGroup, (Object)g);
            }
        }
        HashMap<Group, Integer> ret = new HashMap<Group, Integer>();
        List equivGroups = new ConnectivityInspector((UndirectedGraph)equivs).connectedSets();
        for (Set equivGroup : equivGroups) {
            Integer fixedP = TridentTopology.getFixedParallelism(equivGroup);
            Integer maxP = TridentTopology.getMaxParallelism(equivGroup);
            if (fixedP != null && maxP != null && maxP < fixedP) {
                throw new RuntimeException("Parallelism is fixed to " + fixedP + " but max parallelism is less than that: " + maxP);
            }
            Integer p = 1;
            for (Group g : equivGroup) {
                for (Node n : g.nodes) {
                    if (n.parallelismHint == null) continue;
                    p = Math.max(p, n.parallelismHint);
                }
            }
            if (maxP != null) {
                p = Math.min(maxP, p);
            }
            if (fixedP != null) {
                p = fixedP;
            }
            for (Group g : equivGroup) {
                ret.put(g, p);
            }
        }
        return ret;
    }

    private static Integer getMaxParallelism(Set<Group> groups) {
        Integer ret = null;
        for (Group g : groups) {
            Number maxP;
            if (!TridentTopology.isSpoutGroup(g)) continue;
            SpoutNode n = (SpoutNode)g.nodes.iterator().next();
            HashMap conf = TridentTopology.getSpoutComponentConfig(n.spout);
            if (conf == null) {
                conf = new HashMap();
            }
            if ((maxP = (Number)conf.get("topology.max.task.parallelism")) == null) continue;
            if (ret == null) {
                ret = maxP.intValue();
                continue;
            }
            ret = Math.min(ret, maxP.intValue());
        }
        return ret;
    }

    private static Map getSpoutComponentConfig(Object spout) {
        if (spout instanceof IRichSpout) {
            return ((IRichSpout)spout).getComponentConfiguration();
        }
        if (spout instanceof IBatchSpout) {
            return ((IBatchSpout)spout).getComponentConfiguration();
        }
        return ((ITridentSpout)spout).getComponentConfiguration();
    }

    private static Integer getFixedParallelism(Set<Group> groups) {
        Integer ret = null;
        for (Group g : groups) {
            for (Node n : g.nodes) {
                if (n.stateInfo == null || n.stateInfo.spec.requiredNumPartitions == null) continue;
                int reqPartitions = n.stateInfo.spec.requiredNumPartitions;
                if (ret != null && ret != reqPartitions) {
                    throw new RuntimeException("Cannot have one group have fixed parallelism of two different values");
                }
                ret = reqPartitions;
            }
        }
        return ret;
    }

    private static boolean isIdentityPartition(PartitionNode n) {
        Grouping g = n.thriftGrouping;
        if (g.is_set_custom_serialized()) {
            CustomStreamGrouping csg = (CustomStreamGrouping)Utils.javaDeserialize(g.get_custom_serialized(), Serializable.class);
            return csg instanceof IdentityGrouping;
        }
        return false;
    }

    private static void addEdge(DirectedGraph g, Object source, Object target, int index) {
        g.addEdge(source, target, new IndexedEdge<Object>(source, target, index));
    }

    private Node makeIdentityNode(Fields allOutputFields) {
        return new ProcessorNode(this.getUniqueStreamId(), null, allOutputFields, new Fields(new String[0]), new EachProcessor(new Fields(new String[0]), new FilterExecutor(new TrueFilter())));
    }

    private static List<PartitionNode> extraPartitionInputs(Group g) {
        ArrayList<PartitionNode> ret = new ArrayList<PartitionNode>();
        Set<PartitionNode> inputs = TridentTopology.externalGroupInputs(g);
        HashMap grouped = new HashMap();
        for (PartitionNode n : inputs) {
            if (!grouped.containsKey(n.streamId)) {
                grouped.put(n.streamId, new ArrayList());
            }
            ((List)grouped.get(n.streamId)).add(n);
        }
        for (List group : grouped.values()) {
            PartitionNode anchor = (PartitionNode)group.get(0);
            for (int i = 1; i < group.size(); ++i) {
                PartitionNode n = (PartitionNode)group.get(i);
                if (n.thriftGrouping.equals(anchor.thriftGrouping)) continue;
                ret.add(n);
            }
        }
        return ret;
    }

    private static Set<PartitionNode> externalGroupInputs(Group g) {
        HashSet<PartitionNode> ret = new HashSet<PartitionNode>();
        for (Node n : g.incomingNodes()) {
            if (!(n instanceof PartitionNode)) continue;
            ret.add((PartitionNode)n);
        }
        return ret;
    }

    private static Set<PartitionNode> externalGroupOutputs(Group g) {
        HashSet<PartitionNode> ret = new HashSet<PartitionNode>();
        for (Node n : g.outgoingNodes()) {
            if (!(n instanceof PartitionNode)) continue;
            ret.add((PartitionNode)n);
        }
        return ret;
    }

    private static PartitionNode makeIdentityPartition(Node basis) {
        return new PartitionNode(basis.streamId, basis.name, basis.allOutputFields, Grouping.custom_serialized(Utils.javaSerialize(new IdentityGrouping())));
    }

    protected String getUniqueStreamId() {
        return this._gen.getUniqueStreamId();
    }

    protected String getUniqueStateId() {
        return this._gen.getUniqueStateId();
    }

    protected String getUniqueWindowId() {
        return this._gen.getUniqueWindowId();
    }

    protected void registerNode(Node n) {
        this._graph.addVertex((Object)n);
        if (n.stateInfo != null) {
            String id = n.stateInfo.id;
            if (!this._colocate.containsKey(id)) {
                this._colocate.put(id, new ArrayList());
            }
            this._colocate.get(id).add(n);
        }
    }

    protected Stream addNode(Node n) {
        this.registerNode(n);
        return new Stream(this, n.name, n);
    }

    protected void registerSourcedNode(List<Stream> sources, Node newNode) {
        this.registerNode(newNode);
        int streamIndex = 0;
        for (Stream s : sources) {
            this._graph.addEdge((Object)s._node, (Object)newNode, new IndexedEdge<Node>(s._node, newNode, streamIndex));
            ++streamIndex;
        }
    }

    protected Stream addSourcedNode(List<Stream> sources, Node newNode) {
        this.registerSourcedNode(sources, newNode);
        return new Stream(this, newNode.name, newNode);
    }

    protected TridentState addSourcedStateNode(List<Stream> sources, Node newNode) {
        this.registerSourcedNode(sources, newNode);
        return new TridentState(this, newNode);
    }

    protected Stream addSourcedNode(Stream source, Node newNode) {
        return this.addSourcedNode(Arrays.asList(source), newNode);
    }

    protected TridentState addSourcedStateNode(Stream source, Node newNode) {
        return this.addSourcedStateNode(Arrays.asList(source), newNode);
    }

    private static List<Fields> getAllOutputFields(List streams) {
        ArrayList<Fields> ret = new ArrayList<Fields>();
        for (Object o : streams) {
            ret.add(((IAggregatableStream)o).getOutputFields());
        }
        return ret;
    }

    private static List<GroupedStream> groupedStreams(List<Stream> streams, List<Fields> joinFields) {
        ArrayList<GroupedStream> ret = new ArrayList<GroupedStream>();
        for (int i = 0; i < streams.size(); ++i) {
            ret.add(streams.get(i).groupBy(joinFields.get(i)));
        }
        return ret;
    }

    private static List<Fields> strippedInputFields(List<Stream> streams, List<Fields> joinFields) {
        ArrayList<Fields> ret = new ArrayList<Fields>();
        for (int i = 0; i < streams.size(); ++i) {
            ret.add(TridentUtils.fieldsSubtract(streams.get(i).getOutputFields(), joinFields.get(i)));
        }
        return ret;
    }

    private static List<JoinType> repeat(int n, JoinType type) {
        ArrayList<JoinType> ret = new ArrayList<JoinType>();
        for (int i = 0; i < n; ++i) {
            ret.add(type);
        }
        return ret;
    }
}

