/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq;

import jakarta.jms.Connection;
import jakarta.jms.ConnectionConsumer;
import jakarta.jms.Destination;
import jakarta.jms.ExceptionListener;
import jakarta.jms.JMSException;
import jakarta.jms.Message;
import jakarta.jms.MessageListener;
import jakarta.jms.MessageProducer;
import jakarta.jms.Queue;
import jakarta.jms.ServerSession;
import jakarta.jms.ServerSessionPool;
import jakarta.jms.Session;
import jakarta.jms.TextMessage;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.util.Wait;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ExpiredAckAsyncConsumerTest {
    private static final Logger LOG = LoggerFactory.getLogger(ExpiredAckAsyncConsumerTest.class);
    private BrokerService broker;
    private Connection connection;
    private ConnectionConsumer connectionConsumer;
    private Queue queue;
    private AtomicBoolean finished = new AtomicBoolean();
    private AtomicBoolean failed = new AtomicBoolean();

    @Before
    public void setUp() throws Exception {
        this.broker = new BrokerService();
        this.broker.addConnector("tcp://localhost:0");
        this.broker.setDeleteAllMessagesOnStartup(true);
        PolicyMap policyMap = new PolicyMap();
        PolicyEntry defaultEntry = new PolicyEntry();
        policyMap.setDefaultEntry(defaultEntry);
        this.broker.setDestinationPolicy(policyMap);
        this.broker.start();
        this.broker.waitUntilStarted();
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(((TransportConnector)this.broker.getTransportConnectors().get(0)).getConnectUri().toString());
        factory.setExceptionListener(new ExceptionListener(){

            public void onException(JMSException exception) {
                ExpiredAckAsyncConsumerTest.this.failed.set(true);
            }
        });
        this.connection = factory.createConnection();
        this.queue = this.createQueue();
        this.connection.start();
    }

    @After
    public void tearDown() throws Exception {
        try {
            this.connectionConsumer.close();
            this.connection.close();
        }
        catch (Exception exception) {
            // empty catch block
        }
        this.broker.stop();
        this.broker.waitUntilStopped();
    }

    @Test(timeout=60000L)
    public void testAsyncMessageExpiration() throws Exception {
        ExecutorService executors = Executors.newFixedThreadPool(1);
        final Session session = this.connection.createSession(false, 1);
        final MessageProducer producer = session.createProducer((Destination)this.queue);
        producer.setTimeToLive(10L);
        executors.submit(new Runnable(){

            @Override
            public void run() {
                try {
                    Thread.sleep(100L);
                    for (int count = 0; !ExpiredAckAsyncConsumerTest.this.failed.get() && count < 30; ++count) {
                        producer.send((Message)session.createTextMessage("Hello World: " + count));
                        LOG.info("sending: " + count);
                        Thread.sleep(100L);
                    }
                    ExpiredAckAsyncConsumerTest.this.finished.set(true);
                }
                catch (Exception e) {
                    e.printStackTrace();
                }
            }
        });
        this.connectionConsumer = this.connection.createConnectionConsumer((Destination)this.queue, null, (ServerSessionPool)new TestServerSessionPool(this.connection), 1000);
        Assert.assertTrue((String)"received messages", (boolean)Wait.waitFor((Wait.Condition)new Wait.Condition(){

            public boolean isSatisified() throws Exception {
                return ExpiredAckAsyncConsumerTest.this.finished.get();
            }
        }));
        Assert.assertFalse((String)"An exception was received on receive", (boolean)this.failed.get());
    }

    protected Queue createQueue() {
        return new ActiveMQQueue("TEST");
    }

    private class TestServerSessionPool
    implements ServerSessionPool {
        Connection connection;
        LinkedBlockingQueue<TestServerSession> serverSessions = new LinkedBlockingQueue(10);

        public TestServerSessionPool(Connection connection) throws JMSException {
            this.connection = connection;
            for (int i = 0; i < 15; ++i) {
                this.addSession();
            }
        }

        public ServerSession getServerSession() throws JMSException {
            try {
                return this.serverSessions.take();
            }
            catch (InterruptedException e) {
                throw new RuntimeException("could not get session");
            }
        }

        public void addSession() {
            try {
                this.serverSessions.add(new TestServerSession(this));
            }
            catch (Exception exception) {
                // empty catch block
            }
        }
    }

    private class TestMessageListener
    implements MessageListener {
        private TestMessageListener() {
        }

        public void onMessage(Message message) {
            try {
                Thread.sleep(1000L);
                String text = ((TextMessage)message).getText();
                LOG.info("got message: " + text);
            }
            catch (Exception e) {
                LOG.error("in onMessage", (Throwable)e);
            }
        }
    }

    private class TestServerSession
    implements ServerSession {
        TestServerSessionPool pool;
        Session session;

        public TestServerSession(TestServerSessionPool pool) throws JMSException {
            this.pool = pool;
            this.session = pool.connection.createSession(false, 1);
            this.session.setMessageListener((MessageListener)new TestMessageListener());
        }

        public Session getSession() throws JMSException {
            return this.session;
        }

        public void start() throws JMSException {
            new Thread(){

                @Override
                public void run() {
                    if (!ExpiredAckAsyncConsumerTest.this.finished.get()) {
                        try {
                            TestServerSession.this.session.run();
                            TestServerSession.this.pool.addSession();
                        }
                        catch (Exception exception) {
                            // empty catch block
                        }
                    }
                }
            }.start();
        }
    }
}

