/*
 * Decompiled with CFR 0.152.
 */
package com.mongodb.kafka.connect.sink.cdc.debezium.mongodb;

import com.mongodb.client.model.ReplaceOneModel;
import com.mongodb.client.model.ReplaceOptions;
import com.mongodb.client.model.UpdateOneModel;
import com.mongodb.client.model.WriteModel;
import com.mongodb.kafka.connect.sink.cdc.CdcOperation;
import com.mongodb.kafka.connect.sink.converter.SinkDocument;
import org.apache.kafka.connect.errors.DataException;
import org.bson.BsonDocument;
import org.bson.conversions.Bson;

public class MongoDbUpdate
implements CdcOperation {
    private static final ReplaceOptions REPLACE_OPTIONS = new ReplaceOptions().upsert(true);
    private static final String JSON_DOC_FIELD_PATH = "patch";
    public static final String INTERNAL_OPLOG_FIELD_V = "$v";

    @Override
    public WriteModel<BsonDocument> perform(SinkDocument doc) {
        BsonDocument valueDoc = doc.getValueDoc().orElseThrow(() -> new DataException("Error: value doc must not be missing for update operation"));
        try {
            BsonDocument updateDoc = BsonDocument.parse(valueDoc.getString(JSON_DOC_FIELD_PATH).getValue());
            updateDoc.remove(INTERNAL_OPLOG_FIELD_V);
            if (updateDoc.containsKey("_id")) {
                BsonDocument filterDoc = new BsonDocument("_id", updateDoc.get("_id"));
                return new ReplaceOneModel<BsonDocument>((Bson)filterDoc, updateDoc, REPLACE_OPTIONS);
            }
            BsonDocument keyDoc = doc.getKeyDoc().orElseThrow(() -> new DataException("Error: key doc must not be missing for update operation"));
            BsonDocument filterDoc = BsonDocument.parse(String.format("{%s: %s}", "_id", keyDoc.getString("id").getValue()));
            return new UpdateOneModel<BsonDocument>((Bson)filterDoc, updateDoc);
        }
        catch (DataException exc) {
            throw exc;
        }
        catch (Exception exc) {
            throw new DataException(exc.getMessage(), (Throwable)exc);
        }
    }
}

