/*
 * Decompiled with CFR 0.152.
 */
package org.apache.storm.hbase.trident.state;

import com.google.common.collect.Maps;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.io.Serializable;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException;
import org.apache.hadoop.hbase.security.UserProvider;
import org.apache.storm.hbase.security.HBaseSecurityUtil;
import org.apache.storm.hbase.trident.mapper.TridentHBaseMapMapper;
import org.apache.storm.task.IMetricsContext;
import org.apache.storm.topology.FailedException;
import org.apache.storm.trident.state.JSONNonTransactionalSerializer;
import org.apache.storm.trident.state.JSONOpaqueSerializer;
import org.apache.storm.trident.state.JSONTransactionalSerializer;
import org.apache.storm.trident.state.OpaqueValue;
import org.apache.storm.trident.state.Serializer;
import org.apache.storm.trident.state.State;
import org.apache.storm.trident.state.StateFactory;
import org.apache.storm.trident.state.StateType;
import org.apache.storm.trident.state.TransactionalValue;
import org.apache.storm.trident.state.map.CachedMap;
import org.apache.storm.trident.state.map.IBackingMap;
import org.apache.storm.trident.state.map.MapState;
import org.apache.storm.trident.state.map.NonTransactionalMap;
import org.apache.storm.trident.state.map.OpaqueMap;
import org.apache.storm.trident.state.map.SnapshottableMap;
import org.apache.storm.trident.state.map.TransactionalMap;
import org.apache.storm.tuple.Values;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HBaseMapState<T>
implements IBackingMap<T> {
    private static Logger LOG = LoggerFactory.getLogger(HBaseMapState.class);
    private int partitionNum;
    private static final Map<StateType, Serializer> DEFAULT_SERIALZERS = Maps.newHashMap();
    private Options<T> options;
    private Serializer<T> serializer;
    private HTable table;

    public HBaseMapState(final Options<T> options, Map map, int partitionNum) {
        this.options = options;
        this.serializer = options.serializer;
        this.partitionNum = partitionNum;
        final Configuration hbConfig = HBaseConfiguration.create();
        Map conf = (Map)map.get(options.configKey);
        if (conf == null) {
            LOG.info("HBase configuration not found using key '" + options.configKey + "'");
            LOG.info("Using HBase config from first hbase-site.xml found on classpath.");
        } else {
            if (conf.get("hbase.rootdir") == null) {
                LOG.warn("No 'hbase.rootdir' value found in configuration! Using HBase defaults.");
            }
            for (String key : conf.keySet()) {
                hbConfig.set(key, String.valueOf(conf.get(key)));
            }
        }
        try {
            UserProvider provider = HBaseSecurityUtil.login((Map)map, (Configuration)hbConfig);
            this.table = (HTable)provider.getCurrent().getUGI().doAs((PrivilegedExceptionAction)new PrivilegedExceptionAction<HTable>(){

                @Override
                public HTable run() throws IOException {
                    return new HTable(hbConfig, options.tableName);
                }
            });
        }
        catch (Exception e) {
            throw new RuntimeException("HBase bolt preparation failed: " + e.getMessage(), e);
        }
    }

    public static StateFactory opaque() {
        Options<OpaqueValue> options = new Options<OpaqueValue>();
        return HBaseMapState.opaque(options);
    }

    public static StateFactory opaque(Options<OpaqueValue> opts) {
        return new Factory(StateType.OPAQUE, opts);
    }

    public static StateFactory transactional() {
        Options<TransactionalValue> options = new Options<TransactionalValue>();
        return HBaseMapState.transactional(options);
    }

    public static StateFactory transactional(Options<TransactionalValue> opts) {
        return new Factory(StateType.TRANSACTIONAL, opts);
    }

    public static StateFactory nonTransactional() {
        Options<Object> options = new Options<Object>();
        return HBaseMapState.nonTransactional(options);
    }

    public static StateFactory nonTransactional(Options<Object> opts) {
        return new Factory(StateType.NON_TRANSACTIONAL, opts);
    }

    public List<T> multiGet(List<List<Object>> keys) {
        String qualifier;
        ArrayList<Get> gets = new ArrayList<Get>();
        for (List<Object> key : keys) {
            byte[] hbaseKey = this.options.mapMapper.rowKey(key);
            qualifier = this.options.mapMapper.qualifier(key);
            LOG.info("Partition: {}, GET: {}", (Object)this.partitionNum, (Object)new String(hbaseKey));
            Get get = new Get(hbaseKey);
            get.addColumn(this.options.columnFamily.getBytes(), qualifier.getBytes());
            gets.add(get);
        }
        ArrayList<Object> retval = new ArrayList<Object>();
        try {
            Result[] results = this.table.get(gets);
            for (int i = 0; i < keys.size(); ++i) {
                qualifier = this.options.mapMapper.qualifier(keys.get(i));
                Result result = results[i];
                byte[] value = result.getValue(this.options.columnFamily.getBytes(), qualifier.getBytes());
                if (value != null) {
                    retval.add(this.serializer.deserialize(value));
                    continue;
                }
                retval.add(null);
            }
        }
        catch (IOException e) {
            throw new FailedException("IOException while reading from HBase.", (Throwable)e);
        }
        return retval;
    }

    public void multiPut(List<List<Object>> keys, List<T> values) {
        ArrayList<Put> puts = new ArrayList<Put>(keys.size());
        for (int i = 0; i < keys.size(); ++i) {
            byte[] hbaseKey = this.options.mapMapper.rowKey(keys.get(i));
            String qualifier = this.options.mapMapper.qualifier(keys.get(i));
            LOG.info("Partiton: {}, Key: {}, Value: {}", new Object[]{this.partitionNum, new String(hbaseKey), new String(this.serializer.serialize(values.get(i)))});
            Put put = new Put(hbaseKey);
            T val = values.get(i);
            put.add(this.options.columnFamily.getBytes(), qualifier.getBytes(), this.serializer.serialize(val));
            puts.add(put);
        }
        try {
            this.table.put(puts);
        }
        catch (InterruptedIOException e) {
            throw new FailedException("Interrupted while writing to HBase", (Throwable)e);
        }
        catch (RetriesExhaustedWithDetailsException e) {
            throw new FailedException("Retries exhaused while writing to HBase", (Throwable)e);
        }
        catch (IOException e) {
            throw new FailedException("IOException while writing to HBase", (Throwable)e);
        }
    }

    static {
        DEFAULT_SERIALZERS.put(StateType.NON_TRANSACTIONAL, (Serializer)new JSONNonTransactionalSerializer());
        DEFAULT_SERIALZERS.put(StateType.TRANSACTIONAL, (Serializer)new JSONTransactionalSerializer());
        DEFAULT_SERIALZERS.put(StateType.OPAQUE, (Serializer)new JSONOpaqueSerializer());
    }

    protected static class Factory
    implements StateFactory {
        private StateType stateType;
        private Options options;

        public Factory(StateType stateType, Options options) {
            this.stateType = stateType;
            this.options = options;
            if (this.options.serializer == null) {
                this.options.serializer = (Serializer)DEFAULT_SERIALZERS.get(stateType);
            }
            if (this.options.serializer == null) {
                throw new RuntimeException("Serializer should be specified for type: " + stateType);
            }
            if (this.options.mapMapper == null) {
                throw new RuntimeException("MapMapper should be specified for type: " + stateType);
            }
        }

        public State makeState(Map<String, Object> conf, IMetricsContext metrics, int partitionIndex, int numPartitions) {
            MapState mapState;
            LOG.info("Preparing HBase State for partition {} of {}.", (Object)(partitionIndex + 1), (Object)numPartitions);
            CachedMap state = new HBaseMapState(this.options, conf, partitionIndex);
            if (this.options.cacheSize > 0) {
                state = new CachedMap(state, this.options.cacheSize);
            }
            switch (this.stateType) {
                case NON_TRANSACTIONAL: {
                    mapState = NonTransactionalMap.build((IBackingMap)state);
                    break;
                }
                case OPAQUE: {
                    mapState = OpaqueMap.build((IBackingMap)state);
                    break;
                }
                case TRANSACTIONAL: {
                    mapState = TransactionalMap.build((IBackingMap)state);
                    break;
                }
                default: {
                    throw new IllegalArgumentException("Unknown state type: " + this.stateType);
                }
            }
            return new SnapshottableMap(mapState, (List)new Values(new Object[]{this.options.globalKey}));
        }
    }

    public static class Options<T>
    implements Serializable {
        public Serializer<T> serializer = null;
        public int cacheSize = 5000;
        public String globalKey = "$HBASE_STATE_GLOBAL$";
        public String configKey = "hbase.config";
        public String tableName;
        public String columnFamily;
        public TridentHBaseMapMapper mapMapper;
    }
}

