/*
 * Decompiled with CFR 0.152.
 */
package org.apache.storm.spout;

import java.util.Map;
import org.apache.storm.spout.CheckPointState;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.state.KeyValueState;
import org.apache.storm.state.StateFactory;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CheckpointSpout
extends BaseRichSpout {
    private static final Logger LOG = LoggerFactory.getLogger(CheckpointSpout.class);
    public static final String CHECKPOINT_STREAM_ID = "$checkpoint";
    public static final String CHECKPOINT_COMPONENT_ID = "$checkpointspout";
    public static final String CHECKPOINT_FIELD_TXID = "txid";
    public static final String CHECKPOINT_FIELD_ACTION = "action";
    private static final String TX_STATE_KEY = "__state";
    private TopologyContext context;
    private SpoutOutputCollector collector;
    private long lastCheckpointTs;
    private int checkpointInterval;
    private int sleepInterval;
    private boolean recoveryStepInProgress;
    private boolean checkpointStepInProgress;
    private boolean recovering;
    private KeyValueState<String, CheckPointState> checkpointState;
    private CheckPointState curTxState;

    @Override
    public void open(Map<String, Object> conf, TopologyContext context, SpoutOutputCollector collector) {
        this.open(context, collector, this.loadCheckpointInterval(conf), this.loadCheckpointState(conf, context));
    }

    void open(TopologyContext context, SpoutOutputCollector collector, int checkpointInterval, KeyValueState<String, CheckPointState> checkpointState) {
        this.context = context;
        this.collector = collector;
        this.checkpointInterval = checkpointInterval;
        this.sleepInterval = checkpointInterval / 10;
        this.checkpointState = checkpointState;
        this.curTxState = checkpointState.get(TX_STATE_KEY);
        this.lastCheckpointTs = 0L;
        this.recoveryStepInProgress = false;
        this.checkpointStepInProgress = false;
        this.recovering = true;
    }

    @Override
    public void nextTuple() {
        if (this.shouldRecover()) {
            this.handleRecovery();
            this.startProgress();
        } else if (this.shouldCheckpoint()) {
            this.doCheckpoint();
            this.startProgress();
        } else {
            Utils.sleep(this.sleepInterval);
        }
    }

    @Override
    public void ack(Object msgId) {
        LOG.debug("Got ack with txid {}, current txState {}", msgId, (Object)this.curTxState);
        if (this.curTxState.getTxid() == ((Number)msgId).longValue()) {
            if (this.recovering) {
                this.handleRecoveryAck();
            } else {
                this.handleCheckpointAck();
            }
        } else {
            LOG.warn("Ack msgid {}, txState.txid {} mismatch", msgId, (Object)this.curTxState.getTxid());
        }
        this.resetProgress();
    }

    @Override
    public void fail(Object msgId) {
        LOG.debug("Got fail with msgid {}", msgId);
        if (!this.recovering) {
            LOG.debug("Checkpoint failed, will trigger recovery");
            this.recovering = true;
        }
        this.resetProgress();
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declareStream(CHECKPOINT_STREAM_ID, new Fields(CHECKPOINT_FIELD_TXID, CHECKPOINT_FIELD_ACTION));
    }

    public static boolean isCheckpoint(Tuple input) {
        return CHECKPOINT_STREAM_ID.equals(input.getSourceStreamId());
    }

    private KeyValueState<String, CheckPointState> loadCheckpointState(Map<String, Object> conf, TopologyContext ctx) {
        String namespace = ctx.getThisComponentId() + "-" + ctx.getThisTaskId();
        KeyValueState state = (KeyValueState)StateFactory.getState(namespace, conf, ctx);
        if (state.get(TX_STATE_KEY) == null) {
            CheckPointState txState = new CheckPointState(-1L, CheckPointState.State.COMMITTED);
            state.put(TX_STATE_KEY, txState);
            state.commit();
            LOG.debug("Initialized checkpoint spout state with txState {}", (Object)txState);
        } else {
            LOG.debug("Got checkpoint spout state {}", state.get(TX_STATE_KEY));
        }
        return state;
    }

    private int loadCheckpointInterval(Map<String, Object> topoConf) {
        int interval = 0;
        if (topoConf.containsKey("topology.state.checkpoint.interval.ms")) {
            interval = ((Number)topoConf.get("topology.state.checkpoint.interval.ms")).intValue();
        }
        interval = Math.max(100, interval);
        LOG.info("Checkpoint interval is {} millis", (Object)interval);
        return interval;
    }

    private boolean shouldRecover() {
        return this.recovering && !this.recoveryStepInProgress;
    }

    private boolean shouldCheckpoint() {
        return !this.recovering && !this.checkpointStepInProgress && (this.curTxState.getState() != CheckPointState.State.COMMITTED || this.checkpointIntervalElapsed());
    }

    private boolean checkpointIntervalElapsed() {
        return System.currentTimeMillis() - this.lastCheckpointTs > (long)this.checkpointInterval;
    }

    private void handleRecovery() {
        LOG.debug("In recovery");
        CheckPointState.Action action = this.curTxState.nextAction(true);
        this.emit(this.curTxState.getTxid(), action);
    }

    private void handleRecoveryAck() {
        CheckPointState nextState = this.curTxState.nextState(true);
        if (this.curTxState != nextState) {
            this.saveTxState(nextState);
        } else {
            LOG.debug("Recovery complete, current state {}", (Object)this.curTxState);
            this.recovering = false;
        }
    }

    private void doCheckpoint() {
        LOG.debug("In checkpoint");
        if (this.curTxState.getState() == CheckPointState.State.COMMITTED) {
            this.saveTxState(this.curTxState.nextState(false));
            this.lastCheckpointTs = System.currentTimeMillis();
        }
        CheckPointState.Action action = this.curTxState.nextAction(false);
        this.emit(this.curTxState.getTxid(), action);
    }

    private void handleCheckpointAck() {
        CheckPointState nextState = this.curTxState.nextState(false);
        this.saveTxState(nextState);
    }

    private void emit(long txid, CheckPointState.Action action) {
        LOG.debug("Current state {}, emitting txid {}, action {}", new Object[]{this.curTxState, txid, action});
        this.collector.emit(CHECKPOINT_STREAM_ID, new Values(new Object[]{txid, action}), txid);
    }

    private void saveTxState(CheckPointState txState) {
        LOG.debug("saveTxState, current state {} -> new state {}", (Object)this.curTxState, (Object)txState);
        this.checkpointState.put(TX_STATE_KEY, txState);
        this.checkpointState.commit();
        this.curTxState = txState;
    }

    private void startProgress() {
        if (this.recovering) {
            this.recoveryStepInProgress = true;
        } else {
            this.checkpointStepInProgress = true;
        }
    }

    private void resetProgress() {
        if (this.recovering) {
            this.recoveryStepInProgress = false;
        } else {
            this.checkpointStepInProgress = false;
        }
    }
}

