/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.connector.nebula.catalog;

import com.vesoft.nebula.PropertyType;
import com.vesoft.nebula.client.graph.data.ResultSet;
import com.vesoft.nebula.client.graph.exception.AuthFailedException;
import com.vesoft.nebula.client.graph.exception.ClientServerIncompatibleException;
import com.vesoft.nebula.client.graph.exception.IOErrorException;
import com.vesoft.nebula.client.graph.exception.NotValidConnectionException;
import com.vesoft.nebula.client.graph.net.NebulaPool;
import com.vesoft.nebula.client.graph.net.Session;
import com.vesoft.nebula.client.meta.MetaClient;
import com.vesoft.nebula.client.meta.exception.ExecuteFailedException;
import com.vesoft.nebula.meta.ColumnDef;
import com.vesoft.nebula.meta.EdgeItem;
import com.vesoft.nebula.meta.IdName;
import com.vesoft.nebula.meta.Schema;
import com.vesoft.nebula.meta.TagItem;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.flink.connector.nebula.catalog.AbstractNebulaCatalog;
import org.apache.flink.connector.nebula.connection.NebulaClientOptions;
import org.apache.flink.connector.nebula.connection.NebulaGraphConnectionProvider;
import org.apache.flink.connector.nebula.connection.NebulaMetaConnectionProvider;
import org.apache.flink.connector.nebula.shaded.com.facebook.thrift.TException;
import org.apache.flink.connector.nebula.shaded.org.apache.commons.collections.map.HashedMap;
import org.apache.flink.connector.nebula.shaded.org.slf4j.Logger;
import org.apache.flink.connector.nebula.shaded.org.slf4j.LoggerFactory;
import org.apache.flink.connector.nebula.table.NebulaDynamicTableFactory;
import org.apache.flink.connector.nebula.utils.DataTypeEnum;
import org.apache.flink.connector.nebula.utils.NebulaConstant;
import org.apache.flink.connector.nebula.utils.NebulaSpace;
import org.apache.flink.connector.nebula.utils.NebulaSpaces;
import org.apache.flink.connector.nebula.utils.NebulaUtils;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.catalog.CatalogBaseTable;
import org.apache.flink.table.catalog.CatalogDatabase;
import org.apache.flink.table.catalog.CatalogDatabaseImpl;
import org.apache.flink.table.catalog.CatalogTableImpl;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.exceptions.CatalogException;
import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
import org.apache.flink.table.catalog.exceptions.TableNotExistException;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.types.DataType;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.StringUtils;

