/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.bugs;

import jakarta.jms.Connection;
import jakarta.jms.Destination;
import jakarta.jms.ExceptionListener;
import jakarta.jms.JMSException;
import jakarta.jms.Message;
import jakarta.jms.MessageConsumer;
import jakarta.jms.MessageListener;
import jakarta.jms.MessageProducer;
import jakarta.jms.Session;
import jakarta.jms.TextMessage;
import java.util.ArrayList;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.TestSupport;
import org.apache.activemq.command.ActiveMQQueue;

public class AMQ2489Test
extends TestSupport {
    private static final String SEQ_NUM_PROPERTY = "seqNum";
    private static final int TOTAL_MESSAGES_CNT = 2;
    private static final int CONSUMERS_CNT = 2;
    private final CountDownLatch LATCH = new CountDownLatch(2);
    private Connection connection;

    protected void setUp() throws Exception {
        super.setUp();
        this.connection = this.createConnection();
    }

    protected void tearDown() throws Exception {
        if (this.connection != null) {
            this.connection.close();
            this.connection = null;
        }
        super.tearDown();
    }

    public void testUnorderedClientAcknowledge() throws Exception {
        this.doUnorderedAck(2);
    }

    public void testUnorderedIndividualAcknowledge() throws Exception {
        this.doUnorderedAck(4);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void doUnorderedAck(int acknowledgmentMode) throws Exception {
        ArrayList<Consumer> consumers = null;
        Session producerSession = null;
        this.connection.start();
        TestExceptionListener exceptionListener = new TestExceptionListener();
        this.connection.setExceptionListener((ExceptionListener)exceptionListener);
        try {
            consumers = new ArrayList<Consumer>();
            for (int i = 0; i < 2; ++i) {
                consumers.add(new Consumer(acknowledgmentMode));
            }
            producerSession = this.connection.createSession(false, 1);
            MessageProducer producer = producerSession.createProducer((Destination)new ActiveMQQueue(this.getQueueName()));
            producer.setDeliveryMode(2);
            for (int i = 0; i < 2; ++i) {
                TextMessage message = producerSession.createTextMessage("test");
                message.setIntProperty(SEQ_NUM_PROPERTY, i);
                producer.send((Message)message);
            }
            this.LATCH.await();
            TimeUnit.SECONDS.sleep(1L);
            AMQ2489Test.assertFalse((String)exceptionListener.getStatusText(), (boolean)exceptionListener.hasExceptions());
        }
        finally {
            if (producerSession != null) {
                producerSession.close();
            }
            if (consumers != null) {
                for (Consumer c : consumers) {
                    c.close();
                }
            }
        }
    }

    protected String getQueueName() {
        return ((Object)((Object)this)).getClass().getName() + "." + this.getName();
    }

    public final class TestExceptionListener
    implements ExceptionListener {
        private final Queue<Exception> exceptions = new ConcurrentLinkedQueue<Exception>();

        public void onException(JMSException e) {
            this.exceptions.add((Exception)((Object)e));
        }

        public boolean hasExceptions() {
            return !this.exceptions.isEmpty();
        }

        public String getStatusText() {
            StringBuilder str = new StringBuilder();
            str.append("Exceptions count on broker side: " + this.exceptions.size() + ".\nMessages:\n");
            for (Exception e : this.exceptions) {
                str.append(e.getMessage() + "\n\n");
            }
            return str.toString();
        }
    }

    public final class Consumer
    implements MessageListener {
        final Session session;

        private Consumer(int acknowledgmentMode) {
            try {
                this.session = AMQ2489Test.this.connection.createSession(false, acknowledgmentMode);
                jakarta.jms.Queue queue = this.session.createQueue(AMQ2489Test.this.getQueueName() + "?consumer.prefetchSize=1");
                MessageConsumer consumer = this.session.createConsumer((Destination)queue);
                consumer.setMessageListener((MessageListener)this);
            }
            catch (JMSException e) {
                e.printStackTrace();
                throw new RuntimeException(e);
            }
        }

        public void onMessage(Message message) {
            try {
                int seqNum = message.getIntProperty(AMQ2489Test.SEQ_NUM_PROPERTY);
                if (seqNum % 2 == 0) {
                    System.out.println("Delayed message sequence numeber: " + seqNum);
                    try {
                        TimeUnit.SECONDS.sleep(1L);
                    }
                    catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                    }
                }
                message.acknowledge();
            }
            catch (JMSException e) {
                e.printStackTrace();
                throw new RuntimeException(e);
            }
            finally {
                AMQ2489Test.this.LATCH.countDown();
            }
        }

        private void close() {
            if (this.session != null) {
                try {
                    this.session.close();
                }
                catch (JMSException e) {
                    e.printStackTrace();
                    throw new RuntimeException(e);
                }
            }
        }
    }
}

