/*
 * Decompiled with CFR 0.152.
 */
package org.apache.storm.trident.spout;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import org.apache.storm.Config;
import org.apache.storm.spout.ISpoutOutputCollector;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.IRichSpout;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.trident.spout.RichSpoutBatchId;
import org.apache.storm.trident.spout.RichSpoutBatchIdSerializer;
import org.apache.storm.trident.topology.TridentBoltExecutor;
import org.apache.storm.trident.tuple.ConsList;
import org.apache.storm.trident.util.TridentUtils;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.Utils;

public class RichSpoutBatchTriggerer
implements IRichSpout {
    String _stream;
    IRichSpout _delegate;
    List<Integer> _outputTasks;
    Random _rand;
    String _coordStream;
    Map<Long, Long> _msgIdToBatchId = new HashMap<Long, Long>();
    Map<Long, FinishCondition> _finishConditions = new HashMap<Long, FinishCondition>();

    public RichSpoutBatchTriggerer(IRichSpout delegate, String streamName, String batchGroup) {
        this._delegate = delegate;
        this._stream = streamName;
        this._coordStream = TridentBoltExecutor.COORD_STREAM(batchGroup);
    }

    @Override
    public void open(Map<String, Object> conf, TopologyContext context, SpoutOutputCollector collector) {
        this._delegate.open(conf, context, new SpoutOutputCollector(new StreamOverrideCollector(collector)));
        this._outputTasks = new ArrayList<Integer>();
        for (String component : ((Map)Utils.get(context.getThisTargets(), this._coordStream, new HashMap())).keySet()) {
            this._outputTasks.addAll(context.getComponentTasks(component));
        }
        this._rand = new Random(Utils.secureRandomLong());
    }

    @Override
    public void close() {
        this._delegate.close();
    }

    @Override
    public void activate() {
        this._delegate.activate();
    }

    @Override
    public void deactivate() {
        this._delegate.deactivate();
    }

    @Override
    public void nextTuple() {
        this._delegate.nextTuple();
    }

    @Override
    public void ack(Object msgId) {
        Long batchId = this._msgIdToBatchId.remove((Long)msgId);
        FinishCondition cond = this._finishConditions.get(batchId);
        if (cond != null) {
            cond.vals.remove((Long)msgId);
            if (cond.vals.isEmpty()) {
                this._finishConditions.remove(batchId);
                this._delegate.ack(cond.msgId);
            }
        }
    }

    @Override
    public void fail(Object msgId) {
        Long batchId = this._msgIdToBatchId.remove((Long)msgId);
        FinishCondition cond = this._finishConditions.remove(batchId);
        if (cond != null) {
            this._delegate.fail(cond.msgId);
        }
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        Fields outFields = TridentUtils.getSingleOutputStreamFields(this._delegate);
        outFields = TridentUtils.fieldsConcat(new Fields("$id$"), outFields);
        declarer.declareStream(this._stream, outFields);
        declarer.declareStream(this._coordStream, true, new Fields("id", "count"));
    }

    @Override
    public Map<String, Object> getComponentConfiguration() {
        Map<String, Object> conf = this._delegate.getComponentConfiguration();
        conf = conf == null ? new HashMap<String, Object>() : new HashMap<String, Object>(conf);
        Config.registerSerialization(conf, RichSpoutBatchId.class, RichSpoutBatchIdSerializer.class);
        return conf;
    }

    class StreamOverrideCollector
    implements ISpoutOutputCollector {
        SpoutOutputCollector _collector;

        public StreamOverrideCollector(SpoutOutputCollector collector) {
            this._collector = collector;
        }

        @Override
        public List<Integer> emit(String ignore, List<Object> values, Object msgId) {
            long batchIdVal = RichSpoutBatchTriggerer.this._rand.nextLong();
            RichSpoutBatchId batchId = new RichSpoutBatchId(batchIdVal);
            FinishCondition finish = new FinishCondition();
            finish.msgId = msgId;
            List<Integer> tasks = this._collector.emit(RichSpoutBatchTriggerer.this._stream, new ConsList(batchId, values));
            HashSet<Integer> outTasksSet = new HashSet<Integer>(tasks);
            for (Integer t : RichSpoutBatchTriggerer.this._outputTasks) {
                int count = 0;
                if (outTasksSet.contains(t)) {
                    count = 1;
                }
                long r = RichSpoutBatchTriggerer.this._rand.nextLong();
                this._collector.emitDirect(t, RichSpoutBatchTriggerer.this._coordStream, new Values(batchId, count), r);
                finish.vals.add(r);
                RichSpoutBatchTriggerer.this._msgIdToBatchId.put(r, batchIdVal);
            }
            RichSpoutBatchTriggerer.this._finishConditions.put(batchIdVal, finish);
            return tasks;
        }

        @Override
        public void emitDirect(int task, String ignore, List<Object> values, Object msgId) {
            throw new RuntimeException("Trident does not support direct emits from spouts");
        }

        @Override
        public void reportError(Throwable t) {
            this._collector.reportError(t);
        }

        @Override
        public long getPendingCount() {
            return this._collector.getPendingCount();
        }
    }

    static class FinishCondition {
        Set<Long> vals = new HashSet<Long>();
        Object msgId;

        FinishCondition() {
        }
    }
}

