/*
 * Decompiled with CFR 0.152.
 */
package org.apache.storm.executor.spout;

import com.google.common.collect.ImmutableMap;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.storm.ICredentialsListener;
import org.apache.storm.daemon.StormCommon;
import org.apache.storm.daemon.Task;
import org.apache.storm.daemon.metrics.BuiltinMetricsUtil;
import org.apache.storm.daemon.metrics.SpoutThrottlingMetrics;
import org.apache.storm.daemon.worker.WorkerState;
import org.apache.storm.executor.Executor;
import org.apache.storm.executor.TupleInfo;
import org.apache.storm.executor.spout.SpoutOutputCollectorImpl;
import org.apache.storm.hooks.info.SpoutAckInfo;
import org.apache.storm.hooks.info.SpoutFailInfo;
import org.apache.storm.spout.ISpout;
import org.apache.storm.spout.ISpoutWaitStrategy;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.stats.SpoutExecutorStats;
import org.apache.storm.tuple.TupleImpl;
import org.apache.storm.utils.MutableLong;
import org.apache.storm.utils.ObjectReader;
import org.apache.storm.utils.ReflectionUtils;
import org.apache.storm.utils.RotatingMap;
import org.apache.storm.utils.Time;
import org.apache.storm.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SpoutExecutor
extends Executor {
    private static final Logger LOG = LoggerFactory.getLogger(SpoutExecutor.class);
    private final ISpoutWaitStrategy spoutWaitStrategy;
    private Integer maxSpoutPending;
    private final AtomicBoolean lastActive;
    private List<ISpout> spouts;
    private List<SpoutOutputCollector> outputCollectors;
    private final MutableLong emittedCount;
    private final MutableLong emptyEmitStreak;
    private final SpoutThrottlingMetrics spoutThrottlingMetrics;
    private final boolean hasAckers;
    private RotatingMap<Long, TupleInfo> pending;
    private final boolean backPressureEnabled;

    public SpoutExecutor(WorkerState workerData, List<Long> executorId, Map<String, String> credentials) {
        super(workerData, executorId, credentials);
        this.spoutWaitStrategy = (ISpoutWaitStrategy)ReflectionUtils.newInstance((String)this.topoConf.get("topology.spout.wait.strategy"));
        this.spoutWaitStrategy.prepare(this.topoConf);
        this.backPressureEnabled = ObjectReader.getBoolean(this.topoConf.get("topology.backpressure.enable"), false);
        this.lastActive = new AtomicBoolean(false);
        this.hasAckers = StormCommon.hasAckers(this.topoConf);
        this.emittedCount = new MutableLong(0L);
        this.emptyEmitStreak = new MutableLong(0L);
        this.spoutThrottlingMetrics = new SpoutThrottlingMetrics();
    }

    public void init(final Map<Integer, Task> idToTask) {
        while (!this.stormActive.get()) {
            Utils.sleep(100L);
        }
        LOG.info("Opening spout {}:{}", (Object)this.componentId, idToTask.keySet());
        this.idToTask = idToTask;
        this.maxSpoutPending = ObjectReader.getInt(this.topoConf.get("topology.max.spout.pending"), 0) * idToTask.size();
        this.spouts = new ArrayList<ISpout>();
        for (Task task : idToTask.values()) {
            this.spouts.add((ISpout)task.getTaskObject());
        }
        this.pending = new RotatingMap<Long, TupleInfo>(2, new RotatingMap.ExpiredCallback<Long, TupleInfo>(){

            @Override
            public void expire(Long key, TupleInfo tupleInfo) {
                Long timeDelta = null;
                if (tupleInfo.getTimestamp() != 0L) {
                    timeDelta = Time.deltaMs(tupleInfo.getTimestamp());
                }
                SpoutExecutor.this.failSpoutMsg(SpoutExecutor.this, (Task)idToTask.get(tupleInfo.getTaskId()), timeDelta, tupleInfo, "TIMEOUT");
            }
        });
        this.spoutThrottlingMetrics.registerAll(this.topoConf, idToTask.values().iterator().next().getUserContext());
        this.outputCollectors = new ArrayList<SpoutOutputCollector>();
        for (Map.Entry entry : idToTask.entrySet()) {
            Task taskData = (Task)entry.getValue();
            ISpout spoutObject = (ISpout)taskData.getTaskObject();
            SpoutOutputCollectorImpl spoutOutputCollector = new SpoutOutputCollectorImpl(spoutObject, this, taskData, (Integer)entry.getKey(), this.emittedCount, this.hasAckers, this.rand, this.hasEventLoggers, this.isDebug, this.pending);
            SpoutOutputCollector outputCollector = new SpoutOutputCollector(spoutOutputCollector);
            this.outputCollectors.add(outputCollector);
            taskData.getBuiltInMetrics().registerAll(this.topoConf, taskData.getUserContext());
            ImmutableMap map = ImmutableMap.of((Object)"sendqueue", (Object)this.transferQueue, (Object)"receive", (Object)this.receiveQueue);
            BuiltinMetricsUtil.registerQueueMetrics((Map)map, this.topoConf, taskData.getUserContext());
            if (spoutObject instanceof ICredentialsListener) {
                ((ICredentialsListener)((Object)spoutObject)).setCredentials(this.credentials);
            }
            spoutObject.open(this.topoConf, taskData.getUserContext(), outputCollector);
        }
        this.openOrPrepareWasCalled.set(true);
        LOG.info("Opened spout {}:{}", (Object)this.componentId, idToTask.keySet());
        this.setupMetrics();
    }

    public Callable<Object> call() throws Exception {
        this.init(this.idToTask);
        return new Callable<Object>(){

            @Override
            public Object call() throws Exception {
                SpoutExecutor.this.receiveQueue.consumeBatch(SpoutExecutor.this);
                long currCount = SpoutExecutor.this.emittedCount.get();
                boolean throttleOn = SpoutExecutor.this.backPressureEnabled && SpoutExecutor.this.throttleOn.get();
                boolean reachedMaxSpoutPending = SpoutExecutor.this.maxSpoutPending != 0 && SpoutExecutor.this.pending.size() >= SpoutExecutor.this.maxSpoutPending;
                boolean isActive = SpoutExecutor.this.stormActive.get();
                if (isActive) {
                    if (!SpoutExecutor.this.lastActive.get()) {
                        SpoutExecutor.this.lastActive.set(true);
                        LOG.info("Activating spout {}:{}", (Object)SpoutExecutor.this.componentId, SpoutExecutor.this.idToTask.keySet());
                        for (ISpout spout : SpoutExecutor.this.spouts) {
                            spout.activate();
                        }
                    }
                    if (!(SpoutExecutor.this.transferQueue.isFull() || throttleOn || reachedMaxSpoutPending)) {
                        for (ISpout spout : SpoutExecutor.this.spouts) {
                            spout.nextTuple();
                        }
                    }
                } else {
                    if (SpoutExecutor.this.lastActive.get()) {
                        SpoutExecutor.this.lastActive.set(false);
                        LOG.info("Deactivating spout {}:{}", (Object)SpoutExecutor.this.componentId, SpoutExecutor.this.idToTask.keySet());
                        for (ISpout spout : SpoutExecutor.this.spouts) {
                            spout.deactivate();
                        }
                    }
                    Time.sleep(100L);
                    SpoutExecutor.this.spoutThrottlingMetrics.skippedInactive(SpoutExecutor.this.stats);
                }
                if (currCount == SpoutExecutor.this.emittedCount.get() && isActive) {
                    SpoutExecutor.this.emptyEmitStreak.increment();
                    SpoutExecutor.this.spoutWaitStrategy.emptyEmit(SpoutExecutor.this.emptyEmitStreak.get());
                    if (throttleOn) {
                        SpoutExecutor.this.spoutThrottlingMetrics.skippedThrottle(SpoutExecutor.this.stats);
                    } else if (reachedMaxSpoutPending) {
                        SpoutExecutor.this.spoutThrottlingMetrics.skippedMaxSpout(SpoutExecutor.this.stats);
                    }
                } else {
                    SpoutExecutor.this.emptyEmitStreak.set(0L);
                }
                return 0L;
            }
        };
    }

    @Override
    public void tupleActionFn(int taskId, TupleImpl tuple) throws Exception {
        String streamId = tuple.getSourceStreamId();
        if (streamId.equals("__tick")) {
            this.pending.rotate();
        } else if (streamId.equals("__metrics_tick")) {
            this.metricsTick((Task)this.idToTask.get(taskId), tuple);
        } else if (streamId.equals("__credentials")) {
            Object spoutObj = ((Task)this.idToTask.get(taskId)).getTaskObject();
            if (spoutObj instanceof ICredentialsListener) {
                ((ICredentialsListener)spoutObj).setCredentials((Map)tuple.getValue(0));
            }
        } else if (streamId.equals("__ack_reset_timeout")) {
            Long id = (Long)tuple.getValue(0);
            TupleInfo pendingForId = this.pending.get(id);
            if (pendingForId != null) {
                this.pending.put(id, pendingForId);
            }
        } else {
            Long id = (Long)tuple.getValue(0);
            Long timeDeltaMs = (Long)tuple.getValue(1);
            TupleInfo tupleInfo = (TupleInfo)this.pending.remove(id);
            if (tupleInfo != null && tupleInfo.getMessageId() != null) {
                if (taskId != tupleInfo.getTaskId()) {
                    throw new RuntimeException("Fatal error, mismatched task ids: " + taskId + " " + tupleInfo.getTaskId());
                }
                long startTimeMs = tupleInfo.getTimestamp();
                Long timeDelta = null;
                if (startTimeMs != 0L) {
                    timeDelta = timeDeltaMs;
                }
                if (streamId.equals("__ack_ack")) {
                    this.ackSpoutMsg(this, (Task)this.idToTask.get(taskId), timeDelta, tupleInfo);
                } else if (streamId.equals("__ack_fail")) {
                    this.failSpoutMsg(this, (Task)this.idToTask.get(taskId), timeDelta, tupleInfo, "FAIL-STREAM");
                }
            }
        }
    }

    public void ackSpoutMsg(Executor executor, Task taskData, Long timeDelta, TupleInfo tupleInfo) {
        try {
            ISpout spout = (ISpout)taskData.getTaskObject();
            int taskId = taskData.getTaskId();
            if (executor.getIsDebug().booleanValue()) {
                LOG.info("SPOUT Acking message {} {}", (Object)tupleInfo.getId(), tupleInfo.getMessageId());
            }
            spout.ack(tupleInfo.getMessageId());
            new SpoutAckInfo(tupleInfo.getMessageId(), taskId, timeDelta).applyOn(taskData.getUserContext());
            if (timeDelta != null) {
                ((SpoutExecutorStats)executor.getStats()).spoutAckedTuple(tupleInfo.getStream(), timeDelta);
            }
        }
        catch (Exception e) {
            throw Utils.wrapInRuntime(e);
        }
    }

    public void failSpoutMsg(Executor executor, Task taskData, Long timeDelta, TupleInfo tupleInfo, String reason) {
        try {
            ISpout spout = (ISpout)taskData.getTaskObject();
            int taskId = taskData.getTaskId();
            if (executor.getIsDebug().booleanValue()) {
                LOG.info("SPOUT Failing {} : {} REASON: {}", new Object[]{tupleInfo.getId(), tupleInfo, reason});
            }
            spout.fail(tupleInfo.getMessageId());
            new SpoutFailInfo(tupleInfo.getMessageId(), taskId, timeDelta).applyOn(taskData.getUserContext());
            if (timeDelta != null) {
                ((SpoutExecutorStats)executor.getStats()).spoutFailedTuple(tupleInfo.getStream(), timeDelta);
            }
        }
        catch (Exception e) {
            throw Utils.wrapInRuntime(e);
        }
    }
}

