/*
 * Decompiled with CFR 0.152.
 */
package org.apache.storm.executor.spout;

import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import org.apache.storm.daemon.Task;
import org.apache.storm.executor.TupleInfo;
import org.apache.storm.executor.spout.SpoutExecutor;
import org.apache.storm.spout.ISpout;
import org.apache.storm.spout.ISpoutOutputCollector;
import org.apache.storm.tuple.MessageId;
import org.apache.storm.tuple.TupleImpl;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.MutableLong;
import org.apache.storm.utils.RotatingMap;
import org.apache.storm.utils.Utils;

public class SpoutOutputCollectorImpl
implements ISpoutOutputCollector {
    private final SpoutExecutor executor;
    private final Task taskData;
    private final int taskId;
    private final MutableLong emittedCount;
    private final boolean hasAckers;
    private final Random random;
    private final Boolean isEventLoggers;
    private final Boolean isDebug;
    private final RotatingMap<Long, TupleInfo> pending;

    public SpoutOutputCollectorImpl(ISpout spout, SpoutExecutor executor, Task taskData, int taskId, MutableLong emittedCount, boolean hasAckers, Random random, Boolean isEventLoggers, Boolean isDebug, RotatingMap<Long, TupleInfo> pending) {
        this.executor = executor;
        this.taskData = taskData;
        this.taskId = taskId;
        this.emittedCount = emittedCount;
        this.hasAckers = hasAckers;
        this.random = random;
        this.isEventLoggers = isEventLoggers;
        this.isDebug = isDebug;
        this.pending = pending;
    }

    @Override
    public List<Integer> emit(String streamId, List<Object> tuple, Object messageId) {
        return this.sendSpoutMsg(streamId, tuple, messageId, null);
    }

    @Override
    public void emitDirect(int taskId, String streamId, List<Object> tuple, Object messageId) {
        this.sendSpoutMsg(streamId, tuple, messageId, taskId);
    }

    @Override
    public long getPendingCount() {
        return this.pending.size();
    }

    @Override
    public void reportError(Throwable error) {
        this.executor.getReportError().report(error);
    }

    private List<Integer> sendSpoutMsg(String stream, List<Object> values, Object messageId, Integer outTaskId) {
        TupleInfo info;
        this.emittedCount.increment();
        List<Integer> outTasks = outTaskId != null ? this.taskData.getOutgoingTasks(outTaskId, stream, values) : this.taskData.getOutgoingTasks(stream, values);
        ArrayList<Long> ackSeq = new ArrayList<Long>();
        boolean needAck = messageId != null && this.hasAckers;
        long rootId = MessageId.generateId(this.random);
        for (Integer t : outTasks) {
            MessageId msgId;
            if (needAck) {
                long as = MessageId.generateId(this.random);
                msgId = MessageId.makeRootId(rootId, as);
                ackSeq.add(as);
            } else {
                msgId = MessageId.makeUnanchored();
            }
            TupleImpl tuple = new TupleImpl(this.executor.getWorkerTopologyContext(), values, this.taskId, stream, msgId);
            this.executor.getExecutorTransfer().transfer(t, tuple);
        }
        if (this.isEventLoggers.booleanValue()) {
            this.executor.sendToEventLogger(this.executor, this.taskData, values, this.executor.getComponentId(), messageId, this.random);
        }
        boolean sample = false;
        try {
            sample = this.executor.getSampler().call();
        }
        catch (Exception t) {
            // empty catch block
        }
        if (needAck) {
            info = new TupleInfo();
            info.setTaskId(this.taskId);
            info.setStream(stream);
            info.setMessageId(messageId);
            if (this.isDebug.booleanValue()) {
                info.setValues(values);
            }
            if (sample) {
                info.setTimestamp(System.currentTimeMillis());
            }
            this.pending.put(rootId, info);
            Values ackInitTuple = new Values(rootId, Utils.bitXorVals(ackSeq), this.taskId);
            this.executor.sendUnanchored(this.taskData, "__ack_init", ackInitTuple, this.executor.getExecutorTransfer());
        } else if (messageId != null) {
            info = new TupleInfo();
            info.setStream(stream);
            info.setValues(values);
            info.setMessageId(messageId);
            info.setTimestamp(0L);
            Long timeDelta = sample ? Long.valueOf(0L) : null;
            info.setId("0:");
            this.executor.ackSpoutMsg(this.executor, this.taskData, timeDelta, info);
        }
        return outTasks;
    }
}

