/*
 * Decompiled with CFR 0.152.
 */
package org.apache.storm.transactional.partitioned;

import java.math.BigInteger;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import org.apache.storm.coordination.BatchOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.transactional.ICommitterTransactionalSpout;
import org.apache.storm.transactional.ITransactionalSpout;
import org.apache.storm.transactional.TransactionAttempt;
import org.apache.storm.transactional.partitioned.IOpaquePartitionedTransactionalSpout;
import org.apache.storm.transactional.state.RotatingTransactionalState;
import org.apache.storm.transactional.state.TransactionalState;

public class OpaquePartitionedTransactionalSpoutExecutor
implements ICommitterTransactionalSpout<Object> {
    IOpaquePartitionedTransactionalSpout _spout;

    public OpaquePartitionedTransactionalSpoutExecutor(IOpaquePartitionedTransactionalSpout spout) {
        this._spout = spout;
    }

    @Override
    public ITransactionalSpout.Coordinator<Object> getCoordinator(Map<String, Object> conf, TopologyContext context) {
        return new Coordinator(conf, context);
    }

    @Override
    public ICommitterTransactionalSpout.Emitter getEmitter(Map<String, Object> conf, TopologyContext context) {
        return new Emitter(conf, context);
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        this._spout.declareOutputFields(declarer);
    }

    @Override
    public Map<String, Object> getComponentConfiguration() {
        return this._spout.getComponentConfiguration();
    }

    public class Emitter
    implements ICommitterTransactionalSpout.Emitter {
        IOpaquePartitionedTransactionalSpout.Emitter _emitter;
        TransactionalState _state;
        TreeMap<BigInteger, Map<Integer, Object>> _cachedMetas = new TreeMap();
        Map<Integer, RotatingTransactionalState> _partitionStates = new HashMap<Integer, RotatingTransactionalState>();
        int _index;
        int _numTasks;

        public Emitter(Map<String, Object> conf, TopologyContext context) {
            this._emitter = OpaquePartitionedTransactionalSpoutExecutor.this._spout.getEmitter(conf, context);
            this._index = context.getThisTaskIndex();
            this._numTasks = context.getComponentTasks(context.getThisComponentId()).size();
            this._state = TransactionalState.newUserState(conf, (String)conf.get("topology.transactional.id"), OpaquePartitionedTransactionalSpoutExecutor.this.getComponentConfiguration());
            List<String> existingPartitions = this._state.list("");
            for (String p : existingPartitions) {
                int partition = Integer.parseInt(p);
                if ((partition - this._index) % this._numTasks != 0) continue;
                this._partitionStates.put(partition, new RotatingTransactionalState(this._state, p));
            }
        }

        public void emitBatch(TransactionAttempt tx, Object coordinatorMeta, BatchOutputCollector collector) {
            HashMap<Integer, Object> metas = new HashMap<Integer, Object>();
            this._cachedMetas.put(tx.getTransactionId(), metas);
            int partitions = this._emitter.numPartitions();
            Map.Entry<BigInteger, Map<Integer, Object>> entry = this._cachedMetas.lowerEntry(tx.getTransactionId());
            Map<Integer, Object> prevCached = entry != null ? entry.getValue() : new HashMap<Integer, Object>();
            for (int i = this._index; i < partitions; i += this._numTasks) {
                RotatingTransactionalState state = this._partitionStates.get(i);
                if (state == null) {
                    state = new RotatingTransactionalState(this._state, "" + i);
                    this._partitionStates.put(i, state);
                }
                state.removeState(tx.getTransactionId());
                Object lastMeta = prevCached.get(i);
                if (lastMeta == null) {
                    lastMeta = state.getLastState();
                }
                Object meta = this._emitter.emitPartitionBatch(tx, collector, i, lastMeta);
                metas.put(i, meta);
            }
        }

        @Override
        public void cleanupBefore(BigInteger txid) {
            for (RotatingTransactionalState state : this._partitionStates.values()) {
                state.cleanupBefore(txid);
            }
        }

        @Override
        public void commit(TransactionAttempt attempt) {
            BigInteger txid = attempt.getTransactionId();
            Map<Integer, Object> metas = this._cachedMetas.remove(txid);
            for (Map.Entry<Integer, Object> entry : metas.entrySet()) {
                Integer partition = entry.getKey();
                Object meta = entry.getValue();
                this._partitionStates.get(partition).overrideState(txid, meta);
            }
        }

        @Override
        public void close() {
            this._emitter.close();
        }
    }

    public class Coordinator
    implements ITransactionalSpout.Coordinator<Object> {
        IOpaquePartitionedTransactionalSpout.Coordinator _coordinator;

        public Coordinator(Map<String, Object> conf, TopologyContext context) {
            this._coordinator = OpaquePartitionedTransactionalSpoutExecutor.this._spout.getCoordinator(conf, context);
        }

        @Override
        public Object initializeTransaction(BigInteger txid, Object prevMetadata) {
            return null;
        }

        @Override
        public boolean isReady() {
            return this._coordinator.isReady();
        }

        @Override
        public void close() {
            this._coordinator.close();
        }
    }
}

