/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.store.kahadb;

import jakarta.jms.Destination;
import jakarta.jms.JMSException;
import jakarta.jms.Message;
import jakarta.jms.MessageProducer;
import jakarta.jms.Queue;
import jakarta.jms.Session;
import java.io.File;
import java.lang.reflect.Field;
import java.net.URI;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import junit.framework.TestCase;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.command.TransactionId;
import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
import org.apache.activemq.store.kahadb.MessageDatabase;
import org.apache.activemq.store.kahadb.data.KahaAddMessageCommand;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KahaDBInFlightTxMemoryUsageTest {
    static final Logger LOG = LoggerFactory.getLogger(KahaDBInFlightTxMemoryUsageTest.class);
    @Rule
    public TemporaryFolder dataFileDir = new TemporaryFolder(new File("target"));
    private BrokerService broker;
    private URI brokerConnectURI;
    private Map<TransactionId, List<MessageDatabase.Operation<?>>> inflightTransactions;

    @Before
    public void startBroker() throws Exception {
        this.broker = new BrokerService();
        this.broker.setPersistent(true);
        this.broker.setDataDirectoryFile(this.dataFileDir.getRoot());
        this.broker.setDeleteAllMessagesOnStartup(true);
        TransportConnector connector = this.broker.addConnector(new TransportConnector());
        connector.setUri(new URI("tcp://0.0.0.0:0"));
        connector.setName("tcp");
        this.broker.start();
        this.broker.waitUntilStarted();
        this.brokerConnectURI = this.broker.getConnectorByName("tcp").getConnectUri();
        KahaDBPersistenceAdapter adapter = (KahaDBPersistenceAdapter)this.broker.getPersistenceAdapter();
        Field inflightField = MessageDatabase.class.getDeclaredField("inflightTransactions");
        inflightField.setAccessible(true);
        this.inflightTransactions = (LinkedHashMap)inflightField.get(adapter.getStore());
    }

    @After
    public void stopBroker() throws Exception {
        this.broker.stop();
        this.broker.waitUntilStopped();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testKahaDBInFlightTxMessagesClearedFromMemory() throws JMSException {
        String queueName = "test.queue";
        ActiveMQConnection connection = (ActiveMQConnection)new ActiveMQConnectionFactory(this.brokerConnectURI).createConnection();
        connection.setAlwaysSyncSend(true);
        connection.start();
        Session session = connection.createSession(true, 0);
        Queue queue = session.createQueue("test.queue");
        try {
            Assert.assertTrue((boolean)this.inflightTransactions.isEmpty());
            MessageProducer prod = session.createProducer((Destination)queue);
            for (int i = 0; i < 10; ++i) {
                prod.send((Message)session.createTextMessage("test"));
            }
            Assert.assertEquals((long)this.inflightTransactions.size(), (long)1L);
            List<MessageDatabase.Operation<?>> pendingOps = this.inflightTransactions.values().stream().findFirst().orElseThrow();
            Assert.assertEquals((long)10L, (long)pendingOps.size());
            for (MessageDatabase.Operation<?> pendingOp : pendingOps) {
                Assert.assertTrue((boolean)(pendingOp instanceof MessageDatabase.AddOperation));
                KahaAddMessageCommand command = (KahaAddMessageCommand)((MessageDatabase.AddOperation)pendingOp).getCommand();
                TestCase.assertNotNull((Object)pendingOp.getLocation());
                TestCase.assertNotNull((Object)command);
                TestCase.assertNotNull((Object)command.getMessageId());
                TestCase.assertNotNull((Object)command.getDestination());
                Assert.assertNull((Object)command.getMessage());
            }
            session.commit();
            Assert.assertTrue((boolean)this.inflightTransactions.isEmpty());
        }
        finally {
            connection.close();
        }
    }
}

