/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.bugs;

import jakarta.jms.BytesMessage;
import jakarta.jms.Connection;
import jakarta.jms.Destination;
import jakarta.jms.JMSException;
import jakarta.jms.Message;
import jakarta.jms.MessageConsumer;
import jakarta.jms.MessageProducer;
import jakarta.jms.Queue;
import jakarta.jms.Session;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.concurrent.TimeUnit;
import javax.management.MalformedObjectNameException;
import javax.management.ObjectName;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.jmx.QueueViewMBean;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
import org.apache.activemq.store.kahadb.MessageDatabase;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.filefilter.IOFileFilter;
import org.apache.commons.io.filefilter.TrueFileFilter;
import org.apache.commons.io.filefilter.WildcardFileFilter;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AMQ6133PersistJMSRedeliveryTest {
    private static final Logger LOG = LoggerFactory.getLogger(AMQ6133PersistJMSRedeliveryTest.class);
    private static final String QUEUE_NAME = "test.queue";
    private BrokerService broker;

    @Test
    public void testPersistJMSRedeliveredMessageLossOnIndexRebuild() throws Exception {
        this.sendMessages();
        LOG.info("#### Finished sending messages, test starting. ####");
        long msgCount = this.getProxyToQueue(QUEUE_NAME).getQueueSize();
        int ITERATIONS = 3;
        for (int i = 0; i < 3; ++i) {
            LOG.info("Consumer and Rollback iteration: {}", (Object)i);
            this.consumerAndRollback(i);
        }
        TimeUnit.SECONDS.sleep(20L);
        this.restart();
        Assert.assertEquals((long)msgCount, (long)this.getProxyToQueue(QUEUE_NAME).getQueueSize());
        this.restartWithRecovery(this.getPersistentDir());
        Assert.assertEquals((long)msgCount, (long)this.getProxyToQueue(QUEUE_NAME).getQueueSize());
    }

    @Before
    public void setup() throws Exception {
        ((org.apache.logging.log4j.core.Logger)LogManager.getLogger(MessageDatabase.class)).setLevel(Level.TRACE);
        this.createBroker(true);
    }

    @After
    public void tearDown() throws Exception {
        this.broker.stop();
    }

    private void restart() throws Exception {
        this.broker.stop();
        this.broker.waitUntilStopped();
        this.createBroker(false);
    }

    private void restartWithRecovery(File persistenceDir) throws Exception {
        this.broker.stop();
        this.broker.waitUntilStopped();
        for (File index : FileUtils.listFiles((File)persistenceDir, (IOFileFilter)new WildcardFileFilter("db.*"), (IOFileFilter)TrueFileFilter.INSTANCE)) {
            FileUtils.deleteQuietly((File)index);
        }
        this.createBroker(false);
    }

    private void sendMessages() throws Exception {
        Connection connection = this.createConnection();
        Session session = connection.createSession(false, 1);
        Queue queue = session.createQueue(QUEUE_NAME);
        Queue retainQueue = session.createQueue("test.queue-retain");
        MessageProducer producer = session.createProducer(null);
        byte[] payload = new byte[1000];
        producer.setDeliveryMode(2);
        BytesMessage message = session.createBytesMessage();
        message.writeBytes(payload);
        while (this.getLogFileCount() < 3) {
            producer.send((Destination)queue, (Message)message);
        }
        while (this.getLogFileCount() < 6) {
            producer.send((Destination)retainQueue, (Message)message);
        }
        connection.close();
    }

    private void consumerAndRollback(int iteration) throws Exception {
        Connection connection = this.createConnection();
        Session session = connection.createSession(false, 2);
        Queue queue = session.createQueue(QUEUE_NAME);
        MessageConsumer consumer = session.createConsumer((Destination)queue);
        long msgCount = this.getProxyToQueue(queue.getQueueName()).getQueueSize();
        int i = 0;
        while ((long)i < msgCount) {
            Message message = consumer.receive(50000L);
            Assert.assertNotNull((Object)message);
            if (iteration > 0) {
                Assert.assertTrue((boolean)message.getJMSRedelivered());
            }
            ++i;
        }
        connection.close();
    }

    private Connection createConnection() throws Exception {
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost?jms.prefetchPolicy.all=0");
        factory.setAlwaysSyncSend(true);
        Connection connection = factory.createConnection();
        connection.start();
        return connection;
    }

    private void createBroker(boolean deleteAllMessages) throws Exception {
        PolicyEntry entry = new PolicyEntry();
        entry.setPersistJMSRedelivered(true);
        PolicyMap policyMap = new PolicyMap();
        policyMap.setDefaultEntry(entry);
        this.broker = new BrokerService();
        this.broker.setDeleteAllMessagesOnStartup(deleteAllMessages);
        this.broker.setPersistent(true);
        this.broker.setDestinationPolicy(policyMap);
        KahaDBPersistenceAdapter kahaDB = new KahaDBPersistenceAdapter();
        kahaDB.setJournalMaxFileLength(131072);
        kahaDB.setCleanupInterval(8000L);
        this.broker.setPersistenceAdapter((PersistenceAdapter)kahaDB);
        this.broker.getSystemUsage().getStoreUsage().setLimit(0x700000L);
        this.broker.start();
        this.broker.waitUntilStarted();
    }

    private int getLogFileCount() throws Exception {
        return new ArrayList(FileUtils.listFiles((File)this.getPersistentDir(), (IOFileFilter)new WildcardFileFilter("*.log"), (IOFileFilter)TrueFileFilter.INSTANCE)).size();
    }

    private File getPersistentDir() throws IOException {
        return this.broker.getPersistenceAdapter().getDirectory();
    }

    protected QueueViewMBean getProxyToQueue(String name) throws MalformedObjectNameException, JMSException {
        ObjectName queueViewMBeanName = new ObjectName("org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Queue,destinationName=" + name);
        QueueViewMBean proxy = (QueueViewMBean)this.broker.getManagementContext().newProxyInstance(queueViewMBeanName, QueueViewMBean.class, true);
        return proxy;
    }
}

