/*
 * Decompiled with CFR 0.152.
 */
package com.mongodb.spark.sql.connector.write;

import com.mongodb.client.MongoClient;
import com.mongodb.client.model.BulkWriteOptions;
import com.mongodb.client.model.InsertOneModel;
import com.mongodb.client.model.ReplaceOneModel;
import com.mongodb.client.model.ReplaceOptions;
import com.mongodb.client.model.UpdateOneModel;
import com.mongodb.client.model.UpdateOptions;
import com.mongodb.client.model.WriteModel;
import com.mongodb.spark.sql.connector.config.WriteConfig;
import com.mongodb.spark.sql.connector.exceptions.DataException;
import com.mongodb.spark.sql.connector.schema.RowToBsonDocumentConverter;
import com.mongodb.spark.sql.connector.write.MongoWriterCommitMessage;
import java.util.ArrayList;
import java.util.List;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.connector.write.DataWriter;
import org.apache.spark.sql.connector.write.WriterCommitMessage;
import org.apache.spark.sql.types.StructType;
import org.bson.BsonDocument;
import org.bson.BsonValue;
import org.bson.conversions.Bson;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

final class MongoDataWriter
implements DataWriter<InternalRow> {
    private static final Logger LOGGER = LoggerFactory.getLogger(MongoDataWriter.class);
    private final int partitionId;
    private final long taskId;
    private final RowToBsonDocumentConverter rowToBsonDocumentConverter;
    private final WriteConfig writeConfig;
    private final long epochId;
    private final BulkWriteOptions bulkWriteOptions;
    private final List<WriteModel<BsonDocument>> writeModelList = new ArrayList<WriteModel<BsonDocument>>();
    private MongoClient mongoClient;

    MongoDataWriter(int partitionId, long taskId, StructType schema, WriteConfig writeConfig, long epochId) {
        this.partitionId = partitionId;
        this.taskId = taskId;
        this.rowToBsonDocumentConverter = new RowToBsonDocumentConverter(schema, writeConfig.convertJson());
        this.writeConfig = writeConfig;
        this.epochId = epochId;
        this.bulkWriteOptions = new BulkWriteOptions().ordered(writeConfig.isOrdered());
    }

    public void write(InternalRow row) {
        BsonDocument bsonDocument = this.rowToBsonDocumentConverter.fromRow(row);
        this.writeModelList.add(this.getWriteModel(bsonDocument));
        if (this.writeModelList.size() >= this.writeConfig.getMaxBatchSize()) {
            this.writeModels();
        }
    }

    public WriterCommitMessage commit() {
        this.writeModels();
        LOGGER.debug("Finished all writes for: PartitionId: {}, TaskId: {}.", (Object)this.partitionId, (Object)this.taskId);
        return new MongoWriterCommitMessage(this.partitionId, this.taskId, this.epochId);
    }

    public void abort() {
        LOGGER.debug("Aborting write for: PartitionId: {}, TaskId: {}.", (Object)this.partitionId, (Object)this.taskId);
        this.releaseClient();
        throw new DataException(String.format("Write aborted for: PartitionId: %s, TaskId: %s. Manual data clean up may be required.", this.partitionId, this.taskId));
    }

    public void close() {
        LOGGER.debug("Closing PartitionId: {}, TaskId: {}.", (Object)this.partitionId, (Object)this.taskId);
        this.releaseClient();
    }

    private WriteModel<BsonDocument> getWriteModel(BsonDocument bsonDocument) {
        if (!this.hasIdFields(bsonDocument)) {
            return new InsertOneModel((Object)bsonDocument);
        }
        switch (this.writeConfig.getOperationType()) {
            case INSERT: {
                return new InsertOneModel((Object)bsonDocument);
            }
            case REPLACE: {
                return new ReplaceOneModel((Bson)this.getIdFieldDocument(bsonDocument), (Object)bsonDocument, new ReplaceOptions().upsert(this.writeConfig.isUpsert()));
            }
            case UPDATE: {
                BsonDocument idFields = this.getIdFieldDocument(bsonDocument);
                idFields.keySet().forEach(arg_0 -> ((BsonDocument)bsonDocument).remove(arg_0));
                BsonDocument setDocument = new BsonDocument("$set", (BsonValue)bsonDocument);
                return new UpdateOneModel((Bson)idFields, (Bson)setDocument, new UpdateOptions().upsert(this.writeConfig.isUpsert()));
            }
        }
        throw new DataException("Unsupported operation type: " + (Object)((Object)this.writeConfig.getOperationType()));
    }

    private boolean hasIdFields(BsonDocument bsonDocument) {
        return bsonDocument.keySet().containsAll(this.writeConfig.getIdFields());
    }

    private BsonDocument getIdFieldDocument(BsonDocument bsonDocument) {
        BsonDocument idFields = new BsonDocument();
        this.writeConfig.getIdFields().forEach(k -> {
            BsonValue v = bsonDocument.get(k);
            if (v == null) {
                throw new DataException(String.format("Missing id field: '%s' from: %s", k, bsonDocument.toJson()));
            }
            idFields.append(k, v);
        });
        return idFields;
    }

    private MongoClient getMongoClient() {
        if (this.mongoClient == null) {
            this.mongoClient = this.writeConfig.getMongoClient();
        }
        return this.mongoClient;
    }

    private void releaseClient() {
        if (this.mongoClient != null) {
            this.mongoClient.close();
            this.mongoClient = null;
        }
    }

    private void writeModels() {
        if (this.writeModelList.size() > 0) {
            LOGGER.debug("Writing batch of {} operations to: {}. PartitionId: {}, TaskId: {}.", new Object[]{this.writeModelList.size(), this.writeConfig.getNamespace().getFullName(), this.partitionId, this.taskId});
            this.getMongoClient().getDatabase(this.writeConfig.getDatabaseName()).getCollection(this.writeConfig.getCollectionName(), BsonDocument.class).withWriteConcern(this.writeConfig.getWriteConcern()).bulkWrite(this.writeModelList, this.bulkWriteOptions);
            this.writeModelList.clear();
        }
    }
}

