/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.io.cassandra;

import com.datastax.driver.core.BoundStatement;
import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.PreparedStatement;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.ResultSetFuture;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.Statement;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import java.util.Map;
import java.util.concurrent.Executor;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.io.cassandra.CassandraSinkConfig;
import org.apache.pulsar.io.core.KeyValue;
import org.apache.pulsar.io.core.Sink;
import org.apache.pulsar.io.core.SinkContext;

public abstract class CassandraAbstractSink<K, V>
implements Sink<byte[]> {
    private Cluster cluster;
    private Session session;
    CassandraSinkConfig cassandraSinkConfig;
    private PreparedStatement statement;

    public void open(Map<String, Object> config, SinkContext sinkContext) throws Exception {
        this.cassandraSinkConfig = CassandraSinkConfig.load(config);
        if (this.cassandraSinkConfig.getRoots() == null || this.cassandraSinkConfig.getKeyspace() == null || this.cassandraSinkConfig.getKeyname() == null || this.cassandraSinkConfig.getColumnFamily() == null || this.cassandraSinkConfig.getColumnName() == null) {
            throw new IllegalArgumentException("Required property not set.");
        }
        this.createClient(this.cassandraSinkConfig.getRoots());
        this.statement = this.session.prepare("INSERT INTO " + this.cassandraSinkConfig.getColumnFamily() + " (" + this.cassandraSinkConfig.getKeyname() + ", " + this.cassandraSinkConfig.getColumnName() + ") VALUES (?, ?)");
    }

    public void close() throws Exception {
        this.session.close();
        this.cluster.close();
    }

    public void write(final Record<byte[]> record) {
        KeyValue<K, V> keyValue = this.extractKeyValue(record);
        BoundStatement bound = this.statement.bind(new Object[]{keyValue.getKey(), keyValue.getValue()});
        ResultSetFuture future = this.session.executeAsync((Statement)bound);
        Futures.addCallback((ListenableFuture)future, (FutureCallback)new FutureCallback<ResultSet>(){

            public void onSuccess(ResultSet result) {
                record.ack();
            }

            public void onFailure(Throwable t) {
                record.fail();
            }
        }, (Executor)MoreExecutors.directExecutor());
    }

    private void createClient(String roots) {
        String[] hosts = roots.split(",");
        if (hosts.length <= 0) {
            throw new RuntimeException("Invalid cassandra roots");
        }
        Cluster.Builder b = Cluster.builder();
        for (int i = 0; i < hosts.length; ++i) {
            String[] hostPort = hosts[i].split(":");
            b.addContactPoint(hostPort[0]);
            if (hostPort.length <= 1) continue;
            b.withPort(Integer.parseInt(hostPort[1]));
        }
        this.cluster = b.withoutJMXReporting().build();
        this.session = this.cluster.connect();
        this.session.execute("USE " + this.cassandraSinkConfig.getKeyspace());
    }

    public abstract KeyValue<K, V> extractKeyValue(Record<byte[]> var1);
}

