/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.connector.nebula.table;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.connector.nebula.connection.NebulaClientOptions;
import org.apache.flink.connector.nebula.statement.EdgeExecutionOptions;
import org.apache.flink.connector.nebula.statement.ExecutionOptions;
import org.apache.flink.connector.nebula.statement.VertexExecutionOptions;
import org.apache.flink.connector.nebula.table.NebulaDynamicTableSink;
import org.apache.flink.connector.nebula.table.NebulaDynamicTableSource;
import org.apache.flink.connector.nebula.utils.DataTypeEnum;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.catalog.Column;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.factories.DynamicTableFactory;
import org.apache.flink.table.factories.DynamicTableSinkFactory;
import org.apache.flink.table.factories.DynamicTableSourceFactory;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.utils.TableSchemaUtils;

public class NebulaDynamicTableFactory
implements DynamicTableSourceFactory,
DynamicTableSinkFactory {
    public static final String IDENTIFIER = "nebula";
    public static final ConfigOption<String> METAADDRESS = ConfigOptions.key((String)"meta-address").stringType().noDefaultValue().withDescription("the nebula meta server address.");
    public static final ConfigOption<String> GRAPHADDRESS = ConfigOptions.key((String)"graph-address").stringType().noDefaultValue().withDescription("the nebula graph server address.");
    public static final ConfigOption<String> USERNAME = ConfigOptions.key((String)"username").stringType().noDefaultValue().withDescription("the nebula server name.");
    public static final ConfigOption<String> PASSWORD = ConfigOptions.key((String)"password").stringType().noDefaultValue().withDescription("the nebula server password.");
    public static final ConfigOption<String> GRAPH_SPACE = ConfigOptions.key((String)"graph-space").stringType().noDefaultValue().withDescription("the nebula graph space name.");
    public static final ConfigOption<String> LABEL_NAME = ConfigOptions.key((String)"label-name").stringType().noDefaultValue().withDescription("the nebula graph space label name.");
    public static final ConfigOption<DataTypeEnum> DATA_TYPE = ConfigOptions.key((String)"data-type").enumType(DataTypeEnum.class).noDefaultValue().withDescription("the nebula graph data type.");
    public static final ConfigOption<Integer> TIMEOUT = ConfigOptions.key((String)"timeout").intType().defaultValue((Object)1000).withDescription("the nebula execute timeout duration.");
    public static final ConfigOption<Integer> ID_INDEX = ConfigOptions.key((String)"id-index").intType().defaultValue((Object)0).withDescription("the nebula execute vertex index.");
    public static final ConfigOption<Integer> SRC_ID_INDEX = ConfigOptions.key((String)"src-id-index").intType().defaultValue((Object)-1).withDescription("the nebula execute edge src index.");
    public static final ConfigOption<Integer> DST_ID_INDEX = ConfigOptions.key((String)"dst-id-index").intType().defaultValue((Object)-1).withDescription("the nebula execute edge dst index.");
    public static final ConfigOption<Integer> RANK_ID_INDEX = ConfigOptions.key((String)"rank-id-index").intType().defaultValue((Object)-1).withDescription("the nebula execute rank index.");
    public static final ConfigOption<Integer> BATCH_SIZE = ConfigOptions.key((String)"batch-size").intType().noDefaultValue().withDescription("batch size.");
    public static final ConfigOption<Integer> BATCH_INTERVAL_MS = ConfigOptions.key((String)"batch-interval-ms").intType().noDefaultValue().withDescription("batch commit interval in milliseconds.");

    public DynamicTableSink createDynamicTableSink(DynamicTableFactory.Context context) {
        FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper((DynamicTableFactory)this, (DynamicTableFactory.Context)context);
        ReadableConfig config = helper.getOptions();
        DataType producedDataType = context.getCatalogTable().getResolvedSchema().toPhysicalRowDataType();
        helper.validate();
        this.validateConfigOptions(config);
        return new NebulaDynamicTableSink(this.getClientOptions(config), this.getExecutionOptions(context, config), producedDataType);
    }

    public DynamicTableSource createDynamicTableSource(DynamicTableFactory.Context context) {
        FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper((DynamicTableFactory)this, (DynamicTableFactory.Context)context);
        ReadableConfig readableConfig = helper.getOptions();
        helper.validate();
        this.validateConfigOptions(readableConfig);
        TableSchema physicalSchema = TableSchemaUtils.getPhysicalSchema((TableSchema)context.getCatalogTable().getSchema());
        ExecutionOptions executionOptions = this.getExecutionOptions(context, readableConfig);
        NebulaClientOptions nebulaClientOptions = this.getClientOptions(readableConfig);
        return new NebulaDynamicTableSource(nebulaClientOptions, executionOptions, physicalSchema);
    }

    private void validateConfigOptions(ReadableConfig config) {
        if (config.getOptional(TIMEOUT).isPresent() && (Integer)config.get(TIMEOUT) < 0) {
            throw new IllegalArgumentException(String.format("The value of '%s' option should not be negative, but is %s.", TIMEOUT.key(), config.get(TIMEOUT)));
        }
    }

    private NebulaClientOptions getClientOptions(ReadableConfig config) {
        return new NebulaClientOptions.NebulaClientOptionsBuilder().setMetaAddress((String)config.get(METAADDRESS)).setGraphAddress((String)config.get(GRAPHADDRESS)).setUsername((String)config.get(USERNAME)).setPassword((String)config.get(PASSWORD)).build();
    }

    private ExecutionOptions getExecutionOptions(DynamicTableFactory.Context context, ReadableConfig config) {
        ArrayList<String> fields = new ArrayList<String>();
        ArrayList<Integer> positions = new ArrayList<Integer>();
        List columns = context.getCatalogTable().getResolvedSchema().getColumns();
        String labelName = (String)config.get(LABEL_NAME);
        if (((DataTypeEnum)((Object)config.get(DATA_TYPE))).isVertex()) {
            for (int i = 1; i < columns.size(); ++i) {
                positions.add(i);
                fields.add(((Column)columns.get(i)).getName());
            }
            VertexExecutionOptions.ExecutionOptionBuilder builder = new VertexExecutionOptions.ExecutionOptionBuilder().setFields(fields).setIdIndex((Integer)config.get(ID_INDEX)).setPositions(positions).setGraphSpace((String)config.get(GRAPH_SPACE)).setTag(labelName);
            config.getOptional(BATCH_SIZE).ifPresent(builder::setBatchSize);
            config.getOptional(BATCH_INTERVAL_MS).ifPresent(builder::setBatchIntervalMs);
            return builder.build();
        }
        for (int i = 2; i < columns.size(); ++i) {
            if ((Integer)config.get(RANK_ID_INDEX) == i) continue;
            positions.add(i);
            fields.add(((Column)columns.get(i)).getName());
        }
        EdgeExecutionOptions.ExecutionOptionBuilder builder = new EdgeExecutionOptions.ExecutionOptionBuilder().setFields(fields).setSrcIndex((Integer)config.get(SRC_ID_INDEX)).setDstIndex((Integer)config.get(DST_ID_INDEX)).setRankIndex((Integer)config.get(RANK_ID_INDEX)).setPositions(positions).setGraphSpace((String)config.get(GRAPH_SPACE)).setEdge(labelName);
        config.getOptional(BATCH_SIZE).ifPresent(builder::setBatchSize);
        config.getOptional(BATCH_INTERVAL_MS).ifPresent(builder::setBatchIntervalMs);
        return builder.build();
    }

    public String factoryIdentifier() {
        return IDENTIFIER;
    }

    public Set<ConfigOption<?>> requiredOptions() {
        HashSet set = new HashSet();
        set.add(METAADDRESS);
        set.add(GRAPHADDRESS);
        set.add(USERNAME);
        set.add(PASSWORD);
        return set;
    }

    public Set<ConfigOption<?>> optionalOptions() {
        HashSet set = new HashSet();
        set.add(GRAPH_SPACE);
        set.add(LABEL_NAME);
        set.add(DATA_TYPE);
        set.add(TIMEOUT);
        set.add(ID_INDEX);
        set.add(SRC_ID_INDEX);
        set.add(DST_ID_INDEX);
        set.add(RANK_ID_INDEX);
        set.add(BATCH_SIZE);
        set.add(BATCH_INTERVAL_MS);
        return set;
    }
}

