/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.store.jdbc;

import jakarta.jms.ConnectionFactory;
import jakarta.jms.Destination;
import jakarta.jms.Message;
import jakarta.jms.MessageConsumer;
import jakarta.jms.MessageProducer;
import jakarta.jms.Queue;
import jakarta.jms.QueueBrowser;
import jakarta.jms.Session;
import jakarta.jms.Topic;
import jakarta.jms.TopicSubscriber;
import jakarta.jms.XASession;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.util.Arrays;
import java.util.Enumeration;
import javax.management.ObjectName;
import javax.transaction.xa.XAResource;
import javax.transaction.xa.Xid;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQXAConnection;
import org.apache.activemq.ActiveMQXAConnectionFactory;
import org.apache.activemq.TestSupport;
import org.apache.activemq.broker.BrokerPlugin;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.jmx.QueueViewMBean;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.TransactionId;
import org.apache.activemq.command.XATransactionId;
import org.apache.activemq.filter.AnyDestination;
import org.apache.activemq.filter.DestinationMap;
import org.apache.activemq.jaas.GroupPrincipal;
import org.apache.activemq.openwire.OpenWireFormat;
import org.apache.activemq.security.AuthorizationMap;
import org.apache.activemq.security.AuthorizationPlugin;
import org.apache.activemq.security.SimpleAuthenticationPlugin;
import org.apache.activemq.security.SimpleAuthorizationMap;
import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.util.TestUtils;
import org.apache.activemq.util.Wait;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@RunWith(value=Parameterized.class)
public class XACompletionTest
extends TestSupport {
    private static final Logger LOG = LoggerFactory.getLogger(XACompletionTest.class);
    protected ActiveMQXAConnectionFactory factory;
    protected static final int messagesExpected = 1;
    protected BrokerService broker;
    protected String connectionUri;
    @Parameterized.Parameter
    public TestSupport.PersistenceAdapterChoice persistenceAdapterChoice;

    @Parameterized.Parameters(name="store={0}")
    public static Iterable<Object[]> getTestParameters() {
        return Arrays.asList({TestSupport.PersistenceAdapterChoice.KahaDB}, {TestSupport.PersistenceAdapterChoice.JDBC});
    }

    @Before
    public void setUp() throws Exception {
        this.broker = this.createBroker();
    }

    @After
    public void stopAll() throws Exception {
        if (this.broker != null) {
            this.broker.stop();
            this.broker.waitUntilStopped();
            this.broker = null;
        }
    }

    @Test
    public void testStatsAndRedispatchAfterAckPreparedClosed() throws Exception {
        this.factory = new ActiveMQXAConnectionFactory(this.connectionUri + "?jms.prefetchPolicy.all=0&jms.redeliveryPolicy.maximumRedeliveries=0");
        this.factory.setWatchTopicAdvisories(false);
        this.sendMessages(1);
        ActiveMQXAConnection activeMQXAConnection = (ActiveMQXAConnection)this.factory.createXAConnection();
        activeMQXAConnection.start();
        XASession xaSession = activeMQXAConnection.createXASession();
        Queue destination = xaSession.createQueue("TEST");
        MessageConsumer consumer = xaSession.createConsumer((Destination)destination);
        XAResource resource = xaSession.getXAResource();
        resource.recover(0x1000000);
        resource.recover(0);
        Xid tid = TestUtils.createXid();
        resource.start(tid, 0);
        Message message = consumer.receive(2000L);
        LOG.info("Received : " + message);
        resource.end(tid, 0x4000000);
        activeMQXAConnection.close();
        this.dumpMessages();
        this.dumpMessages();
        LOG.info("Try jmx browse... after commit");
        ObjectName queueViewMBeanName = new ObjectName("org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Queue,destinationName=TEST");
        QueueViewMBean proxy = (QueueViewMBean)this.broker.getManagementContext().newProxyInstance(queueViewMBeanName, QueueViewMBean.class, true);
        XACompletionTest.assertEquals((String)"size", (long)1L, (long)proxy.getQueueSize());
        LOG.info("Try receive... after rollback");
        message = this.regularReceive("TEST");
        XACompletionTest.assertNotNull((String)"message gone", (Object)message);
    }

    @Test
    public void testStatsAndBrowseAfterAckPreparedCommitted() throws Exception {
        this.factory = new ActiveMQXAConnectionFactory(this.connectionUri + "?jms.prefetchPolicy.all=0&jms.redeliveryPolicy.maximumRedeliveries=1");
        this.factory.setWatchTopicAdvisories(false);
        this.sendMessages(1);
        ActiveMQXAConnection activeMQXAConnection = (ActiveMQXAConnection)this.factory.createXAConnection();
        activeMQXAConnection.start();
        XASession xaSession = activeMQXAConnection.createXASession();
        Queue destination = xaSession.createQueue("TEST");
        MessageConsumer consumer = xaSession.createConsumer((Destination)destination);
        XAResource resource = xaSession.getXAResource();
        resource.recover(0x1000000);
        resource.recover(0);
        Xid tid = TestUtils.createXid();
        resource.start(tid, 0);
        int messagesReceived = 0;
        for (int i = 0; i < 1; ++i) {
            Message message = null;
            try {
                LOG.debug("Receiving message " + (messagesReceived + 1) + " of 1");
                message = consumer.receive(2000L);
                LOG.info("Received : " + message);
                ++messagesReceived;
                continue;
            }
            catch (Exception e) {
                LOG.debug("Caught exception:", (Throwable)e);
            }
        }
        resource.end(tid, 0x4000000);
        resource.prepare(tid);
        consumer.close();
        this.dumpMessages();
        resource.commit(tid, false);
        this.dumpMessages();
        LOG.info("Try jmx browse... after commit");
        ObjectName queueViewMBeanName = new ObjectName("org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Queue,destinationName=TEST");
        QueueViewMBean proxy = (QueueViewMBean)this.broker.getManagementContext().newProxyInstance(queueViewMBeanName, QueueViewMBean.class, true);
        XACompletionTest.assertTrue((boolean)proxy.browseMessages().isEmpty());
        XACompletionTest.assertEquals((String)"prefetch 0", (long)0L, (long)proxy.getInFlightCount());
        XACompletionTest.assertEquals((String)"size 0", (long)0L, (long)proxy.getQueueSize());
        LOG.info("Try browse... after commit");
        Message browsed = this.regularBrowseFirst();
        XACompletionTest.assertNull((String)"message gone", (Object)browsed);
        LOG.info("Try receive... after commit");
        Message message = this.regularReceive("TEST");
        XACompletionTest.assertNull((String)"message gone", (Object)message);
    }

    @Test
    public void testStatsAndBrowseAfterAckPreparedRolledback() throws Exception {
        this.factory = new ActiveMQXAConnectionFactory(this.connectionUri + "?jms.prefetchPolicy.all=0");
        this.factory.setWatchTopicAdvisories(false);
        this.sendMessages(10);
        ObjectName queueViewMBeanName = new ObjectName("org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Queue,destinationName=TEST");
        final QueueViewMBean proxy = (QueueViewMBean)this.broker.getManagementContext().newProxyInstance(queueViewMBeanName, QueueViewMBean.class, true);
        ActiveMQXAConnection activeMQXAConnection = (ActiveMQXAConnection)this.factory.createXAConnection();
        activeMQXAConnection.start();
        XASession xaSession = activeMQXAConnection.createXASession();
        Queue destination = xaSession.createQueue("TEST");
        MessageConsumer consumer = xaSession.createConsumer((Destination)destination);
        XAResource resource = xaSession.getXAResource();
        resource.recover(0x1000000);
        resource.recover(0);
        XACompletionTest.assertEquals((String)"prefetch 0", (long)0L, (long)proxy.getInFlightCount());
        XACompletionTest.assertEquals((String)"size 0", (long)10L, (long)proxy.getQueueSize());
        XACompletionTest.assertEquals((String)"size 0", (int)0, (int)proxy.cursorSize());
        Xid tid = TestUtils.createXid();
        resource.start(tid, 0);
        for (int i = 0; i < 5; ++i) {
            Message message = null;
            try {
                message = consumer.receive(2000L);
                LOG.info("Received : " + message);
                continue;
            }
            catch (Exception e) {
                LOG.debug("Caught exception:", (Throwable)e);
            }
        }
        resource.end(tid, 0x4000000);
        resource.prepare(tid);
        consumer.close();
        this.dumpMessages();
        Wait.waitFor((Wait.Condition)new Wait.Condition(){

            public boolean isSatisified() throws Exception {
                return proxy.getInFlightCount() == 0L;
            }
        });
        XACompletionTest.assertEquals((String)"prefetch", (long)0L, (long)proxy.getInFlightCount());
        XACompletionTest.assertEquals((String)"size", (long)10L, (long)proxy.getQueueSize());
        XACompletionTest.assertEquals((String)"cursor size", (int)0, (int)proxy.cursorSize());
        resource.rollback(tid);
        this.dumpMessages();
        LOG.info("Try jmx browse... after rollback");
        XACompletionTest.assertEquals((int)10, (int)proxy.browseMessages().size());
        XACompletionTest.assertEquals((String)"prefetch", (long)0L, (long)proxy.getInFlightCount());
        XACompletionTest.assertEquals((String)"size", (long)10L, (long)proxy.getQueueSize());
        XACompletionTest.assertEquals((String)"cursor size", (int)0, (int)proxy.cursorSize());
        LOG.info("Try browse... after");
        Message browsed = this.regularBrowseFirst();
        XACompletionTest.assertNotNull((String)"message gone", (Object)browsed);
        LOG.info("Try receive... after");
        for (int i = 0; i < 10; ++i) {
            Message message = this.regularReceive("TEST");
            XACompletionTest.assertNotNull((String)"message gone", (Object)message);
        }
    }

    @Test
    public void testStatsAndConsumeAfterAckPreparedRolledback() throws Exception {
        this.factory = new ActiveMQXAConnectionFactory(this.connectionUri + "?jms.prefetchPolicy.all=0");
        this.factory.setWatchTopicAdvisories(false);
        this.sendMessages(10);
        ActiveMQXAConnection activeMQXAConnection = (ActiveMQXAConnection)this.factory.createXAConnection();
        activeMQXAConnection.start();
        XASession xaSession = activeMQXAConnection.createXASession();
        Queue destination = xaSession.createQueue("TEST");
        MessageConsumer consumer = xaSession.createConsumer((Destination)destination);
        XAResource resource = xaSession.getXAResource();
        resource.recover(0x1000000);
        resource.recover(0);
        this.dumpMessages();
        Xid tid = TestUtils.createXid();
        resource.start(tid, 0);
        int messagesReceived = 0;
        for (int i = 0; i < 5; ++i) {
            Message message = null;
            try {
                LOG.debug("Receiving message " + (messagesReceived + 1) + " of 1");
                message = consumer.receive(2000L);
                LOG.info("Received : " + message);
                ++messagesReceived;
                continue;
            }
            catch (Exception e) {
                LOG.debug("Caught exception:", (Throwable)e);
            }
        }
        resource.end(tid, 0x4000000);
        resource.prepare(tid);
        consumer.close();
        LOG.info("after close");
        this.dumpMessages();
        XACompletionTest.assertEquals((String)"drain", (int)5, (int)this.drainUnack(5, "TEST"));
        this.dumpMessages();
        this.broker = this.restartBroker();
        XACompletionTest.assertEquals((String)"redrain", (int)5, (int)this.drainUnack(5, "TEST"));
        LOG.info("Try consume... after restart");
        this.dumpMessages();
        ObjectName queueViewMBeanName = new ObjectName("org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Queue,destinationName=TEST");
        QueueViewMBean proxy = (QueueViewMBean)this.broker.getManagementContext().newProxyInstance(queueViewMBeanName, QueueViewMBean.class, true);
        XACompletionTest.assertEquals((String)"prefetch", (long)0L, (long)proxy.getInFlightCount());
        XACompletionTest.assertEquals((String)"size", (long)5L, (long)proxy.getQueueSize());
        XACompletionTest.assertEquals((String)"cursor size 0", (int)0, (int)proxy.cursorSize());
        this.factory = new ActiveMQXAConnectionFactory(this.connectionUri + "?jms.prefetchPolicy.all=0");
        this.factory.setWatchTopicAdvisories(false);
        activeMQXAConnection = (ActiveMQXAConnection)this.factory.createXAConnection();
        activeMQXAConnection.start();
        xaSession = activeMQXAConnection.createXASession();
        XAResource xaResource = xaSession.getXAResource();
        Xid[] xids = xaResource.recover(0x1000000);
        xaResource.recover(0);
        LOG.info("Rollback outcome for ack");
        xaResource.rollback(xids[0]);
        LOG.info("Try receive... after rollback");
        for (int i = 0; i < 10; ++i) {
            Message message = this.regularReceive("TEST");
            XACompletionTest.assertNotNull((String)("message gone: " + i), (Object)message);
        }
        this.dumpMessages();
        XACompletionTest.assertNull((String)"none left", (Object)this.regularReceive("TEST"));
        XACompletionTest.assertEquals((String)"prefetch", (long)0L, (long)proxy.getInFlightCount());
        XACompletionTest.assertEquals((String)"size", (long)0L, (long)proxy.getQueueSize());
        XACompletionTest.assertEquals((String)"cursor size", (int)0, (int)proxy.cursorSize());
        XACompletionTest.assertEquals((String)"dq", (long)10L, (long)proxy.getDequeueCount());
    }

    @Test
    public void testConsumeAfterAckPreparedRolledbackTopic() throws Exception {
        this.factory = new ActiveMQXAConnectionFactory(this.connectionUri + "?jms.prefetchPolicy.all=0");
        this.factory.setWatchTopicAdvisories(false);
        ActiveMQTopic destination = new ActiveMQTopic("TEST");
        ActiveMQXAConnection activeMQXAConnection = (ActiveMQXAConnection)this.factory.createXAConnection();
        activeMQXAConnection.setClientID("durable");
        activeMQXAConnection.start();
        XASession xaSession = activeMQXAConnection.createXASession();
        TopicSubscriber consumer = xaSession.createDurableSubscriber((Topic)destination, "sub1");
        consumer.close();
        consumer = xaSession.createDurableSubscriber((Topic)destination, "sub2");
        this.sendMessagesTo(10, (Destination)destination);
        XAResource resource = xaSession.getXAResource();
        resource.recover(0x1000000);
        resource.recover(0);
        this.dumpMessages();
        Xid tid = TestUtils.createXid();
        resource.start(tid, 0);
        int messagesReceived = 0;
        for (int i = 0; i < 5; ++i) {
            Message message = null;
            try {
                LOG.debug("Receiving message " + (messagesReceived + 1) + " of 1");
                message = consumer.receive(2000L);
                LOG.info("Received : " + message);
                ++messagesReceived;
                continue;
            }
            catch (Exception e) {
                LOG.debug("Caught exception:", (Throwable)e);
            }
        }
        resource.end(tid, 0x4000000);
        resource.prepare(tid);
        consumer.close();
        activeMQXAConnection.close();
        LOG.info("after close");
        this.broker = this.restartBroker();
        LOG.info("Try consume... after restart");
        this.dumpMessages();
        this.factory = new ActiveMQXAConnectionFactory(this.connectionUri + "?jms.prefetchPolicy.all=0");
        this.factory.setWatchTopicAdvisories(false);
        activeMQXAConnection = (ActiveMQXAConnection)this.factory.createXAConnection();
        activeMQXAConnection.start();
        xaSession = activeMQXAConnection.createXASession();
        XAResource xaResource = xaSession.getXAResource();
        Xid[] xids = xaResource.recover(0x1000000);
        xaResource.recover(0);
        LOG.info("Rollback outcome for ack");
        xaResource.rollback(xids[0]);
        XACompletionTest.assertTrue((String)"got expected", (boolean)this.consumeOnlyN(10, "durable", "sub1", destination));
        XACompletionTest.assertTrue((String)"got expected", (boolean)this.consumeOnlyN(10, "durable", "sub2", destination));
    }

    @Test
    public void testConsumeAfterAckPreparedCommitTopic() throws Exception {
        this.factory = new ActiveMQXAConnectionFactory(this.connectionUri + "?jms.prefetchPolicy.all=0");
        this.factory.setWatchTopicAdvisories(false);
        ActiveMQTopic destination = new ActiveMQTopic("TEST");
        ActiveMQXAConnection activeMQXAConnection = (ActiveMQXAConnection)this.factory.createXAConnection();
        activeMQXAConnection.setClientID("durable");
        activeMQXAConnection.start();
        XASession xaSession = activeMQXAConnection.createXASession();
        TopicSubscriber consumer = xaSession.createDurableSubscriber((Topic)destination, "sub1");
        consumer.close();
        consumer = xaSession.createDurableSubscriber((Topic)destination, "sub2");
        this.sendMessagesTo(10, (Destination)destination);
        XAResource resource = xaSession.getXAResource();
        resource.recover(0x1000000);
        resource.recover(0);
        this.dumpMessages();
        Xid tid = TestUtils.createXid();
        resource.start(tid, 0);
        int messagesReceived = 0;
        for (int i = 0; i < 5; ++i) {
            Message message = null;
            try {
                LOG.debug("Receiving message " + (messagesReceived + 1) + " of 1");
                message = consumer.receive(2000L);
                LOG.info("Received : " + message);
                ++messagesReceived;
                continue;
            }
            catch (Exception e) {
                LOG.debug("Caught exception:", (Throwable)e);
            }
        }
        resource.end(tid, 0x4000000);
        resource.prepare(tid);
        consumer.close();
        activeMQXAConnection.close();
        LOG.info("after close");
        this.broker = this.restartBroker();
        LOG.info("Try consume... after restart");
        this.dumpMessages();
        this.factory = new ActiveMQXAConnectionFactory(this.connectionUri + "?jms.prefetchPolicy.all=0");
        this.factory.setWatchTopicAdvisories(false);
        activeMQXAConnection = (ActiveMQXAConnection)this.factory.createXAConnection();
        activeMQXAConnection.start();
        xaSession = activeMQXAConnection.createXASession();
        XAResource xaResource = xaSession.getXAResource();
        Xid[] xids = xaResource.recover(0x1000000);
        xaResource.recover(0);
        LOG.info("Rollback outcome for ack");
        xaResource.commit(xids[0], false);
        XACompletionTest.assertTrue((String)"got expected", (boolean)this.consumeOnlyN(10, "durable", "sub1", destination));
        XACompletionTest.assertTrue((String)"got expected", (boolean)this.consumeOnlyN(5, "durable", "sub2", destination));
        LOG.info("at end...");
        this.dumpMessages();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private boolean consumeOnlyN(int expected, String clientId, String subName, ActiveMQTopic destination) throws Exception {
        int drained = 0;
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(this.connectionUri + "?jms.prefetchPolicy.all=" + expected);
        factory.setWatchTopicAdvisories(false);
        connection.setClientID(clientId);
        try (jakarta.jms.Connection connection = factory.createConnection();){
            connection.start();
            Session session = connection.createSession(false, 1);
            TopicSubscriber consumer = session.createDurableSubscriber((Topic)destination, subName);
            Message message = null;
            while ((message = consumer.receive(2000L)) != null) {
                ++drained;
                LOG.info("Sub:" + subName + ", received: " + message.getJMSMessageID());
            }
            consumer.close();
        }
        return drained == expected;
    }

    @Test
    public void testStatsAndConsumeAfterAckPreparedRolledbackOutOfOrderRecovery() throws Exception {
        this.factory = new ActiveMQXAConnectionFactory(this.connectionUri + "?jms.prefetchPolicy.all=0");
        this.factory.setWatchTopicAdvisories(false);
        this.sendMessages(20);
        for (int i = 0; i < 10; ++i) {
            ActiveMQXAConnection activeMQXAConnection = (ActiveMQXAConnection)this.factory.createXAConnection();
            activeMQXAConnection.start();
            XASession xaSession = activeMQXAConnection.createXASession();
            Queue destination = xaSession.createQueue("TEST");
            MessageConsumer consumer = xaSession.createConsumer((Destination)destination);
            XAResource resource = xaSession.getXAResource();
            Xid tid = TestUtils.createXid();
            resource.start(tid, 0);
            Message message = null;
            try {
                message = consumer.receive(2000L);
                LOG.info("Received (" + i + ") : ," + message);
            }
            catch (Exception e) {
                LOG.debug("Caught exception:", (Throwable)e);
            }
            resource.end(tid, 0x4000000);
            resource.prepare(tid);
        }
        ActiveMQXAConnection activeMQXAConnection = (ActiveMQXAConnection)this.factory.createXAConnection();
        activeMQXAConnection.start();
        XASession xaSession = activeMQXAConnection.createXASession();
        XAResource xaResource = xaSession.getXAResource();
        Xid[] xids = xaResource.recover(0x1000000);
        xaResource.recover(0);
        xaResource.rollback(xids[0]);
        xaResource.rollback(xids[1]);
        activeMQXAConnection.close();
        LOG.info("RESTART");
        this.broker = this.restartBroker();
        this.dumpMessages();
        ObjectName queueViewMBeanName = new ObjectName("org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Queue,destinationName=TEST");
        QueueViewMBean proxy = (QueueViewMBean)this.broker.getManagementContext().newProxyInstance(queueViewMBeanName, QueueViewMBean.class, true);
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(this.connectionUri + "?jms.prefetchPolicy.all=1");
        factory.setWatchTopicAdvisories(false);
        jakarta.jms.Connection connection = factory.createConnection();
        connection.start();
        Session session = connection.createSession(false, 2);
        Queue destination = session.createQueue("TEST");
        MessageConsumer consumer = session.createConsumer((Destination)destination);
        consumer.close();
        ActiveMQConnectionFactory receiveFactory = new ActiveMQConnectionFactory(this.connectionUri + "?jms.prefetchPolicy.all=0");
        receiveFactory.setWatchTopicAdvisories(false);
        ActiveMQXAConnectionFactory activeMQXAConnectionFactory = new ActiveMQXAConnectionFactory(this.connectionUri + "?jms.prefetchPolicy.all=0");
        activeMQXAConnectionFactory.setWatchTopicAdvisories(false);
        activeMQXAConnection = (ActiveMQXAConnection)activeMQXAConnectionFactory.createXAConnection();
        activeMQXAConnection.start();
        xaSession = activeMQXAConnection.createXASession();
        xaResource = xaSession.getXAResource();
        xids = xaResource.recover(0x1000000);
        xaResource.recover(0);
        for (int i = 0; i < xids.length; ++i) {
            xaResource.rollback(xids[i]);
        }
        MessageConsumer consumer2 = session.createConsumer((Destination)new ActiveMQQueue("TEST?consumer.prefetchSize=2"));
        LOG.info("Try receive... after rollback");
        Message message = this.regularReceiveWith(receiveFactory, "TEST");
        XACompletionTest.assertNotNull((String)"message 1: ", (Object)message);
        LOG.info("Received : " + message);
        this.dumpMessages();
        message = this.regularReceiveWith(receiveFactory, "TEST");
        XACompletionTest.assertNotNull((String)"last message", (Object)message);
        LOG.info("Received : " + message);
    }

    @Test
    public void testMoveInTwoBranches() throws Exception {
        this.factory = new ActiveMQXAConnectionFactory(this.connectionUri + "?jms.prefetchPolicy.all=0&jms.redeliveryPolicy.maximumRedeliveries=1");
        this.factory.setWatchTopicAdvisories(false);
        this.sendMessages(1);
        ActiveMQXAConnection activeMQXAConnection = (ActiveMQXAConnection)this.factory.createXAConnection();
        activeMQXAConnection.start();
        XASession xaSession = activeMQXAConnection.createXASession();
        Queue destination = xaSession.createQueue("TEST");
        MessageConsumer consumer = xaSession.createConsumer((Destination)destination);
        XAResource resource = xaSession.getXAResource();
        final Xid tid = TestUtils.createXid();
        byte[] branch = tid.getBranchQualifier();
        final byte[] branch2 = Arrays.copyOf(branch, branch.length);
        branch2[0] = 33;
        Xid branchTid = new Xid(){

            @Override
            public int getFormatId() {
                return tid.getFormatId();
            }

            @Override
            public byte[] getGlobalTransactionId() {
                return tid.getGlobalTransactionId();
            }

            @Override
            public byte[] getBranchQualifier() {
                return branch2;
            }
        };
        resource.start(tid, 0);
        int messagesReceived = 0;
        Message message = null;
        for (int i = 0; i < 1; ++i) {
            try {
                LOG.debug("Receiving message " + (messagesReceived + 1) + " of 1");
                message = consumer.receive(2000L);
                LOG.info("Received : " + message);
                ++messagesReceived;
                continue;
            }
            catch (Exception e) {
                LOG.debug("Caught exception:", (Throwable)e);
            }
        }
        resource.end(tid, 0x4000000);
        ActiveMQXAConnection activeMQXAConnectionSend = (ActiveMQXAConnection)this.factory.createXAConnection();
        activeMQXAConnectionSend.start();
        XASession xaSessionSend = activeMQXAConnection.createXASession();
        Queue destinationSend = xaSessionSend.createQueue("TEST_MOVE");
        MessageProducer producer = xaSessionSend.createProducer((Destination)destinationSend);
        XAResource resourceSend = xaSessionSend.getXAResource();
        resourceSend.start(branchTid, 0);
        ActiveMQMessage toSend = (ActiveMQMessage)xaSessionSend.createTextMessage();
        toSend.setTransactionId((TransactionId)new XATransactionId(branchTid));
        producer.send((Message)toSend);
        resourceSend.end(branchTid, 0x4000000);
        resourceSend.prepare(branchTid);
        resource.prepare(tid);
        consumer.close();
        LOG.info("Prepared");
        this.dumpMessages();
        LOG.info("Commit Ack");
        resource.commit(tid, false);
        this.dumpMessages();
        LOG.info("Commit Send");
        resourceSend.commit(branchTid, false);
        this.dumpMessages();
        LOG.info("Try jmx browse... after commit");
        ObjectName queueViewMBeanName = new ObjectName("org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Queue,destinationName=TEST");
        QueueViewMBean proxy = (QueueViewMBean)this.broker.getManagementContext().newProxyInstance(queueViewMBeanName, QueueViewMBean.class, true);
        XACompletionTest.assertTrue((boolean)proxy.browseMessages().isEmpty());
        XACompletionTest.assertEquals((String)"dq ", (long)1L, (long)proxy.getDequeueCount());
        XACompletionTest.assertEquals((String)"size 0", (long)0L, (long)proxy.getQueueSize());
        ObjectName queueMoveViewMBeanName = new ObjectName("org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Queue,destinationName=TEST_MOVE");
        QueueViewMBean moveProxy = (QueueViewMBean)this.broker.getManagementContext().newProxyInstance(queueMoveViewMBeanName, QueueViewMBean.class, true);
        XACompletionTest.assertEquals((String)"enq", (long)1L, (long)moveProxy.getEnqueueCount());
        XACompletionTest.assertEquals((String)"size 1", (long)1L, (long)moveProxy.getQueueSize());
        XACompletionTest.assertNotNull((Object)this.regularReceive("TEST_MOVE"));
        XACompletionTest.assertEquals((String)"size 0", (long)0L, (long)moveProxy.getQueueSize());
    }

    @Test
    public void testMoveInTwoBranchesPreparedAckRecoveryRestartRollback() throws Exception {
        this.factory = new ActiveMQXAConnectionFactory(this.connectionUri + "?jms.prefetchPolicy.all=0&jms.redeliveryPolicy.maximumRedeliveries=1");
        this.factory.setWatchTopicAdvisories(false);
        this.sendMessages(1);
        ActiveMQXAConnection activeMQXAConnection = (ActiveMQXAConnection)this.factory.createXAConnection();
        activeMQXAConnection.start();
        XASession xaSession = activeMQXAConnection.createXASession();
        Queue destination = xaSession.createQueue("TEST");
        MessageConsumer consumer = xaSession.createConsumer((Destination)destination);
        XAResource resource = xaSession.getXAResource();
        final Xid tid = TestUtils.createXid();
        byte[] branch = tid.getBranchQualifier();
        final byte[] branch2 = Arrays.copyOf(branch, branch.length);
        branch2[0] = 33;
        Xid branchTid = new Xid(){

            @Override
            public int getFormatId() {
                return tid.getFormatId();
            }

            @Override
            public byte[] getGlobalTransactionId() {
                return tid.getGlobalTransactionId();
            }

            @Override
            public byte[] getBranchQualifier() {
                return branch2;
            }
        };
        resource.start(tid, 0);
        int messagesReceived = 0;
        Message message = null;
        for (int i = 0; i < 1; ++i) {
            try {
                LOG.debug("Receiving message " + (messagesReceived + 1) + " of 1");
                message = consumer.receive(2000L);
                LOG.info("Received : " + message);
                ++messagesReceived;
                continue;
            }
            catch (Exception e) {
                LOG.debug("Caught exception:", (Throwable)e);
            }
        }
        resource.end(tid, 0x4000000);
        ActiveMQXAConnection activeMQXAConnectionSend = (ActiveMQXAConnection)this.factory.createXAConnection();
        activeMQXAConnectionSend.start();
        XASession xaSessionSend = activeMQXAConnection.createXASession();
        Queue destinationSend = xaSessionSend.createQueue("TEST_MOVE");
        MessageProducer producer = xaSessionSend.createProducer((Destination)destinationSend);
        XAResource resourceSend = xaSessionSend.getXAResource();
        resourceSend.start(branchTid, 0);
        ActiveMQMessage toSend = (ActiveMQMessage)xaSessionSend.createTextMessage();
        toSend.setTransactionId((TransactionId)new XATransactionId(branchTid));
        producer.send((Message)toSend);
        resourceSend.end(branchTid, 0x4000000);
        resourceSend.prepare(branchTid);
        resource.prepare(tid);
        resourceSend.rollback(branchTid);
        consumer.close();
        LOG.info("Prepared");
        this.dumpMessages();
        this.broker = this.restartBroker();
        LOG.info("New broker");
        this.dumpMessages();
        ObjectName queueViewMBeanName = new ObjectName("org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Queue,destinationName=TEST");
        QueueViewMBean proxy = (QueueViewMBean)this.broker.getManagementContext().newProxyInstance(queueViewMBeanName, QueueViewMBean.class, true);
        XACompletionTest.assertEquals((String)"size", (long)0L, (long)proxy.getQueueSize());
        XACompletionTest.assertNull((Object)this.regularReceive("TEST_MOVE"));
        ObjectName queueMoveViewMBeanName = new ObjectName("org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Queue,destinationName=TEST_MOVE");
        QueueViewMBean moveProxy = (QueueViewMBean)this.broker.getManagementContext().newProxyInstance(queueMoveViewMBeanName, QueueViewMBean.class, true);
        XACompletionTest.assertEquals((String)"enq", (long)0L, (long)moveProxy.getDequeueCount());
        XACompletionTest.assertEquals((String)"size", (long)0L, (long)moveProxy.getQueueSize());
        XACompletionTest.assertEquals((String)"size 0", (long)0L, (long)moveProxy.getQueueSize());
        this.factory = new ActiveMQXAConnectionFactory(this.connectionUri + "?jms.prefetchPolicy.all=0&jms.redeliveryPolicy.maximumRedeliveries=1");
        this.factory.setWatchTopicAdvisories(false);
        activeMQXAConnection = (ActiveMQXAConnection)this.factory.createXAConnection();
        activeMQXAConnection.start();
        xaSession = activeMQXAConnection.createXASession();
        resource = xaSession.getXAResource();
        resource.rollback(tid);
        XACompletionTest.assertEquals((String)"size", (long)1L, (long)proxy.getQueueSize());
        XACompletionTest.assertEquals((String)"c size", (int)1, (int)proxy.cursorSize());
        XACompletionTest.assertNotNull((Object)this.regularReceive("TEST"));
        XACompletionTest.assertEquals((String)"size", (long)0L, (long)proxy.getQueueSize());
        XACompletionTest.assertEquals((String)"c size", (int)0, (int)proxy.cursorSize());
        XACompletionTest.assertEquals((String)"dq", (long)1L, (long)proxy.getDequeueCount());
    }

    @Test
    public void testMoveInTwoBranchesTwoBrokers() throws Exception {
        this.factory = new ActiveMQXAConnectionFactory(this.connectionUri + "?jms.prefetchPolicy.all=0&jms.redeliveryPolicy.maximumRedeliveries=1");
        this.factory.setWatchTopicAdvisories(false);
        this.sendMessages(1);
        ActiveMQXAConnection activeMQXAConnection = (ActiveMQXAConnection)this.factory.createXAConnection();
        activeMQXAConnection.start();
        XASession xaSession = activeMQXAConnection.createXASession();
        Queue destination = xaSession.createQueue("TEST");
        MessageConsumer consumer = xaSession.createConsumer((Destination)destination);
        XAResource resource = xaSession.getXAResource();
        final Xid tid = TestUtils.createXid();
        byte[] branch = tid.getBranchQualifier();
        final byte[] branch2 = Arrays.copyOf(branch, branch.length);
        branch2[0] = 33;
        Xid branchTid = new Xid(){

            @Override
            public int getFormatId() {
                return tid.getFormatId();
            }

            @Override
            public byte[] getGlobalTransactionId() {
                return tid.getGlobalTransactionId();
            }

            @Override
            public byte[] getBranchQualifier() {
                return branch2;
            }
        };
        resource.start(tid, 0);
        int messagesReceived = 0;
        Message message = null;
        for (int i = 0; i < 1; ++i) {
            try {
                LOG.debug("Receiving message " + (messagesReceived + 1) + " of 1");
                message = consumer.receive(2000L);
                LOG.info("Received : " + message);
                ++messagesReceived;
                continue;
            }
            catch (Exception e) {
                LOG.debug("Caught exception:", (Throwable)e);
            }
        }
        resource.end(tid, 0x4000000);
        ActiveMQXAConnection activeMQXAConnectionSend = (ActiveMQXAConnection)this.factory.createXAConnection();
        activeMQXAConnectionSend.start();
        XASession xaSessionSend = activeMQXAConnection.createXASession();
        Queue destinationSend = xaSessionSend.createQueue("TEST_MOVE");
        MessageProducer producer = xaSessionSend.createProducer((Destination)destinationSend);
        XAResource resourceSend = xaSessionSend.getXAResource();
        resourceSend.start(branchTid, 0);
        ActiveMQMessage toSend = (ActiveMQMessage)xaSessionSend.createTextMessage();
        toSend.setTransactionId((TransactionId)new XATransactionId(branchTid));
        producer.send((Message)toSend);
        resourceSend.end(branchTid, 0x4000000);
        resourceSend.prepare(branchTid);
        resource.prepare(tid);
        consumer.close();
        LOG.info("Prepared");
        this.dumpMessages();
        LOG.info("Commit Ack");
        resource.commit(tid, false);
        this.dumpMessages();
        LOG.info("Commit Send");
        resourceSend.commit(branchTid, false);
        this.dumpMessages();
        LOG.info("Try jmx browse... after commit");
        ObjectName queueViewMBeanName = new ObjectName("org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Queue,destinationName=TEST");
        QueueViewMBean proxy = (QueueViewMBean)this.broker.getManagementContext().newProxyInstance(queueViewMBeanName, QueueViewMBean.class, true);
        XACompletionTest.assertTrue((boolean)proxy.browseMessages().isEmpty());
        XACompletionTest.assertEquals((String)"dq ", (long)1L, (long)proxy.getDequeueCount());
        XACompletionTest.assertEquals((String)"size 0", (long)0L, (long)proxy.getQueueSize());
        ObjectName queueMoveViewMBeanName = new ObjectName("org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Queue,destinationName=TEST_MOVE");
        QueueViewMBean moveProxy = (QueueViewMBean)this.broker.getManagementContext().newProxyInstance(queueMoveViewMBeanName, QueueViewMBean.class, true);
        XACompletionTest.assertEquals((String)"enq", (long)1L, (long)moveProxy.getEnqueueCount());
        XACompletionTest.assertEquals((String)"size 1", (long)1L, (long)moveProxy.getQueueSize());
        XACompletionTest.assertNotNull((Object)this.regularReceive("TEST_MOVE"));
        XACompletionTest.assertEquals((String)"size 0", (long)0L, (long)moveProxy.getQueueSize());
    }

    private Message regularReceive(String qName) throws Exception {
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(this.connectionUri);
        factory.setWatchTopicAdvisories(false);
        return this.regularReceiveWith(factory, qName);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private Message regularReceiveWith(ActiveMQConnectionFactory factory, String qName) throws Exception {
        try (jakarta.jms.Connection connection = factory.createConnection();){
            connection.start();
            Session session = connection.createSession(false, 1);
            Queue destination = session.createQueue(qName);
            MessageConsumer consumer = session.createConsumer((Destination)destination);
            Message message = consumer.receive(2000L);
            return message;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private int drainUnack(int limit, String qName) throws Exception {
        int drained;
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(this.connectionUri + "?jms.prefetchPolicy.all=" + limit);
        factory.setWatchTopicAdvisories(false);
        try (jakarta.jms.Connection connection = factory.createConnection();){
            connection.start();
            Session session = connection.createSession(false, 2);
            Queue destination = session.createQueue(qName);
            MessageConsumer consumer = session.createConsumer((Destination)destination);
            for (drained = 0; drained < limit && consumer.receive(2000L) != null; ++drained) {
            }
            consumer.close();
        }
        return drained;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private Message regularBrowseFirst() throws Exception {
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(this.connectionUri);
        activeMQConnectionFactory.setWatchTopicAdvisories(false);
        try (jakarta.jms.Connection connection = activeMQConnectionFactory.createConnection();){
            connection.start();
            Session session = connection.createSession(false, 1);
            Queue destination = session.createQueue("TEST");
            QueueBrowser browser = session.createBrowser(destination);
            Enumeration e = browser.getEnumeration();
            if (e.hasMoreElements()) {
                Message message = (Message)e.nextElement();
                return message;
            }
            Message message = null;
            return message;
        }
    }

    protected void sendMessages(int messagesExpected) throws Exception {
        this.sendMessagesTo(messagesExpected, (Destination)new ActiveMQQueue("TEST"));
    }

    protected void sendMessagesTo(int messagesExpected, Destination destination) throws Exception {
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(this.connectionUri);
        activeMQConnectionFactory.setWatchTopicAdvisories(false);
        this.sendMessagesWithTo((ConnectionFactory)activeMQConnectionFactory, messagesExpected, destination);
    }

    protected void sendMessagesWith(ConnectionFactory factory, int messagesExpected) throws Exception {
        this.sendMessagesWithTo(factory, messagesExpected, (Destination)new ActiveMQQueue("TEST"));
    }

    protected void sendMessagesWithTo(ConnectionFactory factory, int messagesExpected, Destination destination) throws Exception {
        jakarta.jms.Connection connection = factory.createConnection();
        connection.start();
        Session session = connection.createSession(false, 1);
        MessageProducer producer = session.createProducer(destination);
        producer.setDeliveryMode(2);
        for (int i = 0; i < messagesExpected; ++i) {
            LOG.debug("Sending message " + (i + 1) + " of " + messagesExpected);
            producer.send((Message)session.createTextMessage("test message " + (i + 1)));
        }
        connection.close();
    }

    protected void dumpMessages() throws Exception {
        if (this.persistenceAdapterChoice.compareTo(TestSupport.PersistenceAdapterChoice.JDBC) != 0) {
            return;
        }
        OpenWireFormat wireFormat = new OpenWireFormat();
        Connection conn = ((JDBCPersistenceAdapter)this.broker.getPersistenceAdapter()).getDataSource().getConnection();
        PreparedStatement statement = conn.prepareStatement("SELECT ID, MSG, XID FROM ACTIVEMQ_MSGS");
        ResultSet result = statement.executeQuery();
        LOG.info("Messages in broker db...");
        while (result.next()) {
            long id = result.getLong(1);
            org.apache.activemq.command.Message message = (org.apache.activemq.command.Message)wireFormat.unmarshal(new ByteSequence(result.getBytes(2)));
            String xid = result.getString(3);
            LOG.info("id: " + id + ", message SeqId: " + message.getMessageId().getBrokerSequenceId() + ", XID:" + xid + ", MSG: " + message);
        }
        statement.close();
        statement = conn.prepareStatement("SELECT LAST_ACKED_ID, CLIENT_ID, SUB_NAME, PRIORITY, XID FROM ACTIVEMQ_ACKS");
        result = statement.executeQuery();
        LOG.info("Messages in ACKS table db...");
        while (result.next()) {
            LOG.info("lastAcked: {}, clientId: {}, SUB_NAME: {}, PRIORITY: {}, XID {}", new Object[]{result.getLong(1), result.getString(2), result.getString(3), result.getInt(4), result.getString(5)});
        }
        statement.close();
        conn.close();
    }

    protected BrokerService createBroker() throws Exception {
        return this.createBroker(true);
    }

    protected BrokerService restartBroker() throws Exception {
        this.broker.stop();
        this.broker.waitUntilStopped();
        return this.createBroker(false);
    }

    protected BrokerService createBroker(boolean del) throws Exception {
        BrokerService broker = new BrokerService();
        broker.setAdvisorySupport(false);
        PolicyMap policyMap = new PolicyMap();
        PolicyEntry policyEntry = new PolicyEntry();
        policyEntry.setExpireMessagesPeriod(0L);
        policyMap.setDefaultEntry(policyEntry);
        broker.setDestinationPolicy(policyMap);
        broker.setDeleteAllMessagesOnStartup(del);
        XACompletionTest.setPersistenceAdapter(broker, this.persistenceAdapterChoice);
        broker.setPersistent(true);
        this.connectionUri = broker.addConnector("tcp://localhost:0").getPublishableConnectString();
        String id = "a";
        AuthorizationPlugin authorizationPlugin = new AuthorizationPlugin();
        SimpleAuthorizationMap map = new SimpleAuthorizationMap();
        DestinationMap destinationMap = new DestinationMap();
        GroupPrincipal anaGroup = new GroupPrincipal("a");
        destinationMap.put((ActiveMQDestination)new AnyDestination(new ActiveMQDestination[]{new ActiveMQQueue(">")}), (Object)anaGroup);
        destinationMap.put((ActiveMQDestination)new AnyDestination(new ActiveMQDestination[]{new ActiveMQTopic(">")}), (Object)anaGroup);
        map.setWriteACLs(destinationMap);
        map.setAdminACLs(destinationMap);
        map.setReadACLs(destinationMap);
        authorizationPlugin.setMap((AuthorizationMap)map);
        SimpleAuthenticationPlugin simpleAuthenticationPlugin = new SimpleAuthenticationPlugin();
        simpleAuthenticationPlugin.setAnonymousAccessAllowed(true);
        simpleAuthenticationPlugin.setAnonymousGroup("a");
        simpleAuthenticationPlugin.setAnonymousUser("a");
        broker.setPlugins(new BrokerPlugin[]{simpleAuthenticationPlugin, authorizationPlugin});
        broker.start();
        return broker;
    }
}

