/*
 * Decompiled with CFR 0.152.
 */
package com.mongodb.spark.sql.connector.read;

import com.mongodb.spark.sql.connector.assertions.Assertions;
import com.mongodb.spark.sql.connector.config.ReadConfig;
import com.mongodb.spark.sql.connector.read.MongoInputPartitionHelper;
import com.mongodb.spark.sql.connector.read.MongoMicroBatchInputPartition;
import com.mongodb.spark.sql.connector.read.MongoMicroBatchPartitionReaderFactory;
import com.mongodb.spark.sql.connector.schema.BsonDocumentToRowConverter;
import java.time.Instant;
import org.apache.spark.sql.connector.read.InputPartition;
import org.apache.spark.sql.connector.read.PartitionReaderFactory;
import org.apache.spark.sql.connector.read.streaming.MicroBatchStream;
import org.apache.spark.sql.connector.read.streaming.Offset;
import org.apache.spark.sql.execution.streaming.LongOffset;
import org.apache.spark.sql.types.StructType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

final class MongoMicroBatchStream
implements MicroBatchStream {
    private static final Logger LOGGER = LoggerFactory.getLogger(MongoMicroBatchStream.class);
    private final StructType schema;
    private final ReadConfig readConfig;
    private final BsonDocumentToRowConverter bsonDocumentToRowConverter;
    private volatile Long lastTime = Instant.now().getEpochSecond();
    private int partitionId;

    MongoMicroBatchStream(StructType schema, ReadConfig readConfig) {
        Assertions.validateConfig(schema, s -> !s.isEmpty(), () -> "Mongo micro batch streams require a schema to be defined");
        this.schema = schema;
        this.readConfig = readConfig;
        this.bsonDocumentToRowConverter = new BsonDocumentToRowConverter(schema, readConfig.outputExtendedJson());
    }

    public Offset latestOffset() {
        long now = Instant.now().getEpochSecond();
        if (this.lastTime < now) {
            this.lastTime = now;
        }
        return new LongOffset(this.lastTime.longValue());
    }

    public InputPartition[] planInputPartitions(Offset start, Offset end) {
        return new InputPartition[]{new MongoMicroBatchInputPartition(this.partitionId++, MongoInputPartitionHelper.generatePipeline(this.schema, this.readConfig), (LongOffset)start, (LongOffset)end)};
    }

    public PartitionReaderFactory createReaderFactory() {
        return new MongoMicroBatchPartitionReaderFactory(this.bsonDocumentToRowConverter, this.readConfig);
    }

    public Offset initialOffset() {
        return new LongOffset(-1L);
    }

    public Offset deserializeOffset(String json) {
        return new LongOffset(Long.parseLong(json));
    }

    public void commit(Offset end) {
        LOGGER.info("MicroBatchStream commit: {}", (Object)end);
    }

    public void stop() {
        LOGGER.info("MicroBatchStream stopped.");
    }
}

