/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.bugs;

import jakarta.jms.Connection;
import jakarta.jms.Destination;
import jakarta.jms.JMSException;
import jakarta.jms.Message;
import jakarta.jms.MessageConsumer;
import jakarta.jms.MessageProducer;
import jakarta.jms.Session;
import jakarta.jms.TextMessage;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import junit.framework.TestCase;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.command.ActiveMQDestination;

public class AMQ1917Test
extends TestCase {
    private static final int NUM_MESSAGES = 4000;
    private static final int NUM_THREADS = 10;
    private static final String REQUEST_QUEUE = "mock.in.queue";
    private static final String REPLY_QUEUE = "mock.out.queue";
    private Destination requestDestination = ActiveMQDestination.createDestination((String)"mock.in.queue", (byte)1);
    private Destination replyDestination = ActiveMQDestination.createDestination((String)"mock.out.queue", (byte)1);
    private CountDownLatch roundTripLatch = new CountDownLatch(4000);
    private CountDownLatch errorLatch = new CountDownLatch(1);
    private ThreadPoolExecutor tpe;
    private final String BROKER_URL = "tcp://localhost:0";
    private String connectionUri;
    private BrokerService broker = null;
    private boolean working = true;
    final Session[] sessions = new Session[10];
    final MessageProducer[] producers = new MessageProducer[10];

    public void setUp() throws Exception {
        this.broker = new BrokerService();
        this.broker.setPersistent(false);
        this.broker.addConnector("tcp://localhost:0");
        this.broker.start();
        this.connectionUri = ((TransportConnector)this.broker.getTransportConnectors().get(0)).getPublishableConnectString();
        ArrayBlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>(10000);
        this.tpe = new ThreadPoolExecutor(10, 10, 60000L, TimeUnit.MILLISECONDS, queue);
        LimitedThreadFactory limitedthreadFactory = new LimitedThreadFactory(this.tpe.getThreadFactory());
        this.tpe.setThreadFactory(limitedthreadFactory);
    }

    public void tearDown() throws Exception {
        this.broker.stop();
        this.tpe.shutdown();
    }

    public void testLoadedSendRecieveWithCorrelationId() throws Exception {
        int i;
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
        connectionFactory.setBrokerURL(this.connectionUri);
        Connection connection = connectionFactory.createConnection();
        this.setupReceiver(connection);
        connection = connectionFactory.createConnection();
        connection.start();
        for (i = 0; i < 10; ++i) {
            this.sessions[i] = connection.createSession(false, 1);
            this.producers[i] = this.sessions[i].createProducer(this.requestDestination);
        }
        for (i = 0; i < 4000; ++i) {
            MessageSenderReceiver msr = new MessageSenderReceiver(this.requestDestination, this.replyDestination, "Test Message : " + i);
            this.tpe.execute(msr);
        }
        while (!this.roundTripLatch.await(4000L, TimeUnit.MILLISECONDS)) {
            if (!this.errorLatch.await(1000L, TimeUnit.MILLISECONDS)) continue;
            AMQ1917Test.fail((String)"there was an error, check the console for thread or thread allocation failure");
            break;
        }
        this.working = false;
    }

    private void setupReceiver(Connection connection) throws Exception {
        final Session session = connection.createSession(false, 1);
        final MessageConsumer consumer = session.createConsumer(this.requestDestination);
        final MessageProducer sender = session.createProducer(this.replyDestination);
        connection.start();
        new Thread(){

            @Override
            public void run() {
                while (AMQ1917Test.this.working) {
                    try {
                        TextMessage msg = (TextMessage)consumer.receive(20000L);
                        if (msg == null) {
                            AMQ1917Test.this.errorLatch.countDown();
                            TestCase.fail((String)("Response timed out. latchCount=" + AMQ1917Test.this.roundTripLatch.getCount()));
                            continue;
                        }
                        String result = msg.getText();
                        TextMessage response = session.createTextMessage();
                        response.setJMSCorrelationID(msg.getJMSMessageID());
                        response.setText(result);
                        sender.send((Message)response);
                    }
                    catch (JMSException e) {
                        if (!AMQ1917Test.this.working) continue;
                        AMQ1917Test.this.errorLatch.countDown();
                        TestCase.fail((String)("Unexpected exception:" + e));
                    }
                }
            }
        }.start();
    }

    public class LimitedThreadFactory
    implements ThreadFactory {
        int threadCount;
        private ThreadFactory factory;

        public LimitedThreadFactory(ThreadFactory threadFactory) {
            this.factory = threadFactory;
        }

        @Override
        public Thread newThread(Runnable arg0) {
            if (++this.threadCount > 10) {
                AMQ1917Test.this.errorLatch.countDown();
                TestCase.fail((String)"too many threads requested");
            }
            return this.factory.newThread(arg0);
        }
    }

    class MessageSenderReceiver
    implements Runnable {
        Destination reqDest;
        Destination replyDest;
        String origMsg;

        public MessageSenderReceiver(Destination reqDest, Destination replyDest, String msg) throws Exception {
            this.replyDest = replyDest;
            this.reqDest = reqDest;
            this.origMsg = msg;
        }

        private int getIndexFromCurrentThread() {
            String name = Thread.currentThread().getName();
            String num = name.substring(name.lastIndexOf(45) + 1);
            int idx = Integer.parseInt(num) - 1;
            TestCase.assertTrue((String)("idx is in range: idx=" + idx), (idx < 10 ? 1 : 0) != 0);
            return idx;
        }

        @Override
        public void run() {
            try {
                int threadIndex = this.getIndexFromCurrentThread();
                Session session = AMQ1917Test.this.sessions[threadIndex];
                MessageProducer producer = AMQ1917Test.this.producers[threadIndex];
                TextMessage sendJmsMsg = session.createTextMessage(this.origMsg);
                producer.setDeliveryMode(1);
                producer.send((Message)sendJmsMsg);
                String jmsId = sendJmsMsg.getJMSMessageID();
                String selector = "JMSCorrelationID='" + jmsId + "'";
                MessageConsumer consumer = session.createConsumer(this.replyDest, selector);
                Message receiveJmsMsg = consumer.receive(2000L);
                consumer.close();
                if (receiveJmsMsg == null) {
                    AMQ1917Test.this.errorLatch.countDown();
                    TestCase.fail((String)("Unable to receive response for:" + this.origMsg + ", with selector=" + selector));
                } else {
                    AMQ1917Test.this.roundTripLatch.countDown();
                }
            }
            catch (JMSException e) {
                TestCase.fail((String)("unexpected exception:" + e));
            }
        }
    }
}

