/*
 * Decompiled with CFR 0.152.
 */
package org.apache.camel.component.kafka.consumer;

import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import org.apache.camel.Exchange;
import org.apache.camel.component.kafka.KafkaConfiguration;
import org.apache.camel.component.kafka.KafkaConsumer;
import org.apache.camel.component.kafka.consumer.CommitManager;
import org.apache.camel.component.kafka.consumer.KafkaAsyncManualCommit;
import org.apache.camel.component.kafka.consumer.KafkaManualCommit;
import org.apache.camel.component.kafka.consumer.KafkaManualCommitFactory;
import org.apache.camel.spi.StateRepository;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractCommitManager
implements CommitManager {
    public static final long START_OFFSET = -1L;
    private static final Logger LOG = LoggerFactory.getLogger(AbstractCommitManager.class);
    protected final KafkaConsumer kafkaConsumer;
    protected final String threadId;
    protected final String printableTopic;
    protected final KafkaConfiguration configuration;
    private final Consumer<?, ?> consumer;

    public AbstractCommitManager(Consumer<?, ?> consumer, KafkaConsumer kafkaConsumer, String threadId, String printableTopic) {
        this.consumer = consumer;
        this.kafkaConsumer = kafkaConsumer;
        this.threadId = threadId;
        this.printableTopic = printableTopic;
        this.configuration = kafkaConsumer.getEndpoint().getConfiguration();
    }

    protected KafkaManualCommit getManualCommit(Exchange exchange, TopicPartition partition, ConsumerRecord<Object, Object> record, Collection<KafkaAsyncManualCommit> asyncCommits, KafkaManualCommitFactory manualCommitFactory) {
        StateRepository<String, String> offsetRepository = this.configuration.getOffsetRepository();
        long commitTimeoutMs = this.configuration.getCommitTimeoutMs();
        KafkaManualCommitFactory.CamelExchangePayload camelExchangePayload = new KafkaManualCommitFactory.CamelExchangePayload(exchange, this.consumer, this.threadId, offsetRepository, asyncCommits);
        KafkaManualCommitFactory.KafkaRecordPayload kafkaRecordPayload = new KafkaManualCommitFactory.KafkaRecordPayload(partition, record.offset(), commitTimeoutMs);
        return manualCommitFactory.newInstance(camelExchangePayload, kafkaRecordPayload);
    }

    @Override
    public KafkaManualCommit getManualCommit(Exchange exchange, TopicPartition partition, ConsumerRecord<Object, Object> record) {
        KafkaManualCommitFactory manualCommitFactory = this.kafkaConsumer.getEndpoint().getKafkaManualCommitFactory();
        return this.getManualCommit(exchange, partition, record, null, manualCommitFactory);
    }

    @Override
    public void commitOffsetForce(TopicPartition partition, long partitionLastOffset) {
        if (LOG.isDebugEnabled()) {
            LOG.debug("Forcing commitSync {} [topic: {} partition: {} offset: {}]", new Object[]{this.threadId, partition.topic(), partition.partition(), partitionLastOffset});
        }
        long timeout = this.configuration.getCommitTimeoutMs();
        this.consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(partitionLastOffset + 1L)), Duration.ofMillis(timeout));
    }
}

