/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.bugs;

import jakarta.jms.Connection;
import jakarta.jms.ConnectionFactory;
import jakarta.jms.Destination;
import jakarta.jms.JMSException;
import jakarta.jms.Message;
import jakarta.jms.MessageConsumer;
import jakarta.jms.MessageListener;
import jakarta.jms.MessageProducer;
import jakarta.jms.Session;
import jakarta.jms.TextMessage;
import jakarta.jms.Topic;
import java.io.InterruptedIOException;
import java.net.URI;
import java.util.Random;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.core.Appender;
import org.apache.logging.log4j.core.Filter;
import org.apache.logging.log4j.core.Layout;
import org.apache.logging.log4j.core.LogEvent;
import org.apache.logging.log4j.core.appender.AbstractAppender;
import org.apache.logging.log4j.core.config.Property;
import org.apache.logging.log4j.core.filter.AbstractFilter;
import org.apache.logging.log4j.core.layout.MessageLayout;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AMQ5426Test {
    private static final Logger LOG = LoggerFactory.getLogger(AMQ5426Test.class);
    private BrokerService brokerService;
    private String connectionUri;
    private AtomicBoolean hasFailureInProducer = new AtomicBoolean(false);
    private Thread producerThread;
    private AtomicBoolean hasErrorInLogger;

    protected ConnectionFactory createConnectionFactory() throws Exception {
        ActiveMQConnectionFactory conFactory = new ActiveMQConnectionFactory(this.connectionUri);
        conFactory.setWatchTopicAdvisories(false);
        conFactory.setOptimizeAcknowledge(true);
        return conFactory;
    }

    @Before
    public void setUp() throws Exception {
        this.hasFailureInProducer = new AtomicBoolean(false);
        this.hasErrorInLogger = new AtomicBoolean(false);
        this.brokerService = BrokerFactory.createBroker((URI)new URI("broker://()/localhost?persistent=false&useJmx=true"));
        PolicyEntry policy = new PolicyEntry();
        policy.setTopicPrefetch(100);
        PolicyMap pMap = new PolicyMap();
        pMap.setDefaultEntry(policy);
        this.brokerService.addConnector("tcp://0.0.0.0:0");
        this.brokerService.start();
        this.connectionUri = this.brokerService.getTransportConnectorByScheme("tcp").getPublishableConnectString();
        org.apache.logging.log4j.core.Logger logger = (org.apache.logging.log4j.core.Logger)org.apache.logging.log4j.core.Logger.class.cast(LogManager.getRootLogger());
        AbstractAppender appender = new AbstractAppender("testAppender", (Filter)new AbstractFilter(){}, (Layout)new MessageLayout(), false, new Property[0]){

            public void append(LogEvent event) {
                if (event.getLevel().isMoreSpecificThan(Level.WARN)) {
                    AMQ5426Test.this.hasErrorInLogger.set(true);
                }
            }
        };
        appender.start();
        logger.get().addAppender((Appender)appender, Level.DEBUG, (Filter)new AbstractFilter(){});
        logger.addAppender((Appender)appender);
        this.producerThread = new Thread(new Runnable(){

            @Override
            public void run() {
                try {
                    Connection connection = AMQ5426Test.this.createConnectionFactory().createConnection();
                    connection.start();
                    Session session = connection.createSession(false, 1);
                    Topic destination = session.createTopic("test.AMQ5426");
                    LOG.debug("Created topic: {}", (Object)destination);
                    MessageProducer producer = session.createProducer((Destination)destination);
                    producer.setDeliveryMode(1);
                    producer.setTimeToLive(1000L);
                    LOG.debug("Created producer: {}", (Object)producer);
                    int i = 1;
                    while (!Thread.interrupted()) {
                        try {
                            TextMessage msg = session.createTextMessage(" testMessage " + i);
                            producer.send((Message)msg);
                            try {
                                Thread.sleep(0L, 100);
                            }
                            catch (InterruptedException e) {
                                Thread.currentThread().interrupt();
                            }
                            LOG.debug("message sent: {}", (Object)i);
                            ++i;
                        }
                        catch (JMSException e) {
                            if (e.getCause() != null && e.getCause() instanceof InterruptedIOException) continue;
                            throw e;
                        }
                    }
                    producer.close();
                    session.close();
                    connection.close();
                }
                catch (Exception e) {
                    LOG.error(e.getMessage(), (Throwable)e);
                    AMQ5426Test.this.hasFailureInProducer.set(true);
                }
            }
        });
        this.producerThread.start();
    }

    @Test(timeout=120000L)
    public void testConsumerProperlyClosedWithoutError() throws Exception {
        Random rn = new Random();
        int NUMBER_OF_RUNS = 1000;
        for (int run = 0; run < 1000; ++run) {
            final AtomicInteger numberOfMessagesReceived = new AtomicInteger(0);
            LOG.info("Starting run {} of {}", (Object)run, (Object)1000);
            Connection connection = this.createConnectionFactory().createConnection();
            connection.start();
            Session session = connection.createSession(false, 3);
            Topic destination = session.createTopic("test.AMQ5426");
            LOG.debug("Created topic: {}", (Object)destination);
            MessageConsumer consumer = session.createConsumer((Destination)destination);
            consumer.setMessageListener(new MessageListener(){

                public void onMessage(Message message) {
                    LOG.debug("Received message");
                    numberOfMessagesReceived.getAndIncrement();
                }
            });
            LOG.debug("Created consumer: {}", (Object)consumer);
            try {
                Thread.sleep(rn.nextInt(5) + 1);
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            LOG.debug("Closing consumer");
            consumer.close();
            session.close();
            connection.close();
            Assert.assertFalse((String)"Exception in Producer Thread", (boolean)this.hasFailureInProducer.get());
            Assert.assertFalse((String)"Error detected in Logger", (boolean)this.hasErrorInLogger.get());
            LOG.info("Run {} of {} completed, message received: {}", new Object[]{run, 1000, numberOfMessagesReceived.get()});
        }
    }

    @After
    public void tearDown() throws Exception {
        LOG.info("Shutdown producer thread");
        this.producerThread.interrupt();
        this.producerThread.join();
        this.brokerService.stop();
        this.brokerService.waitUntilStopped();
        Assert.assertFalse((String)"Exception in Producer Thread", (boolean)this.hasFailureInProducer.get());
        Assert.assertFalse((String)"Error detected in Logger", (boolean)this.hasErrorInLogger.get());
    }
}

