/*
 * Decompiled with CFR 0.152.
 */
package org.apache.storm.daemon.nimbus;

import com.codahale.metrics.Meter;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Sets;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InterruptedIOException;
import java.io.OutputStream;
import java.net.BindException;
import java.net.ServerSocket;
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.nio.channels.WritableByteChannel;
import java.security.Principal;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.UnaryOperator;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import javax.security.auth.Subject;
import org.apache.storm.StormTimer;
import org.apache.storm.blobstore.AtomicOutputStream;
import org.apache.storm.blobstore.BlobStore;
import org.apache.storm.blobstore.BlobStoreAclHandler;
import org.apache.storm.blobstore.BlobSynchronizer;
import org.apache.storm.blobstore.InputStreamWithMeta;
import org.apache.storm.blobstore.KeySequenceNumber;
import org.apache.storm.blobstore.LocalFsBlobStore;
import org.apache.storm.cluster.ClusterStateContext;
import org.apache.storm.cluster.ClusterUtils;
import org.apache.storm.cluster.DaemonType;
import org.apache.storm.cluster.IStormClusterState;
import org.apache.storm.daemon.DaemonCommon;
import org.apache.storm.daemon.Shutdownable;
import org.apache.storm.daemon.StormCommon;
import org.apache.storm.daemon.nimbus.TopologyActions;
import org.apache.storm.daemon.nimbus.TopologyResources;
import org.apache.storm.daemon.nimbus.TopologyStateTransition;
import org.apache.storm.generated.AlreadyAliveException;
import org.apache.storm.generated.Assignment;
import org.apache.storm.generated.AuthorizationException;
import org.apache.storm.generated.BeginDownloadResult;
import org.apache.storm.generated.ClusterSummary;
import org.apache.storm.generated.CommonAggregateStats;
import org.apache.storm.generated.ComponentAggregateStats;
import org.apache.storm.generated.ComponentPageInfo;
import org.apache.storm.generated.ComponentType;
import org.apache.storm.generated.Credentials;
import org.apache.storm.generated.DebugOptions;
import org.apache.storm.generated.ErrorInfo;
import org.apache.storm.generated.ExecutorInfo;
import org.apache.storm.generated.ExecutorStats;
import org.apache.storm.generated.ExecutorSummary;
import org.apache.storm.generated.GetInfoOptions;
import org.apache.storm.generated.InvalidTopologyException;
import org.apache.storm.generated.KeyAlreadyExistsException;
import org.apache.storm.generated.KeyNotFoundException;
import org.apache.storm.generated.KillOptions;
import org.apache.storm.generated.LSTopoHistory;
import org.apache.storm.generated.ListBlobsResult;
import org.apache.storm.generated.LogConfig;
import org.apache.storm.generated.LogLevel;
import org.apache.storm.generated.LogLevelAction;
import org.apache.storm.generated.Nimbus;
import org.apache.storm.generated.NimbusSummary;
import org.apache.storm.generated.NodeInfo;
import org.apache.storm.generated.NotAliveException;
import org.apache.storm.generated.NumErrorsChoice;
import org.apache.storm.generated.ProfileAction;
import org.apache.storm.generated.ProfileRequest;
import org.apache.storm.generated.ReadableBlobMeta;
import org.apache.storm.generated.RebalanceOptions;
import org.apache.storm.generated.SettableBlobMeta;
import org.apache.storm.generated.StormBase;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.generated.SubmitOptions;
import org.apache.storm.generated.SupervisorInfo;
import org.apache.storm.generated.SupervisorPageInfo;
import org.apache.storm.generated.SupervisorSummary;
import org.apache.storm.generated.TopologyActionOptions;
import org.apache.storm.generated.TopologyHistoryInfo;
import org.apache.storm.generated.TopologyInfo;
import org.apache.storm.generated.TopologyInitialStatus;
import org.apache.storm.generated.TopologyPageInfo;
import org.apache.storm.generated.TopologyStatus;
import org.apache.storm.generated.TopologySummary;
import org.apache.storm.generated.WorkerResources;
import org.apache.storm.generated.WorkerSummary;
import org.apache.storm.logging.ThriftAccessLogger;
import org.apache.storm.metric.ClusterMetricsConsumerExecutor;
import org.apache.storm.metric.StormMetricsRegistry;
import org.apache.storm.metric.api.DataPoint;
import org.apache.storm.metric.api.IClusterMetricsConsumer;
import org.apache.storm.nimbus.DefaultTopologyValidator;
import org.apache.storm.nimbus.ILeaderElector;
import org.apache.storm.nimbus.ITopologyActionNotifierPlugin;
import org.apache.storm.nimbus.ITopologyValidator;
import org.apache.storm.nimbus.NimbusInfo;
import org.apache.storm.scheduler.Cluster;
import org.apache.storm.scheduler.DefaultScheduler;
import org.apache.storm.scheduler.ExecutorDetails;
import org.apache.storm.scheduler.INimbus;
import org.apache.storm.scheduler.IScheduler;
import org.apache.storm.scheduler.SchedulerAssignment;
import org.apache.storm.scheduler.SchedulerAssignmentImpl;
import org.apache.storm.scheduler.SupervisorDetails;
import org.apache.storm.scheduler.Topologies;
import org.apache.storm.scheduler.TopologyDetails;
import org.apache.storm.scheduler.WorkerSlot;
import org.apache.storm.scheduler.resource.ResourceUtils;
import org.apache.storm.security.INimbusCredentialPlugin;
import org.apache.storm.security.auth.AuthUtils;
import org.apache.storm.security.auth.IAuthorizer;
import org.apache.storm.security.auth.ICredentialsRenewer;
import org.apache.storm.security.auth.IGroupMappingServiceProvider;
import org.apache.storm.security.auth.IPrincipalToLocal;
import org.apache.storm.security.auth.NimbusPrincipal;
import org.apache.storm.security.auth.ReqContext;
import org.apache.storm.security.auth.ThriftConnectionType;
import org.apache.storm.security.auth.ThriftServer;
import org.apache.storm.stats.StatsUtil;
import org.apache.storm.utils.BufferInputStream;
import org.apache.storm.utils.ConfigUtils;
import org.apache.storm.utils.LocalState;
import org.apache.storm.utils.ObjectReader;
import org.apache.storm.utils.ReflectionUtils;
import org.apache.storm.utils.ServerConfigUtils;
import org.apache.storm.utils.ServerUtils;
import org.apache.storm.utils.Time;
import org.apache.storm.utils.TimeCacheMap;
import org.apache.storm.utils.TupleUtils;
import org.apache.storm.utils.Utils;
import org.apache.storm.utils.VersionInfo;
import org.apache.storm.validation.ConfigValidation;
import org.apache.storm.zookeeper.Zookeeper;
import org.apache.thrift.TException;
import org.apache.thrift.TProcessor;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.ACL;
import org.json.simple.JSONValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Nimbus
implements Nimbus.Iface,
Shutdownable,
DaemonCommon {
    private static final Logger LOG = LoggerFactory.getLogger(Nimbus.class);
    private static final Meter submitTopologyWithOptsCalls = StormMetricsRegistry.registerMeter("nimbus:num-submitTopologyWithOpts-calls");
    private static final Meter submitTopologyCalls = StormMetricsRegistry.registerMeter("nimbus:num-submitTopology-calls");
    private static final Meter killTopologyWithOptsCalls = StormMetricsRegistry.registerMeter("nimbus:num-killTopologyWithOpts-calls");
    private static final Meter killTopologyCalls = StormMetricsRegistry.registerMeter("nimbus:num-killTopology-calls");
    private static final Meter rebalanceCalls = StormMetricsRegistry.registerMeter("nimbus:num-rebalance-calls");
    private static final Meter activateCalls = StormMetricsRegistry.registerMeter("nimbus:num-activate-calls");
    private static final Meter deactivateCalls = StormMetricsRegistry.registerMeter("nimbus:num-deactivate-calls");
    private static final Meter debugCalls = StormMetricsRegistry.registerMeter("nimbus:num-debug-calls");
    private static final Meter setWorkerProfilerCalls = StormMetricsRegistry.registerMeter("nimbus:num-setWorkerProfiler-calls");
    private static final Meter getComponentPendingProfileActionsCalls = StormMetricsRegistry.registerMeter("nimbus:num-getComponentPendingProfileActions-calls");
    private static final Meter setLogConfigCalls = StormMetricsRegistry.registerMeter("nimbus:num-setLogConfig-calls");
    private static final Meter uploadNewCredentialsCalls = StormMetricsRegistry.registerMeter("nimbus:num-uploadNewCredentials-calls");
    private static final Meter beginFileUploadCalls = StormMetricsRegistry.registerMeter("nimbus:num-beginFileUpload-calls");
    private static final Meter uploadChunkCalls = StormMetricsRegistry.registerMeter("nimbus:num-uploadChunk-calls");
    private static final Meter finishFileUploadCalls = StormMetricsRegistry.registerMeter("nimbus:num-finishFileUpload-calls");
    private static final Meter beginFileDownloadCalls = StormMetricsRegistry.registerMeter("nimbus:num-beginFileDownload-calls");
    private static final Meter downloadChunkCalls = StormMetricsRegistry.registerMeter("nimbus:num-downloadChunk-calls");
    private static final Meter getNimbusConfCalls = StormMetricsRegistry.registerMeter("nimbus:num-getNimbusConf-calls");
    private static final Meter getLogConfigCalls = StormMetricsRegistry.registerMeter("nimbus:num-getLogConfig-calls");
    private static final Meter getTopologyConfCalls = StormMetricsRegistry.registerMeter("nimbus:num-getTopologyConf-calls");
    private static final Meter getTopologyCalls = StormMetricsRegistry.registerMeter("nimbus:num-getTopology-calls");
    private static final Meter getUserTopologyCalls = StormMetricsRegistry.registerMeter("nimbus:num-getUserTopology-calls");
    private static final Meter getClusterInfoCalls = StormMetricsRegistry.registerMeter("nimbus:num-getClusterInfo-calls");
    private static final Meter getLeaderCalls = StormMetricsRegistry.registerMeter("nimbus:num-getLeader-calls");
    private static final Meter isTopologyNameAllowedCalls = StormMetricsRegistry.registerMeter("nimbus:num-isTopologyNameAllowed-calls");
    private static final Meter getTopologyInfoWithOptsCalls = StormMetricsRegistry.registerMeter("nimbus:num-getTopologyInfoWithOpts-calls");
    private static final Meter getTopologyInfoCalls = StormMetricsRegistry.registerMeter("nimbus:num-getTopologyInfo-calls");
    private static final Meter getTopologyPageInfoCalls = StormMetricsRegistry.registerMeter("nimbus:num-getTopologyPageInfo-calls");
    private static final Meter getSupervisorPageInfoCalls = StormMetricsRegistry.registerMeter("nimbus:num-getSupervisorPageInfo-calls");
    private static final Meter getComponentPageInfoCalls = StormMetricsRegistry.registerMeter("nimbus:num-getComponentPageInfo-calls");
    private static final Meter shutdownCalls = StormMetricsRegistry.registerMeter("nimbus:num-shutdown-calls");
    private static final String STORM_VERSION = VersionInfo.getVersion();
    @VisibleForTesting
    public static final List<ACL> ZK_ACLS = Arrays.asList((ACL)ZooDefs.Ids.CREATOR_ALL_ACL.get(0), new ACL(5, ZooDefs.Ids.ANYONE_ID_UNSAFE));
    private static final Subject NIMBUS_SUBJECT = new Subject();
    private static final TopologyStateTransition NOOP_TRANSITION;
    private static final TopologyStateTransition INACTIVE_TRANSITION;
    private static final TopologyStateTransition ACTIVE_TRANSITION;
    private static final TopologyStateTransition KILL_TRANSITION;
    private static final TopologyStateTransition REBALANCE_TRANSITION;
    private static final TopologyStateTransition STARTUP_WHEN_KILLED_TRANSITION;
    private static final TopologyStateTransition REMOVE_TRANSITION;
    private static final TopologyStateTransition STARTUP_WHEN_REBALANCING_TRANSITION;
    private static final TopologyStateTransition DO_REBALANCE_TRANSITION;
    private static final Map<TopologyStatus, Map<TopologyActions, TopologyStateTransition>> TOPO_STATE_TRANSITIONS;
    private static final List<String> EMPTY_STRING_LIST;
    private static final Set<String> EMPTY_STRING_SET;
    private static final Pattern TOPOLOGY_NAME_REGEX;
    private final Map<String, Object> conf;
    private final NimbusInfo nimbusHostPortInfo;
    private final INimbus inimbus;
    private IAuthorizer authorizationHandler;
    private final IAuthorizer impersonationAuthorizationHandler;
    private final AtomicLong submittedCount;
    private final IStormClusterState stormClusterState;
    private final Object submitLock = new Object();
    private final Object schedLock = new Object();
    private final Object credUpdateLock = new Object();
    private final AtomicReference<Map<String, Map<List<Integer>, Map<String, Object>>>> heartbeatsCache;
    private final TimeCacheMap<String, BufferInputStream> downloaders;
    private final TimeCacheMap<String, WritableByteChannel> uploaders;
    private final BlobStore blobStore;
    private final TimeCacheMap<String, BufferInputStream> blobDownloaders;
    private final TimeCacheMap<String, OutputStream> blobUploaders;
    private final TimeCacheMap<String, Iterator<String>> blobListers;
    private final Utils.UptimeComputer uptime;
    private final ITopologyValidator validator;
    private final StormTimer timer;
    private final IScheduler scheduler;
    private final ILeaderElector leaderElector;
    private final AtomicReference<Map<String, String>> idToSchedStatus;
    private final AtomicReference<Map<String, Double[]>> nodeIdToResources;
    private final AtomicReference<Map<String, TopologyResources>> idToResources;
    private final AtomicReference<Map<String, Map<WorkerSlot, WorkerResources>>> idToWorkerResources;
    private final Collection<ICredentialsRenewer> credRenewers;
    private final Object topologyHistoryLock;
    private final LocalState topologyHistoryState;
    private final Collection<INimbusCredentialPlugin> nimbusAutocredPlugins;
    private final ITopologyActionNotifierPlugin nimbusTopologyActionNotifier;
    private final List<ClusterMetricsConsumerExecutor> clusterConsumerExceutors;
    private final IGroupMappingServiceProvider groupMapper;
    private final IPrincipalToLocal principalToLocal;

    private static StormBase make(TopologyStatus status) {
        StormBase ret = new StormBase();
        ret.set_status(status);
        ret.set_component_executors(Collections.emptyMap());
        ret.set_component_debug(Collections.emptyMap());
        return ret;
    }

    private static <T extends AutoCloseable> TimeCacheMap<String, T> fileCacheMap(Map<String, Object> conf) {
        return new TimeCacheMap(ObjectReader.getInt((Object)conf.get("nimbus.file.copy.expiration.secs"), (Integer)600).intValue(), (id, stream) -> {
            try {
                stream.close();
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
        });
    }

    private static <K, V> Map<K, V> merge(Map<? extends K, ? extends V> first, Map<? extends K, ? extends V> other) {
        HashMap<? extends K, ? extends V> ret = new HashMap<K, V>(first);
        if (other != null) {
            ret.putAll(other);
        }
        return ret;
    }

    private static <K, V> Map<K, V> mapDiff(Map<? extends K, ? extends V> first, Map<? extends K, ? extends V> second) {
        HashMap<K, V> ret = new HashMap<K, V>();
        for (Map.Entry<K, V> entry : second.entrySet()) {
            if (entry.getValue().equals(first.get(entry.getKey()))) continue;
            ret.put(entry.getKey(), entry.getValue());
        }
        return ret;
    }

    private static IScheduler makeScheduler(Map<String, Object> conf, INimbus inimbus) {
        IScheduler scheduler;
        String schedClass = (String)conf.get("storm.scheduler");
        IScheduler iScheduler = scheduler = inimbus == null ? null : inimbus.getForcedScheduler();
        if (scheduler != null) {
            LOG.info("Using forced scheduler from INimbus {} {}", scheduler.getClass(), (Object)scheduler);
        } else if (schedClass != null) {
            LOG.info("Using custom scheduler: {}", (Object)schedClass);
            scheduler = (IScheduler)ReflectionUtils.newInstance((String)schedClass);
        } else {
            LOG.info("Using default scheduler");
            scheduler = new DefaultScheduler();
        }
        scheduler.prepare(conf);
        return scheduler;
    }

    private static <T extends AutoCloseable> TimeCacheMap<String, T> makeBlobCacheMap(Map<String, Object> conf) {
        return new TimeCacheMap(ObjectReader.getInt((Object)conf.get("nimbus.blobstore.expiration.secs"), (Integer)600).intValue(), (id, stream) -> {
            try {
                if (stream instanceof AtomicOutputStream) {
                    ((AtomicOutputStream)stream).cancel();
                } else {
                    stream.close();
                }
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
        });
    }

    private static TimeCacheMap<String, Iterator<String>> makeBlobListCachMap(Map<String, Object> conf) {
        return new TimeCacheMap(ObjectReader.getInt((Object)conf.get("nimbus.blobstore.expiration.secs"), (Integer)600).intValue());
    }

    private static ITopologyActionNotifierPlugin createTopologyActionNotifier(Map<String, Object> conf) {
        String clazz = (String)conf.get("nimbus.topology.action.notifier.plugin.class");
        ITopologyActionNotifierPlugin ret = null;
        if (clazz != null && !clazz.isEmpty()) {
            ret = (ITopologyActionNotifierPlugin)ReflectionUtils.newInstance((String)clazz);
            try {
                ret.prepare(conf);
            }
            catch (Exception e) {
                LOG.warn("Ignoring exception, Could not initialize {}", (Object)clazz, (Object)e);
                ret = null;
            }
        }
        return ret;
    }

    private static List<ClusterMetricsConsumerExecutor> makeClusterMetricsConsumerExecutors(Map<String, Object> conf) {
        Collection consumers = (Collection)conf.get("storm.cluster.metrics.consumer.register");
        ArrayList<ClusterMetricsConsumerExecutor> ret = new ArrayList<ClusterMetricsConsumerExecutor>();
        if (consumers != null) {
            for (Map consumer : consumers) {
                ret.add(new ClusterMetricsConsumerExecutor((String)consumer.get("class"), consumer.get("argument")));
            }
        }
        return ret;
    }

    private static Subject getSubject() {
        return ReqContext.context().subject();
    }

    static Map<String, Object> readTopoConf(String topoId, BlobStore blobStore) throws KeyNotFoundException, AuthorizationException, IOException {
        return blobStore.readTopologyConf(topoId, Nimbus.getSubject());
    }

    static List<String> getKeyListFromId(Map<String, Object> conf, String id) {
        ArrayList<String> ret = new ArrayList<String>(3);
        ret.add(ConfigUtils.masterStormCodeKey((String)id));
        ret.add(ConfigUtils.masterStormConfKey((String)id));
        if (!ConfigUtils.isLocalMode(conf)) {
            ret.add(ConfigUtils.masterStormJarKey((String)id));
        }
        return ret;
    }

    private static int getVersionForKey(String key, NimbusInfo nimbusInfo, Map<String, Object> conf) throws KeyNotFoundException {
        KeySequenceNumber kseq = new KeySequenceNumber(key, nimbusInfo);
        return kseq.getKeySequenceNumber(conf);
    }

    private static StormTopology readStormTopology(String topoId, BlobStore store) throws KeyNotFoundException, AuthorizationException, IOException {
        return store.readTopology(topoId, Nimbus.getSubject());
    }

    private static Map<String, Object> readTopoConfAsNimbus(String topoId, BlobStore store) throws KeyNotFoundException, AuthorizationException, IOException {
        return store.readTopologyConf(topoId, NIMBUS_SUBJECT);
    }

    private static StormTopology readStormTopologyAsNimbus(String topoId, BlobStore store) throws KeyNotFoundException, AuthorizationException, IOException {
        return store.readTopology(topoId, NIMBUS_SUBJECT);
    }

    private static Map<String, Map<List<Long>, List<Object>>> computeTopoToExecToNodePort(Map<String, SchedulerAssignment> schedAssignments) {
        HashMap<String, Map<List<Long>, List<Object>>> ret = new HashMap<String, Map<List<Long>, List<Object>>>();
        for (Map.Entry<String, SchedulerAssignment> schedEntry : schedAssignments.entrySet()) {
            HashMap execToNodePort = new HashMap();
            for (Map.Entry execAndNodePort : schedEntry.getValue().getExecutorToSlot().entrySet()) {
                ExecutorDetails exec = (ExecutorDetails)execAndNodePort.getKey();
                WorkerSlot slot = (WorkerSlot)execAndNodePort.getValue();
                ArrayList<Long> listExec = new ArrayList<Long>(2);
                listExec.add(Long.valueOf(exec.getStartTask()));
                listExec.add(Long.valueOf(exec.getEndTask()));
                ArrayList<Object> nodePort = new ArrayList<Object>(2);
                nodePort.add(slot.getNodeId());
                nodePort.add(Long.valueOf(slot.getPort()));
                execToNodePort.put(listExec, nodePort);
            }
            ret.put(schedEntry.getKey(), execToNodePort);
        }
        return ret;
    }

    private static int numUsedWorkers(SchedulerAssignment assignment) {
        if (assignment == null) {
            return 0;
        }
        return assignment.getSlots().size();
    }

    private static Map<String, Map<List<Object>, List<Double>>> computeTopoToNodePortToResources(Map<String, SchedulerAssignment> schedAssignments) {
        HashMap<String, Map<List<Object>, List<Double>>> ret = new HashMap<String, Map<List<Object>, List<Double>>>();
        for (Map.Entry<String, SchedulerAssignment> schedEntry : schedAssignments.entrySet()) {
            HashMap nodePortToResources = new HashMap();
            for (WorkerSlot slot : schedEntry.getValue().getExecutorToSlot().values()) {
                ArrayList<Object> nodePort = new ArrayList<Object>(2);
                nodePort.add(slot.getNodeId());
                nodePort.add(Long.valueOf(slot.getPort()));
                ArrayList<Double> resources = new ArrayList<Double>(3);
                resources.add(slot.getAllocatedMemOnHeap());
                resources.add(slot.getAllocatedMemOffHeap());
                resources.add(slot.getAllocatedCpu());
                nodePortToResources.put(nodePort, resources);
            }
            ret.put(schedEntry.getKey(), nodePortToResources);
        }
        return ret;
    }

    private static Map<String, Map<List<Long>, List<Object>>> computeNewTopoToExecToNodePort(Map<String, SchedulerAssignment> schedAssignments, Map<String, Assignment> existingAssignments) {
        Map<String, Map<List<Long>, List<Object>>> ret = Nimbus.computeTopoToExecToNodePort(schedAssignments);
        if (existingAssignments != null && !existingAssignments.isEmpty()) {
            for (Map.Entry<String, Map<List<Long>, List<Object>>> entry : ret.entrySet()) {
                String topoId = entry.getKey();
                Map<List<Long>, List<Object>> execToNodePort = entry.getValue();
                Assignment assignment = existingAssignments.get(topoId);
                if (assignment == null) continue;
                Map old = assignment.get_executor_node_port();
                HashMap<List<Long>, List<Object>> reassigned = new HashMap<List<Long>, List<Object>>();
                for (Map.Entry<List<Long>, List<Object>> execAndNodePort : execToNodePort.entrySet()) {
                    NodeInfo oldAssigned = (NodeInfo)old.get(execAndNodePort.getKey());
                    String node = (String)execAndNodePort.getValue().get(0);
                    Long port = (Long)execAndNodePort.getValue().get(1);
                    if (oldAssigned != null && oldAssigned.get_node().equals(node) && port.equals(oldAssigned.get_port_iterator().next())) continue;
                    reassigned.put(execAndNodePort.getKey(), execAndNodePort.getValue());
                }
                if (reassigned.isEmpty()) continue;
                int count = new HashSet<List<Object>>(execToNodePort.values()).size();
                Set reExecs = reassigned.keySet();
                LOG.info("Reassigning {} to {} slots", (Object)topoId, (Object)count);
                LOG.info("Reassign executors: {}", reExecs);
            }
        }
        return ret;
    }

    private static List<List<Long>> changedExecutors(Map<List<Long>, NodeInfo> map, Map<List<Long>, List<Object>> newExecToNodePort) {
        ArrayList value;
        HashMap tmpSlotAssigned = map == null ? new HashMap() : Utils.reverseMap(map);
        HashMap slotAssigned = new HashMap();
        for (Map.Entry entry : tmpSlotAssigned.entrySet()) {
            NodeInfo ni = (NodeInfo)entry.getKey();
            ArrayList<String> arrayList = new ArrayList<String>(2);
            arrayList.add(ni.get_node());
            arrayList.add((String)ni.get_port_iterator().next());
            value = new ArrayList((Collection)entry.getValue());
            value.sort((a, b) -> ((Long)a.get(0)).compareTo((Long)b.get(0)));
            slotAssigned.put(arrayList, value);
        }
        HashMap tmpNewSlotAssigned = newExecToNodePort == null ? new HashMap() : Utils.reverseMap(newExecToNodePort);
        HashMap newSlotAssigned = new HashMap();
        for (Map.Entry entry : tmpNewSlotAssigned.entrySet()) {
            value = new ArrayList((Collection)entry.getValue());
            value.sort((a, b) -> ((Long)a.get(0)).compareTo((Long)b.get(0)));
            newSlotAssigned.put(entry.getKey(), value);
        }
        Map diff = Nimbus.mapDiff(slotAssigned, newSlotAssigned);
        ArrayList<List<Long>> arrayList = new ArrayList<List<Long>>();
        for (List val : diff.values()) {
            arrayList.addAll(val);
        }
        return arrayList;
    }

    private static Set<WorkerSlot> newlyAddedSlots(Assignment old, Assignment current) {
        HashSet oldSlots = new HashSet(old.get_executor_node_port().values());
        HashSet niRet = new HashSet(current.get_executor_node_port().values());
        niRet.removeAll(oldSlots);
        HashSet<WorkerSlot> ret = new HashSet<WorkerSlot>();
        for (NodeInfo ni : niRet) {
            ret.add(new WorkerSlot(ni.get_node(), (Number)ni.get_port_iterator().next()));
        }
        return ret;
    }

    private static Map<String, SupervisorDetails> basicSupervisorDetailsMap(IStormClusterState state) {
        HashMap<String, SupervisorDetails> ret = new HashMap<String, SupervisorDetails>();
        for (Map.Entry entry : state.allSupervisorInfo().entrySet()) {
            String id = (String)entry.getKey();
            SupervisorInfo info = (SupervisorInfo)entry.getValue();
            ret.put(id, new SupervisorDetails(id, info.get_hostname(), (Object)info.get_scheduler_meta(), null, info.get_resources_map()));
        }
        return ret;
    }

    private static boolean isTopologyActive(IStormClusterState state, String topoName) {
        return state.getTopoId(topoName).isPresent();
    }

    private static Map<String, Object> tryReadTopoConf(String topoId, BlobStore store) throws NotAliveException, AuthorizationException, IOException {
        try {
            return Nimbus.readTopoConfAsNimbus(topoId, store);
        }
        catch (KeyNotFoundException e) {
            if (topoId == null) {
                throw new NullPointerException();
            }
            throw new NotAliveException(topoId);
        }
    }

    @VisibleForTesting
    public static Set<String> topoIdsToClean(IStormClusterState state, BlobStore store) {
        HashSet<String> ret = new HashSet<String>();
        ret.addAll((Collection)Utils.OR((Object)state.heartbeatStorms(), EMPTY_STRING_LIST));
        ret.addAll((Collection)Utils.OR((Object)state.errorTopologies(), EMPTY_STRING_LIST));
        ret.addAll((Collection)Utils.OR((Object)store.storedTopoIds(), EMPTY_STRING_SET));
        ret.addAll((Collection)Utils.OR((Object)state.backpressureTopologies(), EMPTY_STRING_LIST));
        ret.removeAll((Collection)Utils.OR((Object)state.activeStorms(), EMPTY_STRING_LIST));
        return ret;
    }

    private static String extractStatusStr(StormBase base) {
        TopologyStatus status;
        String ret = null;
        if (base != null && (status = base.get_status()) != null) {
            ret = status.name().toUpperCase();
        }
        return ret;
    }

    private static int componentParallelism(Map<String, Object> topoConf, Object component) throws InvalidTopologyException {
        Map<String, Object> combinedConf = Nimbus.merge(topoConf, StormCommon.componentConf((Object)component));
        int numTasks = ObjectReader.getInt((Object)combinedConf.get("topology.tasks"), (Integer)StormCommon.numStartExecutors((Object)component));
        Integer maxParallel = ObjectReader.getInt((Object)combinedConf.get("topology.max.task.parallelism"), null);
        int ret = numTasks;
        if (maxParallel != null) {
            ret = Math.min(maxParallel, numTasks);
        }
        return ret;
    }

    private static StormTopology normalizeTopology(Map<String, Object> topoConf, StormTopology topology) throws InvalidTopologyException {
        StormTopology ret = topology.deepCopy();
        for (Object comp : StormCommon.allComponents((StormTopology)ret).values()) {
            Map mergedConf = StormCommon.componentConf(comp);
            mergedConf.put("topology.tasks", Nimbus.componentParallelism(topoConf, comp));
            String jsonConf = JSONValue.toJSONString((Object)mergedConf);
            StormCommon.getComponentCommon(comp).set_json_conf(jsonConf);
        }
        return ret;
    }

    private static void addToDecorators(Set<String> decorators, List<String> conf) {
        if (conf != null) {
            decorators.addAll(conf);
        }
    }

    private static void addToSerializers(Map<String, String> ser, List<Object> conf) {
        if (conf != null) {
            for (Object o : conf) {
                if (o instanceof Map) {
                    ser.putAll((Map)o);
                    continue;
                }
                ser.put((String)o, null);
            }
        }
    }

    private static Map<String, Object> normalizeConf(Map<String, Object> conf, Map<String, Object> topoConf, StormTopology topology) {
        ArrayList<Map> allConfs = new ArrayList<Map>();
        for (Object comp : StormCommon.allComponents((StormTopology)topology).values()) {
            allConfs.add(StormCommon.componentConf(comp));
        }
        HashSet<String> decorators = new HashSet<String>();
        HashMap<String, String> serializers = new HashMap<String, String>();
        for (Map c : allConfs) {
            Nimbus.addToDecorators(decorators, (List)c.get("topology.kryo.decorators"));
            Nimbus.addToSerializers(serializers, (List)c.get("topology.kryo.register"));
        }
        Nimbus.addToDecorators(decorators, (List)topoConf.getOrDefault("topology.kryo.decorators", conf.get("topology.kryo.decorators")));
        Nimbus.addToSerializers(serializers, (List)topoConf.getOrDefault("topology.kryo.register", conf.get("topology.kryo.register")));
        Map<String, Object> mergedConf = Nimbus.merge(conf, topoConf);
        HashMap<String, Object> ret = new HashMap<String, Object>(topoConf);
        ret.put("topology.kryo.register", serializers);
        ret.put("topology.kryo.decorators", new ArrayList<String>(decorators));
        ret.put("topology.acker.executors", mergedConf.get("topology.acker.executors"));
        ret.put("topology.eventlogger.executors", mergedConf.get("topology.eventlogger.executors"));
        ret.put("topology.max.task.parallelism", mergedConf.get("topology.max.task.parallelism"));
        return ret;
    }

    private static void rmBlobKey(BlobStore store, String key, IStormClusterState state) {
        try {
            store.deleteBlob(key, NIMBUS_SUBJECT);
            if (store instanceof LocalFsBlobStore) {
                state.removeBlobstoreKey(key);
            }
        }
        catch (Exception e) {
            LOG.info("Exception {}", (Throwable)e);
        }
    }

    @VisibleForTesting
    public static void cleanInbox(String dirLoc, int seconds) {
        long now = Time.currentTimeMillis();
        long ms = Time.secsToMillis((int)seconds);
        File dir = new File(dirLoc);
        for (File f2 : dir.listFiles(f -> f.isFile() && f.lastModified() + ms <= now)) {
            if (f2.delete()) {
                LOG.info("Cleaning inbox ... deleted: {}", (Object)f2.getName());
                continue;
            }
            LOG.error("Cleaning inbox ... error deleting: {}", (Object)f2.getName());
        }
    }

    private static ExecutorInfo toExecInfo(List<Long> exec) {
        return new ExecutorInfo(exec.get(0).intValue(), exec.get(1).intValue());
    }

    private static void validateTopologyName(String name) throws InvalidTopologyException {
        Matcher m = TOPOLOGY_NAME_REGEX.matcher(name);
        if (!m.matches()) {
            throw new InvalidTopologyException("Topology name must match " + TOPOLOGY_NAME_REGEX);
        }
    }

    private static StormTopology tryReadTopology(String topoId, BlobStore store) throws NotAliveException, AuthorizationException, IOException {
        try {
            return Nimbus.readStormTopologyAsNimbus(topoId, store);
        }
        catch (KeyNotFoundException e) {
            throw new NotAliveException(topoId);
        }
    }

    private static void validateTopologySize(Map<String, Object> topoConf, Map<String, Object> nimbusConf, StormTopology topology) throws InvalidTopologyException {
        int workerCount = ObjectReader.getInt((Object)topoConf.get("topology.workers"), (Integer)1);
        Integer allowedWorkers = ObjectReader.getInt((Object)nimbusConf.get("nimbus.slots.perTopology"), null);
        int executorsCount = 0;
        for (Object comp : StormCommon.allComponents((StormTopology)topology).values()) {
            executorsCount += StormCommon.numStartExecutors(comp);
        }
        Integer allowedExecutors = ObjectReader.getInt((Object)nimbusConf.get("nimbus.executors.perTopology"), null);
        if (allowedExecutors != null && executorsCount > allowedExecutors) {
            throw new InvalidTopologyException("Failed to submit topology. Topology requests more than " + allowedExecutors + " executors.");
        }
        if (allowedWorkers != null && workerCount > allowedWorkers) {
            throw new InvalidTopologyException("Failed to submit topology. Topology requests more than " + allowedWorkers + " workers.");
        }
    }

    private static void setLoggerTimeouts(LogLevel level) {
        int timeoutSecs = level.get_reset_log_level_timeout_secs();
        if (timeoutSecs > 0) {
            level.set_reset_log_level_timeout_epoch((long)(Time.currentTimeSecs() + timeoutSecs));
        } else {
            level.unset_reset_log_level_timeout_epoch();
        }
    }

    @VisibleForTesting
    public static List<String> topologiesOnSupervisor(Map<String, Assignment> assignments, String supervisorId) {
        HashSet<String> ret = new HashSet<String>();
        block0: for (Map.Entry<String, Assignment> entry : assignments.entrySet()) {
            Assignment assignment = entry.getValue();
            for (NodeInfo nodeInfo : assignment.get_executor_node_port().values()) {
                if (!supervisorId.equals(nodeInfo.get_node())) continue;
                ret.add(entry.getKey());
                continue block0;
            }
        }
        return new ArrayList<String>(ret);
    }

    private static IClusterMetricsConsumer.ClusterInfo mkClusterInfo() {
        return new IClusterMetricsConsumer.ClusterInfo(Time.currentTimeSecs());
    }

    private static List<DataPoint> extractClusterMetrics(ClusterSummary summ) {
        ArrayList<DataPoint> ret = new ArrayList<DataPoint>();
        ret.add(new DataPoint("supervisors", summ.get_supervisors_size()));
        ret.add(new DataPoint("topologies", summ.get_topologies_size()));
        int totalSlots = 0;
        int usedSlots = 0;
        for (SupervisorSummary sup : summ.get_supervisors()) {
            usedSlots += sup.get_num_used_workers();
            totalSlots += sup.get_num_workers();
        }
        ret.add(new DataPoint("slotsTotal", totalSlots));
        ret.add(new DataPoint("slotsUsed", usedSlots));
        ret.add(new DataPoint("slotsFree", totalSlots - usedSlots));
        int totalExecutors = 0;
        int totalTasks = 0;
        for (TopologySummary topo : summ.get_topologies()) {
            totalExecutors += topo.get_num_executors();
            totalTasks += topo.get_num_tasks();
        }
        ret.add(new DataPoint("executorsTotal", totalExecutors));
        ret.add(new DataPoint("tasksTotal", totalTasks));
        return ret;
    }

    private static Map<IClusterMetricsConsumer.SupervisorInfo, List<DataPoint>> extractSupervisorMetrics(ClusterSummary summ) {
        HashMap<IClusterMetricsConsumer.SupervisorInfo, List<DataPoint>> ret = new HashMap<IClusterMetricsConsumer.SupervisorInfo, List<DataPoint>>();
        for (SupervisorSummary sup : summ.get_supervisors()) {
            IClusterMetricsConsumer.SupervisorInfo info = new IClusterMetricsConsumer.SupervisorInfo(sup.get_host(), sup.get_supervisor_id(), Time.currentTimeSecs());
            ArrayList<DataPoint> metrics = new ArrayList<DataPoint>();
            metrics.add(new DataPoint("slotsTotal", sup.get_num_workers()));
            metrics.add(new DataPoint("slotsUsed", sup.get_num_used_workers()));
            metrics.add(new DataPoint("totalMem", sup.get_total_resources().get("supervisor.memory.capacity.mb")));
            metrics.add(new DataPoint("totalCpu", sup.get_total_resources().get("supervisor.cpu.capacity")));
            metrics.add(new DataPoint("usedMem", sup.get_used_mem()));
            metrics.add(new DataPoint("usedCpu", sup.get_used_cpu()));
            ret.put(info, metrics);
        }
        return ret;
    }

    private static Map<String, Double> setResourcesDefaultIfNotSet(Map<String, Map<String, Double>> compResourcesMap, String compId, Map<String, Object> topoConf) {
        Map<String, Double> resourcesMap = compResourcesMap.get(compId);
        if (resourcesMap == null) {
            resourcesMap = new HashMap<String, Double>();
        }
        ResourceUtils.checkIntialization(resourcesMap, (String)compId, topoConf);
        return resourcesMap;
    }

    private static void validatePortAvailable(Map<String, Object> conf) throws IOException {
        int port = ObjectReader.getInt((Object)conf.get("nimbus.thrift.port"));
        try {
            ServerSocket socket = new ServerSocket(port);
            Throwable throwable = null;
            if (socket != null) {
                if (throwable != null) {
                    try {
                        socket.close();
                    }
                    catch (Throwable throwable2) {
                        throwable.addSuppressed(throwable2);
                    }
                } else {
                    socket.close();
                }
            }
        }
        catch (BindException e) {
            LOG.error("{} is not available. Check if another process is already listening on {}", (Object)port, (Object)port);
            System.exit(0);
        }
    }

    private static Nimbus launchServer(Map<String, Object> conf, INimbus inimbus) throws Exception {
        StormCommon.validateDistributedMode(conf);
        Nimbus.validatePortAvailable(conf);
        Nimbus nimbus = new Nimbus(conf, inimbus);
        nimbus.launchServer();
        ThriftServer server = new ThriftServer(conf, (TProcessor)new Nimbus.Processor((Nimbus.Iface)nimbus), ThriftConnectionType.NIMBUS);
        Utils.addShutdownHookWithForceKillIn1Sec(() -> {
            nimbus.shutdown();
            server.stop();
        });
        LOG.info("Starting nimbus server for storm version '{}'", (Object)STORM_VERSION);
        server.serve();
        return nimbus;
    }

    public static Nimbus launch(INimbus inimbus) throws Exception {
        Map<String, Object> conf = Nimbus.merge(Utils.readStormConfig(), ConfigUtils.readYamlConfig((String)"storm-cluster-auth.yaml", (boolean)false));
        return Nimbus.launchServer(conf, inimbus);
    }

    public static void main(String[] args) throws Exception {
        Utils.setupDefaultUncaughtExceptionHandler();
        Nimbus.launch(new StandaloneINimbus());
    }

    private static IStormClusterState makeStormClusterState(Map<String, Object> conf) throws Exception {
        List<ACL> acls = null;
        if (Utils.isZkAuthenticationConfiguredStormServer(conf)) {
            acls = ZK_ACLS;
        }
        return ClusterUtils.mkStormClusterState(conf, acls, (ClusterStateContext)new ClusterStateContext(DaemonType.NIMBUS));
    }

    public Nimbus(Map<String, Object> conf, INimbus inimbus) throws Exception {
        this(conf, inimbus, null, null, null, null, null);
    }

    public Nimbus(Map<String, Object> conf, INimbus inimbus, IStormClusterState stormClusterState, NimbusInfo hostPortInfo, BlobStore blobStore, ILeaderElector leaderElector, IGroupMappingServiceProvider groupMapper) throws Exception {
        this.conf = conf;
        if (hostPortInfo == null) {
            hostPortInfo = NimbusInfo.fromConf(conf);
        }
        this.nimbusHostPortInfo = hostPortInfo;
        if (inimbus != null) {
            inimbus.prepare(conf, ServerConfigUtils.masterInimbusDir(conf));
        }
        this.inimbus = inimbus;
        this.authorizationHandler = StormCommon.mkAuthorizationHandler((String)((String)conf.get("nimbus.authorizer")), conf);
        this.impersonationAuthorizationHandler = StormCommon.mkAuthorizationHandler((String)((String)conf.get("nimbus.impersonation.authorizer")), conf);
        this.submittedCount = new AtomicLong(0L);
        if (stormClusterState == null) {
            stormClusterState = Nimbus.makeStormClusterState(conf);
        }
        this.stormClusterState = stormClusterState;
        this.heartbeatsCache = new AtomicReference(new HashMap());
        this.downloaders = Nimbus.fileCacheMap(conf);
        this.uploaders = Nimbus.fileCacheMap(conf);
        if (blobStore == null) {
            blobStore = ServerUtils.getNimbusBlobStore(conf, this.nimbusHostPortInfo);
        }
        this.blobStore = blobStore;
        this.blobDownloaders = Nimbus.makeBlobCacheMap(conf);
        this.blobUploaders = Nimbus.makeBlobCacheMap(conf);
        this.blobListers = Nimbus.makeBlobListCachMap(conf);
        this.uptime = Utils.makeUptimeComputer();
        this.validator = (ITopologyValidator)ReflectionUtils.newInstance((String)((String)conf.getOrDefault("nimbus.topology.validator", DefaultTopologyValidator.class.getName())));
        this.timer = new StormTimer(null, (t, e) -> {
            LOG.error("Error while processing event", e);
            Utils.exitProcess((int)20, (String)"Error while processing event");
        });
        this.scheduler = Nimbus.makeScheduler(conf, inimbus);
        if (leaderElector == null) {
            leaderElector = Zookeeper.zkLeaderElector(conf, blobStore);
        }
        this.leaderElector = leaderElector;
        this.idToSchedStatus = new AtomicReference(new HashMap());
        this.nodeIdToResources = new AtomicReference(new HashMap());
        this.idToResources = new AtomicReference(new HashMap());
        this.idToWorkerResources = new AtomicReference(new HashMap());
        this.credRenewers = AuthUtils.GetCredentialRenewers(conf);
        this.topologyHistoryLock = new Object();
        this.topologyHistoryState = ServerConfigUtils.nimbusTopoHistoryState(conf);
        this.nimbusAutocredPlugins = AuthUtils.getNimbusAutoCredPlugins(conf);
        this.nimbusTopologyActionNotifier = Nimbus.createTopologyActionNotifier(conf);
        this.clusterConsumerExceutors = Nimbus.makeClusterMetricsConsumerExecutors(conf);
        if (groupMapper == null) {
            groupMapper = AuthUtils.GetGroupMappingServiceProviderPlugin(conf);
        }
        this.groupMapper = groupMapper;
        this.principalToLocal = AuthUtils.GetPrincipalToLocalPlugin(conf);
    }

    Map<String, Object> getConf() {
        return this.conf;
    }

    @VisibleForTesting
    public void setAuthorizationHandler(IAuthorizer authorizationHandler) {
        this.authorizationHandler = authorizationHandler;
    }

    private IStormClusterState getStormClusterState() {
        return this.stormClusterState;
    }

    @VisibleForTesting
    public AtomicReference<Map<String, Map<List<Integer>, Map<String, Object>>>> getHeartbeatsCache() {
        return this.heartbeatsCache;
    }

    private BlobStore getBlobStore() {
        return this.blobStore;
    }

    private boolean isLeader() throws Exception {
        return this.leaderElector.isLeader();
    }

    private void assertIsLeader() throws Exception {
        if (!this.isLeader()) {
            NimbusInfo leaderAddress = this.leaderElector.getLeader();
            throw new RuntimeException("not a leader, current leader is " + leaderAddress);
        }
    }

    private String getInbox() throws IOException {
        return ServerConfigUtils.masterInbox(this.conf);
    }

    void delayEvent(String topoId, int delaySecs, TopologyActions event, Object args) {
        LOG.info("Delaying event {} for {} secs for {}", new Object[]{event, delaySecs, topoId});
        this.timer.schedule(delaySecs, () -> {
            try {
                this.transition(topoId, event, args, false);
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
        });
    }

    void doRebalance(String topoId, StormBase stormBase) throws Exception {
        RebalanceOptions rbo = stormBase.get_topology_action_options().get_rebalance_options();
        StormBase updated = new StormBase();
        updated.set_topology_action_options(null);
        updated.set_component_debug(Collections.emptyMap());
        if (rbo.is_set_num_executors()) {
            updated.set_component_executors(rbo.get_num_executors());
        }
        if (rbo.is_set_num_workers()) {
            updated.set_num_workers(rbo.get_num_workers());
        }
        this.stormClusterState.updateStorm(topoId, updated);
        this.mkAssignments(topoId);
    }

    private String toTopoId(String topoName) throws NotAliveException {
        return (String)this.stormClusterState.getTopoId(topoName).orElseThrow(() -> new NotAliveException(topoName + " is not alive"));
    }

    private void transitionName(String topoName, TopologyActions event, Object eventArg, boolean errorOnNoTransition) throws Exception {
        this.transition(this.toTopoId(topoName), event, eventArg, errorOnNoTransition);
    }

    private void transition(String topoId, TopologyActions event, Object eventArg) throws Exception {
        this.transition(topoId, event, eventArg, false);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void transition(String topoId, TopologyActions event, Object eventArg, boolean errorOnNoTransition) throws Exception {
        LOG.info("TRANSITION: {} {} {} {}", new Object[]{topoId, event, eventArg, errorOnNoTransition});
        this.assertIsLeader();
        Object object = this.submitLock;
        synchronized (object) {
            IStormClusterState clusterState = this.stormClusterState;
            StormBase base = clusterState.stormBase(topoId, null);
            if (base == null || base.get_status() == null) {
                LOG.info("Cannot apply event {} to {} because topology no longer exists", (Object)event, (Object)topoId);
            } else {
                StormBase updates;
                TopologyStatus status = base.get_status();
                TopologyStateTransition transition = TOPO_STATE_TRANSITIONS.get(status).get((Object)event);
                if (transition == null) {
                    String message = "No transition for event: " + (Object)((Object)event) + ", status: " + status + " storm-id: " + topoId;
                    if (errorOnNoTransition) {
                        throw new RuntimeException(message);
                    }
                    if (TopologyActions.STARTUP != event) {
                        LOG.info(message);
                    }
                    transition = NOOP_TRANSITION;
                }
                if ((updates = transition.transition(eventArg, this, topoId, base)) != null) {
                    clusterState.updateStorm(topoId, updates);
                }
            }
        }
    }

    private void setupStormCode(Map<String, Object> conf, String topoId, String tmpJarLocation, Map<String, Object> topoConf, StormTopology topology) throws Exception {
        Subject subject = Nimbus.getSubject();
        IStormClusterState clusterState = this.stormClusterState;
        BlobStore store = this.blobStore;
        String jarKey = ConfigUtils.masterStormJarKey((String)topoId);
        String codeKey = ConfigUtils.masterStormCodeKey((String)topoId);
        String confKey = ConfigUtils.masterStormConfKey((String)topoId);
        NimbusInfo hostPortInfo = this.nimbusHostPortInfo;
        if (tmpJarLocation != null) {
            try (FileInputStream fin = new FileInputStream(tmpJarLocation);){
                store.createBlob(jarKey, (InputStream)fin, new SettableBlobMeta(BlobStoreAclHandler.DEFAULT), subject);
            }
            if (store instanceof LocalFsBlobStore) {
                clusterState.setupBlobstore(jarKey, hostPortInfo, Integer.valueOf(Nimbus.getVersionForKey(jarKey, hostPortInfo, conf)));
            }
        }
        store.createBlob(confKey, Utils.toCompressedJsonConf(topoConf), new SettableBlobMeta(BlobStoreAclHandler.DEFAULT), subject);
        if (store instanceof LocalFsBlobStore) {
            clusterState.setupBlobstore(confKey, hostPortInfo, Integer.valueOf(Nimbus.getVersionForKey(confKey, hostPortInfo, conf)));
        }
        store.createBlob(codeKey, Utils.serialize((Object)topology), new SettableBlobMeta(BlobStoreAclHandler.DEFAULT), subject);
        if (store instanceof LocalFsBlobStore) {
            clusterState.setupBlobstore(codeKey, hostPortInfo, Integer.valueOf(Nimbus.getVersionForKey(codeKey, hostPortInfo, conf)));
        }
    }

    private Integer getBlobReplicationCount(String key) throws Exception {
        BlobStore store = this.blobStore;
        if (store != null) {
            return store.getBlobReplication(key, NIMBUS_SUBJECT);
        }
        return null;
    }

    private void waitForDesiredCodeReplication(Map<String, Object> topoConf, String topoId) throws Exception {
        int minReplicationCount = ObjectReader.getInt((Object)topoConf.get("topology.min.replication.count"));
        int maxWaitTime = ObjectReader.getInt((Object)topoConf.get("topology.max.replication.wait.time.sec"));
        int jarCount = minReplicationCount;
        if (!ConfigUtils.isLocalMode(topoConf)) {
            jarCount = this.getBlobReplicationCount(ConfigUtils.masterStormJarKey((String)topoId));
        }
        int codeCount = this.getBlobReplicationCount(ConfigUtils.masterStormCodeKey((String)topoId));
        int confCount = this.getBlobReplicationCount(ConfigUtils.masterStormConfKey((String)topoId));
        long totalWaitTime = 0L;
        if (this.blobStore != null) {
            while (jarCount < minReplicationCount && codeCount < minReplicationCount && confCount < minReplicationCount) {
                if (maxWaitTime > 0 && totalWaitTime > (long)maxWaitTime) {
                    LOG.info("desired replication count of {} not achieved but we have hit the max wait time {} so moving on with replication count for conf key = {} for code key = {} for jar key = ", new Object[]{minReplicationCount, maxWaitTime, confCount, codeCount, jarCount});
                    return;
                }
                LOG.debug("Checking if I am still the leader");
                this.assertIsLeader();
                LOG.info("WAITING... storm-id {}, {} <? {} {} {}", new Object[]{topoId, minReplicationCount, jarCount, codeCount, confCount});
                LOG.info("WAITING... {} <? {}", (Object)totalWaitTime, (Object)maxWaitTime);
                Time.sleepSecs((long)1L);
                ++totalWaitTime;
                if (!ConfigUtils.isLocalMode(topoConf)) {
                    jarCount = this.getBlobReplicationCount(ConfigUtils.masterStormJarKey((String)topoId));
                }
                codeCount = this.getBlobReplicationCount(ConfigUtils.masterStormCodeKey((String)topoId));
                confCount = this.getBlobReplicationCount(ConfigUtils.masterStormConfKey((String)topoId));
            }
        }
        LOG.info("desired replication count {} achieved, current-replication-count for conf key = {}, current-replication-count for code key = {}, current-replication-count for jar key = {}", new Object[]{minReplicationCount, confCount, codeCount, jarCount});
    }

    private TopologyDetails readTopologyDetails(String topoId, StormBase base) throws KeyNotFoundException, AuthorizationException, IOException, InvalidTopologyException {
        assert (base != null);
        assert (topoId != null);
        BlobStore store = this.blobStore;
        Map<String, Object> topoConf = Nimbus.readTopoConfAsNimbus(topoId, store);
        StormTopology topo = Nimbus.readStormTopologyAsNimbus(topoId, store);
        Map<List<Integer>, String> rawExecToComponent = this.computeExecutorToComponent(topoId, base);
        HashMap<ExecutorDetails, String> executorsToComponent = new HashMap<ExecutorDetails, String>();
        for (Map.Entry<List<Integer>, String> entry : rawExecToComponent.entrySet()) {
            List<Integer> execs = entry.getKey();
            ExecutorDetails execDetails = new ExecutorDetails(execs.get(0).intValue(), execs.get(1).intValue());
            executorsToComponent.put(execDetails, entry.getValue());
        }
        return new TopologyDetails(topoId, topoConf, topo, base.get_num_workers(), executorsToComponent, base.get_launch_time_secs());
    }

    private void updateHeartbeats(String topoId, Set<List<Integer>> allExecutors, Assignment existingAssignment) {
        LOG.debug("Updating heartbeats for {} {}", (Object)topoId, allExecutors);
        IStormClusterState state = this.stormClusterState;
        Map executorBeats = StatsUtil.convertExecutorBeats((Map)state.executorBeats(topoId, existingAssignment.get_executor_node_port()));
        Map cache = StatsUtil.updateHeartbeatCache(this.heartbeatsCache.get().get(topoId), (Map)executorBeats, allExecutors, (Integer)ObjectReader.getInt((Object)this.conf.get("nimbus.task.timeout.secs")));
        this.heartbeatsCache.getAndUpdate(new Assoc<String, Map>(topoId, cache));
    }

    private void updateAllHeartbeats(Map<String, Assignment> existingAssignments, Map<String, Set<List<Integer>>> topologyToExecutors) {
        for (Map.Entry<String, Assignment> entry : existingAssignments.entrySet()) {
            String topoId = entry.getKey();
            this.updateHeartbeats(topoId, topologyToExecutors.get(topoId), entry.getValue());
        }
    }

    private Set<List<Integer>> aliveExecutors(TopologyDetails td, Set<List<Integer>> allExecutors, Assignment assignment) {
        String topoId = td.getId();
        Map<List<Integer>, Map<String, Object>> hbCache = this.heartbeatsCache.get().get(topoId);
        LOG.debug("NEW  Computing alive executors for {}\nExecutors: {}\nAssignment: {}\nHeartbeat cache: {}", new Object[]{topoId, allExecutors, assignment, hbCache});
        int taskLaunchSecs = ObjectReader.getInt((Object)this.conf.get("nimbus.task.launch.secs"));
        HashSet<List<Integer>> ret = new HashSet<List<Integer>>();
        Map execToStartTimes = assignment.get_executor_start_time_secs();
        for (List<Integer> exec : allExecutors) {
            Integer delta;
            ArrayList<Long> longExec = new ArrayList<Long>(exec.size());
            for (Integer num : exec) {
                longExec.add(num.longValue());
            }
            Long startTime = (Long)execToStartTimes.get(longExec);
            Boolean isTimedOut = (Boolean)hbCache.get(StatsUtil.convertExecutor(longExec)).get("is-timed-out");
            Integer n = delta = startTime == null ? null : Integer.valueOf(Time.deltaSecs((int)startTime.intValue()));
            if (!(startTime == null || delta >= taskLaunchSecs && isTimedOut.booleanValue())) {
                ret.add(exec);
                continue;
            }
            LOG.info("Executor {}:{} not alive", (Object)topoId, exec);
        }
        return ret;
    }

    private List<List<Integer>> computeExecutors(String topoId, StormBase base) throws KeyNotFoundException, AuthorizationException, IOException, InvalidTopologyException {
        BlobStore store = this.blobStore;
        assert (base != null);
        Map compToExecutors = base.get_component_executors();
        Map<String, Object> topoConf = Nimbus.readTopoConfAsNimbus(topoId, store);
        StormTopology topology = Nimbus.readStormTopologyAsNimbus(topoId, store);
        ArrayList<List<Integer>> ret = new ArrayList<List<Integer>>();
        if (compToExecutors != null) {
            Map taskInfo = StormCommon.stormTaskInfo((StormTopology)topology, topoConf);
            HashMap compToTaskList = Utils.reverseMap((Map)taskInfo);
            for (Map.Entry entry : compToTaskList.entrySet()) {
                List comps = (List)entry.getValue();
                comps.sort(null);
                Integer numExecutors = (Integer)compToExecutors.get(entry.getKey());
                if (numExecutors == null) continue;
                List partitioned = Utils.partitionFixed((int)numExecutors, (Collection)comps);
                for (List partition : partitioned) {
                    ret.add(Arrays.asList((Integer)partition.get(0), (Integer)partition.get(partition.size() - 1)));
                }
            }
        }
        return ret;
    }

    private Map<List<Integer>, String> computeExecutorToComponent(String topoId, StormBase base) throws KeyNotFoundException, AuthorizationException, InvalidTopologyException, IOException {
        BlobStore store = this.blobStore;
        List<List<Integer>> executors = this.computeExecutors(topoId, base);
        StormTopology topology = Nimbus.readStormTopologyAsNimbus(topoId, store);
        Map<String, Object> topoConf = Nimbus.readTopoConfAsNimbus(topoId, store);
        Map taskToComponent = StormCommon.stormTaskInfo((StormTopology)topology, topoConf);
        HashMap<List<Integer>, String> ret = new HashMap<List<Integer>, String>();
        for (List<Integer> executor : executors) {
            ret.put(executor, (String)taskToComponent.get(executor.get(0)));
        }
        return ret;
    }

    private Map<String, Set<List<Integer>>> computeTopologyToExecutors(Map<String, StormBase> bases) throws KeyNotFoundException, AuthorizationException, InvalidTopologyException, IOException {
        HashMap<String, Set<List<Integer>>> ret = new HashMap<String, Set<List<Integer>>>();
        if (bases != null) {
            for (Map.Entry<String, StormBase> entry : bases.entrySet()) {
                String topoId = entry.getKey();
                ret.put(topoId, new HashSet<List<Integer>>(this.computeExecutors(topoId, entry.getValue())));
            }
        }
        return ret;
    }

    private Map<String, Set<List<Integer>>> computeTopologyToAliveExecutors(Map<String, Assignment> existingAssignment, Topologies topologies, Map<String, Set<List<Integer>>> topologyToExecutors, String scratchTopologyId) {
        HashMap<String, Set<List<Integer>>> ret = new HashMap<String, Set<List<Integer>>>();
        for (Map.Entry<String, Assignment> entry : existingAssignment.entrySet()) {
            String topoId = entry.getKey();
            Assignment assignment = entry.getValue();
            TopologyDetails td = topologies.getById(topoId);
            Set<List<Integer>> allExecutors = topologyToExecutors.get(topoId);
            Set<List<Integer>> aliveExecutors = topoId.equals(scratchTopologyId) ? allExecutors : new HashSet<List<Integer>>(this.aliveExecutors(td, allExecutors, assignment));
            ret.put(topoId, aliveExecutors);
        }
        return ret;
    }

    private static List<Integer> asIntExec(List<Long> exec) {
        ArrayList<Integer> ret = new ArrayList<Integer>(2);
        ret.add(exec.get(0).intValue());
        ret.add(exec.get(1).intValue());
        return ret;
    }

    private Map<String, Set<Long>> computeSupervisorToDeadPorts(Map<String, Assignment> existingAssignments, Map<String, Set<List<Integer>>> topologyToExecutors, Map<String, Set<List<Integer>>> topologyToAliveExecutors) {
        HashMap<String, Set<Long>> ret = new HashMap<String, Set<Long>>();
        for (Map.Entry<String, Assignment> entry : existingAssignments.entrySet()) {
            String topoId = entry.getKey();
            Assignment assignment = entry.getValue();
            Set<List<Integer>> allExecutors = topologyToExecutors.get(topoId);
            Set<List<Integer>> aliveExecutors = topologyToAliveExecutors.get(topoId);
            HashSet<List<Integer>> deadExecutors = new HashSet<List<Integer>>(allExecutors);
            deadExecutors.removeAll(aliveExecutors);
            Map execToNodePort = assignment.get_executor_node_port();
            for (Map.Entry assigned : execToNodePort.entrySet()) {
                if (!deadExecutors.contains(Nimbus.asIntExec((List)assigned.getKey()))) continue;
                NodeInfo info = (NodeInfo)assigned.getValue();
                String superId = info.get_node();
                HashSet ports = (HashSet)ret.get(superId);
                if (ports == null) {
                    ports = new HashSet();
                    ret.put(superId, ports);
                }
                ports.addAll(info.get_port());
            }
        }
        return ret;
    }

    private Map<String, SchedulerAssignmentImpl> computeTopologyToSchedulerAssignment(Map<String, Assignment> existingAssignments, Map<String, Set<List<Integer>>> topologyToAliveExecutors) {
        HashMap<String, SchedulerAssignmentImpl> ret = new HashMap<String, SchedulerAssignmentImpl>();
        for (Map.Entry<String, Assignment> entry : existingAssignments.entrySet()) {
            String topoId = entry.getKey();
            Assignment assignment = entry.getValue();
            Set<List<Integer>> aliveExecutors = topologyToAliveExecutors.get(topoId);
            Map execToNodePort = assignment.get_executor_node_port();
            Map workerToResources = assignment.get_worker_resources();
            HashMap<NodeInfo, WorkerSlot> nodePortToSlot = new HashMap<NodeInfo, WorkerSlot>();
            for (Map.Entry nodeAndResources : workerToResources.entrySet()) {
                NodeInfo info = (NodeInfo)nodeAndResources.getKey();
                WorkerResources resources = (WorkerResources)nodeAndResources.getValue();
                WorkerSlot slot = new WorkerSlot(info.get_node(), (Number)info.get_port_iterator().next(), resources.get_mem_on_heap(), resources.get_mem_off_heap(), resources.get_cpu());
                nodePortToSlot.put(info, slot);
            }
            HashMap execToSlot = new HashMap();
            for (Map.Entry execAndNodePort : execToNodePort.entrySet()) {
                List<Integer> exec = Nimbus.asIntExec((List)execAndNodePort.getKey());
                NodeInfo info = (NodeInfo)execAndNodePort.getValue();
                if (!aliveExecutors.contains(exec)) continue;
                execToSlot.put(new ExecutorDetails(exec.get(0).intValue(), exec.get(1).intValue()), nodePortToSlot.get(info));
            }
            ret.put(topoId, new SchedulerAssignmentImpl(topoId, execToSlot));
        }
        return ret;
    }

    private Map<String, SupervisorDetails> readAllSupervisorDetails(Map<String, Set<Long>> superToDeadPorts, Topologies topologies, Collection<String> missingAssignmentTopologies) {
        String superId;
        HashMap<String, SupervisorDetails> ret = new HashMap<String, SupervisorDetails>();
        IStormClusterState state = this.stormClusterState;
        Map superInfos = state.allSupervisorInfo();
        ArrayList<SupervisorDetails> superDetails = new ArrayList<SupervisorDetails>();
        for (Map.Entry entry : superInfos.entrySet()) {
            SupervisorInfo supervisorInfo = (SupervisorInfo)entry.getValue();
            superDetails.add(new SupervisorDetails((String)entry.getKey(), (Object)supervisorInfo.get_meta(), supervisorInfo.get_resources_map()));
        }
        HashMap<String, HashSet<Long>> superToPorts = new HashMap<String, HashSet<Long>>();
        for (WorkerSlot workerSlot : this.inimbus.allSlotsAvailableForScheduling(superDetails, topologies, new HashSet<String>(missingAssignmentTopologies))) {
            superId = workerSlot.getNodeId();
            HashSet<Long> ports = (HashSet<Long>)superToPorts.get(superId);
            if (ports == null) {
                ports = new HashSet<Long>();
                superToPorts.put(superId, ports);
            }
            ports.add(Long.valueOf(workerSlot.getPort()));
        }
        for (Map.Entry entry : superInfos.entrySet()) {
            superId = (String)entry.getKey();
            SupervisorInfo info = (SupervisorInfo)entry.getValue();
            String hostname = info.get_hostname();
            Set<Long> deadPorts = superToDeadPorts.get(superId);
            HashSet allPorts = (HashSet)superToPorts.get(superId);
            allPorts = allPorts == null ? new HashSet() : new HashSet(allPorts);
            if (deadPorts != null) {
                allPorts.removeAll(deadPorts);
            }
            ret.put(superId, new SupervisorDetails(superId, hostname, (Object)info.get_scheduler_meta(), allPorts, info.get_resources_map()));
        }
        return ret;
    }

    private Map<String, SchedulerAssignment> computeNewSchedulerAssignments(Map<String, Assignment> existingAssignments, Topologies topologies, Map<String, StormBase> bases, String scratchTopologyId) throws KeyNotFoundException, AuthorizationException, InvalidTopologyException, IOException {
        Map<String, Set<List<Integer>>> topoToExec = this.computeTopologyToExecutors(bases);
        this.updateAllHeartbeats(existingAssignments, topoToExec);
        Map<String, Set<List<Integer>>> topoToAliveExecutors = this.computeTopologyToAliveExecutors(existingAssignments, topologies, topoToExec, scratchTopologyId);
        Map<String, Set<Long>> supervisorToDeadPorts = this.computeSupervisorToDeadPorts(existingAssignments, topoToExec, topoToAliveExecutors);
        Map<String, SchedulerAssignmentImpl> topoToSchedAssignment = this.computeTopologyToSchedulerAssignment(existingAssignments, topoToAliveExecutors);
        HashSet<String> missingAssignmentTopologies = new HashSet<String>();
        for (TopologyDetails topo : topologies.getTopologies()) {
            String id = topo.getId();
            Set<List<Integer>> allExecs = topoToExec.get(id);
            Set<List<Integer>> set = topoToAliveExecutors.get(id);
            int numDesiredWorkers = topo.getNumWorkers();
            int numAssignedWorkers = Nimbus.numUsedWorkers((SchedulerAssignment)topoToSchedAssignment.get(id));
            if (allExecs != null && !allExecs.isEmpty() && allExecs.equals(set) && numDesiredWorkers <= numAssignedWorkers) continue;
            missingAssignmentTopologies.add(id);
        }
        Map<String, SupervisorDetails> supervisors = this.readAllSupervisorDetails(supervisorToDeadPorts, topologies, missingAssignmentTopologies);
        Cluster cluster = new Cluster(this.inimbus, supervisors, topoToSchedAssignment, this.conf);
        cluster.setStatusMap(this.idToSchedStatus.get());
        this.scheduler.schedule(topologies, cluster);
        this.idToSchedStatus.set(Nimbus.merge(this.idToSchedStatus.get(), cluster.getStatusMap()));
        this.nodeIdToResources.set(cluster.getSupervisorsResourcesMap());
        if (!ObjectReader.getBoolean((Object)this.conf.get("scheduler.display.resource"), (boolean)false)) {
            cluster.updateAssignedMemoryForTopologyAndSupervisor(topologies);
        }
        HashMap resources = new HashMap();
        for (Map.Entry entry : cluster.getTopologyResourcesMap().entrySet()) {
            Double[] r = (Double[])entry.getValue();
            resources.put(entry.getKey(), new TopologyResources(r[0], r[1], r[2], r[3], r[4], r[5]));
        }
        this.idToResources.getAndAccumulate(resources, (orig, update) -> Nimbus.merge(orig, update));
        HashMap workerResources = new HashMap();
        for (Map.Entry uglyWorkerResources : cluster.getWorkerResourcesMap().entrySet()) {
            HashMap slotToResources = new HashMap();
            for (Map.Entry uglySlotToResources : ((Map)uglyWorkerResources.getValue()).entrySet()) {
                Double[] r = (Double[])uglySlotToResources.getValue();
                WorkerResources wr = new WorkerResources();
                wr.set_mem_on_heap(r[0].doubleValue());
                wr.set_mem_off_heap(r[1].doubleValue());
                wr.set_cpu(r[2].doubleValue());
                slotToResources.put(uglySlotToResources.getKey(), wr);
            }
            workerResources.put(uglyWorkerResources.getKey(), slotToResources);
        }
        this.idToWorkerResources.getAndAccumulate(workerResources, (orig, update) -> Nimbus.merge(orig, update));
        return cluster.getAssignments();
    }

    private TopologyResources getResourcesForTopology(String topoId, StormBase base) throws NotAliveException, AuthorizationException, InvalidTopologyException, IOException {
        TopologyResources ret = this.idToResources.get().get(topoId);
        if (ret == null) {
            try {
                IStormClusterState state = this.stormClusterState;
                TopologyDetails details = this.readTopologyDetails(topoId, base);
                double sumOnHeap = 0.0;
                double sumOffHeap = 0.0;
                double sumCPU = 0.0;
                Assignment assignment = state.assignmentInfo(topoId, null);
                if (assignment != null && assignment.is_set_worker_resources()) {
                    for (WorkerResources wr : assignment.get_worker_resources().values()) {
                        if (wr.is_set_cpu()) {
                            sumCPU += wr.get_cpu();
                        }
                        if (wr.is_set_mem_off_heap()) {
                            sumOffHeap += wr.get_mem_off_heap();
                        }
                        if (!wr.is_set_mem_on_heap()) continue;
                        sumOnHeap += wr.get_mem_on_heap();
                    }
                }
                ret = new TopologyResources(details.getTotalRequestedMemOnHeap(), details.getTotalRequestedMemOffHeap(), details.getTotalRequestedCpu(), sumOnHeap, sumOffHeap, sumCPU);
            }
            catch (KeyNotFoundException e) {
                LOG.error("Failed to get topology details", (Throwable)e);
                ret = new TopologyResources(0.0, 0.0, 0.0, 0.0, 0.0, 0.0);
            }
        }
        return ret;
    }

    private Map<WorkerSlot, WorkerResources> getWorkerResourcesForTopology(String topoId) {
        Map<WorkerSlot, WorkerResources> ret = this.idToWorkerResources.get().get(topoId);
        if (ret == null) {
            IStormClusterState state = this.stormClusterState;
            ret = new HashMap<WorkerSlot, WorkerResources>();
            Assignment assignment = state.assignmentInfo(topoId, null);
            if (assignment != null && assignment.is_set_worker_resources()) {
                for (Map.Entry entry : assignment.get_worker_resources().entrySet()) {
                    NodeInfo ni = (NodeInfo)entry.getKey();
                    WorkerSlot slot = new WorkerSlot(ni.get_node(), (Number)ni.get_port_iterator().next());
                    ret.put(slot, (WorkerResources)entry.getValue());
                }
                this.idToWorkerResources.getAndUpdate(new Assoc<String, Map<WorkerSlot, WorkerResources>>(topoId, ret));
            }
        }
        return ret;
    }

    private void mkAssignments() throws Exception {
        this.mkAssignments(null);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * WARNING - void declaration
     */
    private void mkAssignments(String scratchTopoId) throws Exception {
        Map bases;
        if (!this.isLeader()) {
            LOG.info("not a leader, skipping assignments");
            return;
        }
        IStormClusterState state = this.stormClusterState;
        HashMap<String, TopologyDetails> tds = new HashMap<String, TopologyDetails>();
        Object object = this.submitLock;
        synchronized (object) {
            bases = state.topologyBases();
            Iterator it = bases.entrySet().iterator();
            while (it.hasNext()) {
                Map.Entry entry = it.next();
                String id = (String)entry.getKey();
                try {
                    tds.put(id, this.readTopologyDetails(id, (StormBase)entry.getValue()));
                }
                catch (KeyNotFoundException e) {
                    it.remove();
                }
            }
        }
        Topologies topologies = new Topologies(tds);
        List assignedTopologyIds = state.assignments(null);
        HashMap<String, Assignment> existingAssignments = new HashMap<String, Assignment>();
        for (String id : assignedTopologyIds) {
            if (id.equals(scratchTopoId)) continue;
            existingAssignments.put(id, state.assignmentInfo(id, null));
        }
        Map<String, SchedulerAssignment> newSchedulerAssignments = null;
        Object object2 = this.schedLock;
        synchronized (object2) {
            Assignment existingAssignment;
            String topoId;
            newSchedulerAssignments = this.computeNewSchedulerAssignments(existingAssignments, topologies, bases, scratchTopoId);
            Map<String, Map<List<Long>, List<Object>>> topologyToExecutorToNodePort = Nimbus.computeNewTopoToExecToNodePort(newSchedulerAssignments, existingAssignments);
            for (String id : assignedTopologyIds) {
                if (topologyToExecutorToNodePort.containsKey(id)) continue;
                topologyToExecutorToNodePort.put(id, null);
            }
            Map<String, Map<List<Object>, List<Double>>> newAssignedWorkerToResources = Nimbus.computeTopoToNodePortToResources(newSchedulerAssignments);
            int nowSecs = Time.currentTimeSecs();
            Map<String, SupervisorDetails> basicSupervisorDetailsMap = Nimbus.basicSupervisorDetailsMap(state);
            HashMap<String, Assignment> newAssignments = new HashMap<String, Assignment>();
            for (Map.Entry<String, Map<List<Long>, List<Object>>> entry : topologyToExecutorToNodePort.entrySet()) {
                void var22_28;
                topoId = entry.getKey();
                Map<List<Long>, List<Object>> execToNodePort = entry.getValue();
                existingAssignment = (Assignment)existingAssignments.get(topoId);
                HashSet<String> allNodes = new HashSet<String>();
                if (execToNodePort != null) {
                    for (List<Object> list : execToNodePort.values()) {
                        allNodes.add((String)list.get(0));
                    }
                }
                HashMap<String, String> allNodeHost = new HashMap<String, String>();
                if (existingAssignment != null) {
                    allNodeHost.putAll(existingAssignment.get_node_host());
                }
                for (String node : allNodes) {
                    String host = this.inimbus.getHostName(basicSupervisorDetailsMap, node);
                    if (host == null) continue;
                    allNodeHost.put(node, host);
                }
                Object var22_32 = null;
                if (existingAssignment != null) {
                    Map map = existingAssignment.get_executor_node_port();
                }
                List<List<Long>> reassignExecutors = Nimbus.changedExecutors((Map<List<Long>, NodeInfo>)var22_28, execToNodePort);
                HashMap<List<Long>, Long> startTimes = new HashMap<List<Long>, Long>();
                if (existingAssignment != null) {
                    startTimes.putAll(existingAssignment.get_executor_start_time_secs());
                }
                for (List<Long> id : reassignExecutors) {
                    startTimes.put(id, Long.valueOf(nowSecs));
                }
                Map<List<Object>, List<Double>> workerToResources = newAssignedWorkerToResources.get(topoId);
                Assignment newAssignment = new Assignment((String)this.conf.get("storm.local.dir"));
                HashMap justAssignedKeys = new HashMap(allNodeHost);
                justAssignedKeys.keySet().retainAll(allNodes);
                newAssignment.set_node_host(justAssignedKeys);
                HashMap<List<Long>, NodeInfo> execToNodeInfo = new HashMap<List<Long>, NodeInfo>();
                for (Map.Entry<List<Long>, List<Object>> execAndNodePort : execToNodePort.entrySet()) {
                    List<Object> nodePort3 = execAndNodePort.getValue();
                    NodeInfo ni = new NodeInfo();
                    ni.set_node((String)nodePort3.get(0));
                    ni.add_to_port(((Long)nodePort3.get(1)).longValue());
                    execToNodeInfo.put(execAndNodePort.getKey(), ni);
                }
                newAssignment.set_executor_node_port(execToNodeInfo);
                newAssignment.set_executor_start_time_secs(startTimes);
                HashMap<NodeInfo, WorkerResources> workerResources = new HashMap<NodeInfo, WorkerResources>();
                for (Map.Entry<List<Object>, List<Double>> wr : workerToResources.entrySet()) {
                    List<Object> nodePort4 = wr.getKey();
                    NodeInfo ni = new NodeInfo();
                    ni.set_node((String)nodePort4.get(0));
                    ni.add_to_port(((Long)nodePort4.get(1)).longValue());
                    List<Double> r = wr.getValue();
                    WorkerResources resources = new WorkerResources();
                    resources.set_mem_on_heap(r.get(0).doubleValue());
                    resources.set_mem_off_heap(r.get(1).doubleValue());
                    resources.set_cpu(r.get(2).doubleValue());
                    workerResources.put(ni, resources);
                }
                newAssignment.set_worker_resources(workerResources);
                newAssignments.put(topoId, newAssignment);
            }
            if (!newAssignments.equals(existingAssignments)) {
                LOG.debug("RESETTING id->resources and id->worker-resources cache!");
                this.idToResources.set(new HashMap());
                this.idToWorkerResources.set(new HashMap());
            }
            for (Map.Entry<String, Map<List<Long>, List<Object>>> entry : newAssignments.entrySet()) {
                topoId = entry.getKey();
                Assignment assignment = (Assignment)entry.getValue();
                if (assignment.equals(existingAssignment = (Assignment)existingAssignments.get(topoId))) {
                    LOG.debug("Assignment for {} hasn't changed", (Object)topoId);
                    continue;
                }
                LOG.info("Setting new assignment for topology id {}: {}", (Object)topoId, (Object)assignment);
                state.setAssignment(topoId, assignment);
            }
            HashMap<String, Set<WorkerSlot>> addedSlots = new HashMap<String, Set<WorkerSlot>>();
            for (Map.Entry entry : newAssignments.entrySet()) {
                String topoId2 = (String)entry.getKey();
                Assignment assignment = (Assignment)entry.getValue();
                Assignment existingAssignment2 = (Assignment)existingAssignments.get(topoId2);
                if (existingAssignment2 == null) {
                    existingAssignment2 = new Assignment();
                    existingAssignment2.set_executor_node_port(new HashMap());
                    existingAssignment2.set_executor_start_time_secs(new HashMap());
                }
                Set<WorkerSlot> newSlots = Nimbus.newlyAddedSlots(existingAssignment2, assignment);
                addedSlots.put(topoId2, newSlots);
            }
            this.inimbus.assignSlots(topologies, addedSlots);
        }
    }

    private void notifyTopologyActionListener(String topoId, String action) {
        ITopologyActionNotifierPlugin notifier = this.nimbusTopologyActionNotifier;
        if (notifier != null) {
            try {
                notifier.notify(topoId, action);
            }
            catch (Exception e) {
                LOG.warn("Ignoring exception from Topology action notifier for storm-Id {}", (Object)topoId, (Object)e);
            }
        }
    }

    private void startTopology(String topoName, String topoId, TopologyStatus initStatus) throws KeyNotFoundException, AuthorizationException, IOException, InvalidTopologyException {
        assert (TopologyStatus.ACTIVE == initStatus || TopologyStatus.INACTIVE == initStatus);
        IStormClusterState state = this.stormClusterState;
        BlobStore store = this.blobStore;
        Map<String, Object> topoConf = Nimbus.readTopoConf(topoId, store);
        StormTopology topology = StormCommon.systemTopology(topoConf, (StormTopology)Nimbus.readStormTopology(topoId, store));
        HashMap numExecutors = new HashMap();
        for (Map.Entry entry : StormCommon.allComponents((StormTopology)topology).entrySet()) {
            numExecutors.put(entry.getKey(), StormCommon.numStartExecutors(entry.getValue()));
        }
        LOG.info("Activating {}: {}", (Object)topoName, (Object)topoId);
        StormBase base = new StormBase();
        base.set_name(topoName);
        base.set_launch_time_secs(Time.currentTimeSecs());
        base.set_status(initStatus);
        base.set_num_workers(ObjectReader.getInt((Object)topoConf.get("topology.workers"), (Integer)0).intValue());
        base.set_component_executors(numExecutors);
        base.set_owner((String)topoConf.get("topology.submitter.user"));
        base.set_component_debug(new HashMap());
        state.activateStorm(topoId, base);
        this.notifyTopologyActionListener(topoName, "activate");
    }

    private void assertTopoActive(String topoName, boolean expectActive) throws NotAliveException, AlreadyAliveException {
        if (Nimbus.isTopologyActive(this.stormClusterState, topoName) != expectActive) {
            if (expectActive) {
                throw new NotAliveException(topoName + " is not alive");
            }
            throw new AlreadyAliveException(topoName + " is already alive");
        }
    }

    private Map<String, Object> tryReadTopoConfFromName(String topoName) throws NotAliveException, AuthorizationException, IOException {
        IStormClusterState state = this.stormClusterState;
        String topoId = (String)state.getTopoId(topoName).orElseThrow(() -> new NotAliveException(topoName + " is not alive"));
        return Nimbus.tryReadTopoConf(topoId, this.blobStore);
    }

    @VisibleForTesting
    public void checkAuthorization(String topoName, Map<String, Object> topoConf, String operation) throws AuthorizationException {
        this.checkAuthorization(topoName, topoConf, operation, null);
    }

    @VisibleForTesting
    public void checkAuthorization(String topoName, Map<String, Object> topoConf, String operation, ReqContext context) throws AuthorizationException {
        IAuthorizer aclHandler = this.authorizationHandler;
        IAuthorizer impersonationAuthorizer = this.impersonationAuthorizationHandler;
        if (context == null) {
            context = ReqContext.context();
        }
        HashMap<String, Object> checkConf = new HashMap<String, Object>();
        if (topoConf != null) {
            checkConf.putAll(topoConf);
        } else if (topoName != null) {
            checkConf.put("topology.name", topoName);
        }
        if (context.isImpersonating()) {
            LOG.warn("principal: {} is trying to impersonate principal: {}", (Object)context.realPrincipal(), (Object)context.principal());
            if (impersonationAuthorizer == null) {
                LOG.warn("impersonation attempt but {} has no authorizer configured. potential security risk, please see SECURITY.MD to learn how to configure impersonation authorizer.", (Object)"nimbus.impersonation.authorizer");
            } else if (!impersonationAuthorizer.permit(context, operation, checkConf)) {
                ThriftAccessLogger.logAccess(context.requestID(), context.remoteAddress(), context.principal(), operation, topoName, "access-denied");
                throw new AuthorizationException("principal " + context.realPrincipal() + " is not authorized to impersonate principal " + context.principal() + " from host " + context.remoteAddress() + " Please see SECURITY.MD to learn how to configure impersonation acls.");
            }
        }
        if (aclHandler != null) {
            if (!aclHandler.permit(context, operation, checkConf)) {
                ThriftAccessLogger.logAccess(context.requestID(), context.remoteAddress(), context.principal(), operation, topoName, "access-denied");
                throw new AuthorizationException(operation + (topoName != null ? " on topology " + topoName : "") + " is not authorized");
            }
            ThriftAccessLogger.logAccess(context.requestID(), context.remoteAddress(), context.principal(), operation, topoName, "access-granted");
        }
    }

    private boolean isAuthorized(String operation, String topoId) throws NotAliveException, AuthorizationException, IOException {
        Map<String, Object> topoConf = Nimbus.tryReadTopoConf(topoId, this.blobStore);
        String topoName = (String)topoConf.get("topology.name");
        try {
            this.checkAuthorization(topoName, topoConf, operation);
            return true;
        }
        catch (AuthorizationException e) {
            return false;
        }
    }

    @VisibleForTesting
    public Set<String> filterAuthorized(String operation, Collection<String> topoIds) throws NotAliveException, AuthorizationException, IOException {
        HashSet<String> ret = new HashSet<String>();
        for (String topoId : topoIds) {
            if (!this.isAuthorized(operation, topoId)) continue;
            ret.add(topoId);
        }
        return ret;
    }

    @VisibleForTesting
    public void rmDependencyJarsInTopology(String topoId) {
        try {
            BlobStore store = this.blobStore;
            IStormClusterState state = this.stormClusterState;
            StormTopology topo = Nimbus.readStormTopologyAsNimbus(topoId, store);
            List dependencyJars = topo.get_dependency_jars();
            LOG.info("Removing dependency jars from blobs - {}", (Object)dependencyJars);
            if (dependencyJars != null && !dependencyJars.isEmpty()) {
                for (String key : dependencyJars) {
                    Nimbus.rmBlobKey(store, key, state);
                }
            }
        }
        catch (Exception e) {
            LOG.info("Exception {}", (Throwable)e);
        }
    }

    @VisibleForTesting
    public void rmTopologyKeys(String topoId) {
        BlobStore store = this.blobStore;
        IStormClusterState state = this.stormClusterState;
        Nimbus.rmBlobKey(store, ConfigUtils.masterStormJarKey((String)topoId), state);
        Nimbus.rmBlobKey(store, ConfigUtils.masterStormConfKey((String)topoId), state);
        Nimbus.rmBlobKey(store, ConfigUtils.masterStormCodeKey((String)topoId), state);
    }

    @VisibleForTesting
    public void forceDeleteTopoDistDir(String topoId) throws IOException {
        Utils.forceDelete((String)ServerConfigUtils.masterStormDistRoot(this.conf, topoId));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @VisibleForTesting
    public void doCleanup() throws Exception {
        Set<String> toClean;
        if (!this.isLeader()) {
            LOG.info("not a leader, skipping cleanup");
            return;
        }
        IStormClusterState state = this.stormClusterState;
        Iterator<String> iterator = this.submitLock;
        synchronized (iterator) {
            toClean = Nimbus.topoIdsToClean(state, this.blobStore);
        }
        if (toClean != null) {
            for (String topoId : toClean) {
                LOG.info("Cleaning up {}", (Object)topoId);
                state.teardownHeartbeats(topoId);
                state.teardownTopologyErrors(topoId);
                state.removeBackpressure(topoId);
                this.rmDependencyJarsInTopology(topoId);
                this.forceDeleteTopoDistDir(topoId);
                this.rmTopologyKeys(topoId);
                this.heartbeatsCache.getAndUpdate(new Dissoc(topoId));
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void cleanTopologyHistory(int mins) {
        int cutoffAgeSecs = Time.currentTimeSecs() - mins * 60;
        Object object = this.topologyHistoryLock;
        synchronized (object) {
            LocalState state = this.topologyHistoryState;
            state.filterOldTopologies((long)cutoffAgeSecs);
        }
    }

    private void setupBlobstore() throws AuthorizationException, KeyNotFoundException {
        IStormClusterState state = this.stormClusterState;
        BlobStore store = this.blobStore;
        HashSet localKeys = new HashSet();
        Iterator it = store.listKeys();
        while (it.hasNext()) {
            localKeys.add(it.next());
        }
        HashSet activeKeys = new HashSet(state.activeKeys());
        HashSet activeLocalKeys = new HashSet(localKeys);
        activeLocalKeys.retainAll(activeKeys);
        HashSet keysToDelete = new HashSet(localKeys);
        keysToDelete.removeAll(activeKeys);
        NimbusInfo nimbusInfo = this.nimbusHostPortInfo;
        LOG.debug("Deleting keys not on the zookeeper {}", keysToDelete);
        for (String toDelete : keysToDelete) {
            store.deleteBlob(toDelete, NIMBUS_SUBJECT);
        }
        LOG.debug("Creating list of key entries for blobstore inside zookeeper {} local {}", activeKeys, activeLocalKeys);
        for (String key : activeLocalKeys) {
            try {
                state.setupBlobstore(key, nimbusInfo, Integer.valueOf(Nimbus.getVersionForKey(key, nimbusInfo, this.conf)));
            }
            catch (KeyNotFoundException e) {
                store.deleteBlob(key, NIMBUS_SUBJECT);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void addTopoToHistoryLog(String topoId, Map<String, Object> topoConf) {
        LOG.info("Adding topo to history log: {}", (Object)topoId);
        LocalState state = this.topologyHistoryState;
        List<String> users = ServerConfigUtils.getTopoLogsUsers(topoConf);
        List<String> groups = ServerConfigUtils.getTopoLogsGroups(topoConf);
        Object object = this.topologyHistoryLock;
        synchronized (object) {
            state.addTopologyHistory(new LSTopoHistory(topoId, (long)Time.currentTimeSecs(), users, groups));
        }
    }

    private Set<String> userGroups(String user) throws IOException {
        if (user == null || user.isEmpty()) {
            return Collections.emptySet();
        }
        return this.groupMapper.getGroups(user);
    }

    private boolean isUserPartOf(String user, Collection<String> groupsToCheck) throws IOException {
        HashSet<String> userGroups = new HashSet<String>(this.userGroups(user));
        userGroups.retainAll(groupsToCheck);
        return !userGroups.isEmpty();
    }

    private List<String> readTopologyHistory(String user, Collection<String> adminUsers) throws IOException {
        LocalState state = this.topologyHistoryState;
        ArrayList<String> ret = new ArrayList<String>();
        for (LSTopoHistory history : state.getTopoHistoryList()) {
            if (user != null && !adminUsers.contains(user) && !this.isUserPartOf(user, history.get_groups()) && !history.get_users().contains(user)) continue;
            ret.add(history.get_topology_id());
        }
        return ret;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void renewCredentials() throws Exception {
        if (!this.isLeader()) {
            LOG.info("not a leader, skipping credential renewal.");
            return;
        }
        IStormClusterState state = this.stormClusterState;
        BlobStore store = this.blobStore;
        Collection<ICredentialsRenewer> renewers = this.credRenewers;
        Object lock = this.credUpdateLock;
        List assignedIds = state.activeStorms();
        if (assignedIds != null) {
            for (String id : assignedIds) {
                Map<String, Object> topoConf = Collections.unmodifiableMap(Nimbus.tryReadTopoConf(id, store));
                Object object = lock;
                synchronized (object) {
                    Credentials origCreds = state.credentials(id, null);
                    if (origCreds != null) {
                        Map orig = origCreds.get_creds();
                        HashMap newCreds = new HashMap(orig);
                        for (ICredentialsRenewer renewer : renewers) {
                            LOG.info("Renewing Creds For {} with {}", (Object)id, (Object)renewer);
                            renewer.renew(newCreds, topoConf);
                        }
                        if (!newCreds.equals(origCreds)) {
                            state.setCredentials(id, new Credentials(newCreds), topoConf);
                        }
                    }
                }
            }
        }
    }

    private void blobSync() throws Exception {
        if ("distributed".equals(this.conf.get("storm.cluster.mode")) && !this.isLeader()) {
            IStormClusterState state = this.stormClusterState;
            NimbusInfo nimbusInfo = this.nimbusHostPortInfo;
            BlobStore store = this.blobStore;
            HashSet<String> allKeys = new HashSet<String>();
            Iterator it = store.listKeys();
            while (it.hasNext()) {
                allKeys.add((String)it.next());
            }
            HashSet<String> zkKeys = new HashSet<String>(state.blobstore(() -> {
                try {
                    this.blobSync();
                }
                catch (Exception e) {
                    throw new RuntimeException(e);
                }
            }));
            LOG.debug("blob-sync blob-store-keys {} zookeeper-keys {}", allKeys, zkKeys);
            BlobSynchronizer sync = new BlobSynchronizer(store, this.conf);
            sync.setNimbusInfo(nimbusInfo);
            sync.setBlobStoreKeySet(allKeys);
            sync.setZookeeperKeySet(zkKeys);
            sync.syncBlobs();
        }
    }

    private SupervisorSummary makeSupervisorSummary(String supervisorId, SupervisorInfo info) {
        LOG.debug("INFO: {} ID: {}", (Object)info, (Object)supervisorId);
        int numPorts = 0;
        if (info.is_set_meta()) {
            numPorts = info.get_meta_size();
        }
        int numUsedPorts = 0;
        if (info.is_set_used_ports()) {
            numUsedPorts = info.get_used_ports_size();
        }
        LOG.debug("NUM PORTS: {}", (Object)numPorts);
        SupervisorSummary ret = new SupervisorSummary(info.get_hostname(), (int)info.get_uptime_secs(), numPorts, numUsedPorts, supervisorId);
        ret.set_total_resources(info.get_resources_map());
        Double[] resources = this.nodeIdToResources.get().get(supervisorId);
        if (resources != null) {
            ret.set_used_mem(Utils.nullToZero((Double)resources[2]));
            ret.set_used_cpu(Utils.nullToZero((Double)resources[3]));
        }
        if (info.is_set_version()) {
            ret.set_version(info.get_version());
        }
        return ret;
    }

    private ClusterSummary getClusterInfoImpl() throws Exception {
        IStormClusterState state = this.stormClusterState;
        Map infos = state.allSupervisorInfo();
        ArrayList<SupervisorSummary> summaries = new ArrayList<SupervisorSummary>(infos.size());
        for (Map.Entry entry : infos.entrySet()) {
            summaries.add(this.makeSupervisorSummary((String)entry.getKey(), (SupervisorInfo)entry.getValue()));
        }
        int uptime = this.uptime.upTime();
        Map bases = state.topologyBases();
        List nimbuses = state.nimbuses();
        NimbusInfo leader = this.leaderElector.getLeader();
        for (Object nimbusSummary : nimbuses) {
            nimbusSummary.set_uptime_secs(Time.deltaSecs((int)nimbusSummary.get_uptime_secs()));
            nimbusSummary.set_isLeader(leader.getHost().equals(nimbusSummary.get_host()) && leader.getPort() == nimbusSummary.get_port());
        }
        ArrayList<TopologySummary> topologySummaries = new ArrayList<TopologySummary>();
        for (Map.Entry entry : bases.entrySet()) {
            TopologyResources resources;
            String status;
            StormBase base = (StormBase)entry.getValue();
            if (base == null) continue;
            String topoId = (String)entry.getKey();
            Assignment assignment = state.assignmentInfo(topoId, null);
            int numTasks = 0;
            int numExecutors = 0;
            int numWorkers = 0;
            if (assignment != null && assignment.is_set_executor_node_port()) {
                for (List ids : assignment.get_executor_node_port().keySet()) {
                    numTasks += StormCommon.executorIdToTasks((List)ids).size();
                }
                numExecutors = assignment.get_executor_node_port_size();
                numWorkers = new HashSet(assignment.get_executor_node_port().values()).size();
            }
            TopologySummary summary = new TopologySummary(topoId, base.get_name(), numTasks, numExecutors, numWorkers, Time.deltaSecs((int)base.get_launch_time_secs()), Nimbus.extractStatusStr(base));
            if (base.is_set_owner()) {
                summary.set_owner(base.get_owner());
            }
            if ((status = this.idToSchedStatus.get().get(topoId)) != null) {
                summary.set_sched_status(status);
            }
            if ((resources = this.getResourcesForTopology(topoId, base)) != null) {
                summary.set_requested_memonheap(resources.getRequestedMemOnHeap().doubleValue());
                summary.set_requested_memoffheap(resources.getRequestedMemOffHeap().doubleValue());
                summary.set_requested_cpu(resources.getRequestedCpu().doubleValue());
                summary.set_assigned_memonheap(resources.getAssignedMemOnHeap().doubleValue());
                summary.set_assigned_memoffheap(resources.getAssignedMemOffHeap().doubleValue());
                summary.set_assigned_cpu(resources.getAssignedCpu().doubleValue());
            }
            summary.set_replication_count(this.getBlobReplicationCount(ConfigUtils.masterStormCodeKey((String)topoId)).intValue());
            topologySummaries.add(summary);
        }
        ClusterSummary ret = new ClusterSummary(summaries, topologySummaries, nimbuses);
        ret.set_nimbus_uptime_secs(uptime);
        return ret;
    }

    private void sendClusterMetricsToExecutors() throws Exception {
        IClusterMetricsConsumer.ClusterInfo clusterInfo = Nimbus.mkClusterInfo();
        ClusterSummary clusterSummary = this.getClusterInfoImpl();
        List<DataPoint> clusterMetrics = Nimbus.extractClusterMetrics(clusterSummary);
        Map<IClusterMetricsConsumer.SupervisorInfo, List<DataPoint>> supervisorMetrics = Nimbus.extractSupervisorMetrics(clusterSummary);
        for (ClusterMetricsConsumerExecutor consumerExecutor : this.clusterConsumerExceutors) {
            consumerExecutor.handleDataPoints(clusterInfo, clusterMetrics);
            for (Map.Entry<IClusterMetricsConsumer.SupervisorInfo, List<DataPoint>> entry : supervisorMetrics.entrySet()) {
                consumerExecutor.handleDataPoints(entry.getKey(), (Collection<DataPoint>)entry.getValue());
            }
        }
    }

    private CommonTopoInfo getCommonTopoInfo(String topoId, String operation) throws NotAliveException, AuthorizationException, IOException, InvalidTopologyException {
        BlobStore store = this.blobStore;
        IStormClusterState state = this.stormClusterState;
        CommonTopoInfo ret = new CommonTopoInfo();
        ret.topoConf = Nimbus.tryReadTopoConf(topoId, store);
        ret.topoName = (String)ret.topoConf.get("topology.name");
        this.checkAuthorization(ret.topoName, ret.topoConf, operation);
        ret.topology = Nimbus.tryReadTopology(topoId, store);
        ret.taskToComponent = StormCommon.stormTaskInfo((StormTopology)ret.topology, ret.topoConf);
        ret.base = state.stormBase(topoId, null);
        ret.launchTimeSecs = ret.base != null && ret.base.is_set_launch_time_secs() ? ret.base.get_launch_time_secs() : 0;
        ret.assignment = state.assignmentInfo(topoId, null);
        ret.beats = (Map)Utils.OR(this.heartbeatsCache.get().get(topoId), Collections.emptyMap());
        ret.allComponents = new HashSet<String>(ret.taskToComponent.values());
        return ret;
    }

    @VisibleForTesting
    public void launchServer() throws Exception {
        try {
            Integer interval;
            BlobStore store = this.blobStore;
            IStormClusterState state = this.stormClusterState;
            NimbusInfo hpi = this.nimbusHostPortInfo;
            LOG.info("Starting Nimbus with conf {}", this.conf);
            this.validator.prepare(this.conf);
            state.addNimbusHost(hpi.getHost(), new NimbusSummary(hpi.getHost(), hpi.getPort(), Time.currentTimeSecs(), false, STORM_VERSION));
            this.leaderElector.addToLeaderLockQueue();
            if (store instanceof LocalFsBlobStore) {
                state.blobstore(() -> {
                    try {
                        this.blobSync();
                    }
                    catch (Exception e) {
                        throw new RuntimeException(e);
                    }
                });
                this.setupBlobstore();
            }
            for (ClusterMetricsConsumerExecutor exec : this.clusterConsumerExceutors) {
                exec.prepare();
            }
            if (this.isLeader()) {
                for (String topoId : state.activeStorms()) {
                    this.transition(topoId, TopologyActions.STARTUP, null);
                }
            }
            boolean doNotReassign = (Boolean)this.conf.getOrDefault("NIMBUS-DO-NOT-REASSIGN", false);
            this.timer.scheduleRecurring(0, ObjectReader.getInt((Object)this.conf.get("nimbus.monitor.freq.secs")).intValue(), () -> {
                try {
                    if (!doNotReassign) {
                        this.mkAssignments();
                    }
                    this.doCleanup();
                }
                catch (Exception e) {
                    throw new RuntimeException(e);
                }
            });
            int jarExpSecs = ObjectReader.getInt((Object)this.conf.get("nimbus.inbox.jar.expiration.secs"));
            this.timer.scheduleRecurring(0, ObjectReader.getInt((Object)this.conf.get("nimbus.cleanup.inbox.freq.secs")).intValue(), () -> {
                try {
                    Nimbus.cleanInbox(this.getInbox(), jarExpSecs);
                }
                catch (Exception e) {
                    throw new RuntimeException(e);
                }
            });
            if (store instanceof LocalFsBlobStore) {
                this.timer.scheduleRecurring(0, ObjectReader.getInt((Object)this.conf.get("nimbus.code.sync.freq.secs")).intValue(), () -> {
                    try {
                        this.blobSync();
                    }
                    catch (Exception e) {
                        throw new RuntimeException(e);
                    }
                });
            }
            if ((interval = ObjectReader.getInt((Object)this.conf.get("logviewer.cleanup.interval.secs"), null)) != null) {
                int lvCleanupAgeMins = ObjectReader.getInt((Object)this.conf.get("logviewer.cleanup.age.mins"));
                this.timer.scheduleRecurring(0, interval.intValue(), () -> {
                    try {
                        this.cleanTopologyHistory(lvCleanupAgeMins);
                    }
                    catch (Exception e) {
                        throw new RuntimeException(e);
                    }
                });
            }
            this.timer.scheduleRecurring(0, ObjectReader.getInt((Object)this.conf.get("nimbus.credential.renewers.freq.secs")).intValue(), () -> {
                try {
                    this.renewCredentials();
                }
                catch (Exception e) {
                    throw new RuntimeException(e);
                }
            });
            StormMetricsRegistry.registerGauge("nimbus:num-supervisors", () -> state.supervisors(null));
            StormMetricsRegistry.startMetricsReporters(this.conf);
            if (this.clusterConsumerExceutors != null) {
                this.timer.scheduleRecurring(0, ObjectReader.getInt((Object)this.conf.get("storm.cluster.metrics.consumer.publish.interval.secs")).intValue(), () -> {
                    try {
                        if (this.isLeader()) {
                            this.sendClusterMetricsToExecutors();
                        }
                    }
                    catch (Exception e) {
                        throw new RuntimeException(e);
                    }
                });
            }
        }
        catch (Exception e) {
            if (Utils.exceptionCauseIsInstanceOf(InterruptedException.class, (Throwable)e)) {
                throw e;
            }
            if (Utils.exceptionCauseIsInstanceOf(InterruptedIOException.class, (Throwable)e)) {
                throw e;
            }
            LOG.error("Error on initialization of nimbus", (Throwable)e);
            Utils.exitProcess((int)13, (String)"Error on initialization of nimbus");
        }
    }

    public void submitTopology(String name, String uploadedJarLocation, String jsonConf, StormTopology topology) throws AlreadyAliveException, InvalidTopologyException, AuthorizationException, TException {
        submitTopologyCalls.mark();
        this.submitTopologyWithOpts(name, uploadedJarLocation, jsonConf, topology, new SubmitOptions(TopologyInitialStatus.ACTIVE));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void submitTopologyWithOpts(String topoName, String uploadedJarLocation, String jsonConf, StormTopology topology, SubmitOptions options) throws AlreadyAliveException, InvalidTopologyException, AuthorizationException, TException {
        try {
            Map blobMap;
            submitTopologyWithOptsCalls.mark();
            this.assertIsLeader();
            assert (options != null);
            Nimbus.validateTopologyName(topoName);
            this.checkAuthorization(topoName, null, "submitTopology");
            this.assertTopoActive(topoName, false);
            Map<String, Object> topoConf = (Map<String, Object>)JSONValue.parse((String)jsonConf);
            try {
                ConfigValidation.validateFields((Map)topoConf);
            }
            catch (IllegalArgumentException ex) {
                throw new InvalidTopologyException(ex.getMessage());
            }
            this.validator.validate(topoName, topoConf, topology);
            if (((Boolean)this.conf.getOrDefault("storm.disable.symlinks", false)).booleanValue() && (blobMap = (Map)topoConf.get("topology.blobstore.map")) != null && !blobMap.isEmpty()) {
                throw new InvalidTopologyException("symlinks are disabled so blobs are not supported but topology.blobstore.map = " + blobMap);
            }
            Utils.validateTopologyBlobStoreMap((Map)topoConf, (Set)Sets.newHashSet((Iterator)this.blobStore.listKeys()));
            long uniqueNum = this.submittedCount.incrementAndGet();
            String topoId = topoName + "-" + uniqueNum + "-" + Time.currentTimeSecs();
            Map creds = null;
            if (options.is_set_creds()) {
                creds = options.get_creds().get_creds();
            }
            topoConf.put("storm.id", topoId);
            topoConf.put("topology.name", topoName);
            topoConf = Nimbus.normalizeConf(this.conf, topoConf, topology);
            ReqContext req = ReqContext.context();
            Principal principal = req.principal();
            String submitterPrincipal = principal == null ? null : principal.toString();
            String submitterUser = this.principalToLocal.toLocal(principal);
            String systemUser = System.getProperty("user.name");
            HashSet topoAcl = new HashSet(topoConf.getOrDefault("topology.users", Collections.emptyList()));
            topoAcl.add(submitterPrincipal);
            topoAcl.add(submitterUser);
            topoConf.put("topology.submitter.principal", Utils.OR((Object)submitterPrincipal, (Object)""));
            topoConf.put("topology.submitter.user", Utils.OR((Object)submitterUser, (Object)systemUser));
            topoConf.put("topology.users", new ArrayList(topoAcl));
            topoConf.put("storm.zookeeper.superACL", this.conf.get("storm.zookeeper.superACL"));
            if (!Utils.isZkAuthenticationConfiguredStormServer(this.conf)) {
                topoConf.remove("storm.zookeeper.topology.auth.scheme");
                topoConf.remove("storm.zookeeper.topology.auth.payload");
            }
            if (!((Boolean)this.conf.getOrDefault("storm.topology.classpath.beginning.enabled", false)).booleanValue()) {
                topoConf.remove("topology.classpath.beginning");
            }
            Map<String, Object> totalConf = Nimbus.merge(this.conf, topoConf);
            topology = Nimbus.normalizeTopology(totalConf, topology);
            IStormClusterState state = this.stormClusterState;
            if (creds != null) {
                Map<String, Object> finalConf = Collections.unmodifiableMap(topoConf);
                for (INimbusCredentialPlugin autocred : this.nimbusAutocredPlugins) {
                    autocred.populateCredentials(creds, finalConf);
                }
            }
            if (ObjectReader.getBoolean((Object)this.conf.get("supervisor.run.worker.as.user"), (boolean)false) && (submitterUser == null || submitterUser.isEmpty())) {
                throw new AuthorizationException("Could not determine the user to run this topology as.");
            }
            StormCommon.systemTopology(totalConf, (StormTopology)topology);
            Nimbus.validateTopologySize(topoConf, this.conf, topology);
            if (Utils.isZkAuthenticationConfiguredStormServer(this.conf) && !Utils.isZkAuthenticationConfiguredTopology(topoConf)) {
                throw new IllegalArgumentException("The cluster is configured for zookeeper authentication, but no payload was provided.");
            }
            LOG.info("Received topology submission for {} with conf {}", (Object)topoName, (Object)Utils.redactValue(topoConf, (String)"storm.zookeeper.topology.auth.payload"));
            Object object = this.submitLock;
            synchronized (object) {
                this.assertTopoActive(topoName, false);
                if (creds != null) {
                    state.setCredentials(topoId, new Credentials(creds), topoConf);
                }
                LOG.info("uploadedJar {}", (Object)uploadedJarLocation);
                this.setupStormCode(this.conf, topoId, uploadedJarLocation, totalConf, topology);
                this.waitForDesiredCodeReplication(totalConf, topoId);
                state.setupHeatbeats(topoId);
                if (ObjectReader.getBoolean((Object)totalConf.get("topology.backpressure.enable"), (boolean)false)) {
                    state.setupBackpressure(topoId);
                }
                this.notifyTopologyActionListener(topoName, "submitTopology");
                TopologyStatus status = null;
                switch (options.get_initial_status()) {
                    case INACTIVE: {
                        status = TopologyStatus.INACTIVE;
                        break;
                    }
                    case ACTIVE: {
                        status = TopologyStatus.ACTIVE;
                        break;
                    }
                    default: {
                        throw new IllegalArgumentException("Inital Status of " + options.get_initial_status() + " is not allowed.");
                    }
                }
                this.startTopology(topoName, topoId, status);
            }
        }
        catch (Exception e) {
            LOG.warn("Topology submission exception. (topology name='{}')", (Object)topoName, (Object)e);
            if (e instanceof TException) {
                throw (TException)e;
            }
            throw new RuntimeException(e);
        }
    }

    public void killTopology(String name) throws NotAliveException, AuthorizationException, TException {
        killTopologyCalls.mark();
        this.killTopologyWithOpts(name, new KillOptions());
    }

    public void killTopologyWithOpts(String topoName, KillOptions options) throws NotAliveException, AuthorizationException, TException {
        killTopologyWithOptsCalls.mark();
        this.assertTopoActive(topoName, true);
        try {
            Map<String, Object> topoConf = this.tryReadTopoConfFromName(topoName);
            String operation = "killTopology";
            this.checkAuthorization(topoName, topoConf, "killTopology");
            Integer waitAmount = null;
            if (options.is_set_wait_secs()) {
                waitAmount = options.get_wait_secs();
            }
            this.transitionName(topoName, TopologyActions.KILL, waitAmount, true);
            this.notifyTopologyActionListener(topoName, "killTopology");
            this.addTopoToHistoryLog((String)topoConf.get("storm.id"), topoConf);
        }
        catch (Exception e) {
            LOG.warn("Kill topology exception. (topology name='{}')", (Object)topoName, (Object)e);
            if (e instanceof TException) {
                throw (TException)e;
            }
            throw new RuntimeException(e);
        }
    }

    public void activate(String topoName) throws NotAliveException, AuthorizationException, TException {
        activateCalls.mark();
        try {
            Map<String, Object> topoConf = this.tryReadTopoConfFromName(topoName);
            String operation = "activate";
            this.checkAuthorization(topoName, topoConf, "activate");
            this.transitionName(topoName, TopologyActions.ACTIVATE, null, true);
            this.notifyTopologyActionListener(topoName, "activate");
        }
        catch (Exception e) {
            LOG.warn("Activate topology exception. (topology name='{}')", (Object)topoName, (Object)e);
            if (e instanceof TException) {
                throw (TException)e;
            }
            throw new RuntimeException(e);
        }
    }

    public void deactivate(String topoName) throws NotAliveException, AuthorizationException, TException {
        deactivateCalls.mark();
        try {
            Map<String, Object> topoConf = this.tryReadTopoConfFromName(topoName);
            String operation = "deactivate";
            this.checkAuthorization(topoName, topoConf, "deactivate");
            this.transitionName(topoName, TopologyActions.INACTIVATE, null, true);
            this.notifyTopologyActionListener(topoName, "deactivate");
        }
        catch (Exception e) {
            LOG.warn("Deactivate topology exception. (topology name='{}')", (Object)topoName, (Object)e);
            if (e instanceof TException) {
                throw (TException)e;
            }
            throw new RuntimeException(e);
        }
    }

    public void rebalance(String topoName, RebalanceOptions options) throws NotAliveException, InvalidTopologyException, AuthorizationException, TException {
        rebalanceCalls.mark();
        this.assertTopoActive(topoName, true);
        try {
            Map<String, Object> topoConf = this.tryReadTopoConfFromName(topoName);
            String operation = "rebalance";
            this.checkAuthorization(topoName, topoConf, "rebalance");
            Map execOverrides = options.is_set_num_executors() ? options.get_num_executors() : Collections.emptyMap();
            for (Integer value : execOverrides.values()) {
                if (value != null && value > 0) continue;
                throw new InvalidTopologyException("Number of executors must be greater than 0");
            }
            this.transitionName(topoName, TopologyActions.REBALANCE, options, true);
            this.notifyTopologyActionListener(topoName, "rebalance");
        }
        catch (Exception e) {
            LOG.warn("rebalance topology exception. (topology name='{}')", (Object)topoName, (Object)e);
            if (e instanceof TException) {
                throw (TException)e;
            }
            throw new RuntimeException(e);
        }
    }

    public void setLogConfig(String topoId, LogConfig config) throws TException {
        try {
            setLogConfigCalls.mark();
            Map<String, Object> topoConf = Nimbus.tryReadTopoConf(topoId, this.blobStore);
            String topoName = (String)topoConf.get("topology.name");
            this.checkAuthorization(topoName, topoConf, "setLogConfig");
            IStormClusterState state = this.stormClusterState;
            LogConfig mergedLogConfig = state.topologyLogConfig(topoId, null);
            if (mergedLogConfig == null) {
                mergedLogConfig = new LogConfig();
            }
            Map namedLoggers = mergedLogConfig.get_named_logger_level();
            for (LogLevel logLevel : namedLoggers.values()) {
                logLevel.set_action(LogLevelAction.UNCHANGED);
            }
            if (config.is_set_named_logger_level()) {
                for (Map.Entry entry : config.get_named_logger_level().entrySet()) {
                    LogLevel logConfig = (LogLevel)entry.getValue();
                    String loggerName = (String)entry.getKey();
                    LogLevelAction action = logConfig.get_action();
                    if (loggerName.isEmpty()) {
                        throw new RuntimeException("Named loggers need a valid name. Use ROOT for the root logger");
                    }
                    switch (action) {
                        case UPDATE: {
                            Nimbus.setLoggerTimeouts(logConfig);
                            mergedLogConfig.put_to_named_logger_level(loggerName, logConfig);
                            break;
                        }
                        case REMOVE: {
                            Map nl = mergedLogConfig.get_named_logger_level();
                            if (nl == null) break;
                            nl.remove(loggerName);
                            break;
                        }
                    }
                }
            }
            LOG.info("Setting log config for {}:{}", (Object)topoName, (Object)mergedLogConfig);
            state.setTopologyLogConfig(topoId, mergedLogConfig);
        }
        catch (Exception e) {
            LOG.warn("set log config topology exception. (topology id='{}')", (Object)topoId, (Object)e);
            if (e instanceof TException) {
                throw (TException)e;
            }
            throw new RuntimeException(e);
        }
    }

    public LogConfig getLogConfig(String topoId) throws TException {
        try {
            getLogConfigCalls.mark();
            Map<String, Object> topoConf = Nimbus.tryReadTopoConf(topoId, this.blobStore);
            String topoName = (String)topoConf.get("topology.name");
            this.checkAuthorization(topoName, topoConf, "getLogConfig");
            IStormClusterState state = this.stormClusterState;
            LogConfig logConfig = state.topologyLogConfig(topoId, null);
            if (logConfig == null) {
                logConfig = new LogConfig();
            }
            return logConfig;
        }
        catch (Exception e) {
            LOG.warn("get log conf topology exception. (topology id='{}')", (Object)topoId, (Object)e);
            if (e instanceof TException) {
                throw (TException)e;
            }
            throw new RuntimeException(e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void debug(String topoName, String componentId, boolean enable, double samplingPercentage) throws NotAliveException, AuthorizationException, TException {
        debugCalls.mark();
        try {
            IStormClusterState state = this.stormClusterState;
            String topoId = this.toTopoId(topoName);
            Map<String, Object> topoConf = Nimbus.tryReadTopoConf(topoId, this.blobStore);
            double spct = Math.max(Math.min(samplingPercentage, 100.0), 0.0);
            this.checkAuthorization(topoName, topoConf, "debug");
            if (topoId == null) {
                throw new NotAliveException(topoName);
            }
            boolean hasCompId = componentId != null && !componentId.isEmpty();
            DebugOptions options = new DebugOptions();
            options.set_enable(enable);
            if (enable) {
                options.set_samplingpct(spct);
            }
            StormBase updates = new StormBase();
            updates.set_component_executors(Collections.emptyMap());
            String key = hasCompId ? componentId : topoId;
            updates.put_to_component_debug(key, options);
            LOG.info("Nimbus setting debug to {} for storm-name '{}' storm-id '{}' sanpling pct '{}'" + (hasCompId ? " component-id '" + componentId + "'" : ""), new Object[]{enable, topoName, topoId, spct});
            Object object = this.submitLock;
            synchronized (object) {
                state.updateStorm(topoId, updates);
            }
        }
        catch (Exception e) {
            LOG.warn("debug topology exception. (topology name='{}')", (Object)topoName, (Object)e);
            if (e instanceof TException) {
                throw (TException)e;
            }
            throw new RuntimeException(e);
        }
    }

    public void setWorkerProfiler(String topoId, ProfileRequest profileRequest) throws TException {
        try {
            setWorkerProfilerCalls.mark();
            Map<String, Object> topoConf = Nimbus.tryReadTopoConf(topoId, this.blobStore);
            String topoName = (String)topoConf.get("topology.name");
            this.checkAuthorization(topoName, topoConf, "setWorkerProfiler");
            IStormClusterState state = this.stormClusterState;
            state.setWorkerProfileRequest(topoId, profileRequest);
        }
        catch (Exception e) {
            LOG.warn("set worker profiler topology exception. (topology id='{}')", (Object)topoId, (Object)e);
            if (e instanceof TException) {
                throw (TException)e;
            }
            throw new RuntimeException(e);
        }
    }

    public List<ProfileRequest> getComponentPendingProfileActions(String id, String componentId, ProfileAction action) throws TException {
        try {
            getComponentPendingProfileActionsCalls.mark();
            CommonTopoInfo info = this.getCommonTopoInfo(id, "getComponentPendingProfileActions");
            Map nodeToHost = info.assignment.get_node_host();
            HashMap exec2hostPort = new HashMap();
            for (Map.Entry entry : info.assignment.get_executor_node_port().entrySet()) {
                NodeInfo ni = (NodeInfo)entry.getValue();
                List<Object> hostPort = Arrays.asList(nodeToHost.get(ni.get_node()), ((Long)ni.get_port_iterator().next()).intValue());
                exec2hostPort.put(entry.getKey(), hostPort);
            }
            List nodeInfos = StatsUtil.extractNodeInfosFromHbForComp(exec2hostPort, info.taskToComponent, (boolean)false, (String)componentId);
            ArrayList<ProfileRequest> ret = new ArrayList<ProfileRequest>();
            for (Map ni : nodeInfos) {
                String niHost = (String)ni.get("host");
                int niPort = (Integer)ni.get("port");
                ProfileRequest newestMatch = null;
                long reqTime = -1L;
                for (ProfileRequest req : this.stormClusterState.getTopologyProfileRequests(id)) {
                    long time;
                    String expectedHost = req.get_nodeInfo().get_node();
                    int expectedPort = ((Long)req.get_nodeInfo().get_port_iterator().next()).intValue();
                    ProfileAction expectedAction = req.get_action();
                    if (!niHost.equals(expectedHost) || niPort != expectedPort || action != expectedAction || (time = req.get_time_stamp()) <= reqTime) continue;
                    reqTime = time;
                    newestMatch = req;
                }
                if (newestMatch == null) continue;
                ret.add(newestMatch);
            }
            LOG.info("Latest profile actions for topology {} component {} {}", new Object[]{id, componentId, ret});
            return ret;
        }
        catch (Exception e) {
            LOG.warn("Get comp actions topology exception. (topology id='{}')", (Object)id, (Object)e);
            if (e instanceof TException) {
                throw (TException)e;
            }
            throw new RuntimeException(e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void uploadNewCredentials(String topoName, Credentials credentials) throws NotAliveException, InvalidTopologyException, AuthorizationException, TException {
        try {
            uploadNewCredentialsCalls.mark();
            IStormClusterState state = this.stormClusterState;
            String topoId = this.toTopoId(topoName);
            if (topoId == null) {
                throw new NotAliveException(topoName + " is not alive");
            }
            Map<String, Object> topoConf = Nimbus.tryReadTopoConf(topoId, this.blobStore);
            if (credentials == null) {
                credentials = new Credentials(Collections.emptyMap());
            }
            this.checkAuthorization(topoName, topoConf, "uploadNewCredentials");
            Object object = this.credUpdateLock;
            synchronized (object) {
                state.setCredentials(topoId, credentials, topoConf);
            }
        }
        catch (Exception e) {
            LOG.warn("Upload Creds topology exception. (topology name='{}')", (Object)topoName, (Object)e);
            if (e instanceof TException) {
                throw (TException)e;
            }
            throw new RuntimeException(e);
        }
    }

    public String beginCreateBlob(String key, SettableBlobMeta meta) throws AuthorizationException, KeyAlreadyExistsException, TException {
        try {
            String sessionId = Utils.uuid();
            this.blobUploaders.put((Object)sessionId, (Object)this.blobStore.createBlob(key, meta, Nimbus.getSubject()));
            LOG.info("Created blob for {}", (Object)key);
            return sessionId;
        }
        catch (Exception e) {
            LOG.warn("begin create blob exception.", (Throwable)e);
            if (e instanceof TException) {
                throw (TException)e;
            }
            throw new RuntimeException(e);
        }
    }

    public String beginUpdateBlob(String key) throws AuthorizationException, KeyNotFoundException, TException {
        try {
            String sessionId = Utils.uuid();
            this.blobUploaders.put((Object)sessionId, (Object)this.blobStore.updateBlob(key, Nimbus.getSubject()));
            LOG.info("Created upload session for {}", (Object)key);
            return sessionId;
        }
        catch (Exception e) {
            LOG.warn("begin update blob exception.", (Throwable)e);
            if (e instanceof TException) {
                throw (TException)e;
            }
            throw new RuntimeException(e);
        }
    }

    public void uploadBlobChunk(String session, ByteBuffer chunk) throws AuthorizationException, TException {
        try {
            OutputStream os = (OutputStream)this.blobUploaders.get((Object)session);
            if (os == null) {
                throw new RuntimeException("Blob for session " + session + " does not exist (or timed out)");
            }
            byte[] array = chunk.array();
            int remaining = chunk.remaining();
            int offset = chunk.arrayOffset();
            int position = chunk.position();
            os.write(array, offset + position, remaining);
            this.blobUploaders.put((Object)session, (Object)os);
        }
        catch (Exception e) {
            LOG.warn("upload blob chunk exception.", (Throwable)e);
            if (e instanceof TException) {
                throw (TException)e;
            }
            throw new RuntimeException(e);
        }
    }

    public void finishBlobUpload(String session) throws AuthorizationException, TException {
        try {
            OutputStream os = (OutputStream)this.blobUploaders.get((Object)session);
            if (os == null) {
                throw new RuntimeException("Blob for session " + session + " does not exist (or timed out)");
            }
            os.close();
            LOG.info("Finished uploading blob for session {}. Closing session.", (Object)session);
            this.blobUploaders.remove((Object)session);
        }
        catch (Exception e) {
            LOG.warn("finish blob upload exception.", (Throwable)e);
            if (e instanceof TException) {
                throw (TException)e;
            }
            throw new RuntimeException(e);
        }
    }

    public void cancelBlobUpload(String session) throws AuthorizationException, TException {
        try {
            AtomicOutputStream os = (AtomicOutputStream)this.blobUploaders.get((Object)session);
            if (os == null) {
                throw new RuntimeException("Blob for session " + session + " does not exist (or timed out)");
            }
            os.cancel();
            LOG.info("Canceled uploading blob for session {}. Closing session.", (Object)session);
            this.blobUploaders.remove((Object)session);
        }
        catch (Exception e) {
            LOG.warn("finish blob upload exception.", (Throwable)e);
            if (e instanceof TException) {
                throw (TException)e;
            }
            throw new RuntimeException(e);
        }
    }

    public ReadableBlobMeta getBlobMeta(String key) throws AuthorizationException, KeyNotFoundException, TException {
        try {
            return this.blobStore.getBlobMeta(key, Nimbus.getSubject());
        }
        catch (Exception e) {
            LOG.warn("get blob meta exception.", (Throwable)e);
            if (e instanceof TException) {
                throw (TException)e;
            }
            throw new RuntimeException(e);
        }
    }

    public void setBlobMeta(String key, SettableBlobMeta meta) throws AuthorizationException, KeyNotFoundException, TException {
        try {
            this.blobStore.setBlobMeta(key, meta, Nimbus.getSubject());
        }
        catch (Exception e) {
            LOG.warn("set blob meta exception.", (Throwable)e);
            if (e instanceof TException) {
                throw (TException)e;
            }
            throw new RuntimeException(e);
        }
    }

    public BeginDownloadResult beginBlobDownload(String key) throws AuthorizationException, KeyNotFoundException, TException {
        try {
            InputStreamWithMeta is = this.blobStore.getBlob(key, Nimbus.getSubject());
            String sessionId = Utils.uuid();
            BeginDownloadResult ret = new BeginDownloadResult(is.getVersion(), sessionId);
            ret.set_data_size(is.getFileLength());
            this.blobDownloaders.put((Object)sessionId, (Object)new BufferInputStream((InputStream)is, (Integer)this.conf.getOrDefault("storm.blobstore.inputstream.buffer.size.bytes", 65536)));
            LOG.info("Created download session for {}", (Object)key);
            return ret;
        }
        catch (Exception e) {
            LOG.warn("begin blob download exception.", (Throwable)e);
            if (e instanceof TException) {
                throw (TException)e;
            }
            throw new RuntimeException(e);
        }
    }

    public ByteBuffer downloadBlobChunk(String session) throws AuthorizationException, TException {
        try {
            BufferInputStream is = (BufferInputStream)this.blobDownloaders.get((Object)session);
            if (is == null) {
                throw new RuntimeException("Blob for session " + session + " does not exist (or timed out)");
            }
            byte[] ret = is.read();
            if (ret.length == 0) {
                is.close();
                this.blobDownloaders.remove((Object)session);
            } else {
                this.blobDownloaders.put((Object)session, (Object)is);
            }
            LOG.debug("Sending {} bytes", (Object)ret.length);
            return ByteBuffer.wrap(ret);
        }
        catch (Exception e) {
            LOG.warn("download blob chunk exception.", (Throwable)e);
            if (e instanceof TException) {
                throw (TException)e;
            }
            throw new RuntimeException(e);
        }
    }

    public void deleteBlob(String key) throws AuthorizationException, KeyNotFoundException, TException {
        try {
            this.blobStore.deleteBlob(key, Nimbus.getSubject());
            if (this.blobStore instanceof LocalFsBlobStore) {
                this.stormClusterState.removeBlobstoreKey(key);
                this.stormClusterState.removeKeyVersion(key);
            }
            LOG.info("Deleted blob for key {}", (Object)key);
        }
        catch (Exception e) {
            LOG.warn("delete blob exception.", (Throwable)e);
            if (e instanceof TException) {
                throw (TException)e;
            }
            throw new RuntimeException(e);
        }
    }

    public ListBlobsResult listBlobs(String session) throws TException {
        try {
            Iterator keyIt;
            if (session == null || session.isEmpty()) {
                keyIt = this.blobStore.listKeys();
                session = Utils.uuid();
            } else {
                keyIt = (Iterator)this.blobListers.get((Object)session);
            }
            if (keyIt == null) {
                throw new RuntimeException("Blob list for session " + session + " does not exist (or timed out)");
            }
            if (!keyIt.hasNext()) {
                this.blobListers.remove((Object)session);
                LOG.info("No more blobs to list for session {}", (Object)session);
                return new ListBlobsResult(Collections.emptyList(), session);
            }
            ArrayList listChunk = new ArrayList();
            for (int i = 0; i < 100 && keyIt.hasNext(); ++i) {
                listChunk.add(keyIt.next());
            }
            this.blobListers.put((Object)session, (Object)keyIt);
            LOG.info("Downloading {} entries", (Object)listChunk.size());
            return new ListBlobsResult(listChunk, session);
        }
        catch (Exception e) {
            LOG.warn("list blobs exception.", (Throwable)e);
            if (e instanceof TException) {
                throw (TException)e;
            }
            throw new RuntimeException(e);
        }
    }

    public int getBlobReplication(String key) throws AuthorizationException, KeyNotFoundException, TException {
        try {
            return this.blobStore.getBlobReplication(key, Nimbus.getSubject());
        }
        catch (Exception e) {
            LOG.warn("get blob replication exception.", (Throwable)e);
            if (e instanceof TException) {
                throw (TException)e;
            }
            throw new RuntimeException(e);
        }
    }

    public int updateBlobReplication(String key, int replication) throws AuthorizationException, KeyNotFoundException, TException {
        try {
            return this.blobStore.updateBlobReplication(key, replication, Nimbus.getSubject());
        }
        catch (Exception e) {
            LOG.warn("update blob replication exception.", (Throwable)e);
            if (e instanceof TException) {
                throw (TException)e;
            }
            throw new RuntimeException(e);
        }
    }

    public void createStateInZookeeper(String key) throws TException {
        try {
            IStormClusterState state = this.stormClusterState;
            BlobStore store = this.blobStore;
            NimbusInfo ni = this.nimbusHostPortInfo;
            if (store instanceof LocalFsBlobStore) {
                state.setupBlobstore(key, ni, Integer.valueOf(Nimbus.getVersionForKey(key, ni, this.conf)));
            }
            LOG.debug("Created state in zookeeper {} {} {}", new Object[]{state, store, ni});
        }
        catch (Exception e) {
            LOG.warn("Exception while creating state in zookeeper - key: " + key, (Throwable)e);
            if (e instanceof TException) {
                throw (TException)e;
            }
            throw new RuntimeException(e);
        }
    }

    public String beginFileUpload() throws AuthorizationException, TException {
        try {
            beginFileUploadCalls.mark();
            this.checkAuthorization(null, null, "fileUpload");
            String fileloc = this.getInbox() + "/stormjar-" + Utils.uuid() + ".jar";
            this.uploaders.put((Object)fileloc, (Object)Channels.newChannel(new FileOutputStream(fileloc)));
            LOG.info("Uploading file from client to {}", (Object)fileloc);
            return fileloc;
        }
        catch (Exception e) {
            LOG.warn("Begin file upload exception", (Throwable)e);
            if (e instanceof TException) {
                throw (TException)e;
            }
            throw new RuntimeException(e);
        }
    }

    public void uploadChunk(String location, ByteBuffer chunk) throws AuthorizationException, TException {
        try {
            uploadChunkCalls.mark();
            this.checkAuthorization(null, null, "fileUpload");
            WritableByteChannel channel = (WritableByteChannel)this.uploaders.get((Object)location);
            if (channel == null) {
                throw new RuntimeException("File for that location does not exist (or timed out)");
            }
            channel.write(chunk);
            this.uploaders.put((Object)location, (Object)channel);
        }
        catch (Exception e) {
            LOG.warn("uploadChunk exception.", (Throwable)e);
            if (e instanceof TException) {
                throw (TException)e;
            }
            throw new RuntimeException(e);
        }
    }

    public void finishFileUpload(String location) throws AuthorizationException, TException {
        try {
            finishFileUploadCalls.mark();
            this.checkAuthorization(null, null, "fileUpload");
            WritableByteChannel channel = (WritableByteChannel)this.uploaders.get((Object)location);
            if (channel == null) {
                throw new RuntimeException("File for that location does not exist (or timed out)");
            }
            channel.close();
            LOG.info("Finished uploading file from client: {}", (Object)location);
            this.uploaders.remove((Object)location);
        }
        catch (Exception e) {
            LOG.warn("finish file upload exception.", (Throwable)e);
            if (e instanceof TException) {
                throw (TException)e;
            }
            throw new RuntimeException(e);
        }
    }

    public String beginFileDownload(String file) throws AuthorizationException, TException {
        try {
            beginFileDownloadCalls.mark();
            this.checkAuthorization(null, null, "fileDownload");
            BufferInputStream is = new BufferInputStream((InputStream)this.blobStore.getBlob(file, null), ObjectReader.getInt((Object)this.conf.get("storm.blobstore.inputstream.buffer.size.bytes"), (Integer)65536));
            String id = Utils.uuid();
            this.downloaders.put((Object)id, (Object)is);
            return id;
        }
        catch (Exception e) {
            LOG.warn("begin file download exception.", (Throwable)e);
            if (e instanceof TException) {
                throw (TException)e;
            }
            throw new RuntimeException(e);
        }
    }

    public ByteBuffer downloadChunk(String id) throws AuthorizationException, TException {
        try {
            downloadChunkCalls.mark();
            this.checkAuthorization(null, null, "fileDownload");
            BufferInputStream is = (BufferInputStream)this.downloaders.get((Object)id);
            if (is == null) {
                throw new RuntimeException("Could not find input stream for id " + id);
            }
            byte[] ret = is.read();
            if (ret.length == 0) {
                is.close();
                this.downloaders.remove((Object)id);
            }
            return ByteBuffer.wrap(ret);
        }
        catch (Exception e) {
            LOG.warn("download chunk exception.", (Throwable)e);
            if (e instanceof TException) {
                throw (TException)e;
            }
            throw new RuntimeException(e);
        }
    }

    public String getNimbusConf() throws AuthorizationException, TException {
        try {
            getNimbusConfCalls.mark();
            this.checkAuthorization(null, null, "getNimbusConf");
            return JSONValue.toJSONString(this.conf);
        }
        catch (Exception e) {
            LOG.warn("get nimbus conf exception.", (Throwable)e);
            if (e instanceof TException) {
                throw (TException)e;
            }
            throw new RuntimeException(e);
        }
    }

    public TopologyInfo getTopologyInfo(String id) throws NotAliveException, AuthorizationException, TException {
        try {
            getTopologyInfoCalls.mark();
            GetInfoOptions options = new GetInfoOptions();
            options.set_num_err_choice(NumErrorsChoice.ALL);
            return this.getTopologyInfoWithOpts(id, options);
        }
        catch (Exception e) {
            LOG.warn("get topology ino exception. (topology id={})", (Object)id, (Object)e);
            if (e instanceof TException) {
                throw (TException)e;
            }
            throw new RuntimeException(e);
        }
    }

    public TopologyInfo getTopologyInfoWithOpts(String topoId, GetInfoOptions options) throws NotAliveException, AuthorizationException, TException {
        try {
            TopologyResources resources;
            String string;
            getTopologyInfoWithOptsCalls.mark();
            CommonTopoInfo common = this.getCommonTopoInfo(topoId, "getTopologyInfo");
            if (common.base == null) {
                throw new NotAliveException(topoId);
            }
            IStormClusterState state = this.stormClusterState;
            NumErrorsChoice numErrChoice = (NumErrorsChoice)Utils.OR((Object)options.get_num_err_choice(), (Object)NumErrorsChoice.ALL);
            HashMap<String, List> errors = new HashMap<String, List>();
            block7: for (String string2 : common.allComponents) {
                switch (numErrChoice) {
                    case NONE: {
                        errors.put(string2, Collections.emptyList());
                        continue block7;
                    }
                    case ONE: {
                        ArrayList<ErrorInfo> arrayList = new ArrayList<ErrorInfo>();
                        ErrorInfo info = state.lastError(topoId, string2);
                        if (info != null) {
                            arrayList.add(info);
                        }
                        errors.put(string2, arrayList);
                        continue block7;
                    }
                    case ALL: {
                        errors.put(string2, state.errors(topoId, string2));
                        continue block7;
                    }
                }
                LOG.warn("Got invalid NumErrorsChoice '{}'", (Object)numErrChoice);
                errors.put(string2, state.errors(topoId, string2));
            }
            ArrayList<ExecutorSummary> summaries = new ArrayList<ExecutorSummary>();
            if (common.assignment != null) {
                for (Map.Entry entry : common.assignment.get_executor_node_port().entrySet()) {
                    Map ex;
                    NodeInfo ni = (NodeInfo)entry.getValue();
                    ExecutorInfo execInfo = Nimbus.toExecInfo((List)entry.getKey());
                    String host = ((NodeInfo)entry.getValue()).get_node();
                    Map<String, Object> heartbeat = common.beats.get(StatsUtil.convertExecutor((List)((List)entry.getKey())));
                    if (heartbeat == null) {
                        heartbeat = Collections.emptyMap();
                    }
                    ExecutorSummary summ = new ExecutorSummary(execInfo, common.taskToComponent.get(execInfo.get_task_start()), ni.get_node(), ((Long)ni.get_port_iterator().next()).intValue(), ((Integer)heartbeat.getOrDefault("uptime", 0)).intValue());
                    Map hb = (Map)heartbeat.get("heartbeat");
                    if (hb != null && (ex = (Map)hb.get("stats")) != null) {
                        ExecutorStats stats = StatsUtil.thriftifyExecutorStats((Map)ex);
                        summ.set_stats(stats);
                    }
                    summaries.add(summ);
                }
            }
            TopologyInfo topologyInfo = new TopologyInfo(topoId, common.topoName, Time.deltaSecs((int)common.launchTimeSecs), summaries, Nimbus.extractStatusStr(common.base), errors);
            if (common.base.is_set_owner()) {
                topologyInfo.set_owner(common.base.get_owner());
            }
            if ((string = this.idToSchedStatus.get().get(topoId)) != null) {
                topologyInfo.set_sched_status(string);
            }
            if ((resources = this.getResourcesForTopology(topoId, common.base)) != null) {
                topologyInfo.set_requested_memonheap(resources.getRequestedMemOnHeap().doubleValue());
                topologyInfo.set_requested_memoffheap(resources.getRequestedMemOffHeap().doubleValue());
                topologyInfo.set_requested_cpu(resources.getRequestedCpu().doubleValue());
                topologyInfo.set_assigned_memonheap(resources.getAssignedMemOnHeap().doubleValue());
                topologyInfo.set_assigned_memoffheap(resources.getAssignedMemOffHeap().doubleValue());
                topologyInfo.set_assigned_cpu(resources.getAssignedCpu().doubleValue());
            }
            if (common.base.is_set_component_debug()) {
                topologyInfo.set_component_debug(common.base.get_component_debug());
            }
            topologyInfo.set_replication_count(this.getBlobReplicationCount(ConfigUtils.masterStormCodeKey((String)topoId)).intValue());
            return topologyInfo;
        }
        catch (Exception e) {
            LOG.warn("Get topo info exception. (topology id='{}')", (Object)topoId, (Object)e);
            if (e instanceof TException) {
                throw (TException)e;
            }
            throw new RuntimeException(e);
        }
    }

    public TopologyPageInfo getTopologyPageInfo(String topoId, String window, boolean includeSys) throws NotAliveException, AuthorizationException, TException {
        try {
            DebugOptions debug;
            TopologyResources topologyResources;
            String string;
            getTopologyPageInfoCalls.mark();
            CommonTopoInfo common = this.getCommonTopoInfo(topoId, "getTopologyPageInfo");
            String topoName = common.topoName;
            IStormClusterState state = this.stormClusterState;
            int launchTimeSecs = common.launchTimeSecs;
            Assignment assignment = common.assignment;
            Map<List<Integer>, Map<String, Object>> beats = common.beats;
            Map<Integer, String> taskToComp = common.taskToComponent;
            StormTopology topology = common.topology;
            Map<String, Object> topoConf = common.topoConf;
            StormBase base = common.base;
            if (base == null) {
                throw new NotAliveException(topoId);
            }
            Map<WorkerSlot, WorkerResources> workerToResources = this.getWorkerResourcesForTopology(topoId);
            List workerSummaries = null;
            HashMap exec2NodePort = new HashMap();
            if (assignment != null) {
                Map execToNodeInfo = assignment.get_executor_node_port();
                Map nodeToHost = assignment.get_node_host();
                for (Map.Entry entry : execToNodeInfo.entrySet()) {
                    NodeInfo nodeInfo = (NodeInfo)entry.getValue();
                    List<Object> nodePort = Arrays.asList(nodeInfo.get_node(), nodeInfo.get_port_iterator().next());
                    exec2NodePort.put(entry.getKey(), nodePort);
                }
                workerSummaries = StatsUtil.aggWorkerStats((String)topoId, (String)topoName, taskToComp, beats, exec2NodePort, (Map)nodeToHost, workerToResources, (boolean)includeSys, (boolean)true);
            }
            TopologyPageInfo topoPageInfo = StatsUtil.aggTopoExecsStats((String)topoId, exec2NodePort, taskToComp, beats, (StormTopology)topology, (String)window, (boolean)includeSys, (IStormClusterState)state);
            Map spoutResources = ResourceUtils.getSpoutsResources((StormTopology)topology, topoConf);
            for (Map.Entry entry : topoPageInfo.get_id_to_spout_agg_stats().entrySet()) {
                CommonAggregateStats commonAggregateStats = ((ComponentAggregateStats)entry.getValue()).get_common_stats();
                commonAggregateStats.set_resources_map(Nimbus.setResourcesDefaultIfNotSet(spoutResources, (String)entry.getKey(), topoConf));
            }
            Map boltResources = ResourceUtils.getBoltsResources((StormTopology)topology, topoConf);
            for (Map.Entry entry : topoPageInfo.get_id_to_bolt_agg_stats().entrySet()) {
                CommonAggregateStats commonStats = ((ComponentAggregateStats)entry.getValue()).get_common_stats();
                commonStats.set_resources_map(Nimbus.setResourcesDefaultIfNotSet(boltResources, (String)entry.getKey(), topoConf));
            }
            if (workerSummaries != null) {
                topoPageInfo.set_workers(workerSummaries);
            }
            if (base.is_set_owner()) {
                topoPageInfo.set_owner(base.get_owner());
            }
            if ((string = this.idToSchedStatus.get().get(topoId)) != null) {
                topoPageInfo.set_sched_status(string);
            }
            if ((topologyResources = this.getResourcesForTopology(topoId, base)) != null) {
                topoPageInfo.set_requested_memonheap(topologyResources.getRequestedMemOnHeap().doubleValue());
                topoPageInfo.set_requested_memoffheap(topologyResources.getRequestedMemOffHeap().doubleValue());
                topoPageInfo.set_requested_cpu(topologyResources.getRequestedCpu().doubleValue());
                topoPageInfo.set_assigned_memonheap(topologyResources.getAssignedMemOnHeap().doubleValue());
                topoPageInfo.set_assigned_memoffheap(topologyResources.getAssignedMemOffHeap().doubleValue());
                topoPageInfo.set_assigned_cpu(topologyResources.getAssignedCpu().doubleValue());
            }
            topoPageInfo.set_name(topoName);
            topoPageInfo.set_status(Nimbus.extractStatusStr(base));
            topoPageInfo.set_uptime_secs(Time.deltaSecs((int)launchTimeSecs));
            topoPageInfo.set_topoConf(JSONValue.toJSONString(topoConf));
            topoPageInfo.set_replication_count(this.getBlobReplicationCount(ConfigUtils.masterStormCodeKey((String)topoId)).intValue());
            if (base.is_set_component_debug() && (debug = (DebugOptions)base.get_component_debug().get(topoId)) != null) {
                topoPageInfo.set_debug_options(debug);
            }
            return topoPageInfo;
        }
        catch (Exception e) {
            LOG.warn("Get topo page info exception. (topology id='{}')", (Object)topoId, (Object)e);
            if (e instanceof TException) {
                throw (TException)e;
            }
            throw new RuntimeException(e);
        }
    }

    public SupervisorPageInfo getSupervisorPageInfo(String superId, String host, boolean includeSys) throws NotAliveException, AuthorizationException, TException {
        try {
            getSupervisorPageInfoCalls.mark();
            IStormClusterState state = this.stormClusterState;
            Map superInfos = state.allSupervisorInfo();
            HashMap hostToSuperId = new HashMap();
            for (Map.Entry entry : superInfos.entrySet()) {
                String h = ((SupervisorInfo)entry.getValue()).get_hostname();
                ArrayList superIds = (ArrayList)hostToSuperId.get(h);
                if (superIds == null) {
                    superIds = new ArrayList();
                    hostToSuperId.put(h, superIds);
                }
                superIds.add(entry.getKey());
            }
            List<String> supervisorIds = null;
            supervisorIds = superId == null ? (List<String>)hostToSuperId.get(host) : Arrays.asList(superId);
            SupervisorPageInfo pageInfo = new SupervisorPageInfo();
            Map topoToAssignment = state.topologyAssignments();
            for (String sid : supervisorIds) {
                SupervisorInfo info = (SupervisorInfo)superInfos.get(sid);
                LOG.info("SIDL {} SI: {} ALL: {}", new Object[]{sid, info, superInfos});
                SupervisorSummary supSum = this.makeSupervisorSummary(sid, info);
                pageInfo.add_to_supervisor_summaries(supSum);
                List<String> superTopologies = Nimbus.topologiesOnSupervisor(topoToAssignment, sid);
                Set<String> userTopologies = this.filterAuthorized("getTopology", superTopologies);
                for (String topoId : superTopologies) {
                    Map nodeToHost;
                    CommonTopoInfo common = this.getCommonTopoInfo(topoId, "getSupervisorPageInfo");
                    String topoName = common.topoName;
                    Assignment assignment = common.assignment;
                    Map<List<Integer>, Map<String, Object>> beats = common.beats;
                    Map<Integer, String> taskToComp = common.taskToComponent;
                    HashMap exec2NodePort = new HashMap();
                    if (assignment != null) {
                        Map execToNodeInfo = assignment.get_executor_node_port();
                        for (Map.Entry entry : execToNodeInfo.entrySet()) {
                            NodeInfo ni = (NodeInfo)entry.getValue();
                            List<Object> nodePort = Arrays.asList(ni.get_node(), ni.get_port_iterator().next());
                            exec2NodePort.put(entry.getKey(), nodePort);
                        }
                        nodeToHost = assignment.get_node_host();
                    } else {
                        nodeToHost = Collections.emptyMap();
                    }
                    Map<WorkerSlot, WorkerResources> workerResources = this.getWorkerResourcesForTopology(topoId);
                    boolean isAllowed = userTopologies.contains(topoId);
                    for (WorkerSummary workerSummary : StatsUtil.aggWorkerStats((String)topoId, (String)topoName, taskToComp, beats, exec2NodePort, (Map)nodeToHost, workerResources, (boolean)includeSys, (boolean)isAllowed, (String)sid)) {
                        pageInfo.add_to_worker_summaries(workerSummary);
                    }
                }
            }
            return pageInfo;
        }
        catch (Exception e) {
            LOG.warn("Get super page info exception. (super id='{}')", (Object)superId, (Object)e);
            if (e instanceof TException) {
                throw (TException)e;
            }
            throw new RuntimeException(e);
        }
    }

    public ComponentPageInfo getComponentPageInfo(String topoId, String componentId, String window, boolean includeSys) throws NotAliveException, AuthorizationException, TException {
        try {
            HashMap compToTasks;
            DebugOptions debug;
            ComponentPageInfo compPageInfo;
            Map nodeToHost;
            getComponentPageInfoCalls.mark();
            CommonTopoInfo info = this.getCommonTopoInfo(topoId, "getComponentPageInfo");
            if (info.base == null) {
                throw new NotAliveException(topoId);
            }
            StormTopology topology = info.topology;
            Map<String, Object> topoConf = info.topoConf;
            Assignment assignment = info.assignment;
            HashMap exec2NodePort = new HashMap();
            HashMap exec2HostPort = new HashMap();
            if (assignment != null) {
                Map execToNodeInfo = assignment.get_executor_node_port();
                nodeToHost = assignment.get_node_host();
                for (Map.Entry entry : execToNodeInfo.entrySet()) {
                    NodeInfo ni = (NodeInfo)entry.getValue();
                    List<Object> nodePort = Arrays.asList(ni.get_node(), ni.get_port_iterator().next());
                    List<Object> hostPort = Arrays.asList(nodeToHost.get(ni.get_node()), ni.get_port_iterator().next());
                    exec2NodePort.put(entry.getKey(), nodePort);
                    exec2HostPort.put(entry.getKey(), hostPort);
                }
            } else {
                nodeToHost = Collections.emptyMap();
            }
            if ((compPageInfo = StatsUtil.aggCompExecsStats(exec2HostPort, info.taskToComponent, info.beats, (String)window, (boolean)includeSys, (String)topoId, (StormTopology)topology, (String)componentId)).get_component_type() == ComponentType.SPOUT) {
                compPageInfo.set_resources_map(Nimbus.setResourcesDefaultIfNotSet(ResourceUtils.getSpoutsResources((StormTopology)topology, topoConf), componentId, topoConf));
            } else {
                compPageInfo.set_resources_map(Nimbus.setResourcesDefaultIfNotSet(ResourceUtils.getBoltsResources((StormTopology)topology, topoConf), componentId, topoConf));
            }
            compPageInfo.set_topology_name(info.topoName);
            compPageInfo.set_errors(this.stormClusterState.errors(topoId, componentId));
            compPageInfo.set_topology_status(Nimbus.extractStatusStr(info.base));
            if (info.base.is_set_component_debug() && (debug = (DebugOptions)info.base.get_component_debug().get(componentId)) != null) {
                compPageInfo.set_debug_options(debug);
            }
            if ((compToTasks = Utils.reverseMap(info.taskToComponent)).containsKey("__eventlogger")) {
                List tasks = (List)compToTasks.get("__eventlogger");
                tasks.sort(null);
                int taskIndex = TupleUtils.chooseTaskIndex(Collections.singletonList(componentId), (int)tasks.size());
                int taskId = (Integer)tasks.get(taskIndex);
                String host = null;
                Integer port = null;
                for (Map.Entry entry : exec2HostPort.entrySet()) {
                    int start = ((Long)((List)entry.getKey()).get(0)).intValue();
                    int end = ((Long)((List)entry.getKey()).get(1)).intValue();
                    if (taskId < start || taskId > end) continue;
                    host = (String)((List)entry.getValue()).get(0);
                    port = ((Number)((List)entry.getValue()).get(1)).intValue();
                    break;
                }
                if (host != null && port != null) {
                    compPageInfo.set_eventlog_host(host);
                    compPageInfo.set_eventlog_port(port.intValue());
                }
            }
            return compPageInfo;
        }
        catch (Exception e) {
            LOG.warn("getComponentPageInfo exception. (topo id='{}')", (Object)topoId, (Object)e);
            if (e instanceof TException) {
                throw (TException)e;
            }
            throw new RuntimeException(e);
        }
    }

    public String getTopologyConf(String id) throws NotAliveException, AuthorizationException, TException {
        try {
            getTopologyConfCalls.mark();
            Map<String, Object> topoConf = Nimbus.tryReadTopoConf(id, this.blobStore);
            String topoName = (String)topoConf.get("topology.name");
            this.checkAuthorization(topoName, topoConf, "getTopologyConf");
            return JSONValue.toJSONString(topoConf);
        }
        catch (Exception e) {
            LOG.warn("Get topo conf exception. (topology id='{}')", (Object)id, (Object)e);
            if (e instanceof TException) {
                throw (TException)e;
            }
            throw new RuntimeException(e);
        }
    }

    public StormTopology getTopology(String id) throws NotAliveException, AuthorizationException, TException {
        try {
            getTopologyCalls.mark();
            Map<String, Object> topoConf = Nimbus.tryReadTopoConf(id, this.blobStore);
            String topoName = (String)topoConf.get("topology.name");
            this.checkAuthorization(topoName, topoConf, "getTopology");
            return StormCommon.systemTopology(topoConf, (StormTopology)Nimbus.tryReadTopology(id, this.blobStore));
        }
        catch (Exception e) {
            LOG.warn("Get topology exception. (topology id='{}')", (Object)id, (Object)e);
            if (e instanceof TException) {
                throw (TException)e;
            }
            throw new RuntimeException(e);
        }
    }

    public StormTopology getUserTopology(String id) throws NotAliveException, AuthorizationException, TException {
        try {
            getUserTopologyCalls.mark();
            Map<String, Object> topoConf = Nimbus.tryReadTopoConf(id, this.blobStore);
            String topoName = (String)topoConf.get("topology.name");
            this.checkAuthorization(topoName, topoConf, "getUserTopology");
            return Nimbus.tryReadTopology(id, this.blobStore);
        }
        catch (Exception e) {
            LOG.warn("Get user topology exception. (topology id='{}')", (Object)id, (Object)e);
            if (e instanceof TException) {
                throw (TException)e;
            }
            throw new RuntimeException(e);
        }
    }

    public TopologyHistoryInfo getTopologyHistory(String user) throws AuthorizationException, TException {
        try {
            List<String> adminUsers = this.conf.getOrDefault("nimbus.admins", Collections.emptyList());
            IStormClusterState state = this.stormClusterState;
            BlobStore store = this.blobStore;
            List assignedIds = state.assignments(null);
            HashSet<String> ret = new HashSet<String>();
            boolean isAdmin = adminUsers.contains(user);
            for (String topoId : assignedIds) {
                Map<String, Object> topoConf = Nimbus.tryReadTopoConf(topoId, store);
                List<String> groups = ServerConfigUtils.getTopoLogsGroups(topoConf);
                List<String> topoLogUsers = ServerConfigUtils.getTopoLogsUsers(topoConf);
                if (user != null && !isAdmin && !this.isUserPartOf(user, groups) && !topoLogUsers.contains(user)) continue;
                ret.add(topoId);
            }
            ret.addAll(this.readTopologyHistory(user, adminUsers));
            return new TopologyHistoryInfo(new ArrayList(ret));
        }
        catch (Exception e) {
            LOG.warn("Get topology history. (user='{}')", (Object)user, (Object)e);
            if (e instanceof TException) {
                throw (TException)e;
            }
            throw new RuntimeException(e);
        }
    }

    public ClusterSummary getClusterInfo() throws AuthorizationException, TException {
        try {
            getClusterInfoCalls.mark();
            this.checkAuthorization(null, null, "getClusterInfo");
            return this.getClusterInfoImpl();
        }
        catch (Exception e) {
            LOG.warn("Get cluster info exception.", (Throwable)e);
            if (e instanceof TException) {
                throw (TException)e;
            }
            throw new RuntimeException(e);
        }
    }

    public NimbusSummary getLeader() throws AuthorizationException, TException {
        getLeaderCalls.mark();
        this.checkAuthorization(null, null, "getClusterInfo");
        List nimbuses = this.stormClusterState.nimbuses();
        NimbusInfo leader = this.leaderElector.getLeader();
        for (NimbusSummary nimbusSummary : nimbuses) {
            if (!leader.getHost().equals(nimbusSummary.get_host()) || leader.getPort() != nimbusSummary.get_port()) continue;
            nimbusSummary.set_uptime_secs(Time.deltaSecs((int)nimbusSummary.get_uptime_secs()));
            nimbusSummary.set_isLeader(true);
            return nimbusSummary;
        }
        return null;
    }

    public boolean isTopologyNameAllowed(String name) throws AuthorizationException, TException {
        isTopologyNameAllowedCalls.mark();
        try {
            this.checkAuthorization(name, null, "getClusterInfo");
            Nimbus.validateTopologyName(name);
            this.assertTopoActive(name, false);
            return true;
        }
        catch (AlreadyAliveException | InvalidTopologyException e) {
            return false;
        }
    }

    public void shutdown() {
        shutdownCalls.mark();
        try {
            LOG.info("Shutting down master");
            this.timer.close();
            this.stormClusterState.disconnect();
            this.downloaders.cleanup();
            this.uploaders.cleanup();
            this.blobStore.shutdown();
            this.leaderElector.close();
            ITopologyActionNotifierPlugin actionNotifier = this.nimbusTopologyActionNotifier;
            if (actionNotifier != null) {
                actionNotifier.cleanup();
            }
            LOG.info("Shut down master");
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    public boolean isWaiting() {
        return this.timer.isTimerWaiting();
    }

    static {
        NIMBUS_SUBJECT.getPrincipals().add((Principal)new NimbusPrincipal());
        NIMBUS_SUBJECT.setReadOnly();
        NOOP_TRANSITION = (arg, nimbus, topoId, base) -> null;
        INACTIVE_TRANSITION = (arg, nimbus, topoId, base) -> Nimbus.make(TopologyStatus.INACTIVE);
        ACTIVE_TRANSITION = (arg, nimbus, topoId, base) -> Nimbus.make(TopologyStatus.ACTIVE);
        KILL_TRANSITION = (killTime, nimbus, topoId, base) -> {
            int delay = 0;
            delay = killTime != null ? ((Number)killTime).intValue() : ObjectReader.getInt((Object)Nimbus.readTopoConf(topoId, nimbus.getBlobStore()).get("topology.message.timeout.secs")).intValue();
            nimbus.delayEvent(topoId, delay, TopologyActions.REMOVE, null);
            StormBase sb = new StormBase();
            sb.set_status(TopologyStatus.KILLED);
            TopologyActionOptions tao = new TopologyActionOptions();
            KillOptions opts = new KillOptions();
            opts.set_wait_secs(delay);
            tao.set_kill_options(opts);
            sb.set_topology_action_options(tao);
            sb.set_component_executors(Collections.emptyMap());
            sb.set_component_debug(Collections.emptyMap());
            return sb;
        };
        REBALANCE_TRANSITION = (args, nimbus, topoId, base) -> {
            RebalanceOptions rbo = ((RebalanceOptions)args).deepCopy();
            int delay = 0;
            delay = rbo.is_set_wait_secs() ? rbo.get_wait_secs() : ObjectReader.getInt((Object)Nimbus.readTopoConf(topoId, nimbus.getBlobStore()).get("topology.message.timeout.secs")).intValue();
            nimbus.delayEvent(topoId, delay, TopologyActions.DO_REBALANCE, null);
            rbo.set_wait_secs(delay);
            if (!rbo.is_set_num_executors()) {
                rbo.set_num_executors(Collections.emptyMap());
            }
            StormBase sb = new StormBase();
            sb.set_status(TopologyStatus.REBALANCING);
            sb.set_prev_status(base.get_status());
            TopologyActionOptions tao = new TopologyActionOptions();
            tao.set_rebalance_options(rbo);
            sb.set_topology_action_options(tao);
            sb.set_component_executors(Collections.emptyMap());
            sb.set_component_debug(Collections.emptyMap());
            return sb;
        };
        STARTUP_WHEN_KILLED_TRANSITION = (args, nimbus, topoId, base) -> {
            int delay = base.get_topology_action_options().get_kill_options().get_wait_secs();
            nimbus.delayEvent(topoId, delay, TopologyActions.REMOVE, null);
            return null;
        };
        REMOVE_TRANSITION = (args, nimbus, topoId, base) -> {
            LOG.info("Killing topology: {}", (Object)topoId);
            IStormClusterState state = nimbus.getStormClusterState();
            state.removeStorm(topoId);
            BlobStore store = nimbus.getBlobStore();
            if (store instanceof LocalFsBlobStore) {
                for (String key : Nimbus.getKeyListFromId(nimbus.getConf(), topoId)) {
                    state.removeBlobstoreKey(key);
                    state.removeKeyVersion(key);
                }
            }
            return null;
        };
        STARTUP_WHEN_REBALANCING_TRANSITION = (args, nimbus, topoId, base) -> {
            int delay = base.get_topology_action_options().get_rebalance_options().get_wait_secs();
            nimbus.delayEvent(topoId, delay, TopologyActions.DO_REBALANCE, null);
            return null;
        };
        DO_REBALANCE_TRANSITION = (args, nimbus, topoId, base) -> {
            nimbus.doRebalance(topoId, base);
            return Nimbus.make(base.get_prev_status());
        };
        TOPO_STATE_TRANSITIONS = new ImmutableMap.Builder().put((Object)TopologyStatus.ACTIVE, (Object)new ImmutableMap.Builder().put((Object)TopologyActions.INACTIVATE, (Object)INACTIVE_TRANSITION).put((Object)TopologyActions.ACTIVATE, (Object)NOOP_TRANSITION).put((Object)TopologyActions.REBALANCE, (Object)REBALANCE_TRANSITION).put((Object)TopologyActions.KILL, (Object)KILL_TRANSITION).build()).put((Object)TopologyStatus.INACTIVE, (Object)new ImmutableMap.Builder().put((Object)TopologyActions.ACTIVATE, (Object)ACTIVE_TRANSITION).put((Object)TopologyActions.INACTIVATE, (Object)NOOP_TRANSITION).put((Object)TopologyActions.REBALANCE, (Object)REBALANCE_TRANSITION).put((Object)TopologyActions.KILL, (Object)KILL_TRANSITION).build()).put((Object)TopologyStatus.KILLED, (Object)new ImmutableMap.Builder().put((Object)TopologyActions.STARTUP, (Object)STARTUP_WHEN_KILLED_TRANSITION).put((Object)TopologyActions.KILL, (Object)KILL_TRANSITION).put((Object)TopologyActions.REMOVE, (Object)REMOVE_TRANSITION).build()).put((Object)TopologyStatus.REBALANCING, (Object)new ImmutableMap.Builder().put((Object)TopologyActions.STARTUP, (Object)STARTUP_WHEN_REBALANCING_TRANSITION).put((Object)TopologyActions.KILL, (Object)KILL_TRANSITION).put((Object)TopologyActions.DO_REBALANCE, (Object)DO_REBALANCE_TRANSITION).build()).build();
        EMPTY_STRING_LIST = Collections.unmodifiableList(Collections.emptyList());
        EMPTY_STRING_SET = Collections.unmodifiableSet(Collections.emptySet());
        TOPOLOGY_NAME_REGEX = Pattern.compile("^[^/.:\\\\]+$");
    }

    private static class CommonTopoInfo {
        public Map<String, Object> topoConf;
        public String topoName;
        public StormTopology topology;
        public Map<Integer, String> taskToComponent;
        public StormBase base;
        public int launchTimeSecs;
        public Assignment assignment;
        public Map<List<Integer>, Map<String, Object>> beats;
        public HashSet<String> allComponents;

        private CommonTopoInfo() {
        }
    }

    @VisibleForTesting
    public static class StandaloneINimbus
    implements INimbus {
        public void prepare(Map<String, Object> topoConf, String schedulerLocalDir) {
        }

        public Collection<WorkerSlot> allSlotsAvailableForScheduling(Collection<SupervisorDetails> supervisors, Topologies topologies, Set<String> topologiesMissingAssignments) {
            HashSet<WorkerSlot> ret = new HashSet<WorkerSlot>();
            for (SupervisorDetails sd : supervisors) {
                String id = sd.getId();
                for (Number port : (Collection)sd.getMeta()) {
                    ret.add(new WorkerSlot(id, port));
                }
            }
            return ret;
        }

        public void assignSlots(Topologies topologies, Map<String, Collection<WorkerSlot>> newSlotsByTopologyId) {
        }

        public String getHostName(Map<String, SupervisorDetails> supervisors, String nodeId) {
            SupervisorDetails sd = supervisors.get(nodeId);
            if (sd != null) {
                return sd.getHost();
            }
            return null;
        }

        public IScheduler getForcedScheduler() {
            return null;
        }
    }

    private static final class Dissoc<K, V>
    implements UnaryOperator<Map<K, V>> {
        private final K key;

        public Dissoc(K key) {
            this.key = key;
        }

        @Override
        public Map<K, V> apply(Map<K, V> t) {
            HashMap<K, V> ret = new HashMap<K, V>(t);
            ret.remove(this.key);
            return ret;
        }
    }

    private static final class Assoc<K, V>
    implements UnaryOperator<Map<K, V>> {
        private final K key;
        private final V value;

        public Assoc(K key, V value) {
            this.key = key;
            this.value = value;
        }

        @Override
        public Map<K, V> apply(Map<K, V> t) {
            HashMap<K, V> ret = new HashMap<K, V>(t);
            ret.put(this.key, this.value);
            return ret;
        }
    }
}

