/*
 * Decompiled with CFR 0.152.
 */
package org.apache.storm.cluster;

import java.security.NoSuchAlgorithmException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.lang.StringUtils;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.framework.state.ConnectionStateListener;
import org.apache.storm.callback.ZKStateChangedCallback;
import org.apache.storm.cluster.ClusterStateContext;
import org.apache.storm.cluster.ClusterUtils;
import org.apache.storm.cluster.ExecutorBeat;
import org.apache.storm.cluster.IStateStorage;
import org.apache.storm.cluster.IStormClusterState;
import org.apache.storm.cluster.VersionedData;
import org.apache.storm.generated.Assignment;
import org.apache.storm.generated.ClusterWorkerHeartbeat;
import org.apache.storm.generated.Credentials;
import org.apache.storm.generated.DebugOptions;
import org.apache.storm.generated.ErrorInfo;
import org.apache.storm.generated.ExecutorInfo;
import org.apache.storm.generated.LogConfig;
import org.apache.storm.generated.NimbusSummary;
import org.apache.storm.generated.NodeInfo;
import org.apache.storm.generated.ProfileAction;
import org.apache.storm.generated.ProfileRequest;
import org.apache.storm.generated.StormBase;
import org.apache.storm.generated.SupervisorInfo;
import org.apache.storm.nimbus.NimbusInfo;
import org.apache.storm.utils.Time;
import org.apache.storm.utils.Utils;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.data.ACL;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class StormClusterStateImpl
implements IStormClusterState {
    private static Logger LOG = LoggerFactory.getLogger(StormClusterStateImpl.class);
    private IStateStorage stateStorage;
    private ConcurrentHashMap<String, Runnable> assignmentInfoCallback;
    private ConcurrentHashMap<String, Runnable> assignmentInfoWithVersionCallback;
    private ConcurrentHashMap<String, Runnable> assignmentVersionCallback;
    private AtomicReference<Runnable> supervisorsCallback;
    private ConcurrentHashMap<String, Runnable> backPressureCallback;
    private AtomicReference<Runnable> assignmentsCallback;
    private ConcurrentHashMap<String, Runnable> stormBaseCallback;
    private AtomicReference<Runnable> blobstoreCallback;
    private ConcurrentHashMap<String, Runnable> credentialsCallback;
    private ConcurrentHashMap<String, Runnable> logConfigCallback;
    private List<ACL> acls;
    private String stateId;
    private boolean solo;

    public StormClusterStateImpl(IStateStorage StateStorage, List<ACL> acls, ClusterStateContext context, boolean solo) throws Exception {
        String[] pathlist;
        this.stateStorage = StateStorage;
        this.solo = solo;
        this.acls = acls;
        this.assignmentInfoCallback = new ConcurrentHashMap();
        this.assignmentInfoWithVersionCallback = new ConcurrentHashMap();
        this.assignmentVersionCallback = new ConcurrentHashMap();
        this.supervisorsCallback = new AtomicReference();
        this.backPressureCallback = new ConcurrentHashMap();
        this.assignmentsCallback = new AtomicReference();
        this.stormBaseCallback = new ConcurrentHashMap();
        this.credentialsCallback = new ConcurrentHashMap();
        this.logConfigCallback = new ConcurrentHashMap();
        this.blobstoreCallback = new AtomicReference();
        this.stateId = this.stateStorage.register(new ZKStateChangedCallback(){

            @Override
            public void changed(Watcher.Event.EventType type, String path) {
                List toks = StormClusterStateImpl.this.tokenizePath(path);
                int size = toks.size();
                if (size >= 1) {
                    String root = (String)toks.get(0);
                    if (root.equals("assignments")) {
                        if (size == 1) {
                            StormClusterStateImpl.this.issueCallback(StormClusterStateImpl.this.assignmentsCallback);
                        } else {
                            StormClusterStateImpl.this.issueMapCallback(StormClusterStateImpl.this.assignmentInfoCallback, (String)toks.get(1));
                            StormClusterStateImpl.this.issueMapCallback(StormClusterStateImpl.this.assignmentVersionCallback, (String)toks.get(1));
                            StormClusterStateImpl.this.issueMapCallback(StormClusterStateImpl.this.assignmentInfoWithVersionCallback, (String)toks.get(1));
                        }
                    } else if (root.equals("supervisors")) {
                        StormClusterStateImpl.this.issueCallback(StormClusterStateImpl.this.supervisorsCallback);
                    } else if (root.equals("blobstore")) {
                        StormClusterStateImpl.this.issueCallback(StormClusterStateImpl.this.blobstoreCallback);
                    } else if (root.equals("storms") && size > 1) {
                        StormClusterStateImpl.this.issueMapCallback(StormClusterStateImpl.this.stormBaseCallback, (String)toks.get(1));
                    } else if (root.equals("credentials") && size > 1) {
                        StormClusterStateImpl.this.issueMapCallback(StormClusterStateImpl.this.credentialsCallback, (String)toks.get(1));
                    } else if (root.equals("logconfigs") && size > 1) {
                        StormClusterStateImpl.this.issueMapCallback(StormClusterStateImpl.this.logConfigCallback, (String)toks.get(1));
                    } else if (root.equals("backpressure") && size > 1) {
                        StormClusterStateImpl.this.issueMapCallback(StormClusterStateImpl.this.backPressureCallback, (String)toks.get(1));
                    } else {
                        LOG.error("{} Unknown callback for subtree {}", (Object)new RuntimeException("Unknown callback for this path"), (Object)path);
                        Runtime.getRuntime().exit(30);
                    }
                }
            }
        });
        for (String path : pathlist = new String[]{"/assignments", "/storms", "/supervisors", "/workerbeats", "/errors", "/blobstore", "/nimbuses", "/logconfigs", "/backpressure"}) {
            this.stateStorage.mkdirs(path, acls);
        }
    }

    protected void issueCallback(AtomicReference<Runnable> cb) {
        Runnable callback = cb.getAndSet(null);
        if (callback != null) {
            callback.run();
        }
    }

    protected void issueMapCallback(ConcurrentHashMap<String, Runnable> callbackConcurrentHashMap, String key) {
        Runnable callback = callbackConcurrentHashMap.remove(key);
        if (callback != null) {
            callback.run();
        }
    }

    @Override
    public List<String> assignments(Runnable callback) {
        if (callback != null) {
            this.assignmentsCallback.set(callback);
        }
        return this.stateStorage.get_children("/assignments", callback != null);
    }

    @Override
    public Assignment assignmentInfo(String stormId, Runnable callback) {
        if (callback != null) {
            this.assignmentInfoCallback.put(stormId, callback);
        }
        byte[] serialized = this.stateStorage.get_data(ClusterUtils.assignmentPath(stormId), callback != null);
        return ClusterUtils.maybeDeserialize(serialized, Assignment.class);
    }

    @Override
    public VersionedData<Assignment> assignmentInfoWithVersion(String stormId, Runnable callback) {
        if (callback != null) {
            this.assignmentInfoWithVersionCallback.put(stormId, callback);
        }
        Assignment assignment = null;
        Integer version = 0;
        VersionedData<byte[]> dataWithVersion = this.stateStorage.get_data_with_version(ClusterUtils.assignmentPath(stormId), callback != null);
        if (dataWithVersion != null) {
            assignment = ClusterUtils.maybeDeserialize(dataWithVersion.getData(), Assignment.class);
            version = dataWithVersion.getVersion();
        }
        return new VersionedData<Object>(version, assignment);
    }

    @Override
    public Integer assignmentVersion(String stormId, Runnable callback) throws Exception {
        if (callback != null) {
            this.assignmentVersionCallback.put(stormId, callback);
        }
        return this.stateStorage.get_version(ClusterUtils.assignmentPath(stormId), callback != null);
    }

    @Override
    public List<String> blobstoreInfo(String blobKey) {
        String path = ClusterUtils.blobstorePath(blobKey);
        this.stateStorage.sync_path(path);
        return this.stateStorage.get_children(path, false);
    }

    @Override
    public List<NimbusSummary> nimbuses() {
        ArrayList<NimbusSummary> nimbusSummaries = new ArrayList<NimbusSummary>();
        List<String> nimbusIds = this.stateStorage.get_children("/nimbuses", false);
        for (String nimbusId : nimbusIds) {
            byte[] serialized = this.stateStorage.get_data(ClusterUtils.nimbusPath(nimbusId), false);
            if (serialized == null) continue;
            NimbusSummary nimbusSummary = ClusterUtils.maybeDeserialize(serialized, NimbusSummary.class);
            nimbusSummaries.add(nimbusSummary);
        }
        return nimbusSummaries;
    }

    @Override
    public void addNimbusHost(final String nimbusId, final NimbusSummary nimbusSummary) {
        this.stateStorage.delete_node(ClusterUtils.nimbusPath(nimbusId));
        this.stateStorage.add_listener(new ConnectionStateListener(){

            public void stateChanged(CuratorFramework curatorFramework, ConnectionState connectionState) {
                LOG.info("Connection state listener invoked, zookeeper connection state has changed to {}", (Object)connectionState);
                if (connectionState.equals((Object)ConnectionState.RECONNECTED)) {
                    LOG.info("Connection state has changed to reconnected so setting nimbuses entry one more time");
                    StormClusterStateImpl.this.stateStorage.delete_node(ClusterUtils.nimbusPath(nimbusId));
                    StormClusterStateImpl.this.stateStorage.set_ephemeral_node(ClusterUtils.nimbusPath(nimbusId), Utils.serialize(nimbusSummary), StormClusterStateImpl.this.acls);
                }
            }
        });
        this.stateStorage.set_ephemeral_node(ClusterUtils.nimbusPath(nimbusId), Utils.serialize(nimbusSummary), this.acls);
    }

    @Override
    public List<String> activeStorms() {
        return this.stateStorage.get_children("/storms", false);
    }

    @Override
    public StormBase stormBase(String stormId, Runnable callback) {
        if (callback != null) {
            this.stormBaseCallback.put(stormId, callback);
        }
        return ClusterUtils.maybeDeserialize(this.stateStorage.get_data(ClusterUtils.stormPath(stormId), callback != null), StormBase.class);
    }

    @Override
    public ClusterWorkerHeartbeat getWorkerHeartbeat(String stormId, String node, Long port) {
        byte[] bytes = this.stateStorage.get_worker_hb(ClusterUtils.workerbeatPath(stormId, node, port), false);
        return ClusterUtils.maybeDeserialize(bytes, ClusterWorkerHeartbeat.class);
    }

    @Override
    public List<ProfileRequest> getWorkerProfileRequests(String stormId, NodeInfo nodeInfo) {
        ArrayList<ProfileRequest> requests = new ArrayList<ProfileRequest>();
        List<ProfileRequest> profileRequests = this.getTopologyProfileRequests(stormId);
        for (ProfileRequest profileRequest : profileRequests) {
            NodeInfo nodeInfo1 = profileRequest.get_nodeInfo();
            if (!nodeInfo1.equals(nodeInfo)) continue;
            requests.add(profileRequest);
        }
        return requests;
    }

    @Override
    public List<ProfileRequest> getTopologyProfileRequests(String stormId) {
        ArrayList<ProfileRequest> profileRequests = new ArrayList<ProfileRequest>();
        String path = ClusterUtils.profilerConfigPath(stormId);
        if (this.stateStorage.node_exists(path, false)) {
            List<String> strs = this.stateStorage.get_children(path, false);
            for (String str : strs) {
                String childPath = path + "/" + str;
                byte[] raw = this.stateStorage.get_data(childPath, false);
                ProfileRequest request = ClusterUtils.maybeDeserialize(raw, ProfileRequest.class);
                if (request == null) continue;
                profileRequests.add(request);
            }
        }
        return profileRequests;
    }

    @Override
    public void setWorkerProfileRequest(String stormId, ProfileRequest profileRequest) {
        ProfileAction profileAction = profileRequest.get_action();
        String host = profileRequest.get_nodeInfo().get_node();
        Long port = profileRequest.get_nodeInfo().get_port_iterator().next();
        String path = ClusterUtils.profilerConfigPath(stormId, host, port, profileAction);
        this.stateStorage.set_data(path, Utils.serialize(profileRequest), this.acls);
    }

    @Override
    public void deleteTopologyProfileRequests(String stormId, ProfileRequest profileRequest) {
        ProfileAction profileAction = profileRequest.get_action();
        String host = profileRequest.get_nodeInfo().get_node();
        Long port = profileRequest.get_nodeInfo().get_port_iterator().next();
        String path = ClusterUtils.profilerConfigPath(stormId, host, port, profileAction);
        this.stateStorage.delete_node(path);
    }

    @Override
    public Map<ExecutorInfo, ExecutorBeat> executorBeats(String stormId, Map<List<Long>, NodeInfo> executorNodePort) {
        HashMap<ExecutorInfo, ExecutorBeat> executorWhbs = new HashMap<ExecutorInfo, ExecutorBeat>();
        HashMap<NodeInfo, List<List<Long>>> nodePortExecutors = Utils.reverseMap(executorNodePort);
        for (Map.Entry entry : nodePortExecutors.entrySet()) {
            String node = ((NodeInfo)entry.getKey()).get_node();
            Long port = ((NodeInfo)entry.getKey()).get_port_iterator().next();
            ClusterWorkerHeartbeat whb = this.getWorkerHeartbeat(stormId, node, port);
            ArrayList<ExecutorInfo> executorInfoList = new ArrayList<ExecutorInfo>();
            for (List list : (List)entry.getValue()) {
                executorInfoList.add(new ExecutorInfo(((Long)list.get(0)).intValue(), ((Long)list.get(list.size() - 1)).intValue()));
            }
            if (whb == null) continue;
            executorWhbs.putAll(ClusterUtils.convertExecutorBeats(executorInfoList, whb));
        }
        return executorWhbs;
    }

    @Override
    public List<String> supervisors(Runnable callback) {
        if (callback != null) {
            this.supervisorsCallback.set(callback);
        }
        return this.stateStorage.get_children("/supervisors", callback != null);
    }

    @Override
    public SupervisorInfo supervisorInfo(String supervisorId) {
        String path = ClusterUtils.supervisorPath(supervisorId);
        return ClusterUtils.maybeDeserialize(this.stateStorage.get_data(path, false), SupervisorInfo.class);
    }

    @Override
    public void setupHeatbeats(String stormId) {
        this.stateStorage.mkdirs(ClusterUtils.workerbeatStormRoot(stormId), this.acls);
    }

    @Override
    public void teardownHeartbeats(String stormId) {
        try {
            this.stateStorage.delete_worker_hb(ClusterUtils.workerbeatStormRoot(stormId));
        }
        catch (Exception e) {
            if (Utils.exceptionCauseIsInstanceOf(KeeperException.class, e)) {
                LOG.warn("Could not teardown heartbeats for {}.", (Object)stormId);
            }
            throw e;
        }
    }

    @Override
    public void teardownTopologyErrors(String stormId) {
        try {
            this.stateStorage.delete_node(ClusterUtils.errorStormRoot(stormId));
        }
        catch (Exception e) {
            if (Utils.exceptionCauseIsInstanceOf(KeeperException.class, e)) {
                LOG.warn("Could not teardown errors for {}.", (Object)stormId);
            }
            throw e;
        }
    }

    @Override
    public List<String> heartbeatStorms() {
        return this.stateStorage.get_worker_hb_children("/workerbeats", false);
    }

    @Override
    public List<String> errorTopologies() {
        return this.stateStorage.get_children("/errors", false);
    }

    @Override
    public List<String> backpressureTopologies() {
        return this.stateStorage.get_children("/backpressure", false);
    }

    @Override
    public void setTopologyLogConfig(String stormId, LogConfig logConfig) {
        this.stateStorage.set_data(ClusterUtils.logConfigPath(stormId), Utils.serialize(logConfig), this.acls);
    }

    @Override
    public LogConfig topologyLogConfig(String stormId, Runnable cb) {
        if (cb != null) {
            this.logConfigCallback.put(stormId, cb);
        }
        String path = ClusterUtils.logConfigPath(stormId);
        return ClusterUtils.maybeDeserialize(this.stateStorage.get_data(path, cb != null), LogConfig.class);
    }

    @Override
    public void workerHeartbeat(String stormId, String node, Long port, ClusterWorkerHeartbeat info) {
        if (info != null) {
            String path = ClusterUtils.workerbeatPath(stormId, node, port);
            this.stateStorage.set_worker_hb(path, Utils.serialize(info), this.acls);
        }
    }

    @Override
    public void removeWorkerHeartbeat(String stormId, String node, Long port) {
        String path = ClusterUtils.workerbeatPath(stormId, node, port);
        this.stateStorage.delete_worker_hb(path);
    }

    @Override
    public void supervisorHeartbeat(String supervisorId, SupervisorInfo info) {
        String path = ClusterUtils.supervisorPath(supervisorId);
        this.stateStorage.set_ephemeral_node(path, Utils.serialize(info), this.acls);
    }

    @Override
    public void workerBackpressure(String stormId, String node, Long port, boolean on) {
        String path = ClusterUtils.backpressurePath(stormId, node, port);
        boolean existed = this.stateStorage.node_exists(path, false);
        if (existed) {
            if (!on) {
                this.stateStorage.delete_node(path);
            }
        } else if (on) {
            this.stateStorage.set_ephemeral_node(path, null, this.acls);
        }
    }

    @Override
    public boolean topologyBackpressure(String stormId, Runnable callback) {
        if (callback != null) {
            this.backPressureCallback.put(stormId, callback);
        }
        String path = ClusterUtils.backpressureStormRoot(stormId);
        List<Object> childrens = null;
        childrens = this.stateStorage.node_exists(path, false) ? this.stateStorage.get_children(path, callback != null) : new ArrayList();
        return childrens.size() > 0;
    }

    @Override
    public void setupBackpressure(String stormId) {
        this.stateStorage.mkdirs(ClusterUtils.backpressureStormRoot(stormId), this.acls);
    }

    @Override
    public void removeBackpressure(String stormId) {
        try {
            this.stateStorage.delete_node(ClusterUtils.backpressureStormRoot(stormId));
        }
        catch (Exception e) {
            if (Utils.exceptionCauseIsInstanceOf(KeeperException.class, e)) {
                LOG.warn("Could not teardown backpressure node for {}.", (Object)stormId);
            }
            throw e;
        }
    }

    @Override
    public void removeWorkerBackpressure(String stormId, String node, Long port) {
        String path = ClusterUtils.backpressurePath(stormId, node, port);
        boolean existed = this.stateStorage.node_exists(path, false);
        if (existed) {
            this.stateStorage.delete_node(path);
        }
    }

    @Override
    public void activateStorm(String stormId, StormBase stormBase) {
        String path = ClusterUtils.stormPath(stormId);
        this.stateStorage.set_data(path, Utils.serialize(stormBase), this.acls);
    }

    @Override
    public void updateStorm(String stormId, StormBase newElems) {
        StormBase stormBase = this.stormBase(stormId, null);
        if (stormBase.get_component_executors() != null) {
            HashMap<String, Integer> newComponentExecutors = new HashMap<String, Integer>();
            Map<String, Integer> componentExecutors = newElems.get_component_executors();
            for (Map.Entry<String, Integer> entry : componentExecutors.entrySet()) {
                newComponentExecutors.put(entry.getKey(), entry.getValue());
            }
            for (Map.Entry<String, Integer> entry : stormBase.get_component_executors().entrySet()) {
                if (componentExecutors.containsKey(entry.getKey())) continue;
                newComponentExecutors.put(entry.getKey(), entry.getValue());
            }
            if (newComponentExecutors.size() > 0) {
                newElems.set_component_executors(newComponentExecutors);
            }
        }
        HashMap<String, DebugOptions> ComponentDebug = new HashMap<String, DebugOptions>();
        Map<String, DebugOptions> oldComponentDebug = stormBase.get_component_debug();
        Map<String, DebugOptions> newComponentDebug = newElems.get_component_debug();
        HashSet<String> debugOptionsKeys = new HashSet<String>();
        debugOptionsKeys.addAll(oldComponentDebug.keySet());
        debugOptionsKeys.addAll(newComponentDebug.keySet());
        for (String key : debugOptionsKeys) {
            boolean enable = false;
            double samplingpct = 0.0;
            if (oldComponentDebug.containsKey(key)) {
                enable = oldComponentDebug.get(key).is_enable();
                samplingpct = oldComponentDebug.get(key).get_samplingpct();
            }
            if (newComponentDebug.containsKey(key)) {
                enable = newComponentDebug.get(key).is_enable();
                samplingpct += newComponentDebug.get(key).get_samplingpct();
            }
            DebugOptions debugOptions = new DebugOptions();
            debugOptions.set_enable(enable);
            debugOptions.set_samplingpct(samplingpct);
            ComponentDebug.put(key, debugOptions);
        }
        if (ComponentDebug.size() > 0) {
            newElems.set_component_debug(ComponentDebug);
        }
        if (StringUtils.isBlank((String)newElems.get_name())) {
            newElems.set_name(stormBase.get_name());
        }
        if (newElems.get_status() == null) {
            newElems.set_status(stormBase.get_status());
        }
        if (newElems.get_num_workers() == 0) {
            newElems.set_num_workers(stormBase.get_num_workers());
        }
        if (newElems.get_launch_time_secs() == 0) {
            newElems.set_launch_time_secs(stormBase.get_launch_time_secs());
        }
        if (StringUtils.isBlank((String)newElems.get_owner())) {
            newElems.set_owner(stormBase.get_owner());
        }
        if (newElems.get_topology_action_options() == null) {
            newElems.set_topology_action_options(stormBase.get_topology_action_options());
        }
        if (newElems.get_status() == null) {
            newElems.set_status(stormBase.get_status());
        }
        this.stateStorage.set_data(ClusterUtils.stormPath(stormId), Utils.serialize(newElems), this.acls);
    }

    @Override
    public void removeStormBase(String stormId) {
        this.stateStorage.delete_node(ClusterUtils.stormPath(stormId));
    }

    @Override
    public void setAssignment(String stormId, Assignment info) {
        this.stateStorage.set_data(ClusterUtils.assignmentPath(stormId), Utils.serialize(info), this.acls);
    }

    @Override
    public void setupBlobstore(String key, NimbusInfo nimbusInfo, Integer versionInfo) {
        String path = ClusterUtils.blobstorePath(key) + "/" + nimbusInfo.toHostPortString() + "-" + versionInfo;
        LOG.info("set-path: {}", (Object)path);
        this.stateStorage.mkdirs(ClusterUtils.blobstorePath(key), this.acls);
        this.stateStorage.delete_node_blobstore(ClusterUtils.blobstorePath(key), nimbusInfo.toHostPortString());
        this.stateStorage.set_ephemeral_node(path, null, this.acls);
    }

    @Override
    public List<String> activeKeys() {
        return this.stateStorage.get_children("/blobstore", false);
    }

    @Override
    public List<String> blobstore(Runnable callback) {
        if (callback != null) {
            this.blobstoreCallback.set(callback);
        }
        this.stateStorage.sync_path("/blobstore");
        return this.stateStorage.get_children("/blobstore", callback != null);
    }

    @Override
    public void removeStorm(String stormId) {
        this.stateStorage.delete_node(ClusterUtils.assignmentPath(stormId));
        this.stateStorage.delete_node(ClusterUtils.credentialsPath(stormId));
        this.stateStorage.delete_node(ClusterUtils.logConfigPath(stormId));
        this.stateStorage.delete_node(ClusterUtils.profilerConfigPath(stormId));
        this.removeStormBase(stormId);
    }

    @Override
    public void removeBlobstoreKey(String blobKey) {
        LOG.debug("remove key {}", (Object)blobKey);
        this.stateStorage.delete_node(ClusterUtils.blobstorePath(blobKey));
    }

    @Override
    public void removeKeyVersion(String blobKey) {
        this.stateStorage.delete_node(ClusterUtils.blobstoreMaxKeySequenceNumberPath(blobKey));
    }

    @Override
    public void reportError(String stormId, String componentId, String node, Long port, Throwable error) {
        String path = ClusterUtils.errorPath(stormId, componentId);
        String lastErrorPath = ClusterUtils.lastErrorPath(stormId, componentId);
        ErrorInfo errorInfo = new ErrorInfo(ClusterUtils.stringifyError(error), Time.currentTimeSecs());
        errorInfo.set_host(node);
        errorInfo.set_port(port.intValue());
        byte[] serData = Utils.serialize(errorInfo);
        this.stateStorage.mkdirs(path, this.acls);
        this.stateStorage.create_sequential(path + "/" + "e", serData, this.acls);
        this.stateStorage.set_data(lastErrorPath, serData, this.acls);
        List<String> childrens = this.stateStorage.get_children(path, false);
        Collections.sort(childrens, new Comparator<String>(){

            @Override
            public int compare(String arg0, String arg1) {
                return Long.compare(Long.parseLong(arg0.substring(1)), Long.parseLong(arg1.substring(1)));
            }
        });
        while (childrens.size() > 10) {
            this.stateStorage.delete_node(path + "/" + childrens.remove(0));
        }
    }

    @Override
    public List<ErrorInfo> errors(String stormId, String componentId) {
        ArrayList<ErrorInfo> errorInfos = new ArrayList<ErrorInfo>();
        String path = ClusterUtils.errorPath(stormId, componentId);
        if (this.stateStorage.node_exists(path, false)) {
            List<String> childrens = this.stateStorage.get_children(path, false);
            for (String child : childrens) {
                String childPath = path + "/" + child;
                ErrorInfo errorInfo = ClusterUtils.maybeDeserialize(this.stateStorage.get_data(childPath, false), ErrorInfo.class);
                if (errorInfo == null) continue;
                errorInfos.add(errorInfo);
            }
        }
        Collections.sort(errorInfos, new Comparator<ErrorInfo>(){

            @Override
            public int compare(ErrorInfo arg0, ErrorInfo arg1) {
                return Integer.compare(arg1.get_error_time_secs(), arg0.get_error_time_secs());
            }
        });
        return errorInfos;
    }

    @Override
    public ErrorInfo lastError(String stormId, String componentId) {
        String path = ClusterUtils.lastErrorPath(stormId, componentId);
        if (this.stateStorage.node_exists(path, false)) {
            ErrorInfo errorInfo = ClusterUtils.maybeDeserialize(this.stateStorage.get_data(path, false), ErrorInfo.class);
            return errorInfo;
        }
        return null;
    }

    @Override
    public void setCredentials(String stormId, Credentials creds, Map<String, Object> topoConf) throws NoSuchAlgorithmException {
        List<ACL> aclList = ClusterUtils.mkTopoOnlyAcls(topoConf);
        String path = ClusterUtils.credentialsPath(stormId);
        this.stateStorage.set_data(path, Utils.serialize(creds), aclList);
    }

    @Override
    public Credentials credentials(String stormId, Runnable callback) {
        if (callback != null) {
            this.credentialsCallback.put(stormId, callback);
        }
        String path = ClusterUtils.credentialsPath(stormId);
        return ClusterUtils.maybeDeserialize(this.stateStorage.get_data(path, callback != null), Credentials.class);
    }

    @Override
    public void disconnect() {
        this.stateStorage.unregister(this.stateId);
        if (this.solo) {
            this.stateStorage.close();
        }
    }

    private List<String> tokenizePath(String path) {
        String[] toks = path.split("/");
        ArrayList<String> rtn = new ArrayList<String>();
        for (String str : toks) {
            if (str.isEmpty()) continue;
            rtn.add(str);
        }
        return rtn;
    }
}

