/*
 * Decompiled with CFR 0.152.
 */
package org.apache.storm.hbase.bolt;

import backtype.storm.Config;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;
import backtype.storm.utils.TupleUtils;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.storm.hbase.bolt.AbstractHBaseBolt;
import org.apache.storm.hbase.bolt.mapper.HBaseMapper;
import org.apache.storm.hbase.common.ColumnList;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HBaseBolt
extends AbstractHBaseBolt {
    private static final Logger LOG = LoggerFactory.getLogger(HBaseBolt.class);
    boolean writeToWAL = true;
    List<Mutation> batchMutations = new LinkedList<Mutation>();
    List<Tuple> tupleBatch = new LinkedList<Tuple>();

    public HBaseBolt(String tableName, HBaseMapper mapper) {
        super(tableName, mapper);
    }

    public HBaseBolt writeToWAL(boolean writeToWAL) {
        this.writeToWAL = writeToWAL;
        return this;
    }

    public HBaseBolt withConfigKey(String configKey) {
        this.configKey = configKey;
        return this;
    }

    public HBaseBolt withBatchSize(int batchSize) {
        this.batchSize = batchSize;
        return this;
    }

    public HBaseBolt withFlushIntervalSecs(int flushIntervalSecs) {
        this.flushIntervalSecs = flushIntervalSecs;
        return this;
    }

    public Map<String, Object> getComponentConfiguration() {
        Map conf = super.getComponentConfiguration();
        if (conf == null) {
            conf = new Config();
        }
        if (conf.containsKey("topology.message.timeout.secs") && this.flushIntervalSecs == 0) {
            Integer topologyTimeout = Integer.parseInt(conf.get("topology.message.timeout.secs").toString());
            this.flushIntervalSecs = (int)Math.floor(topologyTimeout / 2);
            LOG.debug("Setting flush interval to [{}] based on topology.message.timeout.secs", (Object)this.flushIntervalSecs);
        }
        LOG.info("Enabling tick tuple with interval [{}]", (Object)this.flushIntervalSecs);
        conf.put("topology.tick.tuple.freq.secs", this.flushIntervalSecs);
        return conf;
    }

    public void execute(Tuple tuple) {
        boolean flush = false;
        try {
            Object rowKey;
            if (TupleUtils.isTick((Tuple)tuple)) {
                LOG.debug("TICK received! current batch status [" + this.tupleBatch.size() + "/" + this.batchSize + "]");
                flush = true;
            } else {
                rowKey = this.mapper.rowKey(tuple);
                ColumnList cols = this.mapper.columns(tuple);
                List<Mutation> mutations = this.hBaseClient.constructMutationReq((byte[])rowKey, cols, this.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL);
                this.batchMutations.addAll(mutations);
                this.tupleBatch.add(tuple);
                if (this.tupleBatch.size() >= this.batchSize) {
                    flush = true;
                }
            }
            if (flush && !this.tupleBatch.isEmpty()) {
                this.hBaseClient.batchMutate(this.batchMutations);
                LOG.debug("acknowledging tuples after batchMutate");
                rowKey = this.tupleBatch.iterator();
                while (rowKey.hasNext()) {
                    Tuple t = (Tuple)rowKey.next();
                    this.collector.ack(t);
                }
                this.tupleBatch.clear();
                this.batchMutations.clear();
            }
        }
        catch (Exception e) {
            this.collector.reportError((Throwable)e);
            for (Tuple t : this.tupleBatch) {
                this.collector.fail(t);
            }
            this.tupleBatch.clear();
            this.batchMutations.clear();
        }
    }

    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
    }
}

