/*
 * Decompiled with CFR 0.152.
 */
package org.apache.storm.daemon.worker;

import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Sets;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
import org.apache.storm.Constants;
import org.apache.storm.StormTimer;
import org.apache.storm.cluster.IStateStorage;
import org.apache.storm.cluster.IStormClusterState;
import org.apache.storm.cluster.VersionedData;
import org.apache.storm.daemon.StormCommon;
import org.apache.storm.daemon.supervisor.AdvancedFSOps;
import org.apache.storm.generated.Assignment;
import org.apache.storm.generated.DebugOptions;
import org.apache.storm.generated.Grouping;
import org.apache.storm.generated.InvalidTopologyException;
import org.apache.storm.generated.NodeInfo;
import org.apache.storm.generated.StormBase;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.generated.StreamInfo;
import org.apache.storm.generated.TopologyStatus;
import org.apache.storm.grouping.Load;
import org.apache.storm.grouping.LoadMapping;
import org.apache.storm.hooks.BaseWorkerHook;
import org.apache.storm.messaging.ConnectionWithStatus;
import org.apache.storm.messaging.DeserializingConnectionCallback;
import org.apache.storm.messaging.IConnection;
import org.apache.storm.messaging.IContext;
import org.apache.storm.messaging.TaskMessage;
import org.apache.storm.messaging.TransportFactory;
import org.apache.storm.serialization.KryoTupleSerializer;
import org.apache.storm.task.WorkerTopologyContext;
import org.apache.storm.tuple.AddressedTuple;
import org.apache.storm.tuple.Fields;
import org.apache.storm.utils.ConfigUtils;
import org.apache.storm.utils.DisruptorQueue;
import org.apache.storm.utils.ObjectReader;
import org.apache.storm.utils.ThriftTopologyUtils;
import org.apache.storm.utils.TransferDrainer;
import org.apache.storm.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class WorkerState {
    private static final Logger LOG = LoggerFactory.getLogger(WorkerState.class);
    final Map<String, Object> conf;
    final IContext mqContext;
    final IConnection receiver;
    final String topologyId;
    final String assignmentId;
    final int port;
    final String workerId;
    final IStateStorage stateStorage;
    final IStormClusterState stormClusterState;
    final AtomicBoolean isWorkerActive;
    final AtomicBoolean isTopologyActive;
    final AtomicReference<Map<String, DebugOptions>> stormComponentToDebug;
    final Set<List<Long>> executors;
    final List<Integer> taskIds;
    final Map<String, Object> topologyConf;
    final StormTopology topology;
    final StormTopology systemTopology;
    final Map<Integer, String> taskToComponent;
    final Map<String, Map<String, Fields>> componentToStreamToFields;
    final Map<String, List<Integer>> componentToSortedTasks;
    final ReentrantReadWriteLock endpointSocketLock;
    final AtomicReference<Map<Integer, NodeInfo>> cachedTaskToNodePort;
    final AtomicReference<Map<NodeInfo, IConnection>> cachedNodeToPortSocket;
    final Map<List<Long>, DisruptorQueue> executorReceiveQueueMap;
    final Map<Integer, DisruptorQueue> shortExecutorReceiveQueueMap;
    final Map<Integer, Integer> taskToShortExecutor;
    final Runnable suicideCallback;
    final Utils.UptimeComputer uptime;
    final Map<String, Object> defaultSharedResources;
    final Map<String, Object> userSharedResources;
    final LoadMapping loadMapping;
    final AtomicReference<Map<String, VersionedData<Assignment>>> assignmentVersions;
    final AtomicBoolean backpressure = new AtomicBoolean(false);
    final AtomicBoolean transferBackpressure = new AtomicBoolean(false);
    final AtomicBoolean backpressureTrigger = new AtomicBoolean(false);
    final AtomicBoolean throttleOn = new AtomicBoolean(false);
    final DisruptorQueue transferQueue;
    final StormTimer heartbeatTimer = this.mkHaltingTimer("heartbeat-timer");
    final StormTimer refreshLoadTimer = this.mkHaltingTimer("refresh-load-timer");
    final StormTimer refreshConnectionsTimer = this.mkHaltingTimer("refresh-connections-timer");
    final StormTimer refreshCredentialsTimer = this.mkHaltingTimer("refresh-credentials-timer");
    final StormTimer resetLogLevelsTimer = this.mkHaltingTimer("reset-log-levels-timer");
    final StormTimer refreshActiveTimer = this.mkHaltingTimer("refresh-active-timer");
    final StormTimer executorHeartbeatTimer = this.mkHaltingTimer("executor-heartbeat-timer");
    final StormTimer refreshBackpressureTimer = this.mkHaltingTimer("refresh-backpressure-timer");
    final StormTimer userTimer = this.mkHaltingTimer("user-timer");
    private final Set<Integer> outboundTasks;
    private final AtomicLong nextUpdate = new AtomicLong(0L);
    private final boolean trySerializeLocal;
    private final TransferDrainer drainer;
    private static final long LOAD_REFRESH_INTERVAL_MS = 5000L;

    public Map getConf() {
        return this.conf;
    }

    public IConnection getReceiver() {
        return this.receiver;
    }

    public String getTopologyId() {
        return this.topologyId;
    }

    public int getPort() {
        return this.port;
    }

    public String getWorkerId() {
        return this.workerId;
    }

    public IStateStorage getStateStorage() {
        return this.stateStorage;
    }

    public AtomicBoolean getIsTopologyActive() {
        return this.isTopologyActive;
    }

    public AtomicReference<Map<String, DebugOptions>> getStormComponentToDebug() {
        return this.stormComponentToDebug;
    }

    public Set<List<Long>> getExecutors() {
        return this.executors;
    }

    public List<Integer> getTaskIds() {
        return this.taskIds;
    }

    public Map getTopologyConf() {
        return this.topologyConf;
    }

    public StormTopology getTopology() {
        return this.topology;
    }

    public StormTopology getSystemTopology() {
        return this.systemTopology;
    }

    public Map<Integer, String> getTaskToComponent() {
        return this.taskToComponent;
    }

    public Map<String, Map<String, Fields>> getComponentToStreamToFields() {
        return this.componentToStreamToFields;
    }

    public Map<String, List<Integer>> getComponentToSortedTasks() {
        return this.componentToSortedTasks;
    }

    public AtomicReference<Map<NodeInfo, IConnection>> getCachedNodeToPortSocket() {
        return this.cachedNodeToPortSocket;
    }

    public Map<List<Long>, DisruptorQueue> getExecutorReceiveQueueMap() {
        return this.executorReceiveQueueMap;
    }

    public Runnable getSuicideCallback() {
        return this.suicideCallback;
    }

    public Utils.UptimeComputer getUptime() {
        return this.uptime;
    }

    public Map<String, Object> getDefaultSharedResources() {
        return this.defaultSharedResources;
    }

    public Map<String, Object> getUserSharedResources() {
        return this.userSharedResources;
    }

    public LoadMapping getLoadMapping() {
        return this.loadMapping;
    }

    public AtomicReference<Map<String, VersionedData<Assignment>>> getAssignmentVersions() {
        return this.assignmentVersions;
    }

    public AtomicBoolean getBackpressureTrigger() {
        return this.backpressureTrigger;
    }

    public AtomicBoolean getThrottleOn() {
        return this.throttleOn;
    }

    public DisruptorQueue getTransferQueue() {
        return this.transferQueue;
    }

    public StormTimer getUserTimer() {
        return this.userTimer;
    }

    public WorkerState(Map<String, Object> conf, IContext mqContext, String topologyId, String assignmentId, int port, String workerId, Map<String, Object> topologyConf, IStateStorage stateStorage, IStormClusterState stormClusterState) throws IOException, InvalidTopologyException {
        this.executors = new HashSet<List<Long>>(this.readWorkerExecutors(stormClusterState, topologyId, assignmentId, port));
        this.transferQueue = new DisruptorQueue("worker-transfer-queue", ObjectReader.getInt(topologyConf.get("topology.transfer.buffer.size")), (Long)topologyConf.get("topology.disruptor.wait.timeout.millis"), ObjectReader.getInt(topologyConf.get("topology.disruptor.batch.size")), (Long)topologyConf.get("topology.disruptor.batch.timeout.millis"));
        this.conf = conf;
        this.mqContext = null != mqContext ? mqContext : TransportFactory.makeContext(topologyConf);
        this.receiver = this.mqContext.bind(topologyId, port);
        this.topologyId = topologyId;
        this.assignmentId = assignmentId;
        this.port = port;
        this.workerId = workerId;
        this.stateStorage = stateStorage;
        this.stormClusterState = stormClusterState;
        this.isWorkerActive = new AtomicBoolean(false);
        this.isTopologyActive = new AtomicBoolean(false);
        this.stormComponentToDebug = new AtomicReference();
        this.executorReceiveQueueMap = this.mkReceiveQueueMap(topologyConf, this.executors);
        this.shortExecutorReceiveQueueMap = new HashMap<Integer, DisruptorQueue>();
        this.taskIds = new ArrayList<Integer>();
        for (Map.Entry<List<Long>, DisruptorQueue> entry : this.executorReceiveQueueMap.entrySet()) {
            this.shortExecutorReceiveQueueMap.put(entry.getKey().get(0).intValue(), entry.getValue());
            this.taskIds.addAll(StormCommon.executorIdToTasks(entry.getKey()));
        }
        Collections.sort(this.taskIds);
        this.topologyConf = topologyConf;
        this.topology = ConfigUtils.readSupervisorTopology(conf, topologyId, AdvancedFSOps.make(conf));
        this.systemTopology = StormCommon.systemTopology(topologyConf, this.topology);
        this.taskToComponent = StormCommon.stormTaskInfo(this.topology, topologyConf);
        this.componentToStreamToFields = new HashMap<String, Map<String, Fields>>();
        for (String string : ThriftTopologyUtils.getComponentIds(this.systemTopology)) {
            HashMap<String, Fields> streamToFields = new HashMap<String, Fields>();
            for (Map.Entry<String, StreamInfo> stream : ThriftTopologyUtils.getComponentCommon(this.systemTopology, string).get_streams().entrySet()) {
                streamToFields.put(stream.getKey(), new Fields(stream.getValue().get_output_fields()));
            }
            this.componentToStreamToFields.put(string, streamToFields);
        }
        this.componentToSortedTasks = Utils.reverseMap(this.taskToComponent);
        this.componentToSortedTasks.values().forEach(Collections::sort);
        this.endpointSocketLock = new ReentrantReadWriteLock();
        this.cachedNodeToPortSocket = new AtomicReference(new HashMap());
        this.cachedTaskToNodePort = new AtomicReference(new HashMap());
        this.taskToShortExecutor = new HashMap<Integer, Integer>();
        for (List list : this.executors) {
            for (Integer task : StormCommon.executorIdToTasks(list)) {
                this.taskToShortExecutor.put(task, ((Long)list.get(0)).intValue());
            }
        }
        this.suicideCallback = Utils.mkSuicideFn();
        this.uptime = Utils.makeUptimeComputer();
        this.defaultSharedResources = this.makeDefaultResources();
        this.userSharedResources = this.makeUserResources();
        this.loadMapping = new LoadMapping();
        this.assignmentVersions = new AtomicReference(new HashMap());
        this.outboundTasks = this.workerOutboundTasks();
        boolean bl = this.trySerializeLocal = topologyConf.containsKey("topology.testing.always.try.serialize") && (Boolean)topologyConf.get("topology.testing.always.try.serialize") != false;
        if (this.trySerializeLocal) {
            LOG.warn("WILL TRY TO SERIALIZE ALL TUPLES (Turn off {} for production", (Object)"topology.testing.always.try.serialize");
        }
        this.drainer = new TransferDrainer();
    }

    public void refreshConnections() {
        try {
            this.refreshConnections(() -> this.refreshConnectionsTimer.schedule(0, this::refreshConnections));
        }
        catch (Exception e) {
            throw Utils.wrapInRuntime(e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void refreshConnections(Runnable callback) throws Exception {
        Assignment assignment;
        Integer version = this.stormClusterState.assignmentVersion(this.topologyId, callback);
        version = null == version ? 0 : version;
        VersionedData<Assignment> assignmentVersion = this.assignmentVersions.get().get(this.topologyId);
        if (null != assignmentVersion && assignmentVersion.getVersion() == version.intValue()) {
            assignment = assignmentVersion.getData();
        } else {
            VersionedData<Assignment> newAssignmentVersion = new VersionedData<Assignment>(version, this.stormClusterState.assignmentInfoWithVersion(this.topologyId, callback).getData());
            this.assignmentVersions.getAndUpdate(prev -> {
                HashMap<String, VersionedData> next = new HashMap<String, VersionedData>((Map<String, VersionedData>)prev);
                next.put(this.topologyId, newAssignmentVersion);
                return next;
            });
            assignment = newAssignmentVersion.getData();
        }
        HashSet<NodeInfo> neededConnections = new HashSet<NodeInfo>();
        HashMap<Integer, NodeInfo> newTaskToNodePort = new HashMap<Integer, NodeInfo>();
        if (null != assignment) {
            Map<Integer, NodeInfo> taskToNodePort = StormCommon.taskToNodeport(assignment.get_executor_node_port());
            for (Map.Entry<Integer, NodeInfo> taskToNodePortEntry : taskToNodePort.entrySet()) {
                Integer task = taskToNodePortEntry.getKey();
                if (!this.outboundTasks.contains(task)) continue;
                newTaskToNodePort.put(task, taskToNodePortEntry.getValue());
                if (this.taskIds.contains(task)) continue;
                neededConnections.add(taskToNodePortEntry.getValue());
            }
        }
        Set<NodeInfo> currentConnections = this.cachedNodeToPortSocket.get().keySet();
        Sets.SetView newConnections = Sets.difference(neededConnections, currentConnections);
        Sets.SetView removeConnections = Sets.difference(currentConnections, neededConnections);
        this.cachedNodeToPortSocket.getAndUpdate(arg_0 -> this.lambda$refreshConnections$2((Set)newConnections, assignment, arg_0));
        try {
            this.endpointSocketLock.writeLock().lock();
            this.cachedTaskToNodePort.set(newTaskToNodePort);
        }
        finally {
            this.endpointSocketLock.writeLock().unlock();
        }
        for (NodeInfo nodeInfo : removeConnections) {
            this.cachedNodeToPortSocket.get().get(nodeInfo).close();
        }
        this.cachedNodeToPortSocket.getAndUpdate(arg_0 -> WorkerState.lambda$refreshConnections$3((Set)removeConnections, arg_0));
    }

    public void refreshStormActive() {
        this.refreshStormActive(() -> this.refreshActiveTimer.schedule(0, this::refreshStormActive));
    }

    public void refreshStormActive(Runnable callback) {
        StormBase base = this.stormClusterState.stormBase(this.topologyId, callback);
        this.isTopologyActive.set(null != base && base.get_status() == TopologyStatus.ACTIVE && this.isWorkerActive.get());
        if (null != base) {
            HashMap<String, DebugOptions> debugOptionsMap = new HashMap<String, DebugOptions>(base.get_component_debug());
            for (DebugOptions debugOptions : debugOptionsMap.values()) {
                if (!debugOptions.is_set_samplingpct()) {
                    debugOptions.set_samplingpct(10.0);
                }
                if (debugOptions.is_set_enable()) continue;
                debugOptions.set_enable(false);
            }
            this.stormComponentToDebug.set(debugOptionsMap);
            LOG.debug("Events debug options {}", this.stormComponentToDebug.get());
        }
    }

    public void refreshThrottle() {
        boolean backpressure = this.stormClusterState.topologyBackpressure(this.topologyId, this::refreshThrottle);
        this.throttleOn.set(backpressure);
    }

    public void refreshLoad() {
        Sets.SetView remoteTasks = Sets.difference(new HashSet<Integer>(this.outboundTasks), new HashSet<Integer>(this.taskIds));
        Long now = System.currentTimeMillis();
        Map<Integer, Double> localLoad = this.shortExecutorReceiveQueueMap.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, entry -> {
            DisruptorQueue.QueueMetrics qMetrics = ((DisruptorQueue)entry.getValue()).getMetrics();
            return (double)qMetrics.population() / (double)qMetrics.capacity();
        }));
        HashMap<Integer, Load> remoteLoad = new HashMap<Integer, Load>();
        this.cachedNodeToPortSocket.get().values().stream().forEach(arg_0 -> WorkerState.lambda$refreshLoad$6(remoteLoad, (Set)remoteTasks, arg_0));
        this.loadMapping.setLocal(localLoad);
        this.loadMapping.setRemote(remoteLoad);
        if (now > this.nextUpdate.get()) {
            this.receiver.sendLoadMetrics(localLoad);
            this.nextUpdate.set(now + 5000L);
        }
    }

    public void activateWorkerWhenAllConnectionsReady() {
        int delaySecs = 0;
        final int recurSecs = 1;
        this.refreshActiveTimer.schedule(delaySecs, new Runnable(){

            @Override
            public void run() {
                if (WorkerState.this.areAllConnectionsReady()) {
                    LOG.info("All connections are ready for worker {}:{} with id", new Object[]{WorkerState.this.assignmentId, WorkerState.this.port, WorkerState.this.workerId});
                    WorkerState.this.isWorkerActive.set(Boolean.TRUE);
                } else {
                    WorkerState.this.refreshActiveTimer.schedule(recurSecs, () -> WorkerState.this.activateWorkerWhenAllConnectionsReady(), false, 0);
                }
            }
        });
    }

    public void registerCallbacks() {
        LOG.info("Registering IConnectionCallbacks for {}:{}", (Object)this.assignmentId, (Object)this.port);
        this.receiver.registerRecv(new DeserializingConnectionCallback(this.topologyConf, this.getWorkerTopologyContext(), this::transferLocal));
    }

    public void transferLocal(List<AddressedTuple> tupleBatch) {
        HashMap<Integer, ArrayList<AddressedTuple>> grouped = new HashMap<Integer, ArrayList<AddressedTuple>>();
        for (AddressedTuple addressedTuple : tupleBatch) {
            Integer executor = this.taskToShortExecutor.get(addressedTuple.dest);
            if (null == executor) {
                LOG.warn("Received invalid messages for unknown tasks. Dropping... ");
                continue;
            }
            ArrayList<AddressedTuple> current = (ArrayList<AddressedTuple>)grouped.get(executor);
            if (null == current) {
                current = new ArrayList<AddressedTuple>();
                grouped.put(executor, current);
            }
            current.add(addressedTuple);
        }
        for (Map.Entry entry : grouped.entrySet()) {
            DisruptorQueue queue = this.shortExecutorReceiveQueueMap.get(entry.getKey());
            if (null != queue) {
                queue.publish(entry.getValue());
                continue;
            }
            LOG.warn("Received invalid messages for unknown tasks. Dropping... ");
        }
    }

    public void transfer(KryoTupleSerializer serializer, List<AddressedTuple> tupleBatch) {
        if (this.trySerializeLocal) {
            this.assertCanSerialize(serializer, tupleBatch);
        }
        ArrayList<AddressedTuple> local = new ArrayList<AddressedTuple>();
        HashMap remoteMap = new HashMap();
        for (AddressedTuple addressedTuple : tupleBatch) {
            int destTask = addressedTuple.getDest();
            if (this.taskIds.contains(destTask)) {
                local.add(addressedTuple);
                continue;
            }
            if (!remoteMap.containsKey(destTask)) {
                remoteMap.put(destTask, new ArrayList());
            }
            ((List)remoteMap.get(destTask)).add(new TaskMessage(destTask, serializer.serialize(addressedTuple.getTuple())));
        }
        if (!local.isEmpty()) {
            this.transferLocal(local);
        }
        if (!remoteMap.isEmpty()) {
            this.transferQueue.publish(remoteMap);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void sendTuplesToRemoteWorker(HashMap<Integer, ArrayList<TaskMessage>> packets, long seqId, boolean batchEnd) {
        this.drainer.add(packets);
        if (batchEnd) {
            ReentrantReadWriteLock.ReadLock readLock = this.endpointSocketLock.readLock();
            try {
                readLock.lock();
                this.drainer.send(this.cachedTaskToNodePort.get(), this.cachedNodeToPortSocket.get());
            }
            finally {
                readLock.unlock();
            }
            this.drainer.clear();
        }
    }

    private void assertCanSerialize(KryoTupleSerializer serializer, List<AddressedTuple> tuples) {
        for (AddressedTuple addressedTuple : tuples) {
            serializer.serialize(addressedTuple.getTuple());
        }
    }

    public WorkerTopologyContext getWorkerTopologyContext() {
        try {
            String codeDir = ConfigUtils.supervisorStormResourcesPath(ConfigUtils.supervisorStormDistRoot(this.conf, this.topologyId));
            String pidDir = ConfigUtils.workerPidsRoot(this.conf, this.topologyId);
            return new WorkerTopologyContext(this.systemTopology, this.topologyConf, this.taskToComponent, this.componentToSortedTasks, this.componentToStreamToFields, this.topologyId, codeDir, pidDir, this.port, this.taskIds, this.defaultSharedResources, this.userSharedResources);
        }
        catch (IOException e) {
            throw Utils.wrapInRuntime(e);
        }
    }

    public void runWorkerStartHooks() {
        WorkerTopologyContext workerContext = this.getWorkerTopologyContext();
        for (ByteBuffer hook : this.topology.get_worker_hooks()) {
            byte[] hookBytes = Utils.toByteArray(hook);
            BaseWorkerHook hookObject = Utils.javaDeserialize(hookBytes, BaseWorkerHook.class);
            hookObject.start(this.topologyConf, workerContext);
        }
    }

    public void runWorkerShutdownHooks() {
        for (ByteBuffer hook : this.topology.get_worker_hooks()) {
            byte[] hookBytes = Utils.toByteArray(hook);
            BaseWorkerHook hookObject = Utils.javaDeserialize(hookBytes, BaseWorkerHook.class);
            hookObject.shutdown();
        }
    }

    public void closeResources() {
        LOG.info("Shutting down default resources");
        ((ExecutorService)this.defaultSharedResources.get("executor")).shutdownNow();
        LOG.info("Shut down default resources");
    }

    public boolean areAllConnectionsReady() {
        return this.cachedNodeToPortSocket.get().values().stream().map(WorkerState::isConnectionReady).reduce((left, right) -> left != false && right != false).orElse(true);
    }

    public static boolean isConnectionReady(IConnection connection) {
        return !(connection instanceof ConnectionWithStatus) || ((ConnectionWithStatus)connection).status() == ConnectionWithStatus.Status.Ready;
    }

    private List<List<Long>> readWorkerExecutors(IStormClusterState stormClusterState, String topologyId, String assignmentId, int port) {
        LOG.info("Reading assignments");
        ArrayList<List<Long>> executorsAssignedToThisWorker = new ArrayList<List<Long>>();
        executorsAssignedToThisWorker.add(Constants.SYSTEM_EXECUTOR_ID);
        Map<List<Long>, NodeInfo> executorToNodePort = stormClusterState.assignmentInfo(topologyId, null).get_executor_node_port();
        for (Map.Entry<List<Long>, NodeInfo> entry : executorToNodePort.entrySet()) {
            NodeInfo nodeInfo = entry.getValue();
            if (!nodeInfo.get_node().equals(assignmentId) || nodeInfo.get_port().iterator().next() != (long)port) continue;
            executorsAssignedToThisWorker.add(entry.getKey());
        }
        return executorsAssignedToThisWorker;
    }

    private Map<List<Long>, DisruptorQueue> mkReceiveQueueMap(Map<String, Object> topologyConf, Set<List<Long>> executors) {
        HashMap<List<Long>, DisruptorQueue> receiveQueueMap = new HashMap<List<Long>, DisruptorQueue>();
        for (List<Long> executor : executors) {
            receiveQueueMap.put(executor, new DisruptorQueue("receive-queue", ObjectReader.getInt(topologyConf.get("topology.executor.receive.buffer.size")), (Long)topologyConf.get("topology.disruptor.wait.timeout.millis"), ObjectReader.getInt(topologyConf.get("topology.disruptor.batch.size")), (Long)topologyConf.get("topology.disruptor.batch.timeout.millis")));
        }
        return receiveQueueMap;
    }

    private Map<String, Object> makeDefaultResources() {
        int threadPoolSize = ObjectReader.getInt(this.conf.get("topology.worker.shared.thread.pool.size"));
        return ImmutableMap.of((Object)"executor", (Object)Executors.newFixedThreadPool(threadPoolSize));
    }

    private Map<String, Object> makeUserResources() {
        return new HashMap<String, Object>();
    }

    private StormTimer mkHaltingTimer(String name) {
        return new StormTimer(name, (thread, exception) -> {
            LOG.error("Error when processing event", exception);
            Utils.exitProcess(20, "Error when processing an event");
        });
    }

    private Set<Integer> workerOutboundTasks() {
        WorkerTopologyContext context = this.getWorkerTopologyContext();
        HashSet<String> components = new HashSet<String>();
        for (Integer taskId : this.taskIds) {
            for (Map<String, Grouping> value : context.getTargets(context.getComponentId(taskId)).values()) {
                components.addAll(value.keySet());
            }
        }
        HashSet<Integer> outboundTasks = new HashSet<Integer>();
        for (Map.Entry<String, List<Integer>> entry : Utils.reverseMap(this.taskToComponent).entrySet()) {
            if (!components.contains(entry.getKey())) continue;
            outboundTasks.addAll((Collection<Integer>)entry.getValue());
        }
        return outboundTasks;
    }

    private static /* synthetic */ void lambda$refreshLoad$6(Map remoteLoad, Set remoteTasks, IConnection conn) {
        remoteLoad.putAll(conn.getLoad(remoteTasks));
    }

    private static /* synthetic */ Map lambda$refreshConnections$3(Set removeConnections, Map prev) {
        HashMap next = new HashMap(prev);
        removeConnections.forEach(next::remove);
        return next;
    }

    private /* synthetic */ Map lambda$refreshConnections$2(Set newConnections, Assignment assignment, Map prev) {
        HashMap<NodeInfo, IConnection> next = new HashMap<NodeInfo, IConnection>(prev);
        for (NodeInfo nodeInfo : newConnections) {
            next.put(nodeInfo, this.mqContext.connect(this.topologyId, assignment.get_node_host().get(nodeInfo.get_node()), nodeInfo.get_port().iterator().next().intValue()));
        }
        return next;
    }

    public static interface ILocalTransferCallback {
        public void transfer(List<AddressedTuple> var1);
    }
}

