/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.store.jdbc;

import jakarta.jms.Connection;
import jakarta.jms.Destination;
import jakarta.jms.ExceptionListener;
import jakarta.jms.JMSException;
import jakarta.jms.Message;
import jakarta.jms.MessageConsumer;
import jakarta.jms.MessageProducer;
import jakarta.jms.Session;
import jakarta.jms.TextMessage;
import java.io.IOException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.RedeliveryPolicy;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.RegionBroker;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
import org.apache.activemq.util.DefaultIOExceptionHandler;
import org.apache.activemq.util.IOExceptionHandler;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.core.Appender;
import org.apache.logging.log4j.core.Filter;
import org.apache.logging.log4j.core.Layout;
import org.apache.logging.log4j.core.LogEvent;
import org.apache.logging.log4j.core.appender.AbstractAppender;
import org.apache.logging.log4j.core.config.Property;
import org.apache.logging.log4j.core.filter.AbstractFilter;
import org.apache.logging.log4j.core.layout.MessageLayout;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class JDBCConcurrentDLQTest {
    private static final Logger LOG = LoggerFactory.getLogger(JDBCConcurrentDLQTest.class);
    BrokerService broker;
    JDBCPersistenceAdapter jdbcPersistenceAdapter;
    Appender appender = null;
    final AtomicBoolean gotError = new AtomicBoolean(false);

    @Before
    public void setUp() throws Exception {
        this.gotError.set(false);
        this.broker = this.createBroker();
        this.broker.start();
        this.broker.waitUntilStarted();
        org.apache.logging.log4j.core.Logger jdbcLogger = (org.apache.logging.log4j.core.Logger)org.apache.logging.log4j.core.Logger.class.cast(LogManager.getLogger(JDBCPersistenceAdapter.class));
        org.apache.logging.log4j.core.Logger regionLogger = (org.apache.logging.log4j.core.Logger)org.apache.logging.log4j.core.Logger.class.cast(LogManager.getLogger(RegionBroker.class));
        this.appender = new AbstractAppender("testAppender", (Filter)new AbstractFilter(){}, (Layout)new MessageLayout(), false, new Property[0]){

            public void append(LogEvent event) {
                if (event.getLevel().isMoreSpecificThan(Level.WARN)) {
                    LOG.error("Got error from log:" + event.getMessage().getFormattedMessage());
                    JDBCConcurrentDLQTest.this.gotError.set(true);
                }
            }
        };
        this.appender.start();
        jdbcLogger.get().addAppender(this.appender, Level.DEBUG, (Filter)new AbstractFilter(){});
        jdbcLogger.addAppender(this.appender);
        regionLogger.get().addAppender(this.appender, Level.DEBUG, (Filter)new AbstractFilter(){});
        regionLogger.addAppender(this.appender);
    }

    @After
    public void tearDown() throws Exception {
        ((org.apache.logging.log4j.core.Logger)LogManager.getLogger(RegionBroker.class)).removeAppender(this.appender);
        ((org.apache.logging.log4j.core.Logger)LogManager.getLogger(JDBCPersistenceAdapter.class)).removeAppender(this.appender);
        this.broker.stop();
    }

    protected BrokerService createBroker() throws Exception {
        this.broker = new BrokerService();
        this.broker.setUseJmx(true);
        this.broker.setAdvisorySupport(false);
        this.jdbcPersistenceAdapter = new JDBCPersistenceAdapter();
        this.jdbcPersistenceAdapter.setUseLock(false);
        this.broker.setPersistenceAdapter((PersistenceAdapter)this.jdbcPersistenceAdapter);
        this.broker.setDeleteAllMessagesOnStartup(true);
        this.broker.addConnector("tcp://0.0.0.0:0");
        return this.broker;
    }

    @Test
    public void testConcurrentDlqOk() throws Exception {
        ActiveMQQueue dest = new ActiveMQQueue("DD");
        final ActiveMQConnectionFactory amq = new ActiveMQConnectionFactory(this.broker.getTransportConnectorByScheme("tcp").getPublishableConnectString());
        amq.setWatchTopicAdvisories(false);
        this.broker.setIoExceptionHandler((IOExceptionHandler)new DefaultIOExceptionHandler(){

            public void handle(IOException exception) {
                LOG.error("handle IOException from store", (Throwable)exception);
                JDBCConcurrentDLQTest.this.gotError.set(true);
            }
        });
        org.apache.logging.log4j.core.Logger loggerRB = (org.apache.logging.log4j.core.Logger)org.apache.logging.log4j.core.Logger.class.cast(LogManager.getLogger(RegionBroker.class));
        org.apache.logging.log4j.core.Logger loggerJDBC = (org.apache.logging.log4j.core.Logger)org.apache.logging.log4j.core.Logger.class.cast(LogManager.getLogger(JDBCPersistenceAdapter.class));
        loggerRB.get().addAppender(this.appender, Level.DEBUG, (Filter)new AbstractFilter(){});
        loggerRB.addAppender(this.appender);
        loggerJDBC.get().addAppender(this.appender, Level.DEBUG, (Filter)new AbstractFilter(){});
        loggerJDBC.addAppender(this.appender);
        int numMessages = 100;
        AtomicInteger consumed = new AtomicInteger(100);
        this.produceMessages(amq, (Destination)dest, 100);
        ExecutorService executorService = Executors.newCachedThreadPool();
        for (int i = 0; i < 50; ++i) {
            executorService.execute(new Runnable(){
                final /* synthetic */ Destination val$dest;
                final /* synthetic */ AtomicInteger val$consumed;
                {
                    this.val$dest = destination;
                    this.val$consumed = atomicInteger;
                }

                /*
                 * WARNING - Removed try catching itself - possible behaviour change.
                 */
                @Override
                public void run() {
                    Connection connection = null;
                    Session session = null;
                    MessageConsumer consumer = null;
                    try {
                        connection = amq.createConnection();
                        connection.setExceptionListener(new ExceptionListener(){

                            public void onException(JMSException e) {
                                e.printStackTrace();
                            }
                        });
                        RedeliveryPolicy queuePolicy = new RedeliveryPolicy();
                        queuePolicy.setMaximumRedeliveries(0);
                        ((ActiveMQConnection)connection).setRedeliveryPolicy(queuePolicy);
                        connection.start();
                        session = connection.createSession(true, 0);
                        consumer = session.createConsumer(this.val$dest);
                        while (this.val$consumed.get() > 0 && !JDBCConcurrentDLQTest.this.gotError.get()) {
                            Message message = consumer.receive(4000L);
                            if (message == null) continue;
                            this.val$consumed.decrementAndGet();
                            session.rollback();
                        }
                    }
                    catch (Exception e) {
                        LOG.error("Error on consumption", (Throwable)e);
                        JDBCConcurrentDLQTest.this.gotError.set(true);
                    }
                    finally {
                        try {
                            if (connection != null) {
                                connection.close();
                            }
                        }
                        catch (Exception exception) {}
                    }
                }
            });
        }
        executorService.shutdown();
        boolean allComplete = executorService.awaitTermination(60L, TimeUnit.SECONDS);
        executorService.shutdownNow();
        LOG.info("Total messages: " + this.broker.getAdminView().getTotalMessageCount());
        LOG.info("Total enqueues: " + this.broker.getAdminView().getTotalEnqueueCount());
        LOG.info("Total deueues: " + this.broker.getAdminView().getTotalDequeueCount());
        Assert.assertTrue((boolean)allComplete);
        Assert.assertEquals((String)"all consumed", (long)0L, (long)consumed.get());
        Assert.assertEquals((String)"all messages get to the dlq", (long)200L, (long)this.broker.getAdminView().getTotalEnqueueCount());
        Assert.assertEquals((String)"all messages acked", (long)100L, (long)this.broker.getAdminView().getTotalDequeueCount());
        Assert.assertFalse((String)"no error", (boolean)this.gotError.get());
    }

    private void produceMessages(ActiveMQConnectionFactory amq, Destination dest, int numMessages) throws JMSException {
        Connection connection = amq.createConnection();
        connection.setExceptionListener(new ExceptionListener(){

            public void onException(JMSException e) {
                e.printStackTrace();
            }
        });
        connection.start();
        Session session = connection.createSession(false, 1);
        MessageProducer producer = session.createProducer(dest);
        long counter = 0L;
        TextMessage message = session.createTextMessage();
        for (int i = 0; i < numMessages; ++i) {
            producer.send((Message)message);
            if (++counter % 50L != 0L) continue;
            LOG.info("sent " + counter + " messages");
        }
        if (connection != null) {
            connection.close();
        }
    }
}

