/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.store.kahadb;

import jakarta.jms.BytesMessage;
import jakarta.jms.Connection;
import jakarta.jms.Destination;
import jakarta.jms.Message;
import jakarta.jms.MessageConsumer;
import jakarta.jms.MessageProducer;
import jakarta.jms.Session;
import java.io.File;
import java.util.ArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.BaseDestination;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.store.kahadb.FilteredKahaDBPersistenceAdapter;
import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
import org.apache.activemq.store.kahadb.MultiKahaDBPersistenceAdapter;
import org.apache.activemq.usage.StoreUsage;
import org.apache.activemq.util.IOHelper;
import org.apache.activemq.util.Wait;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MKahaDBStoreLimitTest {
    private static final Logger LOG = LoggerFactory.getLogger(MKahaDBStoreLimitTest.class);
    final ActiveMQQueue queueA = new ActiveMQQueue("Q.A");
    final ActiveMQQueue queueB = new ActiveMQQueue("Q.B");
    private BrokerService broker;

    @After
    public void stopBroker() throws Exception {
        if (this.broker != null) {
            this.broker.stop();
            this.broker.waitUntilStopped();
        }
    }

    private BrokerService createBroker(MultiKahaDBPersistenceAdapter persistenceAdapter) throws Exception {
        this.broker = new BrokerService();
        this.broker.setPersistenceAdapter((PersistenceAdapter)persistenceAdapter);
        this.broker.setUseJmx(false);
        this.broker.setAdvisorySupport(false);
        this.broker.setSchedulerSupport(false);
        this.broker.setPersistenceAdapter((PersistenceAdapter)persistenceAdapter);
        this.broker.setDeleteAllMessagesOnStartup(true);
        return this.broker;
    }

    @Test
    public void testPerDestUsage() throws Exception {
        MultiKahaDBPersistenceAdapter persistenceAdapter = new MultiKahaDBPersistenceAdapter();
        KahaDBPersistenceAdapter kahaStore = new KahaDBPersistenceAdapter();
        kahaStore.setJournalMaxFileLength(5120);
        kahaStore.setCleanupInterval(1000L);
        FilteredKahaDBPersistenceAdapter filtered = new FilteredKahaDBPersistenceAdapter();
        StoreUsage storeUsage = new StoreUsage();
        storeUsage.setPercentLimit(10);
        storeUsage.setTotal(0xA00000L);
        filtered.setUsage(storeUsage);
        filtered.setPersistenceAdapter((PersistenceAdapter)kahaStore);
        filtered.setPerDestination(true);
        ArrayList<FilteredKahaDBPersistenceAdapter> stores = new ArrayList<FilteredKahaDBPersistenceAdapter>();
        stores.add(filtered);
        persistenceAdapter.setFilteredPersistenceAdapters(stores);
        this.createBroker(persistenceAdapter).start();
        this.produceMessages((Destination)this.queueA, 20);
        this.produceMessages((Destination)this.queueB, 0);
        LOG.info("Store global u: " + this.broker.getSystemUsage().getStoreUsage().getUsage() + ", %:" + this.broker.getSystemUsage().getStoreUsage().getPercentUsage());
        Assert.assertTrue((String)"some usage", (this.broker.getSystemUsage().getStoreUsage().getUsage() > 0L ? 1 : 0) != 0);
        BaseDestination baseDestinationA = (BaseDestination)this.broker.getRegionBroker().getDestinationMap().get(this.queueA);
        BaseDestination baseDestinationB = (BaseDestination)this.broker.getRegionBroker().getDestinationMap().get(this.queueB);
        LOG.info("Store A u: " + baseDestinationA.getSystemUsage().getStoreUsage().getUsage() + ", %: " + baseDestinationA.getSystemUsage().getStoreUsage().getPercentUsage());
        Assert.assertTrue((baseDestinationA.getSystemUsage().getStoreUsage().getUsage() > 0L ? 1 : 0) != 0);
        this.produceMessages((Destination)this.queueB, 40);
        Assert.assertTrue((baseDestinationB.getSystemUsage().getStoreUsage().getUsage() > 0L ? 1 : 0) != 0);
        Assert.assertTrue((baseDestinationB.getSystemUsage().getStoreUsage().getUsage() > baseDestinationA.getSystemUsage().getStoreUsage().getUsage() ? 1 : 0) != 0);
        LOG.info("Store B u: " + baseDestinationB.getSystemUsage().getStoreUsage().getUsage() + ", %: " + baseDestinationB.getSystemUsage().getStoreUsage().getPercentUsage());
        LOG.info("Store global u: " + this.broker.getSystemUsage().getStoreUsage().getUsage() + ", %:" + this.broker.getSystemUsage().getStoreUsage().getPercentUsage());
        this.consume((Destination)this.queueA);
        this.consume((Destination)this.queueB);
        LOG.info("Store global u: " + this.broker.getSystemUsage().getStoreUsage().getUsage() + ", %:" + this.broker.getSystemUsage().getStoreUsage().getPercentUsage());
        LOG.info("Store A u: " + baseDestinationA.getSystemUsage().getStoreUsage().getUsage() + ", %: " + baseDestinationA.getSystemUsage().getStoreUsage().getPercentUsage());
        LOG.info("Store B u: " + baseDestinationB.getSystemUsage().getStoreUsage().getUsage() + ", %: " + baseDestinationB.getSystemUsage().getStoreUsage().getPercentUsage());
    }

    @Test
    public void testExplicitAdapter() throws Exception {
        MultiKahaDBPersistenceAdapter persistenceAdapter = new MultiKahaDBPersistenceAdapter();
        KahaDBPersistenceAdapter kahaStore = new KahaDBPersistenceAdapter();
        kahaStore.setJournalMaxFileLength(25600);
        FilteredKahaDBPersistenceAdapter filtered = new FilteredKahaDBPersistenceAdapter();
        StoreUsage storeUsage = new StoreUsage();
        storeUsage.setPercentLimit(50);
        storeUsage.setTotal(524288L);
        filtered.setUsage(storeUsage);
        filtered.setDestination((ActiveMQDestination)this.queueA);
        filtered.setPersistenceAdapter((PersistenceAdapter)kahaStore);
        ArrayList<FilteredKahaDBPersistenceAdapter> stores = new ArrayList<FilteredKahaDBPersistenceAdapter>();
        stores.add(filtered);
        persistenceAdapter.setFilteredPersistenceAdapters(stores);
        BrokerService brokerService = this.createBroker(persistenceAdapter);
        brokerService.getSystemUsage().getStoreUsage().setTotal(0x100000L);
        brokerService.start();
        this.produceMessages((Destination)this.queueA, 20);
        LOG.info("Store global u: " + this.broker.getSystemUsage().getStoreUsage().getUsage() + ", %:" + this.broker.getSystemUsage().getStoreUsage().getPercentUsage());
        Assert.assertTrue((String)"some usage", (this.broker.getSystemUsage().getStoreUsage().getUsage() > 0L ? 1 : 0) != 0);
        BaseDestination baseDestinationA = (BaseDestination)this.broker.getRegionBroker().getDestinationMap().get(this.queueA);
        LOG.info("Store A u: " + baseDestinationA.getSystemUsage().getStoreUsage().getUsage() + ", %: " + baseDestinationA.getSystemUsage().getStoreUsage().getPercentUsage());
        Assert.assertTrue((String)"limited store has more % usage than parent", (baseDestinationA.getSystemUsage().getStoreUsage().getPercentUsage() > this.broker.getSystemUsage().getStoreUsage().getPercentUsage() ? 1 : 0) != 0);
    }

    @Test
    public void testExplicitAdapterBlockingProducer() throws Exception {
        MultiKahaDBPersistenceAdapter persistenceAdapter = new MultiKahaDBPersistenceAdapter();
        KahaDBPersistenceAdapter kahaStore = new KahaDBPersistenceAdapter();
        kahaStore.setJournalMaxFileLength(8192);
        kahaStore.setIndexDirectory(new File(IOHelper.getDefaultDataDirectory()));
        FilteredKahaDBPersistenceAdapter filtered = new FilteredKahaDBPersistenceAdapter();
        StoreUsage storeUsage = new StoreUsage();
        storeUsage.setLimit(45056L);
        filtered.setUsage(storeUsage);
        filtered.setDestination((ActiveMQDestination)this.queueA);
        filtered.setPersistenceAdapter((PersistenceAdapter)kahaStore);
        ArrayList<FilteredKahaDBPersistenceAdapter> stores = new ArrayList<FilteredKahaDBPersistenceAdapter>();
        stores.add(filtered);
        persistenceAdapter.setFilteredPersistenceAdapters(stores);
        BrokerService brokerService = this.createBroker(persistenceAdapter);
        brokerService.start();
        final AtomicBoolean done = new AtomicBoolean();
        ExecutorService executor = Executors.newCachedThreadPool();
        executor.submit(new Runnable(){

            @Override
            public void run() {
                try {
                    MKahaDBStoreLimitTest.this.produceMessages((Destination)MKahaDBStoreLimitTest.this.queueA, 20);
                    done.set(true);
                }
                catch (Exception exception) {
                    // empty catch block
                }
            }
        });
        Assert.assertTrue((String)"some messages got to dest", (boolean)Wait.waitFor((Wait.Condition)new Wait.Condition(){

            public boolean isSatisified() throws Exception {
                BaseDestination baseDestinationA = (BaseDestination)MKahaDBStoreLimitTest.this.broker.getRegionBroker().getDestinationMap().get(MKahaDBStoreLimitTest.this.queueA);
                return baseDestinationA != null && baseDestinationA.getDestinationStatistics().getMessages().getCount() > 4L;
            }
        }));
        BaseDestination baseDestinationA = (BaseDestination)this.broker.getRegionBroker().getDestinationMap().get(this.queueA);
        long enqueues = 0L;
        do {
            enqueues = baseDestinationA.getDestinationStatistics().getEnqueues().getCount();
            LOG.info("Dest Enqueues: " + enqueues);
            TimeUnit.MILLISECONDS.sleep(500L);
        } while (enqueues != baseDestinationA.getDestinationStatistics().getEnqueues().getCount());
        Assert.assertFalse((String)"expect producer to block", (boolean)done.get());
        LOG.info("Store global u: " + this.broker.getSystemUsage().getStoreUsage().getUsage() + ", %:" + this.broker.getSystemUsage().getStoreUsage().getPercentUsage());
        Assert.assertTrue((String)"some usage", (this.broker.getSystemUsage().getStoreUsage().getUsage() > 0L ? 1 : 0) != 0);
        LOG.info("Store A u: " + baseDestinationA.getSystemUsage().getStoreUsage().getUsage() + ", %: " + baseDestinationA.getSystemUsage().getStoreUsage().getPercentUsage());
        Assert.assertTrue((String)"limited store has more % usage than parent", (baseDestinationA.getSystemUsage().getStoreUsage().getPercentUsage() > this.broker.getSystemUsage().getStoreUsage().getPercentUsage() ? 1 : 0) != 0);
        executor.shutdownNow();
    }

    private void consume(Destination queue) throws Exception {
        ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("vm://localhost?create=false");
        Connection connection = cf.createConnection();
        connection.start();
        Session session = connection.createSession(false, 1);
        MessageConsumer consumer = session.createConsumer(queue);
        for (int i = 0; i < 5; ++i) {
            Assert.assertNotNull((String)("message[" + i + "]"), (Object)consumer.receive(4000L));
        }
        connection.close();
    }

    private void produceMessages(Destination queue, int count) throws Exception {
        ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("vm://localhost?create=false");
        Connection connection = cf.createConnection();
        connection.start();
        Session session = connection.createSession(false, 1);
        MessageProducer producer = session.createProducer(queue);
        BytesMessage bytesMessage = session.createBytesMessage();
        bytesMessage.writeBytes(new byte[1024]);
        for (int i = 0; i < count; ++i) {
            producer.send((Message)bytesMessage);
        }
        connection.close();
    }
}

