/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.service;

import com.google.common.annotations.VisibleForTesting;
import io.netty.channel.EventLoopGroup;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.pulsar.broker.qos.AsyncTokenBucket;
import org.apache.pulsar.broker.qos.FinalRateAsyncTokenBucketBuilder;
import org.apache.pulsar.broker.qos.MonotonicSnapshotClock;
import org.apache.pulsar.broker.service.Producer;
import org.apache.pulsar.broker.service.PublishRateLimiter;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.PublishRate;
import org.jctools.queues.MessagePassingQueue;
import org.jctools.queues.MpscUnboundedArrayQueue;

public class PublishRateLimiterImpl
implements PublishRateLimiter {
    private volatile AsyncTokenBucket tokenBucketOnMessage;
    private volatile AsyncTokenBucket tokenBucketOnByte;
    private final MonotonicSnapshotClock monotonicSnapshotClock;
    private final MessagePassingQueue<Producer> unthrottlingQueue = new MpscUnboundedArrayQueue(1024);
    private final AtomicInteger throttledProducersCount = new AtomicInteger(0);
    private final AtomicBoolean processingQueuedProducers = new AtomicBoolean(false);

    public PublishRateLimiterImpl(MonotonicSnapshotClock monotonicSnapshotClock) {
        this.monotonicSnapshotClock = monotonicSnapshotClock;
    }

    @Override
    public void handlePublishThrottling(Producer producer, int numOfMessages, long msgSizeInBytes) {
        AsyncTokenBucket currentTokenBucketOnByte;
        boolean shouldThrottle = false;
        AsyncTokenBucket currentTokenBucketOnMessage = this.tokenBucketOnMessage;
        if (currentTokenBucketOnMessage != null) {
            boolean bl = shouldThrottle = !currentTokenBucketOnMessage.consumeTokensAndCheckIfContainsTokens(numOfMessages);
        }
        if ((currentTokenBucketOnByte = this.tokenBucketOnByte) != null) {
            shouldThrottle |= !currentTokenBucketOnByte.consumeTokensAndCheckIfContainsTokens(msgSizeInBytes);
        }
        if (shouldThrottle) {
            producer.incrementThrottleCount();
            this.scheduleDecrementThrottleCount(producer);
        }
    }

    private void scheduleDecrementThrottleCount(Producer producer) {
        this.unthrottlingQueue.offer((Object)producer);
        if (this.throttledProducersCount.incrementAndGet() == 1) {
            EventLoopGroup executor = producer.getCnx().getBrokerService().executor();
            this.scheduleUnthrottling((ScheduledExecutorService)executor, this.calculateThrottlingDurationNanos());
        }
    }

    private void scheduleUnthrottling(ScheduledExecutorService executor, long delayNanos) {
        executor.schedule(() -> this.unthrottleQueuedProducers(executor), delayNanos, TimeUnit.NANOSECONDS);
    }

    private long calculateThrottlingDurationNanos() {
        AsyncTokenBucket currentTokenBucketOnByte;
        AsyncTokenBucket currentTokenBucketOnMessage = this.tokenBucketOnMessage;
        long throttlingDurationNanos = 0L;
        if (currentTokenBucketOnMessage != null) {
            throttlingDurationNanos = currentTokenBucketOnMessage.calculateThrottlingDuration();
        }
        if ((currentTokenBucketOnByte = this.tokenBucketOnByte) != null) {
            throttlingDurationNanos = Math.max(throttlingDurationNanos, currentTokenBucketOnByte.calculateThrottlingDuration());
        }
        return throttlingDurationNanos;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void unthrottleQueuedProducers(ScheduledExecutorService executor) {
        if (!this.processingQueuedProducers.compareAndSet(false, true)) {
            return;
        }
        try {
            Producer producer;
            long throttlingDuration = 0L;
            while ((throttlingDuration = this.calculateThrottlingDurationNanos()) == 0L && (producer = (Producer)this.unthrottlingQueue.poll()) != null) {
                producer.decrementThrottleCount();
                this.throttledProducersCount.decrementAndGet();
            }
            if (this.throttledProducersCount.get() > 0) {
                this.scheduleUnthrottling(executor, throttlingDuration);
            }
        }
        finally {
            this.processingQueuedProducers.set(false);
        }
    }

    @Override
    public void update(Policies policies, String clusterName) {
        PublishRate maxPublishRate = policies.publishMaxMessageRate != null ? (PublishRate)policies.publishMaxMessageRate.get(clusterName) : null;
        this.update(maxPublishRate);
    }

    @Override
    public void update(PublishRate maxPublishRate) {
        if (maxPublishRate != null) {
            this.updateTokenBuckets(maxPublishRate.publishThrottlingRateInMsg, maxPublishRate.publishThrottlingRateInByte);
        } else {
            this.tokenBucketOnMessage = null;
            this.tokenBucketOnByte = null;
        }
    }

    protected void updateTokenBuckets(long publishThrottlingRateInMsg, long publishThrottlingRateInByte) {
        this.tokenBucketOnMessage = publishThrottlingRateInMsg > 0L ? ((FinalRateAsyncTokenBucketBuilder)AsyncTokenBucket.builder().rate(publishThrottlingRateInMsg).clock(this.monotonicSnapshotClock)).build() : null;
        this.tokenBucketOnByte = publishThrottlingRateInByte > 0L ? ((FinalRateAsyncTokenBucketBuilder)AsyncTokenBucket.builder().rate(publishThrottlingRateInByte).clock(this.monotonicSnapshotClock)).build() : null;
    }

    @VisibleForTesting
    public AsyncTokenBucket getTokenBucketOnMessage() {
        return this.tokenBucketOnMessage;
    }

    @VisibleForTesting
    public AsyncTokenBucket getTokenBucketOnByte() {
        return this.tokenBucketOnByte;
    }
}

