/*
 * Decompiled with CFR 0.152.
 */
package com.mongodb.kafka.connect.sink;

import com.mongodb.MongoBulkWriteException;
import com.mongodb.MongoException;
import com.mongodb.bulk.BulkWriteResult;
import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoClients;
import com.mongodb.client.model.BulkWriteOptions;
import com.mongodb.client.model.WriteModel;
import com.mongodb.kafka.connect.sink.MongoSinkConfig;
import com.mongodb.kafka.connect.sink.MongoSinkTopicConfig;
import com.mongodb.kafka.connect.sink.RateLimitSettings;
import com.mongodb.kafka.connect.sink.RecordBatches;
import com.mongodb.kafka.connect.sink.converter.SinkConverter;
import com.mongodb.kafka.connect.sink.converter.SinkDocument;
import com.mongodb.kafka.connect.sink.processor.PostProcessors;
import com.mongodb.kafka.connect.sink.writemodel.strategy.WriteModelStrategy;
import com.mongodb.kafka.connect.util.ConfigHelper;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.errors.DataException;
import org.apache.kafka.connect.errors.RetriableException;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTask;
import org.bson.BsonDocument;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MongoSinkTask
extends SinkTask {
    private static final Logger LOGGER = LoggerFactory.getLogger(MongoSinkTask.class);
    private static final BulkWriteOptions BULK_WRITE_OPTIONS = new BulkWriteOptions();
    private MongoSinkConfig sinkConfig;
    private MongoClient mongoClient;
    private Map<String, AtomicInteger> remainingRetriesTopicMap;
    private SinkConverter sinkConverter = new SinkConverter();

    public String version() {
        return "1.0.0";
    }

    public void start(Map<String, String> props) {
        LOGGER.info("Starting MongoDB sink task");
        try {
            this.sinkConfig = new MongoSinkConfig(props);
            this.remainingRetriesTopicMap = new ConcurrentHashMap<String, AtomicInteger>(this.sinkConfig.getTopics().stream().collect(Collectors.toMap(t -> t, t -> new AtomicInteger(this.sinkConfig.getMongoSinkTopicConfig((String)t).getInt("max.num.retries")))));
        }
        catch (Exception e) {
            throw new ConnectException("Failed to start new task", (Throwable)e);
        }
        LOGGER.debug("Started MongoDB sink task");
    }

    public void put(Collection<SinkRecord> records) {
        if (records.isEmpty()) {
            LOGGER.debug("No sink records to process for current poll operation");
            return;
        }
        Map<String, RecordBatches> batchMapping = this.createSinkRecordBatchesPerTopic(records);
        batchMapping.forEach((topic, batches) -> {
            MongoSinkTopicConfig topicConfig = this.sinkConfig.getMongoSinkTopicConfig((String)topic);
            batches.getBufferedBatches().forEach(batch -> {
                this.processSinkRecords(topicConfig, (List<SinkRecord>)batch);
                RateLimitSettings rls = topicConfig.getRateLimitSettings();
                if (rls.isTriggered()) {
                    LOGGER.debug("Rate limit settings triggering {}ms defer timeout after processing {} further batches for topic {}", new Object[]{rls.getTimeoutMs(), rls.getEveryN(), topic});
                    try {
                        Thread.sleep(rls.getTimeoutMs());
                    }
                    catch (InterruptedException e) {
                        LOGGER.error(e.getMessage());
                    }
                }
            });
        });
    }

    public void flush(Map<TopicPartition, OffsetAndMetadata> currentOffsets) {
        LOGGER.debug("Flush called - noop");
    }

    public void stop() {
        LOGGER.info("Stopping MongoDB sink task");
        if (this.mongoClient != null) {
            this.mongoClient.close();
        }
    }

    private MongoClient getMongoClient() {
        if (this.mongoClient == null) {
            this.mongoClient = MongoClients.create(this.sinkConfig.getConnectionString(), ConfigHelper.getMongoDriverInformation());
        }
        return this.mongoClient;
    }

    private void processSinkRecords(MongoSinkTopicConfig config, List<SinkRecord> batch) {
        List<? extends WriteModel<BsonDocument>> writeModels = config.getCdcHandler().isPresent() ? this.buildWriteModelCDC(config, batch) : this.buildWriteModel(config, batch);
        try {
            if (!writeModels.isEmpty()) {
                LOGGER.debug("Bulk writing {} document(s) into collection [{}]", (Object)writeModels.size(), (Object)config.getNamespace().getFullName());
                BulkWriteResult result = this.getMongoClient().getDatabase(config.getNamespace().getDatabaseName()).getCollection(config.getNamespace().getCollectionName(), BsonDocument.class).bulkWrite(writeModels, BULK_WRITE_OPTIONS);
                LOGGER.debug("Mongodb bulk write result: {}", (Object)result);
            }
        }
        catch (MongoBulkWriteException e) {
            LOGGER.error("Mongodb bulk write (partially) failed", (Throwable)e);
            LOGGER.error(e.getWriteResult().toString());
            LOGGER.error(e.getWriteErrors().toString());
            LOGGER.error(e.getWriteConcernError().toString());
            this.checkRetriableException(config, e);
        }
        catch (MongoException e) {
            LOGGER.error("Error on mongodb operation", (Throwable)e);
            LOGGER.error("Writing {} document(s) into collection [{}] failed -> remaining retries ({})", new Object[]{writeModels.size(), config.getNamespace().getFullName(), this.remainingRetriesTopicMap.get(config.getTopic()).get()});
            this.checkRetriableException(config, e);
        }
    }

    private void checkRetriableException(MongoSinkTopicConfig config, MongoException e) {
        if (this.remainingRetriesTopicMap.get(config.getTopic()).decrementAndGet() <= 0) {
            throw new DataException("Failed to write mongodb documents despite retrying", (Throwable)e);
        }
        Integer deferRetryMs = config.getInt("retries.defer.timeout");
        LOGGER.debug("Deferring retry operation for {}ms", (Object)deferRetryMs);
        this.context.timeout((long)deferRetryMs.intValue());
        throw new RetriableException(e.getMessage(), (Throwable)e);
    }

    Map<String, RecordBatches> createSinkRecordBatchesPerTopic(Collection<SinkRecord> records) {
        LOGGER.debug("Number of sink records to process: {}", (Object)records.size());
        HashMap<String, RecordBatches> batchMapping = new HashMap<String, RecordBatches>();
        LOGGER.debug("Buffering sink records into grouped topic batches");
        records.forEach(r -> {
            RecordBatches batches = (RecordBatches)batchMapping.get(r.topic());
            if (batches == null) {
                int maxBatchSize = this.sinkConfig.getMongoSinkTopicConfig(r.topic()).getInt("max.batch.size");
                LOGGER.debug("Batch size for collection {} is at most {} record(s)", (Object)this.sinkConfig.getMongoSinkTopicConfig(r.topic()).getNamespace().getCollectionName(), (Object)maxBatchSize);
                batches = new RecordBatches(maxBatchSize, records.size());
                batchMapping.put(r.topic(), batches);
            }
            batches.buffer((SinkRecord)r);
        });
        return batchMapping;
    }

    List<? extends WriteModel<BsonDocument>> buildWriteModel(MongoSinkTopicConfig config, Collection<SinkRecord> records) {
        ArrayList docsToWrite = new ArrayList(records.size());
        LOGGER.debug("building write model for {} record(s)", (Object)records.size());
        PostProcessors postProcessors = config.getPostProcessors();
        records.forEach(record -> {
            SinkDocument doc = this.sinkConverter.convert((SinkRecord)record);
            postProcessors.getPostProcessorList().forEach(pp -> pp.process(doc, (SinkRecord)record));
            if (doc.getValueDoc().isPresent()) {
                docsToWrite.add(config.getWriteModelStrategy().createWriteModel(doc));
            } else {
                Optional<WriteModelStrategy> deleteOneModelWriteStrategy = config.getDeleteOneWriteModelStrategy();
                if (doc.getKeyDoc().isPresent() && deleteOneModelWriteStrategy.isPresent()) {
                    docsToWrite.add(deleteOneModelWriteStrategy.get().createWriteModel(doc));
                } else {
                    LOGGER.error("skipping sink record {} for which neither key doc nor value doc were present", record);
                }
            }
        });
        return docsToWrite;
    }

    List<? extends WriteModel<BsonDocument>> buildWriteModelCDC(MongoSinkTopicConfig config, Collection<SinkRecord> records) {
        LOGGER.debug("Building CDC write model for {} record(s) for topic {}", (Object)records.size(), (Object)config.getTopic());
        return records.stream().map(this.sinkConverter::convert).map(sd -> config.getCdcHandler().flatMap(c -> c.handle((SinkDocument)sd))).flatMap(o -> o.map(Stream::of).orElseGet(Stream::empty)).collect(Collectors.toList());
    }
}

