/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.bugs;

import jakarta.jms.Connection;
import jakarta.jms.Destination;
import jakarta.jms.Message;
import jakarta.jms.Session;
import java.io.File;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
import org.apache.activemq.store.kahadb.disk.journal.Journal;
import org.apache.activemq.util.ConsumerThread;
import org.apache.activemq.util.ProducerThread;
import org.apache.activemq.util.Wait;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AMQ4323Test {
    private static final Logger LOG = LoggerFactory.getLogger(AMQ4323Test.class);
    BrokerService broker = null;
    File kahaDbDir = null;
    private final Destination destination = new ActiveMQQueue("q");
    final String payload = new String(new byte[1024]);

    protected void startBroker(boolean delete) throws Exception {
        this.broker = new BrokerService();
        this.kahaDbDir = new File(this.broker.getBrokerDataDirectory(), "KahaDB");
        this.deleteDir(this.kahaDbDir);
        this.broker.setSchedulerSupport(false);
        this.broker.setDeleteAllMessagesOnStartup(delete);
        this.broker.setPersistent(true);
        this.broker.setUseJmx(false);
        this.broker.addConnector("tcp://localhost:0");
        PolicyMap map = new PolicyMap();
        PolicyEntry entry = new PolicyEntry();
        entry.setUseCache(false);
        map.setDefaultEntry(entry);
        this.broker.setDestinationPolicy(map);
        this.configurePersistence(this.broker, delete);
        this.broker.start();
        LOG.info("Starting broker..");
    }

    protected void configurePersistence(BrokerService brokerService, boolean deleteAllOnStart) throws Exception {
        KahaDBPersistenceAdapter adapter = (KahaDBPersistenceAdapter)brokerService.getPersistenceAdapter();
        adapter.setJournalMaxFileLength(20480);
        adapter.setCheckpointInterval(500L);
        adapter.setCleanupInterval(500L);
        adapter.setPreallocationScope(Journal.PreallocationScope.ENTIRE_JOURNAL.name());
        if (!deleteAllOnStart) {
            adapter.setForceRecoverIndex(true);
        }
    }

    private boolean deleteDir(File dir) {
        if (dir.isDirectory()) {
            String[] children = dir.list();
            for (int i = 0; i < children.length; ++i) {
                boolean success = this.deleteDir(new File(dir, children[i]));
                if (success) continue;
                return false;
            }
        }
        return dir.delete();
    }

    private int getFileCount(File dir) {
        if (dir.isDirectory()) {
            String[] children = dir.list();
            return children.length;
        }
        return 0;
    }

    @Test
    public void testCleanupOfFiles() throws Exception {
        int messageCount = 500;
        this.startBroker(true);
        int fileCount = this.getFileCount(this.kahaDbDir);
        Assert.assertEquals((long)4L, (long)fileCount);
        Connection connection = new ActiveMQConnectionFactory(((TransportConnector)this.broker.getTransportConnectors().get(0)).getConnectUri()).createConnection();
        connection.start();
        Session producerSess = connection.createSession(false, 1);
        Session consumerSess = connection.createSession(false, 1);
        ProducerThread producer = new ProducerThread(producerSess, this.destination){

            protected Message createMessage(int i) throws Exception {
                return this.session.createTextMessage(AMQ4323Test.this.payload + "::" + i);
            }
        };
        producer.setMessageCount(500);
        ConsumerThread consumer = new ConsumerThread(consumerSess, this.destination);
        consumer.setBreakOnNull(false);
        consumer.setMessageCount(500);
        producer.start();
        producer.join();
        consumer.start();
        consumer.join();
        Assert.assertEquals((String)"consumer got all produced messages", (long)producer.getMessageCount(), (long)consumer.getReceived());
        Assert.assertTrue((String)"gc worked", (boolean)Wait.waitFor((Wait.Condition)new Wait.Condition(){

            public boolean isSatisified() throws Exception {
                int fileCount = AMQ4323Test.this.getFileCount(AMQ4323Test.this.kahaDbDir);
                LOG.info("current filecount:" + fileCount);
                return 4 == fileCount;
            }
        }));
        this.broker.stop();
        this.broker.waitUntilStopped();
    }
}

