/*
 * Decompiled with CFR 0.152.
 */
package org.apache.storm.utils;

import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.dsl.ProducerType;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import junit.framework.TestCase;
import org.apache.storm.utils.DisruptorQueue;
import org.junit.Assert;
import org.junit.Test;

public class DisruptorQueueTest
extends TestCase {
    private static final int TIMEOUT = 5000;
    private static final int PRODUCER_NUM = 4;

    @Test
    public void testFirstMessageFirst() throws InterruptedException {
        for (int i = 0; i < 100; ++i) {
            DisruptorQueue queue = DisruptorQueueTest.createQueue("firstMessageOrder", 16);
            queue.publish((Object)"FIRST");
            IncProducer producer = new IncProducer(queue, i + 100);
            final AtomicReference result = new AtomicReference();
            Consumer consumer = new Consumer(queue, (EventHandler)new EventHandler<Object>(){
                private boolean head = true;

                public void onEvent(Object obj, long sequence, boolean endOfBatch) throws Exception {
                    if (this.head) {
                        this.head = false;
                        result.set(obj);
                    }
                }
            });
            this.run(producer, consumer, queue);
            Assert.assertEquals((String)("We expect to receive first published message first, but received " + result.get()), (Object)"FIRST", result.get());
        }
    }

    @Test
    public void testInOrder() throws InterruptedException {
        final AtomicBoolean allInOrder = new AtomicBoolean(true);
        DisruptorQueue queue = DisruptorQueueTest.createQueue("consumerHang", 1024);
        IncProducer producer = new IncProducer(queue, 0x100000L);
        Consumer consumer = new Consumer(queue, (EventHandler)new EventHandler<Object>(){
            long _expected = 0L;

            public void onEvent(Object obj, long sequence, boolean endOfBatch) throws Exception {
                if (this._expected != ((Number)obj).longValue()) {
                    allInOrder.set(false);
                    System.out.println("Expected " + this._expected + " but got " + obj);
                }
                ++this._expected;
            }
        });
        this.run(producer, consumer, queue, 1000, 1);
        Assert.assertTrue((String)"Messages delivered out of order", (boolean)allInOrder.get());
    }

    @Test
    public void testInOrderBatch() throws InterruptedException {
        final AtomicBoolean allInOrder = new AtomicBoolean(true);
        DisruptorQueue queue = DisruptorQueueTest.createQueue("consumerHang", 10, 1024);
        IncProducer producer = new IncProducer(queue, 0x100000L);
        Consumer consumer = new Consumer(queue, (EventHandler)new EventHandler<Object>(){
            long _expected = 0L;

            public void onEvent(Object obj, long sequence, boolean endOfBatch) throws Exception {
                if (this._expected != ((Number)obj).longValue()) {
                    allInOrder.set(false);
                    System.out.println("Expected " + this._expected + " but got " + obj);
                }
                ++this._expected;
            }
        });
        this.run(producer, consumer, queue, 1000, 1);
        Assert.assertTrue((String)"Messages delivered out of order", (boolean)allInOrder.get());
    }

    private void run(Runnable producer, Runnable consumer, DisruptorQueue queue) throws InterruptedException {
        this.run(producer, consumer, queue, 10, 4);
    }

    private void run(Runnable producer, Runnable consumer, DisruptorQueue queue, int sleepMs, int producerNum) throws InterruptedException {
        int i;
        Thread[] producerThreads = new Thread[producerNum];
        for (int i2 = 0; i2 < producerNum; ++i2) {
            producerThreads[i2] = new Thread(producer);
            producerThreads[i2].start();
        }
        Thread consumerThread = new Thread(consumer);
        consumerThread.start();
        Thread.sleep(sleepMs);
        for (i = 0; i < producerNum; ++i) {
            producerThreads[i].interrupt();
        }
        for (i = 0; i < producerNum; ++i) {
            producerThreads[i].join(5000L);
            DisruptorQueueTest.assertFalse((String)("producer " + i + " is still alive"), (boolean)producerThreads[i].isAlive());
        }
        queue.haltWithInterrupt();
        consumerThread.join(5000L);
        DisruptorQueueTest.assertFalse((String)"consumer is still alive", (boolean)consumerThread.isAlive());
    }

    private static DisruptorQueue createQueue(String name, int queueSize) {
        return new DisruptorQueue(name, ProducerType.MULTI, queueSize, 0L, 1, 1L);
    }

    private static DisruptorQueue createQueue(String name, int batchSize, int queueSize) {
        return new DisruptorQueue(name, ProducerType.MULTI, queueSize, 0L, batchSize, 1L);
    }

    private static class Consumer
    implements Runnable {
        private EventHandler handler;
        private DisruptorQueue queue;

        Consumer(DisruptorQueue queue, EventHandler handler) {
            this.handler = handler;
            this.queue = queue;
        }

        @Override
        public void run() {
            try {
                while (true) {
                    this.queue.consumeBatchWhenAvailable(this.handler);
                }
            }
            catch (RuntimeException runtimeException) {
                return;
            }
        }
    }

    private static class IncProducer
    implements Runnable {
        private DisruptorQueue queue;
        private long _max;

        IncProducer(DisruptorQueue queue, long max) {
            this.queue = queue;
            this._max = max;
        }

        @Override
        public void run() {
            for (long i = 0L; i < this._max && !Thread.currentThread().isInterrupted(); ++i) {
                this.queue.publish((Object)i);
            }
        }
    }
}

