/*
 * Decompiled with CFR 0.152.
 */
package org.apache.storm.pacemaker;

import com.codahale.metrics.ExponentiallyDecayingReservoir;
import com.codahale.metrics.Histogram;
import com.codahale.metrics.Meter;
import com.codahale.metrics.Reservoir;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.storm.generated.HBMessage;
import org.apache.storm.generated.HBMessageData;
import org.apache.storm.generated.HBNodes;
import org.apache.storm.generated.HBPulse;
import org.apache.storm.generated.HBServerMessageType;
import org.apache.storm.metric.StormMetricsRegistry;
import org.apache.storm.pacemaker.IServerMessageHandler;
import org.apache.storm.pacemaker.PacemakerServer;
import org.apache.storm.utils.ConfigUtils;
import org.apache.storm.utils.Utils;
import org.apache.storm.utils.VersionInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import uk.org.lidalia.sysoutslf4j.context.SysOutOverSLF4J;

public class Pacemaker
implements IServerMessageHandler {
    private static final Logger LOG = LoggerFactory.getLogger(Pacemaker.class);
    private Map<String, byte[]> heartbeats = new ConcurrentHashMap<String, byte[]>();
    private Map<String, Object> conf;
    private static final Meter meterSendPulseCount = StormMetricsRegistry.registerMeter("pacemaker:send-pulse-count");
    private static final Meter meterTotalReceivedSize = StormMetricsRegistry.registerMeter("pacemaker:total-receive-size");
    private static final Meter meterGetPulseCount = StormMetricsRegistry.registerMeter("pacemaker:get-pulse=count");
    private static final Meter meterTotalSentSize = StormMetricsRegistry.registerMeter("pacemaker:total-sent-size");
    private static final Histogram histogramHeartbeatSize = StormMetricsRegistry.registerHistogram("pacemaker:heartbeat-size", (Reservoir)new ExponentiallyDecayingReservoir());

    public Pacemaker(Map<String, Object> conf) {
        this.conf = conf;
        StormMetricsRegistry.registerGauge("pacemaker:size-total-keys", new Callable(){

            public Integer call() throws Exception {
                return Pacemaker.this.heartbeats.size();
            }
        });
        StormMetricsRegistry.startMetricsReporters(conf);
    }

    @Override
    public HBMessage handleMessage(HBMessage m, boolean authenticated) {
        HBMessage response = null;
        HBMessageData data = m.get_data();
        switch (m.get_type()) {
            case CREATE_PATH: {
                response = this.createPath(data.get_path());
                break;
            }
            case EXISTS: {
                response = this.pathExists(data.get_path(), authenticated);
                break;
            }
            case SEND_PULSE: {
                response = this.sendPulse(data.get_pulse());
                break;
            }
            case GET_ALL_PULSE_FOR_PATH: {
                response = this.getAllPulseForPath(data.get_path(), authenticated);
                break;
            }
            case GET_ALL_NODES_FOR_PATH: {
                response = this.getAllNodesForPath(data.get_path(), authenticated);
                break;
            }
            case GET_PULSE: {
                response = this.getPulse(data.get_path(), authenticated);
                break;
            }
            case DELETE_PATH: {
                response = this.deletePath(data.get_path());
                break;
            }
            case DELETE_PULSE_ID: {
                response = this.deletePulseId(data.get_path());
                break;
            }
            default: {
                LOG.info("Got Unexpected Type: {}", (Object)m.get_type());
            }
        }
        if (response != null) {
            response.set_message_id(m.get_message_id());
        }
        return response;
    }

    private HBMessage createPath(String path) {
        return new HBMessage(HBServerMessageType.CREATE_PATH_RESPONSE, null);
    }

    private HBMessage pathExists(String path, boolean authenticated) {
        HBMessage response = null;
        if (authenticated) {
            boolean itDoes = this.heartbeats.containsKey(path);
            LOG.debug("Checking if path [ {} ] exists... {} .", (Object)path, (Object)itDoes);
            response = new HBMessage(HBServerMessageType.EXISTS_RESPONSE, HBMessageData.boolval((boolean)itDoes));
        } else {
            response = this.notAuthorized();
        }
        return response;
    }

    private HBMessage notAuthorized() {
        return new HBMessage(HBServerMessageType.NOT_AUTHORIZED, null);
    }

    private HBMessage sendPulse(HBPulse pulse) {
        String id = pulse.get_id();
        byte[] details = pulse.get_details();
        LOG.debug("Saving Pulse for id [ {} ] data [ {} ].", (Object)id, (Object)details);
        meterSendPulseCount.mark();
        meterTotalReceivedSize.mark((long)details.length);
        histogramHeartbeatSize.update(details.length);
        this.heartbeats.put(id, details);
        return new HBMessage(HBServerMessageType.SEND_PULSE_RESPONSE, null);
    }

    private HBMessage getAllPulseForPath(String path, boolean authenticated) {
        if (authenticated) {
            return new HBMessage(HBServerMessageType.GET_ALL_PULSE_FOR_PATH_RESPONSE, null);
        }
        return this.notAuthorized();
    }

    private HBMessage getAllNodesForPath(String path, boolean authenticated) {
        LOG.debug("List all nodes for path {}", (Object)path);
        if (authenticated) {
            HashSet<String> pulseIds = new HashSet<String>();
            for (String key : this.heartbeats.keySet()) {
                String[] replaceStr = key.replaceFirst(path, "").split("/");
                String trimmed = null;
                for (String str : replaceStr) {
                    if (str.equals("")) continue;
                    trimmed = str;
                    break;
                }
                if (trimmed == null || key.indexOf(path) != 0) continue;
                pulseIds.add(trimmed);
            }
            HBMessageData hbMessageData = HBMessageData.nodes((HBNodes)new HBNodes(new ArrayList(pulseIds)));
            return new HBMessage(HBServerMessageType.GET_ALL_NODES_FOR_PATH_RESPONSE, hbMessageData);
        }
        return this.notAuthorized();
    }

    private HBMessage getPulse(String path, boolean authenticated) {
        if (authenticated) {
            byte[] details = this.heartbeats.get(path);
            LOG.debug("Getting Pulse for path [ {} ]...data [ {} ].", (Object)path, (Object)details);
            meterGetPulseCount.mark();
            if (details != null) {
                meterTotalSentSize.mark((long)details.length);
            }
            HBPulse hbPulse = new HBPulse();
            hbPulse.set_id(path);
            hbPulse.set_details(details);
            return new HBMessage(HBServerMessageType.GET_PULSE_RESPONSE, HBMessageData.pulse((HBPulse)hbPulse));
        }
        return this.notAuthorized();
    }

    private HBMessage deletePath(String path) {
        String prefix = path.endsWith("/") ? path : path + "/";
        for (String key : this.heartbeats.keySet()) {
            String checkKey = key + "/";
            if (checkKey.indexOf(prefix) != 0) continue;
            this.deletePulseId(key);
        }
        return new HBMessage(HBServerMessageType.DELETE_PATH_RESPONSE, null);
    }

    private HBMessage deletePulseId(String path) {
        LOG.debug("Deleting Pulse for id [ {} ].", (Object)path);
        this.heartbeats.remove(path);
        return new HBMessage(HBServerMessageType.DELETE_PULSE_ID_RESPONSE, null);
    }

    private PacemakerServer launchServer() {
        LOG.info("Starting pacemaker server for storm version '{}", (Object)VersionInfo.getVersion());
        return new PacemakerServer(this, this.conf);
    }

    public static void main(String[] args) {
        SysOutOverSLF4J.sendSystemOutAndErrToSLF4J();
        Map conf = ConfigUtils.overrideLoginConfigWithSystemProperty((Map)Utils.readStormConfig());
        Pacemaker serverHandler = new Pacemaker(conf);
        serverHandler.launchServer();
    }
}

