/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.bugs;

import jakarta.jms.Connection;
import jakarta.jms.Destination;
import jakarta.jms.Message;
import jakarta.jms.MessageConsumer;
import jakarta.jms.MessageProducer;
import jakarta.jms.Queue;
import jakarta.jms.Session;
import java.util.Random;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AMQ3732Test {
    private static Logger LOG = LoggerFactory.getLogger(AMQ3732Test.class);
    private ActiveMQConnectionFactory connectionFactory;
    private Connection connection;
    private Session session;
    private BrokerService broker;
    private String connectionUri;
    private final Random pause = new Random();
    private final long NUM_MESSAGES = 25000L;
    private final AtomicLong totalConsumed = new AtomicLong();

    @Before
    public void startBroker() throws Exception {
        this.broker = new BrokerService();
        this.broker.setDeleteAllMessagesOnStartup(true);
        this.broker.setPersistent(false);
        this.broker.setUseJmx(false);
        this.broker.addConnector("tcp://0.0.0.0:0");
        this.broker.start();
        this.broker.waitUntilStarted();
        this.connectionUri = ((TransportConnector)this.broker.getTransportConnectors().get(0)).getPublishableConnectString();
        this.connectionFactory = new ActiveMQConnectionFactory(this.connectionUri);
        this.connectionFactory.getPrefetchPolicy().setAll(0);
    }

    @After
    public void stopBroker() throws Exception {
        try {
            this.connection.close();
        }
        catch (Exception exception) {
            // empty catch block
        }
        this.broker.stop();
        this.broker.waitUntilStopped();
    }

    @Test(timeout=1200000L)
    public void testInterruptionAffects() throws Exception {
        this.connection = this.connectionFactory.createConnection();
        this.connection.start();
        this.session = this.connection.createSession(false, 4);
        Queue queue = this.session.createQueue("AMQ3732Test");
        final LinkedBlockingQueue workQueue = new LinkedBlockingQueue();
        final MessageConsumer consumer1 = this.session.createConsumer((Destination)queue);
        final MessageConsumer consumer2 = this.session.createConsumer((Destination)queue);
        final MessageProducer producer = this.session.createProducer((Destination)queue);
        Thread consumer1Thread = new Thread(new Runnable(){

            @Override
            public void run() {
                try {
                    while (AMQ3732Test.this.totalConsumed.get() < 25000L) {
                        Message message = consumer1.receiveNoWait();
                        if (message == null) continue;
                        workQueue.add(message);
                    }
                }
                catch (Exception e) {
                    LOG.error("Caught an unexpected error: ", (Throwable)e);
                }
            }
        });
        consumer1Thread.start();
        Thread consumer2Thread = new Thread(new Runnable(){

            @Override
            public void run() {
                try {
                    while (AMQ3732Test.this.totalConsumed.get() < 25000L) {
                        Message message = consumer2.receive(50L);
                        if (message == null) continue;
                        workQueue.add(message);
                    }
                }
                catch (Exception e) {
                    LOG.error("Caught an unexpected error: ", (Throwable)e);
                }
            }
        });
        consumer2Thread.start();
        Thread producerThread2 = new Thread(new Runnable(){

            @Override
            public void run() {
                try {
                    int i = 0;
                    while ((long)i < 25000L) {
                        producer.send((Message)AMQ3732Test.this.session.createTextMessage("TEST"));
                        TimeUnit.MILLISECONDS.sleep(AMQ3732Test.this.pause.nextInt(10));
                        ++i;
                    }
                }
                catch (Exception e) {
                    LOG.error("Caught an unexpected error: ", (Throwable)e);
                }
            }
        });
        producerThread2.start();
        Thread ackingThread = new Thread(new Runnable(){

            @Override
            public void run() {
                try {
                    while (AMQ3732Test.this.totalConsumed.get() < 25000L) {
                        Message message = (Message)workQueue.take();
                        message.acknowledge();
                        AMQ3732Test.this.totalConsumed.incrementAndGet();
                        if (AMQ3732Test.this.totalConsumed.get() % 100L != 0L) continue;
                        LOG.info("Consumed " + AMQ3732Test.this.totalConsumed.get() + " messages so far.");
                    }
                }
                catch (Exception e) {
                    LOG.error("Caught an unexpected error: ", (Throwable)e);
                }
            }
        });
        ackingThread.start();
        producerThread2.join();
        consumer1Thread.join();
        consumer2Thread.join();
        ackingThread.join();
        Assert.assertEquals((long)25000L, (long)this.totalConsumed.get());
    }
}

