/*
 * Decompiled with CFR 0.152.
 */
package org.apache.storm.daemon;

import java.util.List;
import java.util.Map;
import org.apache.storm.task.IBolt;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.RotatingMap;
import org.apache.storm.utils.TupleUtils;
import org.apache.storm.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Acker
implements IBolt {
    private static final Logger LOG = LoggerFactory.getLogger(Acker.class);
    private static final long serialVersionUID = 4430906880683183091L;
    public static final String ACKER_COMPONENT_ID = "__acker";
    public static final String ACKER_INIT_STREAM_ID = "__ack_init";
    public static final String ACKER_ACK_STREAM_ID = "__ack_ack";
    public static final String ACKER_FAIL_STREAM_ID = "__ack_fail";
    public static final String ACKER_RESET_TIMEOUT_STREAM_ID = "__ack_reset_timeout";
    public static final int TIMEOUT_BUCKET_NUM = 3;
    private OutputCollector collector;
    private RotatingMap<Object, AckObject> pending;

    @Override
    public void prepare(Map<String, Object> topoConf, TopologyContext context, OutputCollector collector) {
        this.collector = collector;
        this.pending = new RotatingMap(3);
    }

    @Override
    public void execute(Tuple input) {
        if (TupleUtils.isTick(input)) {
            Map<Object, AckObject> tmp = this.pending.rotate();
            LOG.debug("Number of timeout tuples:{}", (Object)tmp.size());
            return;
        }
        String streamId = input.getSourceStreamId();
        Object id = input.getValue(0);
        AckObject curr = this.pending.get(id);
        if (ACKER_INIT_STREAM_ID.equals(streamId)) {
            if (curr == null) {
                curr = new AckObject();
                curr.val = input.getLong(1);
                curr.spoutTask = input.getInteger(2);
                this.pending.put(id, curr);
            } else {
                curr.updateAck(input.getLong(1));
                curr.spoutTask = input.getInteger(2);
            }
        } else if (ACKER_ACK_STREAM_ID.equals(streamId)) {
            if (curr != null) {
                curr.updateAck(input.getLong(1));
            } else {
                curr = new AckObject();
                curr.val = input.getLong(1);
                this.pending.put(id, curr);
            }
        } else if (ACKER_FAIL_STREAM_ID.equals(streamId)) {
            if (curr == null) {
                curr = new AckObject();
            }
            curr.failed = true;
            this.pending.put(id, curr);
        } else if (ACKER_RESET_TIMEOUT_STREAM_ID.equals(streamId)) {
            if (curr == null) {
                curr = new AckObject();
            }
            this.pending.put(id, curr);
        } else {
            LOG.warn("Unknown source stream {} from task-{}", (Object)streamId, (Object)input.getSourceTask());
            return;
        }
        Integer task = curr.spoutTask;
        if (curr != null && task != null) {
            Values tuple = new Values(id, this.getTimeDeltaMillis(curr.startTime));
            if (curr.val == 0L) {
                this.pending.remove(id);
                this.collector.emitDirect((int)task, ACKER_ACK_STREAM_ID, (List<Object>)tuple);
            } else if (curr.failed) {
                this.pending.remove(id);
                this.collector.emitDirect((int)task, ACKER_FAIL_STREAM_ID, (List<Object>)tuple);
            } else if (ACKER_RESET_TIMEOUT_STREAM_ID.equals(streamId)) {
                this.collector.emitDirect((int)task, ACKER_RESET_TIMEOUT_STREAM_ID, (List<Object>)tuple);
            }
        }
        this.collector.ack(input);
    }

    @Override
    public void cleanup() {
        LOG.info("Acker: cleanup successfully");
    }

    private long getTimeDeltaMillis(long startTimeMillis) {
        return System.currentTimeMillis() - startTimeMillis;
    }

    private class AckObject {
        public long val = 0L;
        public Integer spoutTask = null;
        public boolean failed = false;
        public long startTime = System.currentTimeMillis();

        private AckObject() {
        }

        public void updateAck(Long value) {
            this.val = Utils.bitXor(this.val, value);
        }
    }
}

