/*
 * Decompiled with CFR 0.152.
 */
package org.apache.storm.utils;

import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.dsl.ProducerType;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import junit.framework.TestCase;
import org.apache.storm.utils.DisruptorBackpressureCallback;
import org.apache.storm.utils.DisruptorQueue;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DisruptorQueueBackpressureTest
extends TestCase {
    private static final Logger LOG = LoggerFactory.getLogger(DisruptorQueueBackpressureTest.class);
    private static final int MESSAGES = 100;
    private static final int CAPACITY = 128;
    private static final double HIGH_WATERMARK = 0.6;
    private static final double LOW_WATERMARK = 0.2;

    @Test
    public void testBackPressureCallback() throws Exception {
        DisruptorQueue queue = DisruptorQueueBackpressureTest.createQueue("testBackPressure", 128);
        queue.setEnableBackpressure(true);
        queue.setHighWaterMark(0.6);
        queue.setLowWaterMark(0.2);
        AtomicBoolean throttleOn = new AtomicBoolean(false);
        final AtomicLong consumerCursor = new AtomicLong(-1L);
        DisruptorBackpressureCallbackImpl cb = new DisruptorBackpressureCallbackImpl(queue, throttleOn, consumerCursor);
        queue.registerBackpressureCallback((DisruptorBackpressureCallback)cb);
        for (int i = 0; i < 100; ++i) {
            queue.publish((Object)String.valueOf(i));
        }
        queue.consumeBatchWhenAvailable((EventHandler)new EventHandler<Object>(){

            public void onEvent(Object o, long l, boolean b) throws Exception {
                consumerCursor.set(l);
            }
        });
        Assert.assertEquals((String)"Check the calling time of throttle on. ", (long)queue.getHighWaterMark(), (long)cb.highWaterMarkCalledPopulation);
        Assert.assertEquals((String)"Checking the calling time of throttle off. ", (long)queue.getLowWaterMark(), (long)cb.lowWaterMarkCalledPopulation);
    }

    private static DisruptorQueue createQueue(String name, int queueSize) {
        return new DisruptorQueue(name, ProducerType.MULTI, queueSize, 0L, 1, 1L);
    }

    class DisruptorBackpressureCallbackImpl
    implements DisruptorBackpressureCallback {
        public long highWaterMarkCalledPopulation = -1L;
        public long lowWaterMarkCalledPopulation = -1L;
        DisruptorQueue queue;
        AtomicBoolean throttleOn;
        AtomicLong consumerCursor;

        public DisruptorBackpressureCallbackImpl(DisruptorQueue queue, AtomicBoolean throttleOn, AtomicLong consumerCursor) {
            this.queue = queue;
            this.throttleOn = throttleOn;
            this.consumerCursor = consumerCursor;
        }

        public void highWaterMark() throws Exception {
            if (!this.throttleOn.get()) {
                this.highWaterMarkCalledPopulation = this.queue.getMetrics().population() + this.queue.getMetrics().overflow();
                this.throttleOn.set(true);
            }
        }

        public void lowWaterMark() throws Exception {
            if (this.throttleOn.get()) {
                this.lowWaterMarkCalledPopulation = this.queue.getMetrics().writePos() - this.consumerCursor.get() + this.queue.getMetrics().overflow();
                this.throttleOn.set(false);
            }
        }
    }
}

