/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.connector.nebula.sink;

import com.vesoft.nebula.client.graph.data.ResultSet;
import com.vesoft.nebula.client.graph.exception.AuthFailedException;
import com.vesoft.nebula.client.graph.exception.ClientServerIncompatibleException;
import com.vesoft.nebula.client.graph.exception.IOErrorException;
import com.vesoft.nebula.client.graph.exception.NotValidConnectionException;
import com.vesoft.nebula.client.graph.net.NebulaPool;
import com.vesoft.nebula.client.graph.net.Session;
import com.vesoft.nebula.client.meta.MetaClient;
import java.io.Flushable;
import java.io.IOException;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.flink.api.common.io.RichOutputFormat;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.nebula.connection.NebulaGraphConnectionProvider;
import org.apache.flink.connector.nebula.connection.NebulaMetaConnectionProvider;
import org.apache.flink.connector.nebula.shaded.com.facebook.thrift.TException;
import org.apache.flink.connector.nebula.shaded.org.slf4j.Logger;
import org.apache.flink.connector.nebula.shaded.org.slf4j.LoggerFactory;
import org.apache.flink.connector.nebula.sink.NebulaBatchExecutor;
import org.apache.flink.connector.nebula.statement.ExecutionOptions;
import org.apache.flink.util.concurrent.ExecutorThreadFactory;

public abstract class NebulaBatchOutputFormat<T, OptionsT extends ExecutionOptions>
extends RichOutputFormat<T>
implements Flushable {
    private static final Logger LOG = LoggerFactory.getLogger(NebulaBatchOutputFormat.class);
    private static final long serialVersionUID = 8846672119763512586L;
    protected MetaClient metaClient;
    protected final NebulaMetaConnectionProvider metaProvider;
    protected final NebulaGraphConnectionProvider graphProvider;
    protected final OptionsT executionOptions;
    protected NebulaBatchExecutor<T> nebulaBatchExecutor;
    private volatile AtomicLong numPendingRow;
    private NebulaPool nebulaPool;
    private Session session;
    private final List<String> errorBuffer = new ArrayList<String>();
    private transient ScheduledExecutorService scheduler;
    private transient ScheduledFuture<?> scheduledFuture;
    private volatile transient boolean closed = false;

    public NebulaBatchOutputFormat(NebulaGraphConnectionProvider graphProvider, NebulaMetaConnectionProvider metaProvider, OptionsT executionOptions) {
        this.graphProvider = graphProvider;
        this.metaProvider = metaProvider;
        this.executionOptions = executionOptions;
    }

    public void configure(Configuration configuration) {
    }

    public void open(int i, int i1) throws IOException {
        ResultSet resultSet;
        try {
            this.nebulaPool = this.graphProvider.getNebulaPool();
            this.session = this.nebulaPool.getSession(this.graphProvider.getUserName(), this.graphProvider.getPassword(), true);
        }
        catch (AuthFailedException | ClientServerIncompatibleException | IOErrorException | NotValidConnectionException | UnknownHostException e) {
            LOG.error("failed to get graph session, ", e);
            throw new IOException("get graph session error, ", e);
        }
        try {
            resultSet = this.session.execute("USE " + ((ExecutionOptions)this.executionOptions).getGraphSpace());
        }
        catch (IOErrorException e) {
            LOG.error("switch space error, ", e);
            throw new IOException("switch space error,", e);
        }
        if (!resultSet.isSucceeded()) {
            LOG.error("switch space failed, {}", (Object)resultSet.getErrorMessage());
            throw new RuntimeException("switch space failed, " + resultSet.getErrorMessage());
        }
        try {
            this.metaClient = this.metaProvider.getMetaClient();
        }
        catch (ClientServerIncompatibleException | TException e) {
            LOG.error("failed to get meta client, ", e);
            throw new IOException("get metaClient error, ", e);
        }
        this.numPendingRow = new AtomicLong(0L);
        this.nebulaBatchExecutor = this.createNebulaBatchExecutor();
        if (((ExecutionOptions)this.executionOptions).getBatchIntervalMs() != 0 && ((ExecutionOptions)this.executionOptions).getBatchSize() != 1) {
            this.scheduler = Executors.newScheduledThreadPool(1, (ThreadFactory)new ExecutorThreadFactory("nebula-write-output-format"));
            this.scheduledFuture = this.scheduler.scheduleWithFixedDelay(() -> {
                NebulaBatchOutputFormat nebulaBatchOutputFormat = this;
                synchronized (nebulaBatchOutputFormat) {
                    if (!this.closed) {
                        this.commit();
                    }
                }
            }, ((ExecutionOptions)this.executionOptions).getBatchIntervalMs(), ((ExecutionOptions)this.executionOptions).getBatchIntervalMs(), TimeUnit.MILLISECONDS);
        }
    }

    protected abstract NebulaBatchExecutor<T> createNebulaBatchExecutor();

    public final synchronized void writeRecord(T row) {
        this.nebulaBatchExecutor.addToBatch(row);
        if (this.numPendingRow.incrementAndGet() >= (long)((ExecutionOptions)this.executionOptions).getBatchSize()) {
            this.commit();
        }
    }

    private synchronized void commit() {
        String errorExec = this.nebulaBatchExecutor.executeBatch(this.session);
        if (errorExec != null) {
            this.errorBuffer.add(errorExec);
        }
        long pendingRow = this.numPendingRow.get();
        this.numPendingRow.compareAndSet(pendingRow, 0L);
    }

    public final synchronized void close() {
        if (!this.closed) {
            this.closed = true;
            if (this.scheduledFuture != null) {
                this.scheduledFuture.cancel(false);
                this.scheduler.shutdown();
            }
            if (this.numPendingRow != null && this.numPendingRow.get() > 0L) {
                this.commit();
            }
            if (!this.errorBuffer.isEmpty()) {
                LOG.error("insert error statements: {}", (Object)this.errorBuffer);
            }
            if (this.session != null) {
                this.session.release();
            }
            if (this.nebulaPool != null) {
                this.nebulaPool.close();
            }
            if (this.metaClient != null) {
                this.metaClient.close();
            }
        }
    }

    @Override
    public synchronized void flush() throws IOException {
        while (this.numPendingRow.get() != 0L) {
            this.commit();
        }
    }
}

