/*
 * Decompiled with CFR 0.152.
 */
package org.apache.storm.trident.spout;

import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.trident.operation.TridentCollector;
import org.apache.storm.trident.spout.ICommitterTridentSpout;
import org.apache.storm.trident.spout.IOpaquePartitionedTridentSpout;
import org.apache.storm.trident.spout.ISpoutPartition;
import org.apache.storm.trident.spout.ITridentSpout;
import org.apache.storm.trident.topology.TransactionAttempt;
import org.apache.storm.trident.topology.state.RotatingTransactionalState;
import org.apache.storm.trident.topology.state.TransactionalState;
import org.apache.storm.tuple.Fields;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class OpaquePartitionedTridentSpoutExecutor
implements ICommitterTridentSpout<Object> {
    protected final Logger LOG = LoggerFactory.getLogger(OpaquePartitionedTridentSpoutExecutor.class);
    IOpaquePartitionedTridentSpout<Object, ISpoutPartition, Object> _spout;

    public OpaquePartitionedTridentSpoutExecutor(IOpaquePartitionedTridentSpout<Object, ISpoutPartition, Object> spout) {
        this._spout = spout;
    }

    @Override
    public ITridentSpout.BatchCoordinator<Object> getCoordinator(String txStateId, Map<String, Object> conf, TopologyContext context) {
        return new Coordinator(conf, context);
    }

    @Override
    public ICommitterTridentSpout.Emitter getEmitter(String txStateId, Map<String, Object> conf, TopologyContext context) {
        return new Emitter(txStateId, conf, context);
    }

    @Override
    public Fields getOutputFields() {
        return this._spout.getOutputFields();
    }

    @Override
    public Map<String, Object> getComponentConfiguration() {
        return this._spout.getComponentConfiguration();
    }

    public class Emitter
    implements ICommitterTridentSpout.Emitter {
        IOpaquePartitionedTridentSpout.Emitter<Object, ISpoutPartition, Object> _emitter;
        TransactionalState _state;
        TreeMap<Long, Map<String, Object>> _cachedMetas = new TreeMap();
        Map<String, EmitterPartitionState> _partitionStates = new HashMap<String, EmitterPartitionState>();
        int _index;
        int _numTasks;
        Object _savedCoordinatorMeta = null;
        boolean _changedMeta = false;

        public Emitter(String txStateId, Map<String, Object> conf, TopologyContext context) {
            this._emitter = OpaquePartitionedTridentSpoutExecutor.this._spout.getEmitter(conf, context);
            this._index = context.getThisTaskIndex();
            this._numTasks = context.getComponentTasks(context.getThisComponentId()).size();
            this._state = TransactionalState.newUserState(conf, txStateId);
            OpaquePartitionedTridentSpoutExecutor.this.LOG.debug("Created {}", (Object)this);
        }

        public void emitBatch(TransactionAttempt tx, Object coordinatorMeta, TridentCollector collector) {
            OpaquePartitionedTridentSpoutExecutor.this.LOG.debug("Emitting Batch. [transaction = {}], [coordinatorMeta = {}], [collector = {}], [{}]", new Object[]{tx, coordinatorMeta, collector, this});
            if (this._savedCoordinatorMeta == null || !this._savedCoordinatorMeta.equals(coordinatorMeta)) {
                this._partitionStates.clear();
                List<ISpoutPartition> taskPartitions = this._emitter.getPartitionsForTask(this._index, this._numTasks, coordinatorMeta);
                for (ISpoutPartition partition : taskPartitions) {
                    this._partitionStates.put(partition.getId(), new EmitterPartitionState(new RotatingTransactionalState(this._state, partition.getId()), partition));
                }
                this._emitter.refreshPartitions(this._emitter.getOrderedPartitions(coordinatorMeta));
                this._savedCoordinatorMeta = coordinatorMeta;
                this._changedMeta = true;
            }
            HashMap<String, Object> metas = new HashMap<String, Object>();
            this._cachedMetas.put(tx.getTransactionId(), metas);
            Map.Entry<Long, Map<String, Object>> entry = this._cachedMetas.lowerEntry(tx.getTransactionId());
            Map<String, Object> prevCached = entry != null ? entry.getValue() : new HashMap<String, Object>();
            for (Map.Entry<String, EmitterPartitionState> e : this._partitionStates.entrySet()) {
                String id = e.getKey();
                EmitterPartitionState s = e.getValue();
                s.rotatingState.removeState(tx.getTransactionId());
                Object lastMeta = prevCached.get(id);
                if (lastMeta == null) {
                    lastMeta = s.rotatingState.getLastState();
                }
                Object meta = this._emitter.emitPartitionBatch(tx, collector, s.partition, lastMeta);
                metas.put(id, meta);
            }
            OpaquePartitionedTridentSpoutExecutor.this.LOG.debug("Emitted Batch. [transaction = {}], [coordinatorMeta = {}], [collector = {}], [{}]", new Object[]{tx, coordinatorMeta, collector, this});
        }

        @Override
        public void success(TransactionAttempt tx) {
            for (EmitterPartitionState state : this._partitionStates.values()) {
                state.rotatingState.cleanupBefore(tx.getTransactionId());
            }
            OpaquePartitionedTridentSpoutExecutor.this.LOG.debug("Success transaction {}. [{}]", (Object)tx, (Object)this);
        }

        @Override
        public void commit(TransactionAttempt attempt) {
            OpaquePartitionedTridentSpoutExecutor.this.LOG.debug("Committing transaction {}. [{}]", (Object)attempt, (Object)this);
            if (this._changedMeta && this._index == 0) {
                HashSet<String> validIds = new HashSet<String>();
                for (ISpoutPartition p : this._emitter.getOrderedPartitions(this._savedCoordinatorMeta)) {
                    validIds.add(p.getId());
                }
                for (String existingPartition : this._state.list("")) {
                    if (validIds.contains(existingPartition)) continue;
                    RotatingTransactionalState s = new RotatingTransactionalState(this._state, existingPartition);
                    s.removeState(attempt.getTransactionId());
                }
                this._changedMeta = false;
            }
            Long txid = attempt.getTransactionId();
            Map<String, Object> metas = this._cachedMetas.remove(txid);
            for (Map.Entry<String, Object> entry : metas.entrySet()) {
                this._partitionStates.get((Object)entry.getKey()).rotatingState.overrideState(txid, entry.getValue());
            }
            OpaquePartitionedTridentSpoutExecutor.this.LOG.debug("Exiting commit method for transaction {}. [{}]", (Object)attempt, (Object)this);
        }

        @Override
        public void close() {
            OpaquePartitionedTridentSpoutExecutor.this.LOG.debug("Closing");
            this._emitter.close();
            OpaquePartitionedTridentSpoutExecutor.this.LOG.debug("Closed");
        }

        public String toString() {
            return "Emitter{, _state=" + this._state + ", _cachedMetas=" + this._cachedMetas + ", _partitionStates=" + this._partitionStates + ", _index=" + this._index + ", _numTasks=" + this._numTasks + ", _savedCoordinatorMeta=" + this._savedCoordinatorMeta + ", _changedMeta=" + this._changedMeta + '}';
        }
    }

    static class EmitterPartitionState {
        public RotatingTransactionalState rotatingState;
        public ISpoutPartition partition;

        public EmitterPartitionState(RotatingTransactionalState s, ISpoutPartition p) {
            this.rotatingState = s;
            this.partition = p;
        }
    }

    public class Coordinator
    implements ITridentSpout.BatchCoordinator<Object> {
        IOpaquePartitionedTridentSpout.Coordinator _coordinator;

        public Coordinator(Map<String, Object> conf, TopologyContext context) {
            this._coordinator = OpaquePartitionedTridentSpoutExecutor.this._spout.getCoordinator(conf, context);
        }

        @Override
        public Object initializeTransaction(long txid, Object prevMetadata, Object currMetadata) {
            OpaquePartitionedTridentSpoutExecutor.this.LOG.debug("Initialize Transaction. [txid = {}], [prevMetadata = {}], [currMetadata = {}]", new Object[]{txid, prevMetadata, currMetadata});
            return this._coordinator.getPartitionsForBatch();
        }

        @Override
        public void close() {
            OpaquePartitionedTridentSpoutExecutor.this.LOG.debug("Closing");
            this._coordinator.close();
            OpaquePartitionedTridentSpoutExecutor.this.LOG.debug("Closed");
        }

        @Override
        public void success(long txid) {
            OpaquePartitionedTridentSpoutExecutor.this.LOG.debug("Success [txid = {}]", (Object)txid);
        }

        @Override
        public boolean isReady(long txid) {
            boolean ready = this._coordinator.isReady(txid);
            OpaquePartitionedTridentSpoutExecutor.this.LOG.debug("[isReady = {}], [txid = {}]", (Object)ready, (Object)txid);
            return ready;
        }
    }
}

