/*
 * Decompiled with CFR 0.152.
 */
package org.apache.storm.transactional.partitioned;

import java.math.BigInteger;
import java.util.HashMap;
import java.util.Map;
import org.apache.storm.coordination.BatchOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.transactional.ITransactionalSpout;
import org.apache.storm.transactional.TransactionAttempt;
import org.apache.storm.transactional.partitioned.IPartitionedTransactionalSpout;
import org.apache.storm.transactional.state.RotatingTransactionalState;
import org.apache.storm.transactional.state.TransactionalState;

public class PartitionedTransactionalSpoutExecutor
implements ITransactionalSpout<Integer> {
    IPartitionedTransactionalSpout _spout;

    public PartitionedTransactionalSpoutExecutor(IPartitionedTransactionalSpout spout) {
        this._spout = spout;
    }

    public IPartitionedTransactionalSpout getPartitionedSpout() {
        return this._spout;
    }

    @Override
    public ITransactionalSpout.Coordinator getCoordinator(Map<String, Object> conf, TopologyContext context) {
        return new Coordinator(conf, context);
    }

    @Override
    public ITransactionalSpout.Emitter getEmitter(Map<String, Object> conf, TopologyContext context) {
        return new Emitter(conf, context);
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        this._spout.declareOutputFields(declarer);
    }

    @Override
    public Map<String, Object> getComponentConfiguration() {
        return this._spout.getComponentConfiguration();
    }

    class Emitter
    implements ITransactionalSpout.Emitter<Integer> {
        private IPartitionedTransactionalSpout.Emitter _emitter;
        private TransactionalState _state;
        private Map<Integer, RotatingTransactionalState> _partitionStates = new HashMap<Integer, RotatingTransactionalState>();
        private int _index;
        private int _numTasks;

        public Emitter(Map<String, Object> conf, TopologyContext context) {
            this._emitter = PartitionedTransactionalSpoutExecutor.this._spout.getEmitter(conf, context);
            this._state = TransactionalState.newUserState(conf, (String)conf.get("topology.transactional.id"), PartitionedTransactionalSpoutExecutor.this.getComponentConfiguration());
            this._index = context.getThisTaskIndex();
            this._numTasks = context.getComponentTasks(context.getThisComponentId()).size();
        }

        @Override
        public void emitBatch(final TransactionAttempt tx, Integer partitions, final BatchOutputCollector collector) {
            for (int i = this._index; i < partitions; i += this._numTasks) {
                if (!this._partitionStates.containsKey(i)) {
                    this._partitionStates.put(i, new RotatingTransactionalState(this._state, "" + i));
                }
                RotatingTransactionalState state = this._partitionStates.get(i);
                final int partition = i;
                Object meta = state.getStateOrCreate(tx.getTransactionId(), new RotatingTransactionalState.StateInitializer(){

                    @Override
                    public Object init(BigInteger txid, Object lastState) {
                        return Emitter.this._emitter.emitPartitionBatchNew(tx, collector, partition, lastState);
                    }
                });
                if (meta == null) continue;
                this._emitter.emitPartitionBatch(tx, collector, partition, meta);
            }
        }

        @Override
        public void cleanupBefore(BigInteger txid) {
            for (RotatingTransactionalState state : this._partitionStates.values()) {
                state.cleanupBefore(txid);
            }
        }

        @Override
        public void close() {
            this._state.close();
            this._emitter.close();
        }
    }

    class Coordinator
    implements ITransactionalSpout.Coordinator<Integer> {
        private IPartitionedTransactionalSpout.Coordinator _coordinator;

        public Coordinator(Map<String, Object> conf, TopologyContext context) {
            this._coordinator = PartitionedTransactionalSpoutExecutor.this._spout.getCoordinator(conf, context);
        }

        @Override
        public Integer initializeTransaction(BigInteger txid, Integer prevMetadata) {
            return this._coordinator.numPartitions();
        }

        @Override
        public boolean isReady() {
            return this._coordinator.isReady();
        }

        @Override
        public void close() {
            this._coordinator.close();
        }
    }
}

