/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.connector.nebula.sink;

import com.vesoft.nebula.client.graph.net.Session;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.connector.nebula.sink.NebulaBatchExecutor;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.data.RowData;
import org.apache.flink.types.Row;

public class NebulaTableBufferReducedExecutor
implements NebulaBatchExecutor<RowData> {
    private final DynamicTableSink.DataStructureConverter dataStructureConverter;
    private final Function<Row, Row> keyExtractor;
    private final NebulaBatchExecutor<Row> insertExecutor;
    private final NebulaBatchExecutor<Row> deleteExecutor;
    private final Map<Row, Tuple2<Boolean, Row>> reduceBuffer = new HashMap<Row, Tuple2<Boolean, Row>>();

    public NebulaTableBufferReducedExecutor(DynamicTableSink.DataStructureConverter dataStructureConverter, Function<Row, Row> keyExtractor, NebulaBatchExecutor<Row> insertExecutor, NebulaBatchExecutor<Row> deleteExecutor) {
        this.dataStructureConverter = dataStructureConverter;
        this.keyExtractor = keyExtractor;
        this.insertExecutor = insertExecutor;
        this.deleteExecutor = deleteExecutor;
    }

    @Override
    public void addToBatch(RowData record) {
        boolean isUpsert;
        switch (record.getRowKind()) {
            case INSERT: 
            case UPDATE_AFTER: {
                isUpsert = true;
                break;
            }
            case UPDATE_BEFORE: 
            case DELETE: {
                isUpsert = false;
                break;
            }
            default: {
                return;
            }
        }
        Row row = (Row)this.dataStructureConverter.toExternal((Object)record);
        Row key = this.keyExtractor.apply(row);
        this.reduceBuffer.put(key, (Tuple2<Boolean, Row>)Tuple2.of((Object)isUpsert, (Object)row));
    }

    @Override
    public String executeBatch(Session session) {
        for (Tuple2<Boolean, Row> value : this.reduceBuffer.values()) {
            boolean isUpsert = (Boolean)value.f0;
            Row row = (Row)value.f1;
            if (isUpsert) {
                this.insertExecutor.addToBatch(row);
                continue;
            }
            this.deleteExecutor.addToBatch(row);
        }
        String insertErrorStatement = this.insertExecutor.executeBatch(session);
        String deleteErrorStatement = this.deleteExecutor.executeBatch(session);
        this.reduceBuffer.clear();
        String errorStatements = Stream.of(insertErrorStatement, deleteErrorStatement).filter(Objects::nonNull).collect(Collectors.joining("; "));
        return errorStatements.isEmpty() ? null : errorStatements;
    }
}

