/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.connector.nebula.source;

import com.vesoft.nebula.client.meta.MetaClient;
import com.vesoft.nebula.client.storage.StorageClient;
import com.vesoft.nebula.client.storage.data.BaseTableRow;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.flink.api.common.io.DefaultInputSplitAssigner;
import org.apache.flink.api.common.io.RichInputFormat;
import org.apache.flink.api.common.io.statistics.BaseStatistics;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.nebula.connection.NebulaClientOptions;
import org.apache.flink.connector.nebula.connection.NebulaMetaConnectionProvider;
import org.apache.flink.connector.nebula.connection.NebulaStorageConnectionProvider;
import org.apache.flink.connector.nebula.shaded.org.slf4j.Logger;
import org.apache.flink.connector.nebula.shaded.org.slf4j.LoggerFactory;
import org.apache.flink.connector.nebula.source.NebulaConverter;
import org.apache.flink.connector.nebula.source.NebulaEdgeSource;
import org.apache.flink.connector.nebula.source.NebulaSource;
import org.apache.flink.connector.nebula.source.NebulaVertexSource;
import org.apache.flink.connector.nebula.statement.ExecutionOptions;
import org.apache.flink.connector.nebula.utils.PartitionUtils;
import org.apache.flink.core.io.GenericInputSplit;
import org.apache.flink.core.io.InputSplit;
import org.apache.flink.core.io.InputSplitAssigner;

public abstract class NebulaInputFormat<T>
extends RichInputFormat<T, InputSplit> {
    protected static final Logger LOG = LoggerFactory.getLogger(NebulaInputFormat.class);
    private static final long serialVersionUID = 902031944252613459L;
    protected ExecutionOptions executionOptions;
    protected NebulaStorageConnectionProvider storageConnectionProvider;
    protected NebulaMetaConnectionProvider metaConnectionProvider;
    private transient StorageClient storageClient;
    private transient MetaClient metaClient;
    protected Boolean hasNext = false;
    protected List<BaseTableRow> rows;
    private NebulaSource nebulaSource;
    protected NebulaConverter<T> nebulaConverter;
    private long scannedRows;
    private int numPart;
    private int times = 0;

    public NebulaInputFormat(NebulaStorageConnectionProvider storageConnectionProvider, ExecutionOptions executionOptions) {
        this.storageConnectionProvider = storageConnectionProvider;
        NebulaClientOptions nebulaClientOptions = storageConnectionProvider.getNebulaClientOptions();
        this.metaConnectionProvider = new NebulaMetaConnectionProvider(nebulaClientOptions);
        this.executionOptions = executionOptions;
    }

    public void configure(Configuration configuration) {
    }

    public void openInputFormat() throws IOException {
        try {
            this.storageClient = this.storageConnectionProvider.getStorageClient();
            this.metaClient = this.metaConnectionProvider.getMetaClient();
            this.numPart = this.metaClient.getPartsAlloc(this.executionOptions.getGraphSpace()).size();
        }
        catch (Exception e) {
            LOG.error("connect storage client error, ", e);
            throw new IOException("connect storage client error, ", e);
        }
        this.rows = new ArrayList<BaseTableRow>();
    }

    public void closeInputFormat() throws IOException {
        try {
            if (this.storageClient != null) {
                this.storageClient.close();
            }
            if (this.metaClient != null) {
                this.metaClient.close();
            }
        }
        catch (Exception e) {
            LOG.error("close client error,", e);
            throw new IOException("close client error,", e);
        }
    }

    public BaseStatistics getStatistics(BaseStatistics baseStatistics) throws IOException {
        return baseStatistics;
    }

    public InputSplit[] createInputSplits(int numSplit) throws IOException {
        InputSplit[] inputSplits = new InputSplit[numSplit];
        for (int i = 0; i < numSplit; ++i) {
            inputSplits[i] = new GenericInputSplit(i + 1, numSplit);
        }
        return inputSplits;
    }

    public InputSplitAssigner getInputSplitAssigner(InputSplit[] inputSplits) {
        return new DefaultInputSplitAssigner(inputSplits);
    }

    public void open(InputSplit inputSplit) throws IOException {
        if (inputSplit != null) {
            GenericInputSplit split = (GenericInputSplit)inputSplit;
            List<Integer> scanParts = PartitionUtils.getScanParts(split.getSplitNumber(), this.numPart, split.getTotalNumberOfSplits());
            this.nebulaSource = this.executionOptions.getDataType().isVertex() ? new NebulaVertexSource(this.storageClient, this.executionOptions, scanParts) : new NebulaEdgeSource(this.storageClient, this.executionOptions, scanParts);
            try {
                this.hasNext = this.nebulaSource.hasNext();
            }
            catch (Exception e) {
                LOG.error("scan NebulaGraph error, ", e);
                throw new IOException("scan error, ", e);
            }
        }
    }

    public boolean reachedEnd() throws IOException {
        return this.hasNext == false;
    }

    public T nextRecord(T reuse) throws IOException {
        if (!this.hasNext.booleanValue()) {
            return null;
        }
        LOG.info("source nextRecord: {}", (Object)this.times++);
        BaseTableRow row = this.nebulaSource.next();
        try {
            this.hasNext = this.nebulaSource.hasNext();
        }
        catch (Exception e) {
            LOG.error("scan NebulaGraph error, ", e);
            throw new IOException("scan NebulaGraph error, ", e);
        }
        ++this.scannedRows;
        return this.nebulaConverter.convert(row);
    }

    public void close() {
        LOG.info("Closing split (scanned {} rows)", (Object)this.scannedRows);
    }

    public NebulaInputFormat<T> setExecutionOptions(ExecutionOptions executionOptions) {
        this.executionOptions = executionOptions;
        return this;
    }
}

