/*
 * Decompiled with CFR 0.152.
 */
package com.mongodb.kafka.connect.source;

import com.mongodb.MongoClientSettings;
import com.mongodb.client.ChangeStreamIterable;
import com.mongodb.client.MongoChangeStreamCursor;
import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoClients;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoCursor;
import com.mongodb.client.MongoDatabase;
import com.mongodb.client.model.changestream.ChangeStreamDocument;
import com.mongodb.kafka.connect.source.MongoCopyDataManager;
import com.mongodb.kafka.connect.source.MongoSourceConfig;
import com.mongodb.kafka.connect.util.ConfigHelper;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
import org.bson.BsonDocument;
import org.bson.BsonDocumentWrapper;
import org.bson.BsonString;
import org.bson.Document;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MongoSourceTask
extends SourceTask {
    private static final Logger LOGGER = LoggerFactory.getLogger(MongoSourceTask.class);
    private static final String INVALIDATE = "invalidate";
    private final Time time;
    private final AtomicBoolean isRunning = new AtomicBoolean();
    private MongoSourceConfig sourceConfig;
    private MongoClient mongoClient;
    private final AtomicBoolean isCopying = new AtomicBoolean();
    private MongoCopyDataManager copyDataManager;
    private BsonDocument cachedResult;
    private BsonDocument cachedResumeAfter;
    private MongoCursor<BsonDocument> cursor;

    public MongoSourceTask() {
        this((Time)new SystemTime());
    }

    private MongoSourceTask(Time time) {
        this.time = time;
    }

    public String version() {
        return "1.0.0";
    }

    public void start(Map<String, String> props) {
        LOGGER.debug("Starting MongoDB source task");
        try {
            this.sourceConfig = new MongoSourceConfig(props);
        }
        catch (Exception e) {
            throw new ConnectException("Failed to start new task", (Throwable)e);
        }
        this.mongoClient = MongoClients.create(this.sourceConfig.getConnectionString(), ConfigHelper.getMongoDriverInformation());
        if (this.shouldCopyData()) {
            this.setCachedResultAndResumeToken();
            this.copyDataManager = new MongoCopyDataManager(this.sourceConfig, this.mongoClient);
            this.isCopying.set(true);
        } else {
            this.cursor = this.createCursor(this.sourceConfig, this.mongoClient);
        }
        this.isRunning.set(true);
        LOGGER.debug("Started MongoDB source task");
    }

    public List<SourceRecord> poll() {
        long startPoll = this.time.milliseconds();
        LOGGER.debug("Polling Start: {}", (Object)this.time.milliseconds());
        ArrayList<SourceRecord> sourceRecords = new ArrayList<SourceRecord>();
        boolean publishFullDocumentOnly = this.sourceConfig.getBoolean("publish.full.document.only");
        int maxBatchSize = this.sourceConfig.getInt("poll.max.batch.size");
        long nextUpdate = startPoll + this.sourceConfig.getLong("poll.await.time.ms");
        String prefix = this.sourceConfig.getString("topic.prefix");
        Map<String, Object> partition = this.createPartitionMap(this.sourceConfig);
        while (this.isRunning.get()) {
            Optional<BsonDocument> next = this.getNextDocument();
            long untilNext = nextUpdate - this.time.milliseconds();
            if (!next.isPresent()) {
                if (untilNext > 0L) {
                    LOGGER.debug("Waiting {} ms to poll", (Object)untilNext);
                    this.time.sleep(untilNext);
                    continue;
                }
                LOGGER.debug("Poll await time passed before reaching max batch size returning {} records", (Object)sourceRecords.size());
                return sourceRecords.isEmpty() ? null : sourceRecords;
            }
            BsonDocument changeStreamDocument = next.get();
            HashMap<String, String> sourceOffset = new HashMap<String, String>();
            sourceOffset.put("_id", changeStreamDocument.getDocument("_id").toJson());
            if (this.isCopying.get()) {
                sourceOffset.put("copy", "true");
            }
            String topicName = this.getTopicNameFromNamespace(prefix, changeStreamDocument.getDocument("ns", new BsonDocument()));
            Optional<Object> jsonDocument = Optional.empty();
            if (publishFullDocumentOnly) {
                if (changeStreamDocument.containsKey("fullDocument")) {
                    jsonDocument = Optional.of(changeStreamDocument.getDocument("fullDocument").toJson());
                }
            } else {
                jsonDocument = Optional.of(changeStreamDocument.toJson());
            }
            jsonDocument.ifPresent(json -> {
                LOGGER.trace("Adding {} to {}", json, (Object)topicName);
                String keyJson = new BsonDocument("_id", changeStreamDocument.get("_id")).toJson();
                sourceRecords.add(new SourceRecord(partition, sourceOffset, topicName, Schema.STRING_SCHEMA, (Object)keyJson, Schema.STRING_SCHEMA, json));
            });
            if (changeStreamDocument.getString("operationType", new BsonString("")).getValue().equalsIgnoreCase(INVALIDATE)) {
                LOGGER.info("Cursor has been invalidated.");
                this.cursor = null;
                return sourceRecords;
            }
            if (sourceRecords.size() != maxBatchSize) continue;
            LOGGER.debug("Reached max batch size: {}, returning records", (Object)maxBatchSize);
            return sourceRecords;
        }
        return null;
    }

    public synchronized void stop() {
        LOGGER.debug("Stopping MongoDB source task");
        this.isRunning.set(false);
        this.isCopying.set(false);
        if (this.copyDataManager != null) {
            this.copyDataManager.close();
            this.copyDataManager = null;
        }
        if (this.cursor != null) {
            this.cursor.close();
            this.cursor = null;
        }
        if (this.mongoClient != null) {
            this.mongoClient.close();
            this.mongoClient = null;
        }
    }

    MongoCursor<BsonDocument> createCursor(MongoSourceConfig cfg, MongoClient mongoClient) {
        LOGGER.debug("Creating a MongoCursor");
        ChangeStreamIterable<Document> changeStream = this.getChangeStreamIterable(cfg, mongoClient);
        BsonDocument resumeAfter = this.getResumeAfter(cfg);
        if (resumeAfter != null) {
            LOGGER.info("Resuming the change stream at the previous offset");
            changeStream.resumeAfter(resumeAfter);
            this.cachedResumeAfter = null;
        }
        LOGGER.debug("Cursor created");
        return changeStream.withDocumentClass(BsonDocument.class).iterator();
    }

    String getTopicNameFromNamespace(String prefix, BsonDocument namespaceDocument) {
        String topicName = "";
        if (namespaceDocument.containsKey("db")) {
            topicName = namespaceDocument.getString("db").getValue();
            if (namespaceDocument.containsKey("coll")) {
                topicName = String.format("%s.%s", topicName, namespaceDocument.getString("coll").getValue());
            }
        }
        return prefix.isEmpty() ? topicName : String.format("%s.%s", prefix, topicName);
    }

    Map<String, Object> createPartitionMap(MongoSourceConfig cfg) {
        return Collections.singletonMap("ns", String.format("%s/%s.%s", cfg.getString("connection.uri"), cfg.getString("database"), cfg.getString("collection")));
    }

    private boolean shouldCopyData() {
        Map<String, Object> offset = this.getOffset(this.sourceConfig);
        return this.sourceConfig.getBoolean("copy.existing") != false && (offset == null || offset.containsKey("copy"));
    }

    private void setCachedResultAndResumeToken() {
        MongoChangeStreamCursor<ChangeStreamDocument<Document>> changeStreamCursor = this.getChangeStreamIterable(this.sourceConfig, this.mongoClient).cursor();
        ChangeStreamDocument firstResult = (ChangeStreamDocument)changeStreamCursor.tryNext();
        if (firstResult != null) {
            this.cachedResult = new BsonDocumentWrapper<ChangeStreamDocument<Document>>(firstResult, ChangeStreamDocument.createCodec(Document.class, MongoClientSettings.getDefaultCodecRegistry()));
        }
        this.cachedResumeAfter = firstResult != null ? firstResult.getResumeToken() : changeStreamCursor.getResumeToken();
        changeStreamCursor.close();
    }

    private Optional<BsonDocument> getNextDocument() {
        if (this.isCopying.get()) {
            Optional<BsonDocument> result = this.copyDataManager.poll();
            if (result.isPresent() || this.copyDataManager.isCopying()) {
                return result;
            }
            this.isCopying.set(false);
            if (this.cachedResult != null) {
                result = Optional.of(this.cachedResult);
                this.cachedResult = null;
                return result;
            }
        }
        if (this.cursor == null) {
            this.cursor = this.createCursor(this.sourceConfig, this.mongoClient);
        }
        try {
            return Optional.ofNullable(this.cursor.tryNext());
        }
        catch (Exception e) {
            this.cursor.close();
            this.cursor = null;
            return Optional.empty();
        }
    }

    private ChangeStreamIterable<Document> getChangeStreamIterable(MongoSourceConfig cfg, MongoClient mongoClient) {
        ChangeStreamIterable<Document> changeStream;
        String database = cfg.getString("database");
        String collection = cfg.getString("collection");
        Optional<List<Document>> pipeline = cfg.getPipeline();
        if (database.isEmpty()) {
            LOGGER.info("Watching all changes on the cluster");
            changeStream = pipeline.map(mongoClient::watch).orElse(mongoClient.watch());
        } else if (collection.isEmpty()) {
            LOGGER.info("Watching for database changes on '{}'", (Object)database);
            MongoDatabase db = mongoClient.getDatabase(database);
            changeStream = pipeline.map(db::watch).orElse(db.watch());
        } else {
            LOGGER.info("Watching for collection changes on '{}.{}'", (Object)database, (Object)collection);
            MongoCollection<Document> coll = mongoClient.getDatabase(database).getCollection(collection);
            changeStream = pipeline.map(coll::watch).orElse(coll.watch());
        }
        int batchSize = cfg.getInt("batch.size");
        if (batchSize > 0) {
            changeStream.batchSize(batchSize);
        }
        cfg.getFullDocument().ifPresent(changeStream::fullDocument);
        cfg.getCollation().ifPresent(changeStream::collation);
        return changeStream;
    }

    private Map<String, Object> getOffset(MongoSourceConfig sourceConfig) {
        return this.context != null ? this.context.offsetStorageReader().offset(this.createPartitionMap(sourceConfig)) : null;
    }

    private BsonDocument getResumeAfter(MongoSourceConfig cfg) {
        return this.getResumeAfter(this.getOffset(cfg));
    }

    private BsonDocument getResumeAfter(Map<String, Object> offset) {
        if (this.cachedResumeAfter != null) {
            return this.cachedResumeAfter;
        }
        if (offset != null && !offset.containsKey("initialSync")) {
            return BsonDocument.parse((String)offset.get("_id"));
        }
        return null;
    }
}

