/*
 * Decompiled with CFR 0.152.
 */
package org.apache.storm;

import java.lang.reflect.Method;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.UnaryOperator;
import org.apache.storm.ILocalCluster;
import org.apache.storm.ILocalClusterTrackedTopologyAware;
import org.apache.storm.ISubmitterHook;
import org.apache.storm.ProcessSimulator;
import org.apache.storm.Testing;
import org.apache.storm.blobstore.BlobStore;
import org.apache.storm.cluster.ClusterStateContext;
import org.apache.storm.cluster.ClusterUtils;
import org.apache.storm.cluster.IStateStorage;
import org.apache.storm.cluster.IStormClusterState;
import org.apache.storm.daemon.Acker;
import org.apache.storm.daemon.DaemonCommon;
import org.apache.storm.daemon.Shutdownable;
import org.apache.storm.daemon.StormCommon;
import org.apache.storm.daemon.nimbus.Nimbus;
import org.apache.storm.daemon.supervisor.ReadClusterState;
import org.apache.storm.daemon.supervisor.StandaloneSupervisor;
import org.apache.storm.daemon.supervisor.Supervisor;
import org.apache.storm.executor.LocalExecutor;
import org.apache.storm.generated.AlreadyAliveException;
import org.apache.storm.generated.AuthorizationException;
import org.apache.storm.generated.BeginDownloadResult;
import org.apache.storm.generated.ClusterSummary;
import org.apache.storm.generated.ComponentPageInfo;
import org.apache.storm.generated.Credentials;
import org.apache.storm.generated.GetInfoOptions;
import org.apache.storm.generated.InvalidTopologyException;
import org.apache.storm.generated.KeyAlreadyExistsException;
import org.apache.storm.generated.KeyNotFoundException;
import org.apache.storm.generated.KillOptions;
import org.apache.storm.generated.ListBlobsResult;
import org.apache.storm.generated.LogConfig;
import org.apache.storm.generated.Nimbus;
import org.apache.storm.generated.NimbusSummary;
import org.apache.storm.generated.NotAliveException;
import org.apache.storm.generated.ProfileAction;
import org.apache.storm.generated.ProfileRequest;
import org.apache.storm.generated.ReadableBlobMeta;
import org.apache.storm.generated.RebalanceOptions;
import org.apache.storm.generated.SettableBlobMeta;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.generated.SubmitOptions;
import org.apache.storm.generated.SupervisorPageInfo;
import org.apache.storm.generated.TopologyHistoryInfo;
import org.apache.storm.generated.TopologyInfo;
import org.apache.storm.generated.TopologyPageInfo;
import org.apache.storm.messaging.IContext;
import org.apache.storm.messaging.local.Context;
import org.apache.storm.nimbus.ILeaderElector;
import org.apache.storm.scheduler.INimbus;
import org.apache.storm.security.auth.IGroupMappingServiceProvider;
import org.apache.storm.security.auth.ThriftConnectionType;
import org.apache.storm.security.auth.ThriftServer;
import org.apache.storm.task.IBolt;
import org.apache.storm.testing.InProcessZookeeper;
import org.apache.storm.testing.NonRichBoltTracker;
import org.apache.storm.testing.TmpPath;
import org.apache.storm.testing.TrackedTopology;
import org.apache.storm.utils.ConfigUtils;
import org.apache.storm.utils.ObjectReader;
import org.apache.storm.utils.RegisteredGlobalState;
import org.apache.storm.utils.StormCommonInstaller;
import org.apache.storm.utils.Time;
import org.apache.storm.utils.Utils;
import org.apache.thrift.TException;
import org.apache.thrift.TProcessor;
import org.json.simple.JSONValue;
import org.json.simple.parser.ParseException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class LocalCluster
implements ILocalClusterTrackedTopologyAware,
Nimbus.Iface {
    private static final Logger LOG = LoggerFactory.getLogger(LocalCluster.class);
    private final Nimbus nimbus;
    private final AtomicInteger portCounter;
    private final Map<String, Object> daemonConf;
    private final List<Supervisor> supervisors;
    private final IStateStorage state;
    private final IStormClusterState clusterState;
    private final List<TmpPath> tmpDirs;
    private final InProcessZookeeper zookeeper;
    private final IContext sharedContext;
    private final ThriftServer thriftServer;
    private final String trackId;
    private final StormCommonInstaller commonInstaller;
    private final Time.SimulatedTime time;
    public static final KillOptions KILL_NOW = new KillOptions();

    private static ThriftServer startNimbusDaemon(Map<String, Object> conf, Nimbus nimbus) {
        ThriftServer ret = new ThriftServer(conf, (TProcessor)new Nimbus.Processor((Nimbus.Iface)nimbus), ThriftConnectionType.NIMBUS);
        LOG.info("Starting Nimbus server...");
        new Thread(() -> ret.serve()).start();
        return ret;
    }

    public LocalCluster() throws Exception {
        this(new Builder().withDaemonConf("topology.enable.message.timeouts", true));
    }

    public LocalCluster(String zkHost, Long zkPort) throws Exception {
        this(new Builder().withDaemonConf("topology.enable.message.timeouts", true).withDaemonConf("storm.zookeeper.servers", Arrays.asList(zkHost)).withDaemonConf("storm.zookeeper.port", zkPort));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private LocalCluster(Builder builder) throws Exception {
        this.time = builder.simulateTime ? new Time.SimulatedTime() : null;
        boolean success = false;
        try {
            this.trackId = builder.trackId;
            if (this.trackId != null) {
                ConcurrentHashMap<String, AtomicInteger> metrics = new ConcurrentHashMap<String, AtomicInteger>();
                metrics.put("spout-emitted", new AtomicInteger(0));
                metrics.put("transferred", new AtomicInteger(0));
                metrics.put("processed", new AtomicInteger(0));
                this.commonInstaller = new StormCommonInstaller(new TrackedStormCommon(this.trackId));
                LOG.warn("Adding tracked metrics for ID {}", (Object)this.trackId);
                RegisteredGlobalState.setState((String)this.trackId, metrics);
                LocalExecutor.setTrackId((String)this.trackId);
            } else {
                this.commonInstaller = null;
            }
            this.tmpDirs = new ArrayList<TmpPath>();
            this.supervisors = new ArrayList<Supervisor>();
            TmpPath nimbusTmp = new TmpPath();
            this.tmpDirs.add(nimbusTmp);
            Map conf = ConfigUtils.readStormConfig();
            conf.put("topology.skip.missing.kryo.registrations", true);
            conf.put("topology.enable.message.timeouts", false);
            conf.put("topology.trident.batch.emit.interval.millis", 50);
            conf.put("storm.cluster.mode", "local");
            conf.put("blobstore.superuser", System.getProperty("user.name"));
            conf.put("blobstore.dir", nimbusTmp.getPath());
            InProcessZookeeper zookeeper = null;
            if (!builder.daemonConf.containsKey("storm.zookeeper.servers")) {
                zookeeper = new InProcessZookeeper();
                conf.put("storm.zookeeper.port", zookeeper.getPort());
                conf.put("storm.zookeeper.servers", Arrays.asList("localhost"));
            }
            this.zookeeper = zookeeper;
            conf.putAll(builder.daemonConf);
            this.daemonConf = new HashMap<String, Object>(conf);
            this.portCounter = new AtomicInteger(builder.supervisorSlotPortMin);
            ClusterStateContext cs = new ClusterStateContext();
            this.state = ClusterUtils.mkStateStorage(this.daemonConf, null, null, (ClusterStateContext)cs);
            this.clusterState = builder.clusterState == null ? ClusterUtils.mkStormClusterState(this.daemonConf, null, (ClusterStateContext)cs) : builder.clusterState;
            conf.put("storm.local.dir", nimbusTmp.getPath());
            Nimbus nimbus = new Nimbus(conf, builder.inimbus == null ? new Nimbus.StandaloneINimbus() : builder.inimbus, this.getClusterState(), null, builder.store, builder.leaderElector, builder.groupMapper);
            if (builder.nimbusWrapper != null) {
                nimbus = (Nimbus)builder.nimbusWrapper.apply(nimbus);
            }
            this.nimbus = nimbus;
            this.nimbus.launchServer();
            Context context = null;
            if (!ObjectReader.getBoolean((Object)this.daemonConf.get("storm.local.mode.zmq"), (boolean)false)) {
                context = new Context();
                context.prepare(this.daemonConf);
            }
            this.sharedContext = context;
            this.thriftServer = builder.nimbusDaemon ? LocalCluster.startNimbusDaemon(this.daemonConf, this.nimbus) : null;
            for (int i = 0; i < builder.supervisors; ++i) {
                this.addSupervisor(builder.portsPerSupervisor, null, null);
            }
            try {
                long timeoutAfter = System.currentTimeMillis() + 10000L;
                while (!this.hasLeader()) {
                    if (timeoutAfter > System.currentTimeMillis()) {
                        throw new IllegalStateException("Timed out waiting for nimbus to become the leader");
                    }
                    Thread.sleep(1L);
                }
            }
            catch (Exception exception) {
                // empty catch block
            }
            success = true;
        }
        finally {
            if (!success) {
                this.close();
            }
        }
    }

    private boolean hasLeader() throws AuthorizationException, TException {
        ClusterSummary summary = this.getNimbus().getClusterInfo();
        if (summary.is_set_nimbuses()) {
            for (NimbusSummary sum : summary.get_nimbuses()) {
                if (!sum.is_isLeader()) continue;
                return true;
            }
        }
        return false;
    }

    public Nimbus getNimbus() {
        return this.nimbus;
    }

    public Map<String, Object> getDaemonConf() {
        return new HashMap<String, Object>(this.daemonConf);
    }

    public LocalTopology submitTopology(String topologyName, Map<String, Object> conf, StormTopology topology) throws TException {
        if (!Utils.isValidConf(conf)) {
            throw new IllegalArgumentException("Topology conf is not json-serializable");
        }
        this.getNimbus().submitTopology(topologyName, null, JSONValue.toJSONString(conf), topology);
        ISubmitterHook hook = (ISubmitterHook)Utils.getConfiguredClass(conf, (Object)"storm.topology.submission.notifier.plugin.class");
        if (hook != null) {
            TopologyInfo topologyInfo = Utils.getTopologyInfo((String)topologyName, null, conf);
            try {
                hook.notify(topologyInfo, conf, topology);
            }
            catch (IllegalAccessException e) {
                throw new RuntimeException(e);
            }
        }
        return new LocalTopology(topologyName, topology);
    }

    public LocalTopology submitTopologyWithOpts(String topologyName, Map<String, Object> conf, StormTopology topology, SubmitOptions submitOpts) throws TException {
        if (!Utils.isValidConf(conf)) {
            throw new IllegalArgumentException("Topology conf is not json-serializable");
        }
        this.getNimbus().submitTopologyWithOpts(topologyName, null, JSONValue.toJSONString(conf), topology, submitOpts);
        return new LocalTopology(topologyName, topology);
    }

    @Override
    public LocalTopology submitTopology(String topologyName, Map<String, Object> conf, TrackedTopology topology) throws TException {
        this.submitTopology(topologyName, (Map)conf, topology.getTopology());
        return new LocalTopology(topologyName, topology.getTopology());
    }

    @Override
    public LocalTopology submitTopologyWithOpts(String topologyName, Map<String, Object> conf, TrackedTopology topology, SubmitOptions submitOpts) throws TException {
        this.submitTopologyWithOpts(topologyName, (Map)conf, topology.getTopology(), submitOpts);
        return new LocalTopology(topologyName, topology.getTopology());
    }

    public void uploadNewCredentials(String topologyName, Credentials creds) throws TException {
        this.getNimbus().uploadNewCredentials(topologyName, creds);
    }

    public void killTopology(String topologyName) throws TException {
        this.getNimbus().killTopology(topologyName);
    }

    public void killTopologyWithOpts(String name, KillOptions options) throws TException {
        this.getNimbus().killTopologyWithOpts(name, options);
    }

    public void activate(String topologyName) throws TException {
        this.getNimbus().activate(topologyName);
    }

    public void deactivate(String topologyName) throws TException {
        this.getNimbus().deactivate(topologyName);
    }

    public void rebalance(String name, RebalanceOptions options) throws TException {
        this.getNimbus().rebalance(name, options);
    }

    public void shutdown() {
        try {
            this.close();
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    public String getTopologyConf(String id) throws TException {
        return this.getNimbus().getTopologyConf(id);
    }

    public StormTopology getTopology(String id) throws TException {
        return this.getNimbus().getTopology(id);
    }

    public ClusterSummary getClusterInfo() throws TException {
        return this.getNimbus().getClusterInfo();
    }

    public TopologyInfo getTopologyInfo(String id) throws TException {
        return this.getNimbus().getTopologyInfo(id);
    }

    public synchronized void close() throws Exception {
        if (this.nimbus != null) {
            this.nimbus.shutdown();
        }
        if (this.thriftServer != null) {
            LOG.info("shutting down thrift server");
            try {
                this.thriftServer.stop();
            }
            catch (Exception e) {
                LOG.info("failed to stop thrift", (Throwable)e);
            }
        }
        if (this.state != null) {
            this.state.close();
        }
        if (this.getClusterState() != null) {
            this.getClusterState().disconnect();
        }
        for (Supervisor s : this.supervisors) {
            s.shutdownAllWorkers(null, ReadClusterState.THREAD_DUMP_ON_ERROR);
            s.close();
        }
        ProcessSimulator.killAllProcesses();
        if (this.zookeeper != null) {
            LOG.info("Shutting down in process zookeeper");
            this.zookeeper.close();
            LOG.info("Done shutting down in process zookeeper");
        }
        for (TmpPath p : this.tmpDirs) {
            p.close();
        }
        if (this.trackId != null) {
            LOG.warn("Clearing tracked metrics for ID {}", (Object)this.trackId);
            LocalExecutor.clearTrackId();
            RegisteredGlobalState.clearState((String)this.trackId);
        }
        if (this.commonInstaller != null) {
            this.commonInstaller.close();
        }
        if (this.time != null) {
            this.time.close();
        }
    }

    public synchronized Supervisor getSupervisor(String id) {
        for (Supervisor s : this.supervisors) {
            if (!id.equals(s.getId())) continue;
            return s;
        }
        return null;
    }

    public synchronized void killSupervisor(String id) {
        Iterator<Supervisor> it = this.supervisors.iterator();
        while (it.hasNext()) {
            Supervisor s = it.next();
            if (!id.equals(s.getId())) continue;
            it.remove();
            s.close();
            return;
        }
    }

    public Supervisor addSupervisor() throws Exception {
        return this.addSupervisor(null, null, null);
    }

    public Supervisor addSupervisor(Number ports) throws Exception {
        return this.addSupervisor(ports, null, null);
    }

    public Supervisor addSupervisor(Number ports, String id) throws Exception {
        return this.addSupervisor(ports, null, id);
    }

    public synchronized Supervisor addSupervisor(Number ports, Map<String, Object> conf, String id) throws Exception {
        if (ports == null) {
            ports = 2;
        }
        TmpPath tmpDir = new TmpPath();
        this.tmpDirs.add(tmpDir);
        ArrayList<Integer> portNumbers = new ArrayList<Integer>(ports.intValue());
        for (int i = 0; i < ports.intValue(); ++i) {
            portNumbers.add(this.portCounter.getAndIncrement());
        }
        HashMap<String, Object> superConf = new HashMap<String, Object>(this.daemonConf);
        if (conf != null) {
            superConf.putAll(conf);
        }
        superConf.put("storm.local.dir", tmpDir.getPath());
        superConf.put("supervisor.slots.ports", portNumbers);
        final String superId = id == null ? Utils.uuid() : id;
        StandaloneSupervisor isuper = new StandaloneSupervisor(){

            @Override
            public String generateSupervisorId() {
                return superId;
            }
        };
        if (!ConfigUtils.isLocalMode(superConf)) {
            throw new IllegalArgumentException("Cannot start server in distrubuted mode!");
        }
        Supervisor s = new Supervisor(superConf, this.sharedContext, isuper);
        s.launch();
        this.supervisors.add(s);
        return s;
    }

    private boolean areAllSupervisorsWaiting() {
        boolean ret = true;
        for (Supervisor s : this.supervisors) {
            ret = ret && s.isWaiting();
        }
        return ret;
    }

    private static boolean areAllWorkersWaiting() {
        boolean ret = true;
        for (Shutdownable s : ProcessSimulator.getAllProcessHandles()) {
            if (!(s instanceof DaemonCommon)) continue;
            ret = ret && ((DaemonCommon)s).isWaiting();
        }
        return ret;
    }

    public void waitForIdle() throws InterruptedException {
        this.waitForIdle(Testing.TEST_TIMEOUT_MS);
    }

    public void waitForIdle(long timeoutMs) throws InterruptedException {
        ThreadLocalRandom rand = ThreadLocalRandom.current();
        long endTime = System.currentTimeMillis() + timeoutMs;
        while (!(this.nimbus.isWaiting() && this.areAllSupervisorsWaiting() && LocalCluster.areAllWorkersWaiting())) {
            if (System.currentTimeMillis() >= endTime) {
                LOG.info("Cluster was not idle in {} ms", (Object)timeoutMs);
                LOG.info(Utils.threadDump());
                throw new AssertionError((Object)("Test timed out (" + timeoutMs + "ms) cluster not idle"));
            }
            Thread.sleep(((Random)rand).nextInt(20));
        }
    }

    public void advanceClusterTime(int secs) throws InterruptedException {
        this.advanceClusterTime(secs, 1);
    }

    public void advanceClusterTime(int secs, int incSecs) throws InterruptedException {
        for (int amountLeft = secs; amountLeft > 0; amountLeft -= incSecs) {
            int diff = Math.min(incSecs, amountLeft);
            Time.advanceTimeSecs((long)diff);
            this.waitForIdle();
        }
    }

    public IStormClusterState getClusterState() {
        return this.clusterState;
    }

    public String getTrackedId() {
        return this.trackId;
    }

    public void submitTopology(String name, String uploadedJarLocation, String jsonConf, StormTopology topology) throws AlreadyAliveException, InvalidTopologyException, AuthorizationException, TException {
        try {
            Map conf = (Map)JSONValue.parseWithException((String)jsonConf);
            this.submitTopology(name, conf, topology);
        }
        catch (ParseException e) {
            throw new RuntimeException(e);
        }
    }

    public void submitTopologyWithOpts(String name, String uploadedJarLocation, String jsonConf, StormTopology topology, SubmitOptions options) throws AlreadyAliveException, InvalidTopologyException, AuthorizationException, TException {
        try {
            Map conf = (Map)JSONValue.parseWithException((String)jsonConf);
            this.submitTopologyWithOpts(name, conf, topology, options);
        }
        catch (ParseException e) {
            throw new RuntimeException(e);
        }
    }

    public void setLogConfig(String name, LogConfig config) throws TException {
        throw new RuntimeException("NOT IMPLMENETED YET");
    }

    public LogConfig getLogConfig(String name) throws TException {
        throw new RuntimeException("NOT IMPLMENETED YET");
    }

    public void debug(String name, String component, boolean enable, double samplingPercentage) throws NotAliveException, AuthorizationException, TException {
        throw new RuntimeException("NOT IMPLMENETED YET");
    }

    public void setWorkerProfiler(String id, ProfileRequest profileRequest) throws TException {
        throw new RuntimeException("NOT IMPLMENETED YET");
    }

    public List<ProfileRequest> getComponentPendingProfileActions(String id, String component_id, ProfileAction action) throws TException {
        throw new RuntimeException("NOT IMPLMENETED YET");
    }

    public String beginCreateBlob(String key, SettableBlobMeta meta) throws AuthorizationException, KeyAlreadyExistsException, TException {
        throw new RuntimeException("BLOBS NOT SUPPORTED IN LOCAL MODE");
    }

    public String beginUpdateBlob(String key) throws AuthorizationException, KeyNotFoundException, TException {
        throw new KeyNotFoundException("BLOBS NOT SUPPORTED IN LOCAL MODE");
    }

    public void uploadBlobChunk(String session, ByteBuffer chunk) throws AuthorizationException, TException {
        throw new RuntimeException("BLOBS NOT SUPPORTED IN LOCAL MODE");
    }

    public void finishBlobUpload(String session) throws AuthorizationException, TException {
        throw new RuntimeException("BLOBS NOT SUPPORTED IN LOCAL MODE");
    }

    public void cancelBlobUpload(String session) throws AuthorizationException, TException {
        throw new RuntimeException("BLOBS NOT SUPPORTED IN LOCAL MODE");
    }

    public ReadableBlobMeta getBlobMeta(String key) throws AuthorizationException, KeyNotFoundException, TException {
        throw new KeyNotFoundException("BLOBS NOT SUPPORTED IN LOCAL MODE");
    }

    public void setBlobMeta(String key, SettableBlobMeta meta) throws AuthorizationException, KeyNotFoundException, TException {
        throw new KeyNotFoundException("BLOBS NOT SUPPORTED IN LOCAL MODE");
    }

    public BeginDownloadResult beginBlobDownload(String key) throws AuthorizationException, KeyNotFoundException, TException {
        throw new KeyNotFoundException("BLOBS NOT SUPPORTED IN LOCAL MODE");
    }

    public ByteBuffer downloadBlobChunk(String session) throws AuthorizationException, TException {
        throw new RuntimeException("BLOBS NOT SUPPORTED IN LOCAL MODE");
    }

    public void deleteBlob(String key) throws AuthorizationException, KeyNotFoundException, TException {
        throw new KeyNotFoundException("BLOBS NOT SUPPORTED IN LOCAL MODE");
    }

    public ListBlobsResult listBlobs(String session) throws TException {
        ListBlobsResult ret = new ListBlobsResult();
        ret.set_keys(new ArrayList());
        return ret;
    }

    public int getBlobReplication(String key) throws AuthorizationException, KeyNotFoundException, TException {
        throw new KeyNotFoundException("BLOBS NOT SUPPORTED IN LOCAL MODE");
    }

    public int updateBlobReplication(String key, int replication) throws AuthorizationException, KeyNotFoundException, TException {
        throw new KeyNotFoundException("BLOBS NOT SUPPORTED IN LOCAL MODE");
    }

    public void createStateInZookeeper(String key) throws TException {
        throw new RuntimeException("NOT IMPLMENETED YET");
    }

    public String beginFileUpload() throws AuthorizationException, TException {
        return Utils.uuid();
    }

    public void uploadChunk(String location, ByteBuffer chunk) throws AuthorizationException, TException {
    }

    public void finishFileUpload(String location) throws AuthorizationException, TException {
    }

    public String beginFileDownload(String file) throws AuthorizationException, TException {
        throw new AuthorizationException("FILE DOWNLOAD NOT SUPPORTED IN LOCAL MODE");
    }

    public ByteBuffer downloadChunk(String id) throws AuthorizationException, TException {
        throw new AuthorizationException("FILE DOWNLOAD NOT SUPPORTED IN LOCAL MODE");
    }

    public String getNimbusConf() throws AuthorizationException, TException {
        throw new RuntimeException("NOT IMPLMENETED YET");
    }

    public NimbusSummary getLeader() throws AuthorizationException, TException {
        return this.nimbus.getLeader();
    }

    public boolean isTopologyNameAllowed(String name) throws AuthorizationException, TException {
        return this.nimbus.isTopologyNameAllowed(name);
    }

    public TopologyInfo getTopologyInfoWithOpts(String id, GetInfoOptions options) throws NotAliveException, AuthorizationException, TException {
        throw new RuntimeException("NOT IMPLMENETED YET");
    }

    public TopologyPageInfo getTopologyPageInfo(String id, String window, boolean is_include_sys) throws NotAliveException, AuthorizationException, TException {
        throw new RuntimeException("NOT IMPLMENETED YET");
    }

    public SupervisorPageInfo getSupervisorPageInfo(String id, String host, boolean is_include_sys) throws NotAliveException, AuthorizationException, TException {
        throw new RuntimeException("NOT IMPLMENETED YET");
    }

    public ComponentPageInfo getComponentPageInfo(String topology_id, String component_id, String window, boolean is_include_sys) throws NotAliveException, AuthorizationException, TException {
        throw new RuntimeException("NOT IMPLMENETED YET");
    }

    public StormTopology getUserTopology(String id) throws NotAliveException, AuthorizationException, TException {
        throw new RuntimeException("NOT IMPLMENETED YET");
    }

    public TopologyHistoryInfo getTopologyHistory(String user) throws AuthorizationException, TException {
        throw new RuntimeException("NOT IMPLMENETED YET");
    }

    /*
     * Exception decompiling
     */
    public static <T> T withLocalModeOverride(Callable<T> c, long ttlSec) throws Exception {
        /*
         * This method has failed to decompile.  When submitting a bug report, please provide this stack trace, and (if you hold appropriate legal rights) the relevant class file.
         * 
         * org.benf.cfr.reader.util.ConfusedCFRException: Started 2 blocks at once
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op04StructuredStatement.getStartingBlocks(Op04StructuredStatement.java:412)
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op04StructuredStatement.buildNestedBlocks(Op04StructuredStatement.java:487)
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op03SimpleStatement.createInitialStructuredBlock(Op03SimpleStatement.java:736)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisInner(CodeAnalyser.java:850)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisOrWrapFail(CodeAnalyser.java:278)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysis(CodeAnalyser.java:201)
         *     at org.benf.cfr.reader.entities.attributes.AttributeCode.analyse(AttributeCode.java:94)
         *     at org.benf.cfr.reader.entities.Method.analyse(Method.java:531)
         *     at org.benf.cfr.reader.entities.ClassFile.analyseMid(ClassFile.java:1055)
         *     at org.benf.cfr.reader.entities.ClassFile.analyseTop(ClassFile.java:942)
         *     at org.benf.cfr.reader.Driver.doJarVersionTypes(Driver.java:257)
         *     at org.benf.cfr.reader.Driver.doJar(Driver.java:139)
         *     at org.benf.cfr.reader.CfrDriverImpl.analyse(CfrDriverImpl.java:76)
         *     at org.benf.cfr.reader.Main.main(Main.java:54)
         */
        throw new IllegalStateException("Decompilation failed");
    }

    public static void main(String[] args) throws Exception {
        if (args.length < 1) {
            throw new IllegalArgumentException("No class was specified to run");
        }
        long ttl = 20L;
        String ttlString = System.getProperty("storm.local.sleeptime", "20");
        try {
            ttl = Long.valueOf(ttlString);
        }
        catch (NumberFormatException e) {
            LOG.warn("could not parse the sleep time defaulting to {} seconds", (Object)ttl);
        }
        LocalCluster.withLocalModeOverride(() -> {
            String klass = args[0];
            Object[] newArgs = Arrays.copyOfRange(args, 1, args.length);
            Class<?> c = Class.forName(klass);
            Method main = c.getDeclaredMethod("main", String[].class);
            LOG.info("\n\n\t\tRUNNING {} with args {}\n\n", (Object)main, (Object)Arrays.toString(newArgs));
            main.invoke(null, new Object[]{newArgs});
            return null;
        }, ttl);
        System.exit(0);
    }

    static {
        KILL_NOW.set_wait_secs(0);
    }

    public class LocalTopology
    extends StormTopology
    implements ILocalCluster.ILocalTopology {
        private static final long serialVersionUID = 6145919776650637748L;
        private final String topoName;

        public LocalTopology(String topoName, StormTopology topo) {
            super(topo);
            this.topoName = topoName;
        }

        public void close() throws TException {
            LocalCluster.this.killTopologyWithOpts(this.topoName, KILL_NOW);
        }
    }

    private static class TrackedStormCommon
    extends StormCommon {
        private final String id;

        public TrackedStormCommon(String id) {
            this.id = id;
        }

        public IBolt makeAckerBoltImpl() {
            return new NonRichBoltTracker((IBolt)new Acker(), this.id);
        }
    }

    public static class Builder {
        private int supervisors = 2;
        private int portsPerSupervisor = 3;
        private Map<String, Object> daemonConf = new HashMap<String, Object>();
        private INimbus inimbus = null;
        private IGroupMappingServiceProvider groupMapper = null;
        private int supervisorSlotPortMin = 1024;
        private boolean nimbusDaemon = false;
        private UnaryOperator<Nimbus> nimbusWrapper = null;
        private BlobStore store = null;
        private IStormClusterState clusterState = null;
        private ILeaderElector leaderElector = null;
        private String trackId = null;
        private boolean simulateTime = false;

        public Builder withSupervisors(int supervisors) {
            if (supervisors < 0) {
                throw new IllegalArgumentException("supervisors cannot be negative");
            }
            this.supervisors = supervisors;
            return this;
        }

        public Builder withPortsPerSupervisor(int portsPerSupervisor) {
            if (portsPerSupervisor < 0) {
                throw new IllegalArgumentException("supervisor ports cannot be negative");
            }
            this.portsPerSupervisor = portsPerSupervisor;
            return this;
        }

        public Builder withDaemonConf(Map<String, Object> conf) {
            if (conf != null) {
                this.daemonConf = new HashMap<String, Object>(conf);
            }
            return this;
        }

        public Builder withDaemonConf(String key, Object value) {
            this.daemonConf.put(key, value);
            return this;
        }

        public Builder withINimbus(INimbus inimbus) {
            this.inimbus = inimbus;
            return this;
        }

        public Builder withGroupMapper(IGroupMappingServiceProvider groupMapper) {
            this.groupMapper = groupMapper;
            return this;
        }

        public Builder withSupervisorSlotPortMin(Number minPort) {
            int port = 1024;
            if (minPort == null) {
                LOG.warn("Number is null... {}", (Object)minPort);
            } else {
                port = minPort.intValue();
            }
            if (port <= 0) {
                throw new IllegalArgumentException("port must be positive");
            }
            this.supervisorSlotPortMin = port;
            return this;
        }

        public Builder withNimbusDaemon() {
            return this.withNimbusDaemon(true);
        }

        public Builder withNimbusDaemon(Boolean nimbusDaemon) {
            if (nimbusDaemon == null) {
                nimbusDaemon = false;
                LOG.warn("nimbusDaemon is null");
            }
            this.nimbusDaemon = nimbusDaemon;
            return this;
        }

        public Builder withSimulatedTime() {
            return this.withSimulatedTime(true);
        }

        public Builder withSimulatedTime(boolean simulateTime) {
            this.simulateTime = simulateTime;
            return this;
        }

        public Builder withNimbusWrapper(UnaryOperator<Nimbus> nimbusWrapper) {
            this.nimbusWrapper = nimbusWrapper;
            return this;
        }

        public Builder withBlobStore(BlobStore store) {
            this.store = store;
            return this;
        }

        public Builder withClusterState(IStormClusterState clusterState) {
            this.clusterState = clusterState;
            return this;
        }

        public Builder withLeaderElector(ILeaderElector leaderElector) {
            this.leaderElector = leaderElector;
            return this;
        }

        public Builder withTracked(String trackId) {
            this.trackId = trackId;
            return this;
        }

        public Builder withTracked() {
            this.trackId = Utils.uuid();
            return this;
        }

        public LocalCluster build() throws Exception {
            return new LocalCluster(this);
        }
    }
}

