/*
 * Decompiled with CFR 0.152.
 */
package org.apache.camel.component.kafka;

import java.io.Closeable;
import java.time.Duration;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
import java.util.regex.Pattern;
import org.apache.camel.component.kafka.KafkaConsumer;
import org.apache.camel.component.kafka.PollExceptionStrategy;
import org.apache.camel.component.kafka.PollOnError;
import org.apache.camel.component.kafka.consumer.CommitManager;
import org.apache.camel.component.kafka.consumer.CommitManagers;
import org.apache.camel.component.kafka.consumer.support.KafkaRecordProcessorFacade;
import org.apache.camel.component.kafka.consumer.support.PartitionAssignmentListener;
import org.apache.camel.component.kafka.consumer.support.ProcessingResult;
import org.apache.camel.support.BridgeExceptionHandlerToErrorHandler;
import org.apache.camel.util.IOHelper;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.errors.WakeupException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class KafkaFetchRecords
implements Runnable {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaFetchRecords.class);
    private final KafkaConsumer kafkaConsumer;
    private Consumer consumer;
    private final String topicName;
    private final Pattern topicPattern;
    private final String threadId;
    private final Properties kafkaProps;
    private final Map<String, Long> lastProcessedOffset = new HashMap<String, Long>();
    private final PollExceptionStrategy pollExceptionStrategy;
    private final BridgeExceptionHandlerToErrorHandler bridge;
    private final ReentrantLock lock = new ReentrantLock();
    private CommitManager commitManager;
    private boolean retry = true;
    private boolean reconnect;
    private boolean connected;

    KafkaFetchRecords(KafkaConsumer kafkaConsumer, PollExceptionStrategy pollExceptionStrategy, BridgeExceptionHandlerToErrorHandler bridge, String topicName, Pattern topicPattern, String id, Properties kafkaProps) {
        this.kafkaConsumer = kafkaConsumer;
        this.pollExceptionStrategy = pollExceptionStrategy;
        this.bridge = bridge;
        this.topicName = topicName;
        this.topicPattern = topicPattern;
        this.threadId = topicName + "-Thread " + id;
        this.kafkaProps = kafkaProps;
    }

    @Override
    public void run() {
        if (!this.isKafkaConsumerRunnable()) {
            return;
        }
        do {
            block5: {
                try {
                    if (this.isConnected()) break block5;
                    this.createConsumer();
                    this.commitManager = CommitManagers.createCommitManager(this.consumer, this.kafkaConsumer, this.threadId, this.getPrintableTopic());
                    this.initializeConsumer();
                    this.setConnected(true);
                }
                catch (Exception e) {
                    this.setConnected(false);
                    LOG.warn("Error creating org.apache.kafka.clients.consumer.KafkaConsumer due {}", (Object)e.getMessage(), (Object)e);
                    continue;
                }
            }
            this.startPolling();
        } while ((this.isRetrying() || this.isReconnect()) && this.isKafkaConsumerRunnable());
        if (LOG.isInfoEnabled()) {
            LOG.info("Terminating KafkaConsumer thread: {} receiving from {}", (Object)this.threadId, (Object)this.getPrintableTopic());
        }
        this.safeUnsubscribe();
        IOHelper.close((Closeable)this.consumer);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void createConsumer() {
        ClassLoader threadClassLoader = Thread.currentThread().getContextClassLoader();
        try {
            Thread.currentThread().setContextClassLoader(org.apache.kafka.clients.consumer.KafkaConsumer.class.getClassLoader());
            long delay = this.kafkaConsumer.getEndpoint().getConfiguration().getPollTimeoutMs();
            String prefix = this.consumer == null ? "Connecting" : "Reconnecting";
            LOG.info("{} Kafka consumer thread ID {} with poll timeout of {} ms", new Object[]{prefix, this.threadId, delay});
            this.consumer = this.kafkaConsumer.getEndpoint().getKafkaClientFactory().getConsumer(this.kafkaProps);
        }
        finally {
            Thread.currentThread().setContextClassLoader(threadClassLoader);
        }
    }

    private void initializeConsumer() {
        this.subscribe();
        this.setConnected(false);
        this.setRetry(true);
    }

    private void subscribe() {
        PartitionAssignmentListener listener = new PartitionAssignmentListener(this.threadId, this.kafkaConsumer.getEndpoint().getConfiguration(), this.consumer, this.lastProcessedOffset, this::isRunnable, this.commitManager);
        if (LOG.isInfoEnabled()) {
            LOG.info("Subscribing {} to {}", (Object)this.threadId, (Object)this.getPrintableTopic());
        }
        if (this.topicPattern != null) {
            this.consumer.subscribe(this.topicPattern, (ConsumerRebalanceListener)listener);
        } else {
            this.consumer.subscribe(Arrays.asList(this.topicName.split(",")), (ConsumerRebalanceListener)listener);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void startPolling() {
        long partitionLastOffset = -1L;
        try {
            this.lock.lock();
            long pollTimeoutMs = this.kafkaConsumer.getEndpoint().getConfiguration().getPollTimeoutMs();
            if (LOG.isTraceEnabled()) {
                LOG.trace("Polling {} from {} with timeout: {}", new Object[]{this.threadId, this.getPrintableTopic(), pollTimeoutMs});
            }
            KafkaRecordProcessorFacade recordProcessorFacade = new KafkaRecordProcessorFacade(this.kafkaConsumer, this.lastProcessedOffset, this.threadId, this.commitManager);
            Duration pollDuration = Duration.ofMillis(pollTimeoutMs);
            while (this.isKafkaConsumerRunnable() && this.isRetrying() && this.isConnected()) {
                ConsumerRecords allRecords = this.consumer.poll(pollDuration);
                this.commitManager.processAsyncCommits();
                ProcessingResult result = recordProcessorFacade.processPolledRecords((ConsumerRecords<Object, Object>)allRecords);
                if (!result.isBreakOnErrorHit()) continue;
                LOG.debug("We hit an error ... setting flags to force reconnect");
                this.setReconnect(true);
                this.setConnected(false);
                this.setRetry(false);
            }
            if (!this.isConnected()) {
                LOG.debug("Not reconnecting, check whether to auto-commit or not ...");
                this.commitManager.commit();
            }
            this.safeUnsubscribe();
        }
        catch (InterruptException e) {
            this.kafkaConsumer.getExceptionHandler().handleException("Interrupted while consuming " + this.threadId + " from kafka topic", (Throwable)e);
            this.commitManager.commit();
            LOG.info("Unsubscribing {} from {}", (Object)this.threadId, (Object)this.getPrintableTopic());
            this.safeUnsubscribe();
            Thread.currentThread().interrupt();
        }
        catch (WakeupException e) {
            if (LOG.isTraceEnabled()) {
                LOG.trace("The kafka consumer was woken up while polling on thread {} for {}", (Object)this.threadId, (Object)this.getPrintableTopic());
            }
            this.safeUnsubscribe();
        }
        catch (Exception e) {
            if (LOG.isDebugEnabled()) {
                LOG.warn("Exception {} caught while polling {} from kafka {} at offset {}: {}", new Object[]{e.getClass().getName(), this.threadId, this.getPrintableTopic(), this.lastProcessedOffset, e.getMessage(), e});
            } else {
                LOG.warn("Exception {} caught while polling {} from kafka {} at offset {}: {}", new Object[]{e.getClass().getName(), this.threadId, this.getPrintableTopic(), this.lastProcessedOffset, e.getMessage()});
            }
            this.handleAccordingToStrategy(partitionLastOffset, e);
        }
        finally {
            this.lock.unlock();
            if (!this.isRetrying()) {
                LOG.debug("Closing consumer {}", (Object)this.threadId);
                this.safeUnsubscribe();
                IOHelper.close((Closeable)this.consumer);
            }
        }
    }

    private void handleAccordingToStrategy(long partitionLastOffset, Exception e) {
        PollOnError onError = this.pollExceptionStrategy.handleException(e);
        if (PollOnError.RETRY == onError) {
            this.handlePollRetry();
        } else if (PollOnError.RECONNECT == onError) {
            this.handlePollReconnect();
        } else if (PollOnError.ERROR_HANDLER == onError) {
            this.handlePollErrorHandler(partitionLastOffset, e);
        } else if (PollOnError.DISCARD == onError) {
            this.handlePollDiscard(partitionLastOffset);
        } else if (PollOnError.STOP == onError) {
            this.handlePollStop();
        }
    }

    private void safeUnsubscribe() {
        String printableTopic = this.getPrintableTopic();
        try {
            this.consumer.unsubscribe();
        }
        catch (IllegalStateException e) {
            LOG.warn("The consumer is likely already closed. Skipping the unsubscription from {}", (Object)printableTopic);
        }
        catch (Exception e) {
            this.kafkaConsumer.getExceptionHandler().handleException("Error unsubscribing thread " + this.threadId + " from kafka " + printableTopic, (Throwable)e);
        }
    }

    private String getPrintableTopic() {
        if (this.topicPattern != null) {
            return "topic pattern " + this.topicPattern;
        }
        return "topic " + this.topicName;
    }

    private void handlePollStop() {
        LOG.warn("Requesting the consumer to stop based on polling exception strategy");
        this.setRetry(false);
        this.setConnected(false);
    }

    private void handlePollDiscard(long partitionLastOffset) {
        LOG.warn("Requesting the consumer to discard the message and continue to the next based on polling exception strategy");
        this.seekToNextOffset(partitionLastOffset);
    }

    private void handlePollErrorHandler(long partitionLastOffset, Exception e) {
        LOG.warn("Deferring processing to the exception handler based on polling exception strategy");
        this.bridge.handleException((Throwable)e);
        this.seekToNextOffset(partitionLastOffset);
    }

    private void handlePollReconnect() {
        LOG.warn("Requesting the consumer to re-connect on the next run based on polling exception strategy");
        this.setReconnect(true);
        this.setConnected(false);
        this.setRetry(false);
    }

    private void handlePollRetry() {
        LOG.warn("Requesting the consumer to retry polling the same message based on polling exception strategy");
        this.setRetry(true);
    }

    private boolean isKafkaConsumerRunnable() {
        return this.kafkaConsumer.isRunAllowed() && !this.kafkaConsumer.isStoppingOrStopped() && !this.kafkaConsumer.isSuspendingOrSuspended();
    }

    private boolean isRunnable() {
        return this.kafkaConsumer.getEndpoint().getCamelContext().isStopping() && !this.kafkaConsumer.isRunAllowed();
    }

    private void seekToNextOffset(long partitionLastOffset) {
        block5: {
            Set tps;
            boolean logged;
            block4: {
                logged = false;
                tps = this.consumer.assignment();
                if (tps == null || partitionLastOffset == -1L) break block4;
                long next = partitionLastOffset + 1L;
                if (LOG.isInfoEnabled()) {
                    LOG.info("Consumer seeking to next offset {} to continue polling next message from {}", (Object)next, (Object)this.getPrintableTopic());
                }
                for (TopicPartition tp : tps) {
                    this.consumer.seek(tp, next);
                }
                break block5;
            }
            if (tps == null) break block5;
            for (TopicPartition tp : tps) {
                long next = this.consumer.position(tp) + 1L;
                if (!logged) {
                    LOG.info("Consumer seeking to next offset {} to continue polling next message from {}", (Object)next, (Object)this.getPrintableTopic());
                    logged = true;
                }
                this.consumer.seek(tp, next);
            }
        }
    }

    private boolean isRetrying() {
        return this.retry;
    }

    private void setRetry(boolean value) {
        this.retry = value;
    }

    private boolean isReconnect() {
        return this.reconnect;
    }

    private void setReconnect(boolean value) {
        this.reconnect = value;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void safeStop() {
        long timeout = this.kafkaConsumer.getEndpoint().getConfiguration().getShutdownTimeout();
        try {
            LOG.info("Waiting up to {} milliseconds for the processing to finish", (Object)timeout);
            if (!this.lock.tryLock(timeout, TimeUnit.MILLISECONDS)) {
                LOG.warn("The processing of the current record did not finish within {} seconds", (Object)timeout);
            }
            this.consumer.wakeup();
        }
        catch (InterruptedException e) {
            this.consumer.wakeup();
            Thread.currentThread().interrupt();
        }
        finally {
            this.lock.unlock();
        }
    }

    void stop() {
        this.safeStop();
    }

    public boolean isConnected() {
        return this.connected;
    }

    public void setConnected(boolean connected) {
        this.connected = connected;
    }
}

