/*
 * Decompiled with CFR 0.152.
 */
package org.apache.storm.executor.bolt;

import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import org.apache.storm.daemon.Task;
import org.apache.storm.executor.bolt.BoltExecutor;
import org.apache.storm.hooks.info.BoltAckInfo;
import org.apache.storm.hooks.info.BoltFailInfo;
import org.apache.storm.stats.BoltExecutorStats;
import org.apache.storm.task.IOutputCollector;
import org.apache.storm.tuple.MessageId;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.TupleImpl;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.Time;
import org.apache.storm.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BoltOutputCollectorImpl
implements IOutputCollector {
    private static final Logger LOG = LoggerFactory.getLogger(BoltOutputCollectorImpl.class);
    private final BoltExecutor executor;
    private final Task taskData;
    private final int taskId;
    private final Random random;
    private final boolean isEventLoggers;
    private final boolean isDebug;

    public BoltOutputCollectorImpl(BoltExecutor executor, Task taskData, int taskId, Random random, boolean isEventLoggers, boolean isDebug) {
        this.executor = executor;
        this.taskData = taskData;
        this.taskId = taskId;
        this.random = random;
        this.isEventLoggers = isEventLoggers;
        this.isDebug = isDebug;
    }

    @Override
    public List<Integer> emit(String streamId, Collection<Tuple> anchors, List<Object> tuple) {
        return this.boltEmit(streamId, anchors, tuple, null);
    }

    @Override
    public void emitDirect(int taskId, String streamId, Collection<Tuple> anchors, List<Object> tuple) {
        this.boltEmit(streamId, anchors, tuple, taskId);
    }

    private List<Integer> boltEmit(String streamId, Collection<Tuple> anchors, List<Object> values, Integer targetTaskId) {
        List<Integer> outTasks = targetTaskId != null ? this.taskData.getOutgoingTasks(targetTaskId, streamId, values) : this.taskData.getOutgoingTasks(streamId, values);
        for (Integer t : outTasks) {
            HashMap<Long, Long> anchorsToIds = new HashMap<Long, Long>();
            if (anchors != null) {
                for (Tuple a : anchors) {
                    Set<Long> rootIds = a.getMessageId().getAnchorsToIds().keySet();
                    if (rootIds.size() <= 0) continue;
                    long edgeId = MessageId.generateId(this.random);
                    ((TupleImpl)a).updateAckVal(edgeId);
                    for (Long root_id : rootIds) {
                        this.putXor(anchorsToIds, root_id, edgeId);
                    }
                }
            }
            MessageId msgId = MessageId.makeId(anchorsToIds);
            TupleImpl tupleExt = new TupleImpl(this.executor.getWorkerTopologyContext(), values, this.taskId, streamId, msgId);
            this.executor.getExecutorTransfer().transfer(t, tupleExt);
        }
        if (this.isEventLoggers) {
            this.executor.sendToEventLogger(this.executor, this.taskData, values, this.executor.getComponentId(), null, this.random);
        }
        return outTasks;
    }

    @Override
    public void ack(Tuple input) {
        long ackValue = ((TupleImpl)input).getAckVal();
        Map<Long, Long> anchorsToIds = input.getMessageId().getAnchorsToIds();
        for (Map.Entry<Long, Long> entry : anchorsToIds.entrySet()) {
            this.executor.sendUnanchored(this.taskData, "__ack_ack", new Values(entry.getKey(), Utils.bitXor(entry.getValue(), ackValue)), this.executor.getExecutorTransfer());
        }
        long delta = this.tupleTimeDelta((TupleImpl)input);
        if (this.isDebug) {
            LOG.info("BOLT ack TASK: {} TIME: {} TUPLE: {}", new Object[]{this.taskId, delta, input});
        }
        BoltAckInfo boltAckInfo = new BoltAckInfo(input, this.taskId, delta);
        boltAckInfo.applyOn(this.taskData.getUserContext());
        if (delta >= 0L) {
            ((BoltExecutorStats)this.executor.getStats()).boltAckedTuple(input.getSourceComponent(), input.getSourceStreamId(), delta);
        }
    }

    @Override
    public void fail(Tuple input) {
        Set<Long> roots = input.getMessageId().getAnchors();
        for (Long root : roots) {
            this.executor.sendUnanchored(this.taskData, "__ack_fail", new Values(root), this.executor.getExecutorTransfer());
        }
        long delta = this.tupleTimeDelta((TupleImpl)input);
        if (this.isDebug) {
            LOG.info("BOLT fail TASK: {} TIME: {} TUPLE: {}", new Object[]{this.taskId, delta, input});
        }
        BoltFailInfo boltFailInfo = new BoltFailInfo(input, this.taskId, delta);
        boltFailInfo.applyOn(this.taskData.getUserContext());
        if (delta >= 0L) {
            ((BoltExecutorStats)this.executor.getStats()).boltFailedTuple(input.getSourceComponent(), input.getSourceStreamId(), delta);
        }
    }

    @Override
    public void resetTimeout(Tuple input) {
        Set<Long> roots = input.getMessageId().getAnchors();
        for (Long root : roots) {
            this.executor.sendUnanchored(this.taskData, "__ack_reset_timeout", new Values(root), this.executor.getExecutorTransfer());
        }
    }

    @Override
    public void reportError(Throwable error) {
        this.executor.getReportError().report(error);
    }

    private long tupleTimeDelta(TupleImpl tuple) {
        Long ms = tuple.getProcessSampleStartTime();
        if (ms != null) {
            return Time.deltaMs(ms);
        }
        return -1L;
    }

    private void putXor(Map<Long, Long> pending, Long key, Long id) {
        Long curr = pending.get(key);
        if (curr == null) {
            curr = 0L;
        }
        pending.put(key, Utils.bitXor(curr, id));
    }
}

