/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.io.hbase.sink;

import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.io.core.Sink;
import org.apache.pulsar.io.core.SinkContext;
import org.apache.pulsar.io.hbase.sink.HbaseSinkConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class HbaseAbstractSink<T>
implements Sink<T> {
    private static final Logger log = LoggerFactory.getLogger(HbaseAbstractSink.class);
    private HbaseSinkConfig hbaseSinkConfig;
    private Configuration configuration;
    private Connection connection;
    private Admin admin;
    private TableName tableName;
    private Table table;
    protected TableDefinition tableDefinition;
    private long batchTimeMs;
    private int batchSize;
    private List<Record<T>> incomingList;
    private ScheduledExecutorService flushExecutor;

    public void open(Map<String, Object> config, SinkContext sinkContext) throws Exception {
        this.hbaseSinkConfig = HbaseSinkConfig.load(config);
        Preconditions.checkNotNull((Object)this.hbaseSinkConfig.getZookeeperQuorum(), (Object)"zookeeperQuorum property not set.");
        Preconditions.checkNotNull((Object)this.hbaseSinkConfig.getZookeeperClientPort(), (Object)"zookeeperClientPort property not set.");
        Preconditions.checkNotNull((Object)this.hbaseSinkConfig.getZookeeperZnodeParent(), (Object)"zookeeperZnodeParent property not set.");
        Preconditions.checkNotNull((Object)this.hbaseSinkConfig.getTableName(), (Object)"hbase tableName property not set.");
        this.getTable(this.hbaseSinkConfig);
        this.tableDefinition = this.getTableDefinition(this.hbaseSinkConfig);
        this.batchTimeMs = this.hbaseSinkConfig.getBatchTimeMs();
        this.batchSize = this.hbaseSinkConfig.getBatchSize();
        this.incomingList = Lists.newArrayList();
        this.flushExecutor = Executors.newScheduledThreadPool(1);
        this.flushExecutor.scheduleAtFixedRate(() -> this.flush(), this.batchTimeMs, this.batchTimeMs, TimeUnit.MILLISECONDS);
    }

    public void close() throws Exception {
        if (null != this.table) {
            this.table.close();
        }
        if (null != this.admin) {
            this.admin.close();
        }
        if (null != this.connection) {
            this.connection.close();
        }
        if (null != this.flushExecutor) {
            this.flushExecutor.shutdown();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void write(Record<T> record) throws Exception {
        int number;
        HbaseAbstractSink hbaseAbstractSink = this;
        synchronized (hbaseAbstractSink) {
            if (null != record) {
                this.incomingList.add(record);
            }
            number = this.incomingList.size();
        }
        if (number == this.batchSize) {
            this.flushExecutor.execute(() -> this.flush());
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void flush() {
        List<Record<Record>> toFlushList;
        ArrayList<Put> puts = new ArrayList<Put>();
        HbaseAbstractSink hbaseAbstractSink = this;
        synchronized (hbaseAbstractSink) {
            if (this.incomingList.isEmpty()) {
                return;
            }
            toFlushList = this.incomingList;
            this.incomingList = Lists.newArrayList();
        }
        if (CollectionUtils.isNotEmpty(toFlushList)) {
            for (Record record : toFlushList) {
                try {
                    this.bindValue(record, puts);
                }
                catch (Exception e) {
                    record.fail();
                    toFlushList.remove(record);
                    log.warn("Record flush thread was exception ", (Throwable)e);
                }
            }
        }
        try {
            if (CollectionUtils.isNotEmpty(puts)) {
                this.table.batch(puts, new Object[puts.size()]);
            }
            toFlushList.forEach(tRecord -> tRecord.ack());
            puts.clear();
            toFlushList.clear();
        }
        catch (Exception e) {
            toFlushList.forEach(tRecord -> tRecord.fail());
            log.error("Hbase table put data exception ", (Throwable)e);
        }
    }

    public abstract void bindValue(Record<T> var1, List<Put> var2) throws Exception;

    private void getTable(HbaseSinkConfig hbaseSinkConfig) throws IOException {
        this.configuration = HBaseConfiguration.create();
        String hbaseConfigResources = hbaseSinkConfig.getHbaseConfigResources();
        if (StringUtils.isNotBlank((String)hbaseConfigResources)) {
            this.configuration.addResource(hbaseConfigResources);
        }
        this.configuration.set("hbase.zookeeper.quorum", hbaseSinkConfig.getZookeeperQuorum());
        this.configuration.set("hbase.zookeeper.property.clientPort", hbaseSinkConfig.getZookeeperClientPort());
        this.configuration.set("zookeeper.znode.parent", hbaseSinkConfig.getZookeeperZnodeParent());
        this.connection = ConnectionFactory.createConnection((Configuration)this.configuration);
        this.admin = this.connection.getAdmin();
        this.tableName = TableName.valueOf((String)hbaseSinkConfig.getTableName());
        if (!this.admin.tableExists(this.tableName)) {
            throw new IllegalArgumentException(String.valueOf(this.tableName) + " table does not exist.");
        }
        this.table = this.connection.getTable(this.tableName);
    }

    private TableDefinition getTableDefinition(HbaseSinkConfig hbaseSinkConfig) {
        Preconditions.checkNotNull((Object)hbaseSinkConfig.getRowKeyName(), (Object)"rowKeyName property not set.");
        Preconditions.checkNotNull((Object)hbaseSinkConfig.getFamilyName(), (Object)"familyName property not set.");
        Preconditions.checkNotNull(hbaseSinkConfig.getQualifierNames(), (Object)"qualifierNames property not set.");
        return TableDefinition.of(hbaseSinkConfig.getRowKeyName(), hbaseSinkConfig.getFamilyName(), hbaseSinkConfig.getQualifierNames());
    }

    public static class TableDefinition {
        private final String rowKeyName;
        private final String familyName;
        private final List<String> qualifierNames;

        private TableDefinition(String rowKeyName, String familyName, List<String> qualifierNames) {
            this.rowKeyName = rowKeyName;
            this.familyName = familyName;
            this.qualifierNames = qualifierNames;
        }

        public static TableDefinition of(String rowKeyName, String familyName, List<String> qualifierNames) {
            return new TableDefinition(rowKeyName, familyName, qualifierNames);
        }

        public String getRowKeyName() {
            return this.rowKeyName;
        }

        public String getFamilyName() {
            return this.familyName;
        }

        public List<String> getQualifierNames() {
            return this.qualifierNames;
        }

        public boolean equals(Object o) {
            if (o == this) {
                return true;
            }
            if (!(o instanceof TableDefinition)) {
                return false;
            }
            TableDefinition other = (TableDefinition)o;
            if (!other.canEqual(this)) {
                return false;
            }
            String this$rowKeyName = this.getRowKeyName();
            String other$rowKeyName = other.getRowKeyName();
            if (this$rowKeyName == null ? other$rowKeyName != null : !this$rowKeyName.equals(other$rowKeyName)) {
                return false;
            }
            String this$familyName = this.getFamilyName();
            String other$familyName = other.getFamilyName();
            if (this$familyName == null ? other$familyName != null : !this$familyName.equals(other$familyName)) {
                return false;
            }
            List<String> this$qualifierNames = this.getQualifierNames();
            List<String> other$qualifierNames = other.getQualifierNames();
            return !(this$qualifierNames == null ? other$qualifierNames != null : !((Object)this$qualifierNames).equals(other$qualifierNames));
        }

        protected boolean canEqual(Object other) {
            return other instanceof TableDefinition;
        }

        public int hashCode() {
            int PRIME = 59;
            int result = 1;
            String $rowKeyName = this.getRowKeyName();
            result = result * 59 + ($rowKeyName == null ? 43 : $rowKeyName.hashCode());
            String $familyName = this.getFamilyName();
            result = result * 59 + ($familyName == null ? 43 : $familyName.hashCode());
            List<String> $qualifierNames = this.getQualifierNames();
            result = result * 59 + ($qualifierNames == null ? 43 : ((Object)$qualifierNames).hashCode());
            return result;
        }

        public String toString() {
            return "HbaseAbstractSink.TableDefinition(rowKeyName=" + this.getRowKeyName() + ", familyName=" + this.getFamilyName() + ", qualifierNames=" + String.valueOf(this.getQualifierNames()) + ")";
        }
    }
}

