/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.connector.nebula.source;

import com.vesoft.nebula.client.meta.MetaClient;
import com.vesoft.nebula.client.storage.StorageClient;
import com.vesoft.nebula.client.storage.data.BaseTableRow;
import java.util.List;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.nebula.connection.NebulaClientOptions;
import org.apache.flink.connector.nebula.connection.NebulaMetaConnectionProvider;
import org.apache.flink.connector.nebula.connection.NebulaStorageConnectionProvider;
import org.apache.flink.connector.nebula.shaded.org.slf4j.Logger;
import org.apache.flink.connector.nebula.shaded.org.slf4j.LoggerFactory;
import org.apache.flink.connector.nebula.source.NebulaEdgeSource;
import org.apache.flink.connector.nebula.source.NebulaSource;
import org.apache.flink.connector.nebula.source.NebulaVertexSource;
import org.apache.flink.connector.nebula.statement.ExecutionOptions;
import org.apache.flink.connector.nebula.utils.PartitionUtils;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;

public class NebulaSourceFunction
extends RichParallelSourceFunction<BaseTableRow> {
    private static final Logger LOG = LoggerFactory.getLogger(NebulaSourceFunction.class);
    private static final long serialVersionUID = -4864517634021753949L;
    private StorageClient storageClient;
    private MetaClient metaClient;
    private final NebulaStorageConnectionProvider storageConnectionProvider;
    private final NebulaMetaConnectionProvider metaConnectionProvider;
    private ExecutionOptions executionOptions;
    private int numPart;

    public NebulaSourceFunction(NebulaStorageConnectionProvider storageConnectionProvider) {
        this.storageConnectionProvider = storageConnectionProvider;
        NebulaClientOptions nebulaClientOptions = storageConnectionProvider.getNebulaClientOptions();
        this.metaConnectionProvider = new NebulaMetaConnectionProvider(nebulaClientOptions);
    }

    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        this.storageClient = this.storageConnectionProvider.getStorageClient();
        this.metaClient = this.metaConnectionProvider.getMetaClient();
        this.numPart = this.metaClient.getPartsAlloc(this.executionOptions.getGraphSpace()).size();
    }

    public void close() throws Exception {
        try {
            if (this.storageClient != null) {
                this.storageClient.close();
            }
            if (this.metaClient != null) {
                this.metaClient.close();
            }
        }
        catch (Exception e) {
            LOG.error("cancel exception:{}", (Object)e.getMessage(), (Object)e);
        }
    }

    public void run(SourceFunction.SourceContext<BaseTableRow> sourceContext) throws Exception {
        RuntimeContext runtimeContext = this.getRuntimeContext();
        List<Integer> scanParts = PartitionUtils.getScanParts(runtimeContext.getIndexOfThisSubtask() + 1, this.numPart, runtimeContext.getNumberOfParallelSubtasks());
        NebulaSource nebulaSource = this.executionOptions.getDataType().isVertex() ? new NebulaVertexSource(this.storageClient, this.executionOptions, scanParts) : new NebulaEdgeSource(this.storageClient, this.executionOptions, scanParts);
        while (nebulaSource.hasNext()) {
            BaseTableRow row = nebulaSource.next();
            sourceContext.collect((Object)row);
        }
    }

    public void cancel() {
        try {
            if (this.storageClient != null) {
                this.storageClient.close();
            }
            if (this.metaClient != null) {
                this.metaClient.close();
            }
        }
        catch (Exception e) {
            LOG.error("cancel exception:{}", (Object)e.getMessage(), (Object)e);
        }
    }

    public NebulaSourceFunction setExecutionOptions(ExecutionOptions executionOptions) {
        this.executionOptions = executionOptions;
        return this;
    }
}

