/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.client.impl;

import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.pulsar.client.api.BatchReceivePolicy;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerEventListener;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageIdAdv;
import org.apache.pulsar.client.api.MessageListener;
import org.apache.pulsar.client.api.Messages;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.api.TopicMessageId;
import org.apache.pulsar.client.api.transaction.Transaction;
import org.apache.pulsar.client.impl.ConsumerImpl;
import org.apache.pulsar.client.impl.ConsumerInterceptors;
import org.apache.pulsar.client.impl.HandlerState;
import org.apache.pulsar.client.impl.MemoryLimitController;
import org.apache.pulsar.client.impl.MessageIdAdvUtils;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.MessageImpl;
import org.apache.pulsar.client.impl.MessagesImpl;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.TopicMessageImpl;
import org.apache.pulsar.client.impl.UnAckedMessageTracker;
import org.apache.pulsar.client.impl.UnAckedTopicMessageRedeliveryTracker;
import org.apache.pulsar.client.impl.UnAckedTopicMessageTracker;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import org.apache.pulsar.client.impl.transaction.TransactionImpl;
import org.apache.pulsar.client.util.ConsumerName;
import org.apache.pulsar.client.util.ExecutorProvider;
import org.apache.pulsar.client.util.NoOpLock;
import org.apache.pulsar.common.api.proto.CommandAck;
import org.apache.pulsar.common.api.proto.CommandSubscribe;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.apache.pulsar.common.util.collections.GrowableArrayBlockingQueue;
import org.apache.pulsar.shade.com.google.common.base.Preconditions;
import org.apache.pulsar.shade.com.google.common.collect.Queues;
import org.apache.pulsar.shade.io.netty.util.Timeout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class ConsumerBase<T>
extends HandlerState
implements Consumer<T> {
    protected static final int INITIAL_RECEIVER_QUEUE_SIZE = 1;
    protected static final double MEMORY_THRESHOLD_FOR_RECEIVER_QUEUE_SIZE_EXPANSION = 0.75;
    protected final String subscription;
    protected final ConsumerConfigurationData<T> conf;
    protected final String consumerName;
    protected final CompletableFuture<Consumer<T>> subscribeFuture;
    protected final MessageListener<T> listener;
    protected final ConsumerEventListener consumerEventListener;
    protected final ExecutorProvider executorProvider;
    protected final ExecutorService externalPinnedExecutor;
    protected final ExecutorService internalPinnedExecutor;
    protected UnAckedMessageTracker unAckedMessageTracker;
    final GrowableArrayBlockingQueue<Message<T>> incomingMessages;
    protected ConcurrentOpenHashMap<MessageIdAdv, MessageIdImpl[]> unAckedChunkedMessageIdSequenceMap;
    protected final ConcurrentLinkedQueue<CompletableFuture<Message<T>>> pendingReceives;
    protected final int maxReceiverQueueSize;
    private volatile int currentReceiverQueueSize;
    protected static final AtomicIntegerFieldUpdater<ConsumerBase> MESSAGE_LISTENER_QUEUE_SIZE_UPDATER = AtomicIntegerFieldUpdater.newUpdater(ConsumerBase.class, "messageListenerQueueSize");
    protected volatile int messageListenerQueueSize = 0;
    protected static final AtomicIntegerFieldUpdater<ConsumerBase> CURRENT_RECEIVER_QUEUE_SIZE_UPDATER = AtomicIntegerFieldUpdater.newUpdater(ConsumerBase.class, "currentReceiverQueueSize");
    protected final Schema<T> schema;
    protected final ConsumerInterceptors<T> interceptors;
    protected final BatchReceivePolicy batchReceivePolicy;
    protected final ConcurrentLinkedQueue<OpBatchReceive<T>> pendingBatchReceives;
    private static final AtomicLongFieldUpdater<ConsumerBase> INCOMING_MESSAGES_SIZE_UPDATER = AtomicLongFieldUpdater.newUpdater(ConsumerBase.class, "incomingMessagesSize");
    protected volatile long incomingMessagesSize = 0L;
    protected volatile Timeout batchReceiveTimeout = null;
    protected final Lock incomingQueueLock;
    protected static final AtomicLongFieldUpdater<ConsumerBase> CONSUMER_EPOCH = AtomicLongFieldUpdater.newUpdater(ConsumerBase.class, "consumerEpoch");
    private static final String RECONSUME_LATER_ERROR_MSG = "reconsumeLater method not supported because retryEnabled is set to false. You can enable it via ConsumerBuilder.";
    protected volatile long consumerEpoch;
    protected final AtomicBoolean scaleReceiverQueueHint = new AtomicBoolean(false);
    static final byte[] NONE_KEY = "NONE_KEY".getBytes(StandardCharsets.UTF_8);
    private static final Logger log = LoggerFactory.getLogger(ConsumerBase.class);

    protected ConsumerBase(PulsarClientImpl client, String topic, ConsumerConfigurationData<T> conf, int receiverQueueSize, ExecutorProvider executorProvider, CompletableFuture<Consumer<T>> subscribeFuture, Schema<T> schema, ConsumerInterceptors interceptors) {
        super(client, topic);
        this.maxReceiverQueueSize = receiverQueueSize;
        this.subscription = conf.getSubscriptionName();
        this.conf = conf;
        this.consumerName = conf.getConsumerName() == null ? ConsumerName.generateRandomName() : conf.getConsumerName();
        this.subscribeFuture = subscribeFuture;
        this.listener = conf.getMessageListener();
        this.consumerEventListener = conf.getConsumerEventListener();
        this.incomingMessages = new GrowableArrayBlockingQueue();
        this.unAckedChunkedMessageIdSequenceMap = ConcurrentOpenHashMap.newBuilder().build();
        this.executorProvider = executorProvider;
        this.externalPinnedExecutor = executorProvider.getExecutor();
        this.internalPinnedExecutor = client.getInternalExecutorService();
        this.pendingReceives = Queues.newConcurrentLinkedQueue();
        this.pendingBatchReceives = Queues.newConcurrentLinkedQueue();
        this.schema = schema;
        this.interceptors = interceptors;
        if (conf.getBatchReceivePolicy() != null) {
            BatchReceivePolicy userBatchReceivePolicy = conf.getBatchReceivePolicy();
            if (userBatchReceivePolicy.getMaxNumMessages() > this.maxReceiverQueueSize) {
                this.batchReceivePolicy = BatchReceivePolicy.builder().maxNumMessages(this.maxReceiverQueueSize).maxNumBytes(userBatchReceivePolicy.getMaxNumBytes()).messagesFromMultiTopicsEnabled(userBatchReceivePolicy.isMessagesFromMultiTopicsEnabled()).timeout((int)userBatchReceivePolicy.getTimeoutMs(), TimeUnit.MILLISECONDS).build();
                log.warn("BatchReceivePolicy maxNumMessages: {} is greater than maxReceiverQueueSize: {}, reset to maxReceiverQueueSize. batchReceivePolicy: {}", new Object[]{userBatchReceivePolicy.getMaxNumMessages(), this.maxReceiverQueueSize, this.batchReceivePolicy.toString()});
            } else if (userBatchReceivePolicy.getMaxNumMessages() <= 0 && userBatchReceivePolicy.getMaxNumBytes() <= 0) {
                this.batchReceivePolicy = BatchReceivePolicy.builder().maxNumMessages(BatchReceivePolicy.DEFAULT_POLICY.getMaxNumMessages()).maxNumBytes(BatchReceivePolicy.DEFAULT_POLICY.getMaxNumBytes()).messagesFromMultiTopicsEnabled(userBatchReceivePolicy.isMessagesFromMultiTopicsEnabled()).timeout((int)userBatchReceivePolicy.getTimeoutMs(), TimeUnit.MILLISECONDS).build();
                log.warn("BatchReceivePolicy maxNumMessages: {} or maxNumBytes: {} is less than 0. Reset to DEFAULT_POLICY. batchReceivePolicy: {}", new Object[]{userBatchReceivePolicy.getMaxNumMessages(), userBatchReceivePolicy.getMaxNumBytes(), this.batchReceivePolicy.toString()});
            } else {
                this.batchReceivePolicy = conf.getBatchReceivePolicy();
            }
        } else {
            this.batchReceivePolicy = BatchReceivePolicy.DEFAULT_POLICY;
        }
        this.incomingQueueLock = this.getSubType() == CommandSubscribe.SubType.Failover || this.getSubType() == CommandSubscribe.SubType.Exclusive ? new ReentrantLock() : new NoOpLock();
        this.unAckedMessageTracker = conf.getAckTimeoutMillis() != 0L ? (conf.getAckTimeoutRedeliveryBackoff() != null ? new UnAckedTopicMessageRedeliveryTracker(client, this, conf) : new UnAckedTopicMessageTracker(client, this, conf)) : UnAckedMessageTracker.UNACKED_MESSAGE_TRACKER_DISABLED;
        this.initReceiverQueueSize();
    }

    protected UnAckedMessageTracker getUnAckedMessageTracker() {
        return this.unAckedMessageTracker;
    }

    protected void triggerBatchReceiveTimeoutTask() {
        if (!this.hasBatchReceiveTimeout() && this.batchReceivePolicy.getTimeoutMs() > 0L) {
            this.batchReceiveTimeout = this.client.timer().newTimeout(this::pendingBatchReceiveTask, this.batchReceivePolicy.getTimeoutMs(), TimeUnit.MILLISECONDS);
        }
    }

    public void initReceiverQueueSize() {
        if (this.conf.isAutoScaledReceiverQueueSizeEnabled()) {
            CURRENT_RECEIVER_QUEUE_SIZE_UPDATER.set(this, this.minReceiverQueueSize());
        } else {
            CURRENT_RECEIVER_QUEUE_SIZE_UPDATER.set(this, this.maxReceiverQueueSize);
        }
    }

    public abstract int minReceiverQueueSize();

    protected void expectMoreIncomingMessages() {
        if (!this.conf.isAutoScaledReceiverQueueSizeEnabled()) {
            return;
        }
        double usage = this.getMemoryLimitController().map(MemoryLimitController::currentUsagePercent).orElse(0.0);
        if (usage < 0.75 && this.scaleReceiverQueueHint.compareAndSet(true, false)) {
            int oldSize = this.getCurrentReceiverQueueSize();
            int newSize = Math.min(this.maxReceiverQueueSize, oldSize * 2);
            this.setCurrentReceiverQueueSize(newSize);
        }
    }

    protected void trackUnAckedMsgIfNoListener(MessageId messageId, int redeliveryCount) {
        if (this.listener == null) {
            this.unAckedMessageTracker.add(messageId, redeliveryCount);
        }
    }

    protected void reduceCurrentReceiverQueueSize() {
        int newSize;
        if (!this.conf.isAutoScaledReceiverQueueSizeEnabled()) {
            return;
        }
        int oldSize = this.getCurrentReceiverQueueSize();
        if (oldSize > (newSize = Math.max(this.minReceiverQueueSize(), oldSize / 2))) {
            this.setCurrentReceiverQueueSize(newSize);
        }
    }

    public Message<T> receive() throws PulsarClientException {
        if (this.listener != null) {
            throw new PulsarClientException.InvalidConfigurationException("Cannot use receive() when a listener has been set");
        }
        this.verifyConsumerState();
        return this.internalReceive();
    }

    public CompletableFuture<Message<T>> receiveAsync() {
        if (this.listener != null) {
            return FutureUtil.failedFuture((Throwable)new PulsarClientException.InvalidConfigurationException("Cannot use receive() when a listener has been set"));
        }
        try {
            this.verifyConsumerState();
        }
        catch (PulsarClientException e) {
            return FutureUtil.failedFuture(e);
        }
        return this.internalReceiveAsync();
    }

    protected abstract Message<T> internalReceive() throws PulsarClientException;

    protected abstract CompletableFuture<Message<T>> internalReceiveAsync();

    public Message<T> receive(int timeout, TimeUnit unit) throws PulsarClientException {
        if (this.getCurrentReceiverQueueSize() == 0) {
            throw new PulsarClientException.InvalidConfigurationException("Can't use receive with timeout, if the queue size is 0");
        }
        if (this.listener != null) {
            throw new PulsarClientException.InvalidConfigurationException("Cannot use receive() when a listener has been set");
        }
        this.verifyConsumerState();
        return this.internalReceive(timeout, unit);
    }

    protected abstract Message<T> internalReceive(long var1, TimeUnit var3) throws PulsarClientException;

    public Messages<T> batchReceive() throws PulsarClientException {
        this.verifyBatchReceive();
        this.verifyConsumerState();
        return this.internalBatchReceive();
    }

    public CompletableFuture<Messages<T>> batchReceiveAsync() {
        try {
            this.verifyBatchReceive();
            this.verifyConsumerState();
            return this.internalBatchReceiveAsync();
        }
        catch (PulsarClientException e) {
            return FutureUtil.failedFuture(e);
        }
    }

    protected boolean hasNextPendingReceive() {
        return !this.pendingReceives.isEmpty();
    }

    protected CompletableFuture<Message<T>> nextPendingReceive() {
        CompletableFuture<Message<T>> receivedFuture;
        while ((receivedFuture = this.pendingReceives.poll()) != null && receivedFuture.isDone()) {
        }
        return receivedFuture;
    }

    protected void completePendingReceive(CompletableFuture<Message<T>> receivedFuture, Message<T> message) {
        this.getInternalExecutor(message).execute(() -> {
            if (!receivedFuture.complete(message)) {
                log.warn("Race condition detected. receive future was already completed (cancelled={}) and message was dropped. message={}", (Object)receivedFuture.isCancelled(), (Object)message);
            }
        });
    }

    protected CompletableFuture<Void> failPendingReceive() {
        if (this.internalPinnedExecutor.isShutdown()) {
            this.failPendingReceives();
            this.failPendingBatchReceives();
            return CompletableFuture.completedFuture(null);
        }
        CompletableFuture<Void> future = new CompletableFuture<Void>();
        this.internalPinnedExecutor.execute(() -> {
            try {
                this.failPendingReceives();
                this.failPendingBatchReceives();
            }
            finally {
                future.complete(null);
            }
        });
        return future;
    }

    private void failPendingReceives() {
        CompletableFuture<Message<T>> receiveFuture;
        while (!this.pendingReceives.isEmpty() && (receiveFuture = this.pendingReceives.poll()) != null) {
            if (receiveFuture.isDone()) continue;
            receiveFuture.completeExceptionally((Throwable)new PulsarClientException.AlreadyClosedException(String.format("The consumer which subscribes the topic %s with subscription name %s was already closed when cleaning and closing the consumers", this.topic, this.subscription)));
        }
    }

    private void failPendingBatchReceives() {
        OpBatchReceive<T> opBatchReceive;
        while (this.hasNextBatchReceive() && (opBatchReceive = this.nextBatchReceive()) != null && opBatchReceive.future != null) {
            if (opBatchReceive.future.isDone()) continue;
            opBatchReceive.future.completeExceptionally((Throwable)new PulsarClientException.AlreadyClosedException(String.format("The consumer which subscribes the topic %s with subscription name %s was already closed when cleaning and closing the consumers", this.topic, this.subscription)));
        }
    }

    protected abstract Messages<T> internalBatchReceive() throws PulsarClientException;

    protected abstract CompletableFuture<Messages<T>> internalBatchReceiveAsync();

    private static void validateMessageId(Message<?> message) throws PulsarClientException {
        if (message == null) {
            throw new PulsarClientException.InvalidMessageException("Non-null message is required");
        }
        if (message.getMessageId() == null) {
            throw new PulsarClientException.InvalidMessageException("Cannot handle message with null messageId");
        }
    }

    private static void validateMessageId(MessageId messageId) throws PulsarClientException {
        if (messageId == null) {
            throw new PulsarClientException.InvalidMessageException("Cannot handle message with null messageId");
        }
    }

    public void acknowledge(Message<?> message) throws PulsarClientException {
        ConsumerBase.validateMessageId(message);
        this.acknowledge(message.getMessageId());
    }

    public void acknowledge(MessageId messageId) throws PulsarClientException {
        ConsumerBase.validateMessageId(messageId);
        try {
            this.acknowledgeAsync(messageId).get();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw PulsarClientException.unwrap((Throwable)e);
        }
        catch (ExecutionException e) {
            throw PulsarClientException.unwrap((Throwable)e);
        }
    }

    public void acknowledge(List<MessageId> messageIdList) throws PulsarClientException {
        try {
            this.acknowledgeAsync(messageIdList).get();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw PulsarClientException.unwrap((Throwable)e);
        }
        catch (ExecutionException e) {
            throw PulsarClientException.unwrap((Throwable)e);
        }
    }

    public void acknowledge(Messages<?> messages) throws PulsarClientException {
        try {
            this.acknowledgeAsync(messages).get();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw PulsarClientException.unwrap((Throwable)e);
        }
        catch (ExecutionException e) {
            throw PulsarClientException.unwrap((Throwable)e);
        }
    }

    public void reconsumeLater(Message<?> message, long delayTime, TimeUnit unit) throws PulsarClientException {
        this.reconsumeLater(message, null, delayTime, unit);
    }

    public void reconsumeLater(Message<?> message, Map<String, String> customProperties, long delayTime, TimeUnit unit) throws PulsarClientException {
        if (!this.conf.isRetryEnable()) {
            throw new PulsarClientException(RECONSUME_LATER_ERROR_MSG);
        }
        try {
            this.reconsumeLaterAsync(message, customProperties, delayTime, unit).get();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw PulsarClientException.unwrap((Throwable)e);
        }
        catch (ExecutionException e) {
            throw PulsarClientException.unwrap((Throwable)e);
        }
    }

    public void reconsumeLater(Messages<?> messages, long delayTime, TimeUnit unit) throws PulsarClientException {
        try {
            this.reconsumeLaterAsync(messages, delayTime, unit).get();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw PulsarClientException.unwrap((Throwable)e);
        }
        catch (ExecutionException e) {
            throw PulsarClientException.unwrap((Throwable)e);
        }
    }

    public void acknowledgeCumulative(Message<?> message) throws PulsarClientException {
        ConsumerBase.validateMessageId(message);
        this.acknowledgeCumulative(message.getMessageId());
    }

    public void acknowledgeCumulative(MessageId messageId) throws PulsarClientException {
        ConsumerBase.validateMessageId(messageId);
        try {
            this.acknowledgeCumulativeAsync(messageId).get();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw PulsarClientException.unwrap((Throwable)e);
        }
        catch (ExecutionException e) {
            throw PulsarClientException.unwrap((Throwable)e);
        }
    }

    public void reconsumeLaterCumulative(Message<?> message, long delayTime, TimeUnit unit) throws PulsarClientException {
        try {
            this.reconsumeLaterCumulativeAsync(message, delayTime, unit).get();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw PulsarClientException.unwrap((Throwable)e);
        }
        catch (ExecutionException e) {
            throw PulsarClientException.unwrap((Throwable)e);
        }
    }

    public CompletableFuture<Void> acknowledgeAsync(Message<?> message) {
        try {
            ConsumerBase.validateMessageId(message);
        }
        catch (PulsarClientException e) {
            return FutureUtil.failedFuture(e);
        }
        return this.acknowledgeAsync(message.getMessageId());
    }

    public CompletableFuture<Void> acknowledgeAsync(Messages<?> messages) {
        return this.acknowledgeAsync(messages, null);
    }

    public CompletableFuture<Void> acknowledgeAsync(Messages<?> messages, Transaction txn) {
        ArrayList<MessageId> messageIds = new ArrayList<MessageId>(messages.size());
        for (Message message : messages) {
            try {
                ConsumerBase.validateMessageId(message);
            }
            catch (PulsarClientException e) {
                return FutureUtil.failedFuture(e);
            }
            messageIds.add(message.getMessageId());
        }
        if (txn != null) {
            return this.acknowledgeAsync(messageIds, txn);
        }
        return this.acknowledgeAsync(messageIds);
    }

    public CompletableFuture<Void> acknowledgeAsync(List<MessageId> messageIdList) {
        return this.doAcknowledgeWithTxn(messageIdList, CommandAck.AckType.Individual, Collections.emptyMap(), null);
    }

    public CompletableFuture<Void> acknowledgeAsync(List<MessageId> messageIdList, Transaction txn) {
        return this.doAcknowledgeWithTxn(messageIdList, CommandAck.AckType.Individual, Collections.emptyMap(), (TransactionImpl)txn);
    }

    public CompletableFuture<Void> reconsumeLaterAsync(Message<?> message, long delayTime, TimeUnit unit) {
        return this.reconsumeLaterAsync(message, null, delayTime, unit);
    }

    public CompletableFuture<Void> reconsumeLaterAsync(Message<?> message, Map<String, String> customProperties, long delayTime, TimeUnit unit) {
        if (!this.conf.isRetryEnable()) {
            return FutureUtil.failedFuture(new PulsarClientException(RECONSUME_LATER_ERROR_MSG));
        }
        try {
            ConsumerBase.validateMessageId(message);
        }
        catch (PulsarClientException e) {
            return FutureUtil.failedFuture(e);
        }
        return this.doReconsumeLater(message, CommandAck.AckType.Individual, customProperties, delayTime, unit);
    }

    public CompletableFuture<Void> reconsumeLaterAsync(Messages<?> messages, long delayTime, TimeUnit unit) {
        for (Message message2 : messages) {
            try {
                ConsumerBase.validateMessageId(message2);
            }
            catch (PulsarClientException e) {
                return FutureUtil.failedFuture(e);
            }
        }
        messages.forEach(message -> this.reconsumeLaterAsync((Message<?>)message, delayTime, unit));
        return CompletableFuture.completedFuture(null);
    }

    public CompletableFuture<Void> acknowledgeCumulativeAsync(Message<?> message) {
        try {
            ConsumerBase.validateMessageId(message);
        }
        catch (PulsarClientException e) {
            return FutureUtil.failedFuture(e);
        }
        return this.acknowledgeCumulativeAsync(message.getMessageId());
    }

    public CompletableFuture<Void> reconsumeLaterCumulativeAsync(Message<?> message, long delayTime, TimeUnit unit) {
        return this.reconsumeLaterCumulativeAsync(message, null, delayTime, unit);
    }

    public CompletableFuture<Void> reconsumeLaterCumulativeAsync(Message<?> message, Map<String, String> customProperties, long delayTime, TimeUnit unit) {
        if (!this.conf.isRetryEnable()) {
            return FutureUtil.failedFuture(new PulsarClientException(RECONSUME_LATER_ERROR_MSG));
        }
        if (!this.isCumulativeAcknowledgementAllowed(this.conf.getSubscriptionType())) {
            return FutureUtil.failedFuture((Throwable)new PulsarClientException.InvalidConfigurationException("Cannot use cumulative acks on a non-exclusive subscription"));
        }
        return this.doReconsumeLater(message, CommandAck.AckType.Cumulative, customProperties, delayTime, unit);
    }

    public CompletableFuture<Void> acknowledgeAsync(MessageId messageId) {
        return this.acknowledgeAsync(messageId, null);
    }

    public CompletableFuture<Void> acknowledgeAsync(MessageId messageId, Transaction txn) {
        TransactionImpl txnImpl = null;
        if (null != txn) {
            Preconditions.checkArgument(txn instanceof TransactionImpl);
            txnImpl = (TransactionImpl)txn;
            CompletableFuture<Void> completableFuture = new CompletableFuture<Void>();
            if (!txnImpl.checkIfOpen(completableFuture)) {
                return completableFuture;
            }
        }
        return this.doAcknowledgeWithTxn(messageId, CommandAck.AckType.Individual, Collections.emptyMap(), txnImpl);
    }

    public CompletableFuture<Void> acknowledgeCumulativeAsync(MessageId messageId) {
        return this.acknowledgeCumulativeAsync(messageId, null);
    }

    public CompletableFuture<Void> acknowledgeCumulativeAsync(MessageId messageId, Transaction txn) {
        if (!this.isCumulativeAcknowledgementAllowed(this.conf.getSubscriptionType())) {
            return FutureUtil.failedFuture((Throwable)new PulsarClientException.InvalidConfigurationException("Cannot use cumulative acks on a non-exclusive/non-failover subscription"));
        }
        TransactionImpl txnImpl = null;
        if (null != txn) {
            Preconditions.checkArgument(txn instanceof TransactionImpl);
            txnImpl = (TransactionImpl)txn;
        }
        return this.doAcknowledgeWithTxn(messageId, CommandAck.AckType.Cumulative, Collections.emptyMap(), txnImpl);
    }

    public void negativeAcknowledge(Message<?> message) {
        this.negativeAcknowledge(message.getMessageId());
    }

    protected CompletableFuture<Void> doAcknowledgeWithTxn(List<MessageId> messageIdList, CommandAck.AckType ackType, Map<String, Long> properties, TransactionImpl txn) {
        CompletionStage<Void> ackFuture;
        if (txn != null && this instanceof ConsumerImpl) {
            ackFuture = txn.registerAckedTopic(this.getTopic(), this.subscription).thenCompose(ignored -> this.doAcknowledge(messageIdList, ackType, properties, txn));
            txn.registerAckOp((CompletableFuture<Void>)ackFuture);
        } else {
            ackFuture = this.doAcknowledge(messageIdList, ackType, properties, txn);
        }
        return ackFuture;
    }

    protected CompletableFuture<Void> doAcknowledgeWithTxn(MessageId messageId, CommandAck.AckType ackType, Map<String, Long> properties, TransactionImpl txn) {
        if (txn != null && this instanceof ConsumerImpl) {
            CompletionStage ackFuture = txn.registerAckedTopic(this.getTopic(), this.subscription).thenCompose(ignored -> this.doAcknowledge(messageId, ackType, properties, txn));
            txn.registerAckOp((CompletableFuture<Void>)ackFuture);
            return ackFuture;
        }
        CompletableFuture<Void> ackFuture = this.doAcknowledge(messageId, ackType, properties, txn);
        return ackFuture;
    }

    protected abstract CompletableFuture<Void> doAcknowledge(MessageId var1, CommandAck.AckType var2, Map<String, Long> var3, TransactionImpl var4);

    protected abstract CompletableFuture<Void> doAcknowledge(List<MessageId> var1, CommandAck.AckType var2, Map<String, Long> var3, TransactionImpl var4);

    protected abstract CompletableFuture<Void> doReconsumeLater(Message<?> var1, CommandAck.AckType var2, Map<String, String> var3, long var4, TimeUnit var6);

    public void negativeAcknowledge(Messages<?> messages) {
        messages.forEach(this::negativeAcknowledge);
    }

    public void unsubscribe() throws PulsarClientException {
        try {
            this.unsubscribeAsync().get();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw PulsarClientException.unwrap((Throwable)e);
        }
        catch (ExecutionException e) {
            throw PulsarClientException.unwrap((Throwable)e);
        }
    }

    public abstract CompletableFuture<Void> unsubscribeAsync();

    public void close() throws PulsarClientException {
        try {
            this.closeAsync().get();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw PulsarClientException.unwrap((Throwable)e);
        }
        catch (ExecutionException e) {
            throw PulsarClientException.unwrap((Throwable)e);
        }
    }

    public abstract CompletableFuture<Void> closeAsync();

    @Deprecated
    public MessageId getLastMessageId() throws PulsarClientException {
        try {
            return this.getLastMessageIdAsync().get();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw PulsarClientException.unwrap((Throwable)e);
        }
        catch (ExecutionException e) {
            throw PulsarClientException.unwrap((Throwable)e);
        }
    }

    @Deprecated
    public abstract CompletableFuture<MessageId> getLastMessageIdAsync();

    public List<TopicMessageId> getLastMessageIds() throws PulsarClientException {
        try {
            return (List)this.getLastMessageIdsAsync().get();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw PulsarClientException.unwrap((Throwable)e);
        }
        catch (ExecutionException e) {
            throw PulsarClientException.unwrap((Throwable)e);
        }
    }

    private boolean isCumulativeAcknowledgementAllowed(SubscriptionType type) {
        return SubscriptionType.Shared != type && SubscriptionType.Key_Shared != type;
    }

    protected CommandSubscribe.SubType getSubType() {
        SubscriptionType type = this.conf.getSubscriptionType();
        switch (type) {
            case Exclusive: {
                return CommandSubscribe.SubType.Exclusive;
            }
            case Shared: {
                return CommandSubscribe.SubType.Shared;
            }
            case Failover: {
                return CommandSubscribe.SubType.Failover;
            }
            case Key_Shared: {
                return CommandSubscribe.SubType.Key_Shared;
            }
        }
        return null;
    }

    public abstract int getAvailablePermits();

    public abstract int numMessagesInQueue();

    public CompletableFuture<Consumer<T>> subscribeFuture() {
        return this.subscribeFuture;
    }

    public String getTopic() {
        return this.topic;
    }

    public String getSubscription() {
        return this.subscription;
    }

    public String getConsumerName() {
        return this.consumerName;
    }

    protected abstract void redeliverUnacknowledgedMessages(Set<MessageId> var1);

    public String toString() {
        return "ConsumerBase{subscription='" + this.subscription + '\'' + ", consumerName='" + this.consumerName + '\'' + ", topic='" + this.topic + '\'' + '}';
    }

    protected Message<T> beforeConsume(Message<T> message) {
        if (this.interceptors != null) {
            return this.interceptors.beforeConsume(this, message);
        }
        return message;
    }

    protected void onAcknowledge(MessageId messageId, Throwable exception) {
        if (this.interceptors != null) {
            this.interceptors.onAcknowledge(this, messageId, exception);
        }
    }

    protected void onAcknowledge(List<MessageId> messageIds, Throwable exception) {
        if (this.interceptors != null) {
            messageIds.forEach(messageId -> this.interceptors.onAcknowledge(this, (MessageId)messageId, exception));
        }
    }

    protected void onAcknowledgeCumulative(MessageId messageId, Throwable exception) {
        if (this.interceptors != null) {
            this.interceptors.onAcknowledgeCumulative(this, messageId, exception);
        }
    }

    protected void onAcknowledgeCumulative(List<MessageId> messageIds, Throwable exception) {
        if (this.interceptors != null) {
            messageIds.forEach(messageId -> this.interceptors.onAcknowledgeCumulative(this, (MessageId)messageId, exception));
        }
    }

    protected void onNegativeAcksSend(Set<MessageId> messageIds) {
        if (this.interceptors != null) {
            this.interceptors.onNegativeAcksSend(this, messageIds);
        }
    }

    protected void onAckTimeoutSend(Set<MessageId> messageIds) {
        if (this.interceptors != null) {
            this.interceptors.onAckTimeoutSend(this, messageIds);
        }
    }

    protected void onPartitionsChange(String topicName, int partitions) {
        if (this.interceptors != null) {
            this.interceptors.onPartitionsChange(topicName, partitions);
        }
    }

    protected boolean canEnqueueMessage(Message<T> message) {
        return true;
    }

    protected boolean enqueueMessageAndCheckBatchReceive(Message<T> message) {
        int messageSize = message.size();
        this.incomingQueueLock.lock();
        try {
            if (this.canEnqueueMessage(message) && this.incomingMessages.offer(message)) {
                INCOMING_MESSAGES_SIZE_UPDATER.addAndGet(this, messageSize);
                this.getMemoryLimitController().ifPresent(limiter -> limiter.forceReserveMemory(messageSize));
                this.updateAutoScaleReceiverQueueHint();
            }
        }
        finally {
            this.incomingQueueLock.unlock();
        }
        return this.hasEnoughMessagesForBatchReceive();
    }

    protected abstract void updateAutoScaleReceiverQueueHint();

    protected boolean hasEnoughMessagesForBatchReceive() {
        if (this.batchReceivePolicy.getMaxNumMessages() <= 0 && this.batchReceivePolicy.getMaxNumBytes() <= 0) {
            return false;
        }
        return this.batchReceivePolicy.getMaxNumMessages() > 0 && this.incomingMessages.size() >= this.batchReceivePolicy.getMaxNumMessages() || this.batchReceivePolicy.getMaxNumBytes() > 0 && this.getIncomingMessageSize() >= (long)this.batchReceivePolicy.getMaxNumBytes();
    }

    private void verifyConsumerState() throws PulsarClientException {
        switch (this.getState()) {
            case Ready: 
            case Connecting: {
                break;
            }
            case Closing: 
            case Closed: {
                throw new PulsarClientException.AlreadyClosedException("Consumer already closed");
            }
            case Terminated: {
                throw new PulsarClientException.AlreadyClosedException("Topic was terminated");
            }
            case Failed: 
            case Uninitialized: {
                throw new PulsarClientException.NotConnectedException();
            }
        }
    }

    private void verifyBatchReceive() throws PulsarClientException {
        if (this.listener != null) {
            throw new PulsarClientException.InvalidConfigurationException("Cannot use receive() when a listener has been set");
        }
        if (this.getCurrentReceiverQueueSize() == 0) {
            throw new PulsarClientException.InvalidConfigurationException("Can't use batch receive, if the queue size is 0");
        }
    }

    protected void notifyPendingBatchReceivedCallBack() {
        OpBatchReceive<T> opBatchReceive = this.nextBatchReceive();
        if (opBatchReceive == null) {
            return;
        }
        this.notifyPendingBatchReceivedCallBack(opBatchReceive.future);
    }

    private boolean hasNextBatchReceive() {
        return !this.pendingBatchReceives.isEmpty();
    }

    private OpBatchReceive<T> nextBatchReceive() {
        OpBatchReceive<T> opBatchReceive = null;
        while (opBatchReceive == null) {
            opBatchReceive = this.pendingBatchReceives.poll();
            if (opBatchReceive == null) {
                return null;
            }
            if (opBatchReceive.future != null && !opBatchReceive.future.isDone()) continue;
            opBatchReceive = null;
        }
        return opBatchReceive;
    }

    protected final void notifyPendingBatchReceivedCallBack(CompletableFuture<Messages<T>> batchReceiveFuture) {
        MessagesImpl<T> messages = this.getNewMessagesImpl();
        Message<T> msgPeeked = this.incomingMessages.peek();
        String topicName = null;
        while (msgPeeked != null && messages.canAdd(msgPeeked)) {
            Message<T> msg;
            if (!this.batchReceivePolicy.isMessagesFromMultiTopicsEnabled()) {
                if (messages.size() == 1) {
                    topicName = messages.getMessageList().get(0).getTopicName();
                }
                if (topicName != null && !topicName.equals(msgPeeked.getTopicName())) break;
            }
            if ((msg = this.incomingMessages.poll()) != null) {
                this.messageProcessed(msg);
                Message<T> interceptMsg = this.beforeConsume(msg);
                messages.add(interceptMsg);
            }
            msgPeeked = this.incomingMessages.peek();
        }
        this.completePendingBatchReceive(batchReceiveFuture, messages);
    }

    protected void completePendingBatchReceive(CompletableFuture<Messages<T>> future, Messages<T> messages) {
        if (!future.complete(messages)) {
            log.warn("Race condition detected. batch receive future was already completed (cancelled={}) and messages were dropped. messages={}", (Object)future.isCancelled(), messages);
        }
    }

    protected abstract void messageProcessed(Message<?> var1);

    private void pendingBatchReceiveTask(Timeout timeout) {
        this.internalPinnedExecutor.execute(() -> this.doPendingBatchReceiveTask(timeout));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void doPendingBatchReceiveTask(Timeout timeout) {
        if (timeout.isCancelled()) {
            return;
        }
        boolean hasPendingReceives = false;
        ConsumerBase consumerBase = this;
        synchronized (consumerBase) {
            if (this.getState() == HandlerState.State.Closing || this.getState() == HandlerState.State.Closed) {
                return;
            }
            long timeToWaitMs = this.batchReceivePolicy.getTimeoutMs();
            OpBatchReceive<T> opBatchReceive = this.pendingBatchReceives.peek();
            while (opBatchReceive != null) {
                long diff = this.batchReceivePolicy.getTimeoutMs() - TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - opBatchReceive.createdAt);
                if (diff > 0L) {
                    timeToWaitMs = diff;
                    hasPendingReceives = true;
                    break;
                }
                this.completeOpBatchReceive(opBatchReceive);
                OpBatchReceive<T> removed = this.pendingBatchReceives.poll();
                if (removed != opBatchReceive) {
                    log.error("Race condition in consumer {} (should not cause data loss).  Concurrent operations on pendingBatchReceives is not safe", (Object)this.consumerName);
                    if (removed != null && !removed.future.isDone()) {
                        this.completeOpBatchReceive(removed);
                    }
                }
                opBatchReceive = this.pendingBatchReceives.peek();
            }
            this.batchReceiveTimeout = hasPendingReceives ? this.client.timer().newTimeout(this::pendingBatchReceiveTask, timeToWaitMs, TimeUnit.MILLISECONDS) : null;
        }
    }

    protected void tryTriggerListener() {
        if (this.listener != null) {
            this.triggerListener();
        }
    }

    private void triggerListener() {
        this.internalPinnedExecutor.execute(() -> {
            try {
                Message<T> msg;
                do {
                    if ((msg = this.internalReceive(0L, TimeUnit.MILLISECONDS)) != null) {
                        Message<T> finalMsg = msg;
                        MESSAGE_LISTENER_QUEUE_SIZE_UPDATER.incrementAndGet(this);
                        if (SubscriptionType.Key_Shared == this.conf.getSubscriptionType()) {
                            this.executorProvider.getExecutor(this.peekMessageKey(msg)).execute(() -> this.callMessageListener(finalMsg));
                            continue;
                        }
                        this.getExternalExecutor(msg).execute(() -> this.callMessageListener(finalMsg));
                        continue;
                    }
                    if (!log.isDebugEnabled()) continue;
                    log.debug("[{}] [{}] Message has been cleared from the queue", (Object)this.topic, (Object)this.subscription);
                } while (msg != null);
            }
            catch (PulsarClientException e) {
                log.warn("[{}] [{}] Failed to dequeue the message for listener", new Object[]{this.topic, this.subscription, e});
            }
        });
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void callMessageListener(Message<T> msg) {
        try {
            if (log.isDebugEnabled()) {
                log.debug("[{}][{}] Calling message listener for message {}", new Object[]{this.topic, this.subscription, msg.getMessageId()});
            }
            ConsumerImpl receivedConsumer = msg instanceof TopicMessageImpl ? ((TopicMessageImpl)msg).receivedByconsumer : (ConsumerImpl)this;
            receivedConsumer.increaseAvailablePermits((MessageImpl)(msg instanceof TopicMessageImpl ? ((TopicMessageImpl)msg).getMessage() : msg));
            MessageId id = this instanceof ConsumerImpl ? MessageIdAdvUtils.discardBatch(msg.getMessageId()) : msg.getMessageId();
            this.unAckedMessageTracker.add(id, msg.getRedeliveryCount());
            this.listener.received((Consumer)this, msg);
        }
        catch (Throwable t2) {
            log.error("[{}][{}] Message listener error in processing message: {}", new Object[]{this.topic, this.subscription, msg.getMessageId(), t2});
        }
        finally {
            MESSAGE_LISTENER_QUEUE_SIZE_UPDATER.decrementAndGet(this);
        }
    }

    protected byte[] peekMessageKey(Message<T> msg) {
        byte[] key = NONE_KEY;
        if (msg.hasKey()) {
            key = msg.getKeyBytes();
        }
        if (msg.hasOrderingKey()) {
            key = msg.getOrderingKey();
        }
        return key;
    }

    protected MessagesImpl<T> getNewMessagesImpl() {
        return new MessagesImpl(this.batchReceivePolicy.getMaxNumMessages(), this.batchReceivePolicy.getMaxNumBytes());
    }

    protected boolean hasPendingBatchReceive() {
        return this.pendingBatchReceives != null && this.hasNextBatchReceive();
    }

    Optional<MemoryLimitController> getMemoryLimitController() {
        if (!this.conf.isAutoScaledReceiverQueueSizeEnabled()) {
            return Optional.empty();
        }
        return Optional.of(this.client.getMemoryLimitController());
    }

    protected void resetIncomingMessageSize() {
        long oldSize = INCOMING_MESSAGES_SIZE_UPDATER.getAndSet(this, 0L);
        this.getMemoryLimitController().ifPresent(limiter -> limiter.releaseMemory(oldSize));
    }

    protected void decreaseIncomingMessageSize(Message<?> message) {
        INCOMING_MESSAGES_SIZE_UPDATER.addAndGet(this, -message.size());
        this.getMemoryLimitController().ifPresent(limiter -> limiter.releaseMemory(message.size()));
    }

    public long getIncomingMessageSize() {
        return INCOMING_MESSAGES_SIZE_UPDATER.get(this);
    }

    public int getTotalIncomingMessages() {
        return this.incomingMessages.size();
    }

    protected void clearIncomingMessages() {
        this.incomingMessages.forEach(Message::release);
        this.incomingMessages.clear();
        this.resetIncomingMessageSize();
    }

    protected abstract void setCurrentReceiverQueueSize(int var1);

    public int getCurrentReceiverQueueSize() {
        return CURRENT_RECEIVER_QUEUE_SIZE_UPDATER.get(this);
    }

    protected abstract void completeOpBatchReceive(OpBatchReceive<T> var1);

    private ExecutorService getExternalExecutor(Message<T> msg) {
        ConsumerImpl receivedConsumer = msg instanceof TopicMessageImpl ? ((TopicMessageImpl)msg).receivedByconsumer : null;
        ExecutorService executor = receivedConsumer != null && receivedConsumer.externalPinnedExecutor != null ? receivedConsumer.externalPinnedExecutor : this.externalPinnedExecutor;
        return executor;
    }

    private ExecutorService getInternalExecutor(Message<T> msg) {
        ConsumerImpl receivedConsumer = msg instanceof TopicMessageImpl ? ((TopicMessageImpl)msg).receivedByconsumer : null;
        ExecutorService executor = receivedConsumer != null && receivedConsumer.internalPinnedExecutor != null ? receivedConsumer.internalPinnedExecutor : this.internalPinnedExecutor;
        return executor;
    }

    protected boolean isValidConsumerEpoch(MessageImpl<T> message) {
        if ((this.getSubType() == CommandSubscribe.SubType.Failover || this.getSubType() == CommandSubscribe.SubType.Exclusive) && message.getConsumerEpoch() != -1L && message.getConsumerEpoch() < CONSUMER_EPOCH.get(this)) {
            log.info("Consumer filter old epoch message, topic : [{}], messageId : [{}], messageConsumerEpoch : [{}], consumerEpoch : [{}]", new Object[]{this.topic, message.getMessageId(), message.getConsumerEpoch(), this.consumerEpoch});
            message.release();
            message.recycle();
            return false;
        }
        return true;
    }

    public boolean hasBatchReceiveTimeout() {
        return this.batchReceiveTimeout != null;
    }

    public void setConsumerEpoch(long consumerEpoch) {
        this.consumerEpoch = consumerEpoch;
    }

    public long getConsumerEpoch() {
        return this.consumerEpoch;
    }

    protected static final class OpBatchReceive<T> {
        final CompletableFuture<Messages<T>> future;
        final long createdAt;

        private OpBatchReceive(CompletableFuture<Messages<T>> future) {
            this.future = future;
            this.createdAt = System.nanoTime();
        }

        static <T> OpBatchReceive<T> of(CompletableFuture<Messages<T>> future) {
            return new OpBatchReceive<T>(future);
        }
    }
}