public class NebulaCatalog
extends AbstractNebulaCatalog {
    private static final Logger LOG = LoggerFactory.getLogger(NebulaCatalog.class);
    private final NebulaClientOptions nebulaClientOptions;
    private MetaClient metaClient;
    private NebulaPool nebulaPool;
    private Session session;

    public NebulaCatalog(String catalogName, @Nullable String defaultDatabase, String username, String password, String metaAddress, String graphAddress) {
        super(catalogName, defaultDatabase, username, password, metaAddress);
        this.nebulaClientOptions = new NebulaClientOptions.NebulaClientOptionsBuilder().setGraphAddress(graphAddress).setMetaAddress(metaAddress).setUsername(username).setPassword(password).build();
    }

    @Override
    public void open() throws CatalogException {
        super.open();
        NebulaGraphConnectionProvider graphConnectionProvider = new NebulaGraphConnectionProvider(this.nebulaClientOptions);
        NebulaMetaConnectionProvider metaConnectionProvider = new NebulaMetaConnectionProvider(this.nebulaClientOptions);
        try {
            this.metaClient = metaConnectionProvider.getMetaClient();
        }
        catch (ClientServerIncompatibleException | UnknownHostException e) {
            LOG.error("nebula get meta client error, ", e);
            throw new CatalogException("nebula get meta client error.", (Throwable)e);
        }
        try {
            this.nebulaPool = graphConnectionProvider.getNebulaPool();
            this.session = this.nebulaPool.getSession(graphConnectionProvider.getUserName(), graphConnectionProvider.getPassword(), true);
        }
        catch (AuthFailedException | ClientServerIncompatibleException | IOErrorException | NotValidConnectionException | UnknownHostException e) {
            LOG.error("failed to get graph session, ", e);
            throw new CatalogException("get graph session error, ", (Throwable)e);
        }
    }

    @Override
    public void close() throws CatalogException {
        super.close();
        if (this.session != null) {
            this.session.release();
        }
        if (this.nebulaPool != null) {
            this.nebulaPool.close();
        }
        if (this.metaClient != null) {
            this.metaClient.close();
        }
    }

    public List<String> listDatabases() throws CatalogException {
        ArrayList<String> spaceNames = new ArrayList<String>();
        try {
            this.metaClient.connect();
            List<IdName> spaces = this.metaClient.getSpaces();
            for (IdName space : spaces) {
                spaceNames.add(new String(space.getName()));
            }
        }
        catch (ClientServerIncompatibleException | ExecuteFailedException | TException e) {
            LOG.error("failed to connect meta service vis {} ", (Object)this.address, (Object)e);
            throw new CatalogException("nebula meta service connect failed.", (Throwable)e);
        }
        return spaceNames;
    }

    public CatalogDatabase getDatabase(String databaseName) throws DatabaseNotExistException, CatalogException {
        if (this.listDatabases().contains(databaseName.trim())) {
            HashedMap props = new HashedMap();
            try {
                props.put("spaceId", String.valueOf(this.metaClient.getSpace(databaseName).getSpace_id()));
            }
            catch (ExecuteFailedException | TException e) {
                LOG.error("get spaceId error, ", e);
            }
            return new CatalogDatabaseImpl((Map)props, databaseName);
        }
        throw new DatabaseNotExistException(this.getName(), databaseName);
    }

    @Override
    public void createDatabase(String dataBaseName, CatalogDatabase catalogDatabase, boolean ignoreIfExists) throws CatalogException {
        Preconditions.checkArgument((!StringUtils.isNullOrWhitespaceOnly((String)dataBaseName) ? 1 : 0) != 0, (Object)"space name cannot be null or empty.");
        Preconditions.checkNotNull((Object)catalogDatabase, (String)"space cannot be null.");
        if (ignoreIfExists && this.listDatabases().contains(dataBaseName)) {
            LOG.info("Repeat to create space, {} already exists, no effect.", (Object)dataBaseName);
            return;
        }
        Map properties = catalogDatabase.getProperties();
        Map<String, String> newProperties = properties.entrySet().stream().collect(Collectors.toMap(entry -> ((String)entry.getKey()).toLowerCase(), entry -> ((String)entry.getValue()).toUpperCase()));
        if (!newProperties.containsKey("vid_type")) {
            LOG.error("failed to create graph space {}, missing VID type param", (Object)properties);
            throw new CatalogException("nebula create graph space failed, missing VID type.");
        }
        String vidType = newProperties.get("vid_type");
        if (!NebulaUtils.checkValidVidType(vidType)) {
            LOG.error("VID type not satisfy {}", (Object)vidType);
            throw new CatalogException("nebula graph dont support VID type.");
        }
        NebulaSpace space = new NebulaSpace(dataBaseName, catalogDatabase.getComment(), newProperties);
        NebulaSpaces nebulaSpaces = new NebulaSpaces(space);
        String statement = nebulaSpaces.getCreateStatement();
        ResultSet execResult = null;
        try {
            execResult = this.session.execute(statement);
        }
        catch (IOErrorException e) {
            LOG.error("nebula create graph space execute failed.", e);
            throw new CatalogException("nebula create graph space execute failed.");
        }
        if (!execResult.isSucceeded()) {
            LOG.error("create space failed: {}", (Object)execResult.getErrorMessage());
            throw new CatalogException("create space failed, " + execResult.getErrorMessage());
        }
        LOG.debug("create space success.");
    }

    public boolean tableExists(ObjectPath tablePath) throws CatalogException {
        String graphSpace = tablePath.getDatabaseName();
        String table = tablePath.getObjectName();
        try {
            return this.listTables(graphSpace).contains(table);
        }
        catch (DatabaseNotExistException e) {
            throw new CatalogException("failed to call tableExists function, ", (Throwable)e);
        }
    }

    public List<String> listTables(String graphSpace) throws DatabaseNotExistException, CatalogException {
        if (!this.databaseExists(graphSpace)) {
            throw new DatabaseNotExistException(this.getName(), graphSpace);
        }
        try {
            this.metaClient.connect();
        }
        catch (ClientServerIncompatibleException | TException e) {
            LOG.error("failed to connect meta service vis {} ", (Object)this.address, (Object)e);
            throw new CatalogException("nebula meta service connect failed.", (Throwable)e);
        }
        ArrayList<String> tables = new ArrayList<String>();
        try {
            for (TagItem tag : this.metaClient.getTags(graphSpace)) {
                tables.add("VERTEX" + NebulaConstant.POINT + new String(tag.tag_name));
            }
            for (EdgeItem edge : this.metaClient.getEdges(graphSpace)) {
                tables.add("EDGE" + NebulaConstant.POINT + new String(edge.edge_name));
            }
        }
        catch (ExecuteFailedException | TException e) {
            LOG.error("get tags or edges error,", e);
        }
        return tables;
    }

    public CatalogBaseTable getTable(ObjectPath tablePath) throws TableNotExistException, CatalogException {
        Schema schema;
        if (!this.tableExists(tablePath)) {
            throw new TableNotExistException(this.getName(), tablePath);
        }
        String graphSpace = tablePath.getDatabaseName();
        String[] typeAndLabel = tablePath.getObjectName().split(NebulaConstant.SPLIT_POINT);
        String type = typeAndLabel[0];
        String label = typeAndLabel[1];
        if (!DataTypeEnum.checkValidDataType(type)) {
            LOG.warn("tablePath does not exist in nebula");
            return null;
        }
        try {
            this.metaClient.connect();
        }
        catch (ClientServerIncompatibleException | TException e) {
            LOG.error("failed to connect meta service vis {} ", (Object)this.address, (Object)e);
            throw new CatalogException("nebula meta service connect failed.", (Throwable)e);
        }
        try {
            schema = DataTypeEnum.valueOf(type).isVertex() ? this.metaClient.getTag(graphSpace, label) : this.metaClient.getEdge(graphSpace, label);
        }
        catch (ExecuteFailedException | TException e) {
            LOG.error("get tag or edge schema error, ", e);
            return null;
        }
        String[] names = new String[schema.columns.size()];
        DataType[] types = new DataType[schema.columns.size()];
        for (int i = 0; i < schema.columns.size(); ++i) {
            names[i] = new String(schema.columns.get(i).getName());
            types[i] = this.fromNebulaType(schema.columns, i);
        }
        TableSchema.Builder tableBuilder = new TableSchema.Builder().fields(names, types);
        HashMap<String, String> props = new HashMap<String, String>();
        props.put(FactoryUtil.CONNECTOR.key(), "nebula");
        props.put(NebulaDynamicTableFactory.METAADDRESS.key(), this.address);
        props.put(NebulaDynamicTableFactory.USERNAME.key(), this.username);
        props.put(NebulaDynamicTableFactory.PASSWORD.key(), this.password);
        props.put(NebulaDynamicTableFactory.GRAPH_SPACE.key(), tablePath.getDatabaseName());
        props.put(NebulaDynamicTableFactory.LABEL_NAME.key(), tablePath.getObjectName());
        TableSchema tableSchema = tableBuilder.build();
        return new CatalogTableImpl(tableSchema, props, "nebulaTableCatalog");
    }

    private DataType fromNebulaType(List<ColumnDef> columns, int colIndex) {
        int type = columns.get((int)colIndex).getType().type.getValue();
        switch (PropertyType.findByValue(type)) {
            case INT8: 
            case INT16: 
            case INT32: 
            case INT64: 
            case VID: {
                return DataTypes.BIGINT();
            }
            case BOOL: {
                return DataTypes.BOOLEAN();
            }
            case FLOAT: {
                return DataTypes.FLOAT();
            }
            case DOUBLE: {
                return DataTypes.DOUBLE();
            }
            case TIMESTAMP: {
                return DataTypes.TIMESTAMP();
            }
            case DATE: 
            case TIME: 
            case DATETIME: 
            case STRING: 
            case FIXED_STRING: {
                return DataTypes.STRING();
            }
            case UNKNOWN: {
                return DataTypes.NULL();
            }
        }
        throw new UnsupportedOperationException(String.format("Doesn't support nebula type '%s' yet", columns.get(colIndex).getType()));
    }
}

