/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.broker.virtual;

import jakarta.jms.Destination;
import jakarta.jms.ExceptionListener;
import jakarta.jms.JMSException;
import jakarta.jms.Message;
import jakarta.jms.MessageConsumer;
import jakarta.jms.MessageListener;
import jakarta.jms.Session;
import jakarta.jms.TextMessage;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import junit.framework.TestCase;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQMessageProducer;
import org.apache.activemq.ActiveMQSession;
import org.apache.activemq.RedeliveryPolicy;
import org.apache.activemq.broker.BrokerFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.Queue;
import org.apache.activemq.broker.region.RegionBroker;
import org.apache.activemq.command.ActiveMQQueue;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class VirtualTopicDLQTest
extends TestCase {
    private static BrokerService broker;
    private static final Logger LOG;
    static final String jmsConnectionURI = "failover:(vm://localhost)";
    private static final String virtualTopicName = "VirtualTopic.Test";
    private static final String consumer1Prefix = "Consumer.A.";
    private static final String consumer2Prefix = "Consumer.B.";
    private static final String consumer3Prefix = "Consumer.C.";
    private static final String dlqPrefix = "ActiveMQ.DLQ.Queue.";
    private static final int numberMessages = 6;

    @Before
    public void setUp() throws Exception {
        try {
            broker = BrokerFactory.createBroker((String)"xbean:org/apache/activemq/broker/virtual/virtual-individual-dlq.xml", (boolean)true);
            broker.start();
            broker.waitUntilStarted();
        }
        catch (Exception e) {
            e.printStackTrace();
            throw e;
        }
    }

    @After
    public void tearDown() throws Exception {
        try {
            VirtualTopicDLQTest.purgeDestination("ActiveMQ.DLQ.Queue.Consumer.A.VirtualTopic.Test");
            VirtualTopicDLQTest.purgeDestination("ActiveMQ.DLQ.Queue.Consumer.B.VirtualTopic.Test");
            VirtualTopicDLQTest.purgeDestination("ActiveMQ.DLQ.Queue.Consumer.C.VirtualTopic.Test");
        }
        catch (Exception e) {
            e.printStackTrace();
        }
        if (broker != null) {
            broker.stop();
            broker.waitUntilStopped();
            broker = null;
        }
    }

    @Test
    public void testVirtualTopicSubscriberDeadLetterQueue() throws Exception {
        TestConsumer consumer1 = null;
        TestConsumer consumer2 = null;
        TestConsumer consumer3 = null;
        TestConsumer dlqConsumer1 = null;
        TestConsumer dlqConsumer2 = null;
        TestConsumer dlqConsumer3 = null;
        try {
            consumer1 = new TestConsumer("Consumer.A.VirtualTopic.Test", false, 6, true);
            VirtualTopicDLQTest.thread(consumer1, false);
            consumer2 = new TestConsumer("Consumer.B.VirtualTopic.Test", false, 6, true);
            VirtualTopicDLQTest.thread(consumer2, false);
            consumer3 = new TestConsumer("Consumer.C.VirtualTopic.Test", false, 6, false);
            VirtualTopicDLQTest.thread(consumer3, false);
            dlqConsumer1 = new TestConsumer("ActiveMQ.DLQ.Queue.Consumer.A.VirtualTopic.Test", false, 6, false);
            VirtualTopicDLQTest.thread(dlqConsumer1, false);
            dlqConsumer2 = new TestConsumer("ActiveMQ.DLQ.Queue.Consumer.B.VirtualTopic.Test", false, 6, false);
            VirtualTopicDLQTest.thread(dlqConsumer2, false);
            dlqConsumer3 = new TestConsumer("ActiveMQ.DLQ.Queue.Consumer.C.VirtualTopic.Test", false, 6, false);
            VirtualTopicDLQTest.thread(dlqConsumer3, false);
            Thread.sleep(1000L);
            TestProducer producer = new TestProducer(virtualTopicName, true, 6);
            VirtualTopicDLQTest.thread(producer, false);
            VirtualTopicDLQTest.assertTrue((String)("sent all producer messages in time, count is: " + producer.getLatch().getCount()), (boolean)producer.getLatch().await(10L, TimeUnit.SECONDS));
            LOG.info("producer successful, count = " + producer.getLatch().getCount());
            VirtualTopicDLQTest.assertTrue((String)("remaining consumer1 count should be zero, is: " + consumer1.getLatch().getCount()), (boolean)consumer1.getLatch().await(10L, TimeUnit.SECONDS));
            LOG.info("consumer1 successful, count = " + consumer1.getLatch().getCount());
            VirtualTopicDLQTest.assertTrue((String)("remaining consumer2 count should be zero, is: " + consumer2.getLatch().getCount()), (boolean)consumer2.getLatch().await(10L, TimeUnit.SECONDS));
            LOG.info("consumer2 successful, count = " + consumer2.getLatch().getCount());
            VirtualTopicDLQTest.assertTrue((String)("remaining consumer3 count should be zero, is: " + consumer3.getLatch().getCount()), (boolean)consumer3.getLatch().await(10L, TimeUnit.SECONDS));
            LOG.info("consumer3 successful, count = " + consumer3.getLatch().getCount());
            VirtualTopicDLQTest.assertTrue((String)("remaining dlqConsumer1 count should be zero, is: " + dlqConsumer1.getLatch().getCount()), (boolean)dlqConsumer1.getLatch().await(10L, TimeUnit.SECONDS));
            LOG.info("dlqConsumer1 successful, count = " + dlqConsumer1.getLatch().getCount());
            VirtualTopicDLQTest.assertTrue((String)("remaining dlqConsumer2 count should be zero, is: " + dlqConsumer2.getLatch().getCount()), (boolean)dlqConsumer2.getLatch().await(10L, TimeUnit.SECONDS));
            LOG.info("dlqConsumer2 successful, count = " + dlqConsumer2.getLatch().getCount());
            VirtualTopicDLQTest.assertTrue((String)("remaining dlqConsumer3 count should be 6, is: " + dlqConsumer3.getLatch().getCount()), (dlqConsumer3.getLatch().getCount() == 6L ? 1 : 0) != 0);
            LOG.info("dlqConsumer2 successful, count = " + dlqConsumer2.getLatch().getCount());
        }
        catch (Exception e) {
            e.printStackTrace();
            throw e;
        }
        finally {
            if (consumer1 != null) {
                consumer1.setStop(true);
            }
            if (consumer2 != null) {
                consumer2.setStop(true);
            }
            if (consumer3 != null) {
                consumer3.setStop(true);
            }
            if (dlqConsumer1 != null) {
                dlqConsumer1.setStop(true);
            }
            if (dlqConsumer2 != null) {
                dlqConsumer2.setStop(true);
            }
            if (dlqConsumer3 != null) {
                dlqConsumer3.setStop(true);
            }
        }
    }

    private static Thread thread(Runnable runnable, boolean daemon) {
        Thread brokerThread = new Thread(runnable);
        brokerThread.setDaemon(daemon);
        brokerThread.start();
        return brokerThread;
    }

    private static void purgeDestination(String destination) throws Exception {
        Queue dest = (Queue)((RegionBroker)broker.getRegionBroker()).getQueueRegion().getDestinationMap().get(new ActiveMQQueue(destination));
        dest.purge();
        VirtualTopicDLQTest.assertEquals((long)0L, (long)dest.getDestinationStatistics().getMessages().getCount());
    }

    static {
        LOG = LoggerFactory.getLogger(VirtualTopicDLQTest.class);
    }

    private class TestConsumer
    implements Runnable,
    ExceptionListener,
    MessageListener {
        private String destinationName = null;
        private boolean isTopic = true;
        private CountDownLatch latch = null;
        private int maxRedeliveries = 0;
        private int receivedMessageCounter = 0;
        private boolean bFakeFail = false;
        private boolean bStop = false;
        private ActiveMQConnectionFactory connectionFactory = null;
        private ActiveMQConnection connection = null;
        private Session session = null;
        private MessageConsumer consumer = null;

        public TestConsumer(String destinationName, boolean isTopic, int expectedNumberMessages, boolean bFakeFail) {
            this.destinationName = destinationName;
            this.isTopic = isTopic;
            this.latch = new CountDownLatch(expectedNumberMessages * (this.bFakeFail ? this.maxRedeliveries + 1 : 1));
            this.bFakeFail = bFakeFail;
        }

        public CountDownLatch getLatch() {
            return this.latch;
        }

        @Override
        public void run() {
            try {
                LOG.info("Started TestConsumer for destination (" + this.destinationName + ")");
                this.connectionFactory = new ActiveMQConnectionFactory(VirtualTopicDLQTest.jmsConnectionURI);
                this.connection = (ActiveMQConnection)this.connectionFactory.createConnection();
                this.connection.start();
                this.session = this.connection.createSession(true, 0);
                RedeliveryPolicy policy = this.connection.getRedeliveryPolicy();
                policy.setInitialRedeliveryDelay(1L);
                policy.setUseExponentialBackOff(false);
                policy.setMaximumRedeliveries(this.maxRedeliveries);
                this.connection.setExceptionListener((ExceptionListener)this);
                Object destination = null;
                destination = this.isTopic ? this.session.createTopic(this.destinationName) : this.session.createQueue(this.destinationName);
                this.consumer = this.session.createConsumer((Destination)destination);
                this.consumer.setMessageListener((MessageListener)this);
                while (!this.bStop) {
                    Thread.sleep(100L);
                }
                LOG.info("Finished TestConsumer for destination name (" + this.destinationName + ") remaining " + this.latch.getCount() + " messages " + this.toString());
            }
            catch (Exception e) {
                LOG.error("Consumer (" + this.destinationName + ") Caught: " + e);
            }
            finally {
                try {
                    if (this.connection != null) {
                        this.connection.close();
                    }
                }
                catch (Exception e) {
                    LOG.error("Closing connection/session (" + this.destinationName + ")Caught: " + e);
                }
            }
        }

        public synchronized void onException(JMSException ex) {
            ex.printStackTrace();
            LOG.error("Consumer for destination, (" + this.destinationName + "), JMS Exception occured.  Shutting down client.");
        }

        public synchronized void setStop(boolean bStop) {
            this.bStop = bStop;
        }

        public synchronized void onMessage(Message message) {
            ++this.receivedMessageCounter;
            this.latch.countDown();
            LOG.info("Consumer for destination (" + this.destinationName + ") latch countdown: " + this.latch.getCount() + " :: Number messages received " + this.receivedMessageCounter);
            try {
                LOG.info("Consumer for destination (" + this.destinationName + ") Received message id :: " + message.getJMSMessageID());
                if (!this.bFakeFail) {
                    LOG.info("Consumer on destination " + this.destinationName + " committing JMS Session for message: " + message.toString());
                    this.session.commit();
                } else {
                    LOG.info("Consumer on destination " + this.destinationName + " rolling back JMS Session for message: " + message.toString());
                    this.session.rollback();
                }
            }
            catch (JMSException ex) {
                LOG.error("Error reading JMS Message from destination " + this.destinationName + ".");
            }
        }
    }

    private class TestProducer
    implements Runnable {
        private String destinationName = null;
        private boolean isTopic = true;
        private int numberMessages = 0;
        private CountDownLatch latch = null;

        public TestProducer(String destinationName, boolean isTopic, int numberMessages) {
            this.destinationName = destinationName;
            this.isTopic = isTopic;
            this.numberMessages = numberMessages;
            this.latch = new CountDownLatch(numberMessages);
        }

        public CountDownLatch getLatch() {
            return this.latch;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            ActiveMQConnectionFactory connectionFactory = null;
            ActiveMQConnection connection = null;
            ActiveMQSession session = null;
            Object destination = null;
            try {
                LOG.info("Started TestProducer for destination (" + this.destinationName + ")");
                connectionFactory = new ActiveMQConnectionFactory(VirtualTopicDLQTest.jmsConnectionURI);
                connection = (ActiveMQConnection)connectionFactory.createConnection();
                connection.start();
                session = (ActiveMQSession)connection.createSession(false, 1);
                destination = this.isTopic ? session.createTopic(this.destinationName) : session.createQueue(this.destinationName);
                ActiveMQMessageProducer producer = (ActiveMQMessageProducer)session.createProducer((Destination)destination);
                producer.setDeliveryMode(1);
                for (int i = 0; i < this.numberMessages; ++i) {
                    TextMessage message = session.createTextMessage("I am a message :: " + String.valueOf(i));
                    try {
                        producer.send((Message)message);
                    }
                    catch (Exception deeperException) {
                        LOG.info("Producer for destination (" + this.destinationName + ") Caught: " + deeperException);
                    }
                    this.latch.countDown();
                    Thread.sleep(1000L);
                }
                LOG.info("Finished TestProducer for destination (" + this.destinationName + ")");
            }
            catch (Exception e) {
                LOG.error("Terminating TestProducer(" + this.destinationName + ")Caught: " + e);
            }
            finally {
                try {
                    if (connection != null) {
                        connection.close();
                    }
                }
                catch (Exception e) {
                    LOG.error("Closing connection/session (" + this.destinationName + ")Caught: " + e);
                }
            }
        }
    }
}

