/*
 * Decompiled with CFR 0.152.
 */
package org.apache.storm.testing;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.storm.Config;
import org.apache.storm.coordination.BatchOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.testing.MemoryTransactionalSpoutMeta;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.transactional.TransactionAttempt;
import org.apache.storm.transactional.partitioned.IPartitionedTransactionalSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.utils.ObjectReader;
import org.apache.storm.utils.RegisteredGlobalState;
import org.apache.storm.utils.Utils;

public class MemoryTransactionalSpout
implements IPartitionedTransactionalSpout<MemoryTransactionalSpoutMeta> {
    public static final String TX_FIELD = MemoryTransactionalSpout.class.getName() + "/id";
    private String _id;
    private String _finishedPartitionsId;
    private int _takeAmt;
    private Fields _outFields;
    private Map<Integer, List<List<Object>>> _initialPartitions;

    public MemoryTransactionalSpout(Map<Integer, List<List<Object>>> partitions, Fields outFields, int takeAmt) {
        this._id = RegisteredGlobalState.registerState(partitions);
        Map finished = Collections.synchronizedMap(new HashMap());
        this._finishedPartitionsId = RegisteredGlobalState.registerState(finished);
        this._takeAmt = takeAmt;
        this._outFields = outFields;
        this._initialPartitions = partitions;
    }

    public boolean isExhaustedTuples() {
        Map<Integer, Boolean> statuses = this.getFinishedStatuses();
        for (Integer partition : this.getQueues().keySet()) {
            if (statuses.containsKey(partition) && this.getFinishedStatuses().get(partition).booleanValue()) continue;
            return false;
        }
        return true;
    }

    @Override
    public IPartitionedTransactionalSpout.Coordinator getCoordinator(Map<String, Object> conf, TopologyContext context) {
        return new Coordinator();
    }

    @Override
    public IPartitionedTransactionalSpout.Emitter<MemoryTransactionalSpoutMeta> getEmitter(Map<String, Object> conf, TopologyContext context) {
        return new Emitter(conf);
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        ArrayList<String> toDeclare = new ArrayList<String>(this._outFields.toList());
        toDeclare.add(0, TX_FIELD);
        declarer.declare(new Fields(toDeclare));
    }

    @Override
    public Map<String, Object> getComponentConfiguration() {
        Config conf = new Config();
        conf.registerSerialization(MemoryTransactionalSpoutMeta.class);
        return conf;
    }

    public void startup() {
        this.getFinishedStatuses().clear();
    }

    public void cleanup() {
        RegisteredGlobalState.clearState(this._id);
        RegisteredGlobalState.clearState(this._finishedPartitionsId);
    }

    private Map<Integer, List<List<Object>>> getQueues() {
        Map ret = (Map)RegisteredGlobalState.getState(this._id);
        if (ret != null) {
            return ret;
        }
        return this._initialPartitions;
    }

    private Map<Integer, Boolean> getFinishedStatuses() {
        return (Map)RegisteredGlobalState.getState(this._finishedPartitionsId);
    }

    class Emitter
    implements IPartitionedTransactionalSpout.Emitter<MemoryTransactionalSpoutMeta> {
        Integer _maxSpoutPending;
        Map<Integer, Integer> _emptyPartitions = new HashMap<Integer, Integer>();

        public Emitter(Map<String, Object> conf) {
            Object c = conf.get("topology.max.spout.pending");
            this._maxSpoutPending = c == null ? Integer.valueOf(1) : ObjectReader.getInt(c);
        }

        @Override
        public MemoryTransactionalSpoutMeta emitPartitionBatchNew(TransactionAttempt tx, BatchOutputCollector collector, int partition, MemoryTransactionalSpoutMeta lastPartitionMeta) {
            int index = lastPartitionMeta == null ? 0 : lastPartitionMeta.index + lastPartitionMeta.amt;
            List queue = (List)MemoryTransactionalSpout.this.getQueues().get(partition);
            int total = queue.size();
            int left = total - index;
            int toTake = Math.min(left, MemoryTransactionalSpout.this._takeAmt);
            MemoryTransactionalSpoutMeta ret = new MemoryTransactionalSpoutMeta(index, toTake);
            this.emitPartitionBatch(tx, collector, partition, ret);
            if (toTake == 0) {
                Map finishedStatuses;
                int curr = Utils.get(this._emptyPartitions, partition, 0) + 1;
                this._emptyPartitions.put(partition, curr);
                if (curr > this._maxSpoutPending && (finishedStatuses = MemoryTransactionalSpout.this.getFinishedStatuses()) != null) {
                    finishedStatuses.put(partition, true);
                }
            }
            return ret;
        }

        @Override
        public void emitPartitionBatch(TransactionAttempt tx, BatchOutputCollector collector, int partition, MemoryTransactionalSpoutMeta partitionMeta) {
            List queue = (List)MemoryTransactionalSpout.this.getQueues().get(partition);
            for (int i = partitionMeta.index; i < partitionMeta.index + partitionMeta.amt; ++i) {
                ArrayList<Object> toEmit = new ArrayList<Object>((Collection)queue.get(i));
                toEmit.add(0, tx);
                collector.emit(toEmit);
            }
        }

        @Override
        public void close() {
        }
    }

    class Coordinator
    implements IPartitionedTransactionalSpout.Coordinator {
        Coordinator() {
        }

        @Override
        public int numPartitions() {
            return MemoryTransactionalSpout.this.getQueues().size();
        }

        @Override
        public boolean isReady() {
            return true;
        }

        @Override
        public void close() {
        }
    }
}

