/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.client.impl;

import io.opentelemetry.api.common.Attributes;
import java.io.Closeable;
import java.util.ArrayDeque;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Consumer;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.impl.ChunkMessageIdImpl;
import org.apache.pulsar.client.impl.ConsumerBase;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import org.apache.pulsar.client.impl.metrics.Counter;
import org.apache.pulsar.client.impl.metrics.InstrumentProvider;
import org.apache.pulsar.client.impl.metrics.Unit;
import org.apache.pulsar.functions.runtime.shaded.com.google.common.base.Preconditions;
import org.apache.pulsar.functions.runtime.shaded.io.netty.util.Timeout;
import org.apache.pulsar.functions.runtime.shaded.io.netty.util.TimerTask;
import org.apache.pulsar.functions.runtime.shaded.io.netty.util.concurrent.FastThreadLocal;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class UnAckedMessageTracker
implements Closeable {
    private static final Logger log = LoggerFactory.getLogger(UnAckedMessageTracker.class);
    protected final HashMap<MessageId, HashSet<MessageId>> messageIdPartitionMap;
    protected final ArrayDeque<HashSet<MessageId>> timePartitions;
    protected final Lock readLock;
    protected final Lock writeLock;
    public static final UnAckedMessageTrackerDisabled UNACKED_MESSAGE_TRACKER_DISABLED = new UnAckedMessageTrackerDisabled();
    protected final long ackTimeoutMillis;
    protected final long tickDurationInMs;
    private final Counter consumerAckTimeoutsCounter;
    protected Timeout timeout;
    protected static final FastThreadLocal<HashSet<MessageId>> TL_MESSAGE_IDS_SET = new FastThreadLocal<HashSet<MessageId>>(){

        @Override
        protected HashSet<MessageId> initialValue() throws Exception {
            return new HashSet<MessageId>();
        }
    };

    private UnAckedMessageTracker() {
        this.readLock = null;
        this.writeLock = null;
        this.timePartitions = null;
        this.messageIdPartitionMap = null;
        this.ackTimeoutMillis = 0L;
        this.tickDurationInMs = 0L;
        this.consumerAckTimeoutsCounter = null;
    }

    public UnAckedMessageTracker(final PulsarClientImpl client, final ConsumerBase<?> consumerBase, ConsumerConfigurationData<?> conf) {
        this.ackTimeoutMillis = conf.getAckTimeoutMillis();
        this.tickDurationInMs = Math.min(conf.getTickDurationMillis(), conf.getAckTimeoutMillis());
        Preconditions.checkArgument(this.tickDurationInMs > 0L && this.ackTimeoutMillis >= this.tickDurationInMs);
        ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
        this.readLock = readWriteLock.readLock();
        this.writeLock = readWriteLock.writeLock();
        InstrumentProvider ip = client.instrumentProvider();
        this.consumerAckTimeoutsCounter = ip.newCounter("pulsar.client.consumer.message.ack.timeout", Unit.Messages, "The number of messages that were not acknowledged in the configured timeout period, hence, were requested by the client to be redelivered", consumerBase.getTopic(), Attributes.builder().put("pulsar.subscription", consumerBase.getSubscription()).build());
        if (conf.getAckTimeoutRedeliveryBackoff() == null) {
            this.messageIdPartitionMap = new HashMap();
            this.timePartitions = new ArrayDeque();
            int blankPartitions = (int)Math.ceil((double)this.ackTimeoutMillis / (double)this.tickDurationInMs);
            for (int i = 0; i < blankPartitions + 1; ++i) {
                this.timePartitions.add(new HashSet(16, 1.0f));
            }
            this.timeout = client.timer().newTimeout(new TimerTask(){

                /*
                 * WARNING - Removed try catching itself - possible behaviour change.
                 */
                @Override
                public void run(Timeout t) throws Exception {
                    if (t.isCancelled()) {
                        return;
                    }
                    Set messageIds = TL_MESSAGE_IDS_SET.get();
                    messageIds.clear();
                    UnAckedMessageTracker.this.writeLock.lock();
                    try {
                        HashSet<MessageId> headPartition = UnAckedMessageTracker.this.timePartitions.removeFirst();
                        if (!headPartition.isEmpty()) {
                            UnAckedMessageTracker.this.consumerAckTimeoutsCounter.add(headPartition.size());
                            log.info("[{}] {} messages will be re-delivered", (Object)consumerBase, (Object)headPartition.size());
                            headPartition.forEach(messageId -> {
                                if (messageId instanceof ChunkMessageIdImpl) {
                                    UnAckedMessageTracker.addChunkedMessageIdsAndRemoveFromSequenceMap(messageId, messageIds, consumerBase);
                                } else {
                                    messageIds.add(messageId);
                                }
                                UnAckedMessageTracker.this.messageIdPartitionMap.remove(messageId);
                            });
                        }
                        headPartition.clear();
                        UnAckedMessageTracker.this.timePartitions.addLast(headPartition);
                    }
                    finally {
                        try {
                            UnAckedMessageTracker.this.timeout = client.timer().newTimeout(this, UnAckedMessageTracker.this.tickDurationInMs, TimeUnit.MILLISECONDS);
                        }
                        finally {
                            UnAckedMessageTracker.this.writeLock.unlock();
                            if (!messageIds.isEmpty()) {
                                consumerBase.onAckTimeoutSend(messageIds);
                                consumerBase.redeliverUnacknowledgedMessages(messageIds);
                            }
                        }
                    }
                }
            }, this.tickDurationInMs, TimeUnit.MILLISECONDS);
        } else {
            this.messageIdPartitionMap = null;
            this.timePartitions = null;
        }
    }

    public static void addChunkedMessageIdsAndRemoveFromSequenceMap(MessageId messageId, Set<MessageId> messageIds, ConsumerBase<?> consumerBase) {
        if (messageId instanceof MessageIdImpl) {
            MessageIdImpl[] chunkedMsgIds = consumerBase.unAckedChunkedMessageIdSequenceMap.get((MessageIdImpl)messageId);
            if (chunkedMsgIds != null && chunkedMsgIds.length > 0) {
                Collections.addAll(messageIds, chunkedMsgIds);
            }
            consumerBase.unAckedChunkedMessageIdSequenceMap.remove((MessageIdImpl)messageId);
        }
    }

    public void clear() {
        this.writeLock.lock();
        try {
            this.messageIdPartitionMap.clear();
            this.timePartitions.forEach((Consumer<HashSet<MessageId>>)((Consumer<HashSet>)tp -> tp.clear()));
        }
        finally {
            this.writeLock.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public boolean add(MessageId messageId) {
        if (messageId == null) {
            return false;
        }
        this.writeLock.lock();
        try {
            HashSet<MessageId> partition = this.timePartitions.peekLast();
            HashSet<MessageId> previousPartition = this.messageIdPartitionMap.putIfAbsent(messageId, partition);
            if (previousPartition == null) {
                boolean bl = partition.add(messageId);
                return bl;
            }
            boolean bl = false;
            return bl;
        }
        finally {
            this.writeLock.unlock();
        }
    }

    public boolean add(MessageId messageId, int redeliveryCount) {
        return this.add(messageId);
    }

    boolean isEmpty() {
        this.readLock.lock();
        try {
            boolean bl = this.messageIdPartitionMap.isEmpty();
            return bl;
        }
        finally {
            this.readLock.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public boolean remove(MessageId messageId) {
        if (messageId == null) {
            return false;
        }
        this.writeLock.lock();
        try {
            boolean removed = false;
            HashSet<MessageId> exist = this.messageIdPartitionMap.remove(messageId);
            if (exist != null) {
                removed = exist.remove(messageId);
            }
            boolean bl = removed;
            return bl;
        }
        finally {
            this.writeLock.unlock();
        }
    }

    long size() {
        this.readLock.lock();
        try {
            long l = this.messageIdPartitionMap.size();
            return l;
        }
        finally {
            this.readLock.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public int removeMessagesTill(MessageId msgId) {
        this.writeLock.lock();
        try {
            int removed = 0;
            Iterator<Map.Entry<MessageId, HashSet<MessageId>>> iterator = this.messageIdPartitionMap.entrySet().iterator();
            while (iterator.hasNext()) {
                Map.Entry<MessageId, HashSet<MessageId>> entry = iterator.next();
                MessageId messageId = entry.getKey();
                if (messageId.compareTo(msgId) > 0) continue;
                entry.getValue().remove(messageId);
                iterator.remove();
                ++removed;
            }
            int n = removed;
            return n;
        }
        finally {
            this.writeLock.unlock();
        }
    }

    private void stop() {
        this.writeLock.lock();
        try {
            if (this.timeout != null && !this.timeout.isCancelled()) {
                this.timeout.cancel();
                this.timeout = null;
            }
            this.clear();
        }
        finally {
            this.writeLock.unlock();
        }
    }

    @Override
    public void close() {
        this.stop();
    }

    private static class UnAckedMessageTrackerDisabled
    extends UnAckedMessageTracker {
        private UnAckedMessageTrackerDisabled() {
        }

        @Override
        public void clear() {
        }

        @Override
        long size() {
            return 0L;
        }

        @Override
        public boolean add(MessageId m) {
            return true;
        }

        @Override
        public boolean add(MessageId messageId, int redeliveryCount) {
            return true;
        }

        @Override
        public boolean remove(MessageId m) {
            return true;
        }

        @Override
        public int removeMessagesTill(MessageId msgId) {
            return 0;
        }

        @Override
        public void close() {
        }
    }
}

