/*
 * Decompiled with CFR 0.152.
 */
package org.apache.storm.executor;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.dsl.ProducerType;
import java.io.IOException;
import java.lang.reflect.Field;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.storm.Config;
import org.apache.storm.StormTimer;
import org.apache.storm.cluster.ClusterStateContext;
import org.apache.storm.cluster.ClusterUtils;
import org.apache.storm.cluster.DaemonType;
import org.apache.storm.cluster.IStormClusterState;
import org.apache.storm.daemon.GrouperFactory;
import org.apache.storm.daemon.StormCommon;
import org.apache.storm.daemon.Task;
import org.apache.storm.daemon.worker.WorkerState;
import org.apache.storm.executor.ExecutorShutdown;
import org.apache.storm.executor.ExecutorTransfer;
import org.apache.storm.executor.bolt.BoltExecutor;
import org.apache.storm.executor.error.IReportError;
import org.apache.storm.executor.error.ReportError;
import org.apache.storm.executor.error.ReportErrorAndDie;
import org.apache.storm.executor.spout.SpoutExecutor;
import org.apache.storm.generated.Bolt;
import org.apache.storm.generated.DebugOptions;
import org.apache.storm.generated.Grouping;
import org.apache.storm.generated.SpoutSpec;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.grouping.LoadAwareCustomStreamGrouping;
import org.apache.storm.metric.api.IMetric;
import org.apache.storm.metric.api.IMetricsConsumer;
import org.apache.storm.stats.BoltExecutorStats;
import org.apache.storm.stats.CommonStats;
import org.apache.storm.stats.SpoutExecutorStats;
import org.apache.storm.task.WorkerTopologyContext;
import org.apache.storm.tuple.AddressedTuple;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.TupleImpl;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.ConfigUtils;
import org.apache.storm.utils.DisruptorBackpressureCallback;
import org.apache.storm.utils.DisruptorQueue;
import org.apache.storm.utils.ObjectReader;
import org.apache.storm.utils.Time;
import org.apache.storm.utils.Utils;
import org.apache.storm.utils.WorkerBackpressureThread;
import org.json.simple.JSONValue;
import org.json.simple.parser.ParseException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class Executor
implements Callable,
EventHandler<Object> {
    private static final Logger LOG = LoggerFactory.getLogger(Executor.class);
    protected final WorkerState workerData;
    protected final WorkerTopologyContext workerTopologyContext;
    protected final List<Long> executorId;
    protected final List<Integer> taskIds;
    protected final String componentId;
    protected final AtomicBoolean openOrPrepareWasCalled;
    protected final Map<String, Object> topoConf;
    protected final Map<String, Object> conf;
    protected final String stormId;
    protected final HashMap sharedExecutorData;
    protected final AtomicBoolean stormActive;
    protected final AtomicReference<Map<String, DebugOptions>> stormComponentDebug;
    protected final Runnable suicideFn;
    protected final IStormClusterState stormClusterState;
    protected final Map<Integer, String> taskToComponent;
    protected CommonStats stats;
    protected final Map<Integer, Map<Integer, Map<String, IMetric>>> intervalToTaskToMetricToRegistry;
    protected final Map<String, Map<String, LoadAwareCustomStreamGrouping>> streamToComponentToGrouper;
    protected final ReportErrorAndDie reportErrorDie;
    protected final Callable<Boolean> sampler;
    protected ExecutorTransfer executorTransfer;
    protected final String type;
    protected final AtomicBoolean throttleOn;
    protected final IReportError reportError;
    protected final Random rand;
    protected final DisruptorQueue transferQueue;
    protected final DisruptorQueue receiveQueue;
    protected Map<Integer, Task> idToTask;
    protected final Map<String, String> credentials;
    protected final Boolean isDebug;
    protected final Boolean hasEventLoggers;
    protected String hostname;

    protected Executor(WorkerState workerData, List<Long> executorId, Map<String, String> credentials) {
        this.workerData = workerData;
        this.executorId = executorId;
        this.workerTopologyContext = workerData.getWorkerTopologyContext();
        this.taskIds = StormCommon.executorIdToTasks(executorId);
        this.componentId = this.workerTopologyContext.getComponentId(this.taskIds.get(0));
        this.openOrPrepareWasCalled = new AtomicBoolean(false);
        this.topoConf = this.normalizedComponentConf(workerData.getTopologyConf(), this.workerTopologyContext, this.componentId);
        this.receiveQueue = workerData.getExecutorReceiveQueueMap().get(executorId);
        this.stormId = workerData.getTopologyId();
        this.conf = workerData.getConf();
        this.sharedExecutorData = new HashMap();
        this.stormActive = workerData.getIsTopologyActive();
        this.stormComponentDebug = workerData.getStormComponentToDebug();
        this.transferQueue = this.mkExecutorBatchQueue(this.topoConf, executorId);
        this.executorTransfer = new ExecutorTransfer(workerData, this.transferQueue, this.topoConf);
        this.suicideFn = workerData.getSuicideCallback();
        try {
            this.stormClusterState = ClusterUtils.mkStormClusterState(workerData.getStateStorage(), Utils.getWorkerACL(this.topoConf), new ClusterStateContext(DaemonType.WORKER));
        }
        catch (Exception e) {
            throw Utils.wrapInRuntime(e);
        }
        StormTopology topology = this.workerTopologyContext.getRawTopology();
        Map<String, SpoutSpec> spouts = topology.get_spouts();
        Map<String, Bolt> bolts = topology.get_bolts();
        if (spouts.containsKey(this.componentId)) {
            this.type = "spout";
            this.stats = new SpoutExecutorStats(ConfigUtils.samplingRate(this.topoConf), ObjectReader.getInt(this.topoConf.get("num.stat.buckets")));
        } else if (bolts.containsKey(this.componentId)) {
            this.type = "bolt";
            this.stats = new BoltExecutorStats(ConfigUtils.samplingRate(this.topoConf), ObjectReader.getInt(this.topoConf.get("num.stat.buckets")));
        } else {
            throw new RuntimeException("Could not find " + this.componentId + " in " + topology);
        }
        this.intervalToTaskToMetricToRegistry = new HashMap<Integer, Map<Integer, Map<String, IMetric>>>();
        this.taskToComponent = workerData.getTaskToComponent();
        this.streamToComponentToGrouper = this.outboundComponents(this.workerTopologyContext, this.componentId, this.topoConf);
        this.reportError = new ReportError(this.topoConf, this.stormClusterState, this.stormId, this.componentId, this.workerTopologyContext);
        this.reportErrorDie = new ReportErrorAndDie(this.reportError, this.suicideFn);
        this.sampler = ConfigUtils.mkStatsSampler(this.topoConf);
        this.throttleOn = workerData.getThrottleOn();
        this.isDebug = ObjectReader.getBoolean(this.topoConf.get("topology.debug"), false);
        this.rand = new Random(Utils.secureRandomLong());
        this.credentials = credentials;
        this.hasEventLoggers = StormCommon.hasEventLoggers(this.topoConf);
        try {
            this.hostname = Utils.hostname();
        }
        catch (UnknownHostException ignored) {
            this.hostname = "";
        }
    }

    public static Executor mkExecutor(WorkerState workerState, List<Long> executorId, Map<String, String> credentials) {
        Executor executor;
        List<Integer> taskIds;
        String componentId;
        WorkerTopologyContext workerTopologyContext = workerState.getWorkerTopologyContext();
        String type = Executor.getExecutorType(workerTopologyContext, componentId = workerTopologyContext.getComponentId((taskIds = StormCommon.executorIdToTasks(executorId)).get(0)));
        if ("spout".equals(type)) {
            executor = new SpoutExecutor(workerState, executorId, credentials);
            executor.stats = new SpoutExecutorStats(ConfigUtils.samplingRate(executor.getStormConf()), ObjectReader.getInt(executor.getStormConf().get("num.stat.buckets")));
        } else {
            executor = new BoltExecutor(workerState, executorId, credentials);
            executor.stats = new BoltExecutorStats(ConfigUtils.samplingRate(executor.getStormConf()), ObjectReader.getInt(executor.getStormConf().get("num.stat.buckets")));
        }
        HashMap<Integer, Task> idToTask = new HashMap<Integer, Task>();
        for (Integer taskId : taskIds) {
            try {
                Task task = new Task(executor, taskId);
                executor.sendUnanchored(task, "__system", new Values("startup"), executor.getExecutorTransfer());
                idToTask.put(taskId, task);
            }
            catch (IOException ex) {
                throw Utils.wrapInRuntime(ex);
            }
        }
        executor.idToTask = idToTask;
        return executor;
    }

    private static String getExecutorType(WorkerTopologyContext workerTopologyContext, String componentId) {
        StormTopology topology = workerTopologyContext.getRawTopology();
        Map<String, SpoutSpec> spouts = topology.get_spouts();
        Map<String, Bolt> bolts = topology.get_bolts();
        if (spouts.containsKey(componentId)) {
            return "spout";
        }
        if (bolts.containsKey(componentId)) {
            return "bolt";
        }
        throw new RuntimeException("Could not find " + componentId + " in " + topology);
    }

    public ExecutorShutdown execute() throws Exception {
        LOG.info("Loading executor tasks " + this.componentId + ":" + this.executorId);
        this.registerBackpressure();
        Utils.SmartThread systemThreads = Utils.asyncLoop(this.executorTransfer, this.executorTransfer.getName(), this.reportErrorDie);
        String handlerName = this.componentId + "-executor" + this.executorId;
        Utils.SmartThread handlers = Utils.asyncLoop(this, false, this.reportErrorDie, 5, true, true, handlerName);
        this.setupTicks("spout".equals(this.type));
        LOG.info("Finished loading executor " + this.componentId + ":" + this.executorId);
        return new ExecutorShutdown(this, Lists.newArrayList((Object[])new Utils.SmartThread[]{systemThreads, handlers}), this.idToTask);
    }

    public abstract void tupleActionFn(int var1, TupleImpl var2) throws Exception;

    public void onEvent(Object event, long seq, boolean endOfBatch) throws Exception {
        ArrayList addressedTuples = (ArrayList)event;
        for (AddressedTuple addressedTuple : addressedTuples) {
            TupleImpl tuple = (TupleImpl)addressedTuple.getTuple();
            int taskId = addressedTuple.getDest();
            if (this.isDebug.booleanValue()) {
                LOG.info("Processing received message FOR {} TUPLE: {}", (Object)taskId, (Object)tuple);
            }
            if (taskId != -2) {
                this.tupleActionFn(taskId, tuple);
                continue;
            }
            for (Integer t : this.taskIds) {
                this.tupleActionFn(t, tuple);
            }
        }
    }

    public void metricsTick(Task taskData, TupleImpl tuple) {
        try {
            Integer interval = tuple.getInteger(0);
            int taskId = taskData.getTaskId();
            Map<Integer, Map<String, IMetric>> taskToMetricToRegistry = this.intervalToTaskToMetricToRegistry.get(interval);
            Map<String, IMetric> nameToRegistry = null;
            if (taskToMetricToRegistry != null) {
                nameToRegistry = taskToMetricToRegistry.get(taskId);
            }
            if (nameToRegistry != null) {
                IMetricsConsumer.TaskInfo taskInfo = new IMetricsConsumer.TaskInfo(this.hostname, this.workerTopologyContext.getThisWorkerPort(), this.componentId, taskId, Time.currentTimeSecs(), interval);
                ArrayList<IMetricsConsumer.DataPoint> dataPoints = new ArrayList<IMetricsConsumer.DataPoint>();
                for (Map.Entry<String, IMetric> entry : nameToRegistry.entrySet()) {
                    IMetric metric = entry.getValue();
                    Object value = metric.getValueAndReset();
                    if (value == null) continue;
                    IMetricsConsumer.DataPoint dataPoint = new IMetricsConsumer.DataPoint(entry.getKey(), value);
                    dataPoints.add(dataPoint);
                }
                if (!dataPoints.isEmpty()) {
                    this.sendUnanchored(taskData, "__metrics", new Values(taskInfo, dataPoints), this.executorTransfer);
                }
            }
        }
        catch (Exception e) {
            throw Utils.wrapInRuntime(e);
        }
    }

    protected void setupMetrics() {
        for (final Integer interval : this.intervalToTaskToMetricToRegistry.keySet()) {
            StormTimer timerTask = this.workerData.getUserTimer();
            timerTask.scheduleRecurring(interval, interval, new Runnable(){

                @Override
                public void run() {
                    TupleImpl tuple = new TupleImpl(Executor.this.workerTopologyContext, new Values(new Object[]{interval}), -1, "__metrics_tick");
                    ArrayList metricsTickTuple = Lists.newArrayList((Object[])new AddressedTuple[]{new AddressedTuple(-2, tuple)});
                    Executor.this.receiveQueue.publish(metricsTickTuple);
                }
            });
        }
    }

    public void sendUnanchored(Task task, String stream, List<Object> values, ExecutorTransfer transfer) {
        Tuple tuple = task.getTuple(stream, values);
        List<Integer> tasks = task.getOutgoingTasks(stream, values);
        for (Integer t : tasks) {
            transfer.transfer(t, tuple);
        }
    }

    public void sendToEventLogger(Executor executor, Task taskData, List values, String componentId, Object messageId, Random random) {
        double spct;
        Map<String, DebugOptions> componentDebug = executor.getStormComponentDebug().get();
        DebugOptions debugOptions = componentDebug.get(componentId);
        if (debugOptions == null) {
            debugOptions = componentDebug.get(executor.getStormId());
        }
        double d = spct = debugOptions != null && debugOptions.is_enable() ? debugOptions.get_samplingpct() : 0.0;
        if (spct > 0.0 && random.nextDouble() * 100.0 < spct) {
            this.sendUnanchored(taskData, "__eventlog", new Values(componentId, messageId, System.currentTimeMillis(), values), executor.getExecutorTransfer());
        }
    }

    private void registerBackpressure() {
        this.receiveQueue.registerBackpressureCallback(new DisruptorBackpressureCallback(){

            @Override
            public void highWaterMark() throws Exception {
                LOG.debug("executor " + Executor.this.executorId + " is congested, set backpressure flag true");
                WorkerBackpressureThread.notifyBackpressureChecker(Executor.this.workerData.getBackpressureTrigger());
            }

            @Override
            public void lowWaterMark() throws Exception {
                LOG.debug("executor " + Executor.this.executorId + " is not-congested, set backpressure flag false");
                WorkerBackpressureThread.notifyBackpressureChecker(Executor.this.workerData.getBackpressureTrigger());
            }
        });
        this.receiveQueue.setHighWaterMark(ObjectReader.getDouble(this.topoConf.get("backpressure.disruptor.high.watermark")));
        this.receiveQueue.setLowWaterMark(ObjectReader.getDouble(this.topoConf.get("backpressure.disruptor.low.watermark")));
        this.receiveQueue.setEnableBackpressure(ObjectReader.getBoolean(this.topoConf.get("topology.backpressure.enable"), false));
    }

    protected void setupTicks(boolean isSpout) {
        final Integer tickTimeSecs = ObjectReader.getInt(this.topoConf.get("topology.tick.tuple.freq.secs"), null);
        boolean enableMessageTimeout = (Boolean)this.topoConf.get("topology.enable.message.timeouts");
        if (tickTimeSecs != null) {
            if (Utils.isSystemId(this.componentId) || !enableMessageTimeout && isSpout) {
                LOG.info("Timeouts disabled for executor " + this.componentId + ":" + this.executorId);
            } else {
                StormTimer timerTask = this.workerData.getUserTimer();
                timerTask.scheduleRecurring(tickTimeSecs, tickTimeSecs, new Runnable(){

                    @Override
                    public void run() {
                        TupleImpl tuple = new TupleImpl(Executor.this.workerTopologyContext, new Values(new Object[]{tickTimeSecs}), -1, "__tick");
                        ArrayList tickTuple = Lists.newArrayList((Object[])new AddressedTuple[]{new AddressedTuple(-2, tuple)});
                        Executor.this.receiveQueue.publish(tickTuple);
                    }
                });
            }
        }
    }

    private DisruptorQueue mkExecutorBatchQueue(Map<String, Object> topoConf, List<Long> executorId) {
        int sendSize = ObjectReader.getInt(topoConf.get("topology.executor.send.buffer.size"));
        int waitTimeOutMs = ObjectReader.getInt(topoConf.get("topology.disruptor.wait.timeout.millis"));
        int batchSize = ObjectReader.getInt(topoConf.get("topology.disruptor.batch.size"));
        int batchTimeOutMs = ObjectReader.getInt(topoConf.get("topology.disruptor.batch.timeout.millis"));
        return new DisruptorQueue("executor" + executorId + "-send-queue", ProducerType.SINGLE, sendSize, waitTimeOutMs, batchSize, batchTimeOutMs);
    }

    private Map<String, Map<String, LoadAwareCustomStreamGrouping>> outboundComponents(WorkerTopologyContext workerTopologyContext, String componentId, Map<String, Object> topoConf) {
        HashMap<String, Map<String, LoadAwareCustomStreamGrouping>> ret = new HashMap<String, Map<String, LoadAwareCustomStreamGrouping>>();
        Map<String, Map<String, Grouping>> outputGroupings = workerTopologyContext.getTargets(componentId);
        for (Map.Entry<String, Map<String, Grouping>> entry : outputGroupings.entrySet()) {
            String streamId = entry.getKey();
            Map<String, Grouping> componentGrouping = entry.getValue();
            Fields outFields = workerTopologyContext.getComponentOutputFields(componentId, streamId);
            HashMap<String, LoadAwareCustomStreamGrouping> componentGrouper = new HashMap<String, LoadAwareCustomStreamGrouping>();
            for (Map.Entry<String, Grouping> cg : componentGrouping.entrySet()) {
                String component = cg.getKey();
                Grouping grouping = cg.getValue();
                List<Integer> outTasks = workerTopologyContext.getComponentTasks(component);
                LoadAwareCustomStreamGrouping grouper = GrouperFactory.mkGrouper(workerTopologyContext, componentId, streamId, outFields, grouping, outTasks, topoConf);
                componentGrouper.put(component, grouper);
            }
            if (componentGrouper.size() <= 0) continue;
            ret.put(streamId, componentGrouper);
        }
        for (String stream : workerTopologyContext.getComponentCommon(componentId).get_streams().keySet()) {
            if (ret.containsKey(stream)) continue;
            ret.put(stream, null);
        }
        return ret;
    }

    private Map normalizedComponentConf(Map<String, Object> topoConf, WorkerTopologyContext topologyContext, String componentId) {
        Map componentConf;
        List<Object> keysToRemove = Executor.All_CONFIGS();
        keysToRemove.remove("topology.debug");
        keysToRemove.remove("topology.max.spout.pending");
        keysToRemove.remove("topology.max.task.parallelism");
        keysToRemove.remove("topology.transactional.id");
        keysToRemove.remove("topology.tick.tuple.freq.secs");
        keysToRemove.remove("topology.sleep.spout.wait.strategy.time.ms");
        keysToRemove.remove("topology.spout.wait.strategy");
        keysToRemove.remove("topology.bolts.window.length.count");
        keysToRemove.remove("topology.bolts.window.length.duration.ms");
        keysToRemove.remove("topology.bolts.window.sliding.interval.count");
        keysToRemove.remove("topology.bolts.window.sliding.interval.duration.ms");
        keysToRemove.remove("topology.bolts.tuple.timestamp.max.lag.ms");
        keysToRemove.remove("topology.bolts.message.id.field.name");
        keysToRemove.remove("topology.state.provider");
        keysToRemove.remove("topology.state.provider.config");
        keysToRemove.remove("topology.bolts.late.tuple.stream");
        String specJsonConf = topologyContext.getComponentCommon(componentId).get_json_conf();
        if (specJsonConf != null) {
            try {
                componentConf = (Map)JSONValue.parseWithException((String)specJsonConf);
            }
            catch (ParseException e) {
                throw new RuntimeException(e);
            }
            for (Object p : keysToRemove) {
                componentConf.remove(p);
            }
        } else {
            componentConf = new HashMap();
        }
        HashMap<String, Object> ret = new HashMap<String, Object>();
        ret.putAll(topoConf);
        ret.putAll(componentConf);
        return ret;
    }

    public List<Long> getExecutorId() {
        return this.executorId;
    }

    public List<Integer> getTaskIds() {
        return this.taskIds;
    }

    public String getComponentId() {
        return this.componentId;
    }

    public AtomicBoolean getOpenOrPrepareWasCalled() {
        return this.openOrPrepareWasCalled;
    }

    public Map getStormConf() {
        return this.topoConf;
    }

    public String getStormId() {
        return this.stormId;
    }

    public CommonStats getStats() {
        return this.stats;
    }

    public AtomicBoolean getThrottleOn() {
        return this.throttleOn;
    }

    public String getType() {
        return this.type;
    }

    public Boolean getIsDebug() {
        return this.isDebug;
    }

    public ExecutorTransfer getExecutorTransfer() {
        return this.executorTransfer;
    }

    public IReportError getReportError() {
        return this.reportError;
    }

    public WorkerTopologyContext getWorkerTopologyContext() {
        return this.workerTopologyContext;
    }

    public Callable<Boolean> getSampler() {
        return this.sampler;
    }

    public AtomicReference<Map<String, DebugOptions>> getStormComponentDebug() {
        return this.stormComponentDebug;
    }

    public DisruptorQueue getReceiveQueue() {
        return this.receiveQueue;
    }

    public boolean getBackpressure() {
        return this.receiveQueue.getThrottleOn();
    }

    public DisruptorQueue getTransferWorkerQueue() {
        return this.transferQueue;
    }

    public IStormClusterState getStormClusterState() {
        return this.stormClusterState;
    }

    public WorkerState getWorkerData() {
        return this.workerData;
    }

    public Map<String, Map<String, LoadAwareCustomStreamGrouping>> getStreamToComponentToGrouper() {
        return this.streamToComponentToGrouper;
    }

    public HashMap getSharedExecutorData() {
        return this.sharedExecutorData;
    }

    public Map<Integer, Map<Integer, Map<String, IMetric>>> getIntervalToTaskToMetricToRegistry() {
        return this.intervalToTaskToMetricToRegistry;
    }

    @VisibleForTesting
    public void setLocalExecutorTransfer(ExecutorTransfer executorTransfer) {
        this.executorTransfer = executorTransfer;
    }

    private static List<Object> All_CONFIGS() {
        ArrayList<Object> ret = new ArrayList<Object>();
        Config config = new Config();
        Class<?> ConfigClass = config.getClass();
        Field[] fields = ConfigClass.getFields();
        for (int i = 0; i < fields.length; ++i) {
            try {
                Object obj = fields[i].get(null);
                ret.add(obj);
                continue;
            }
            catch (IllegalArgumentException e) {
                LOG.error(e.getMessage(), (Throwable)e);
                continue;
            }
            catch (IllegalAccessException e) {
                LOG.error(e.getMessage(), (Throwable)e);
            }
        }
        return ret;
    }
}

