/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.store.jdbc;

import jakarta.jms.Connection;
import jakarta.jms.Destination;
import jakarta.jms.Message;
import jakarta.jms.MessageProducer;
import jakarta.jms.Session;
import jakarta.jms.Topic;
import jakarta.jms.TopicSubscriber;
import java.io.File;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.store.MessageRecoveryListener;
import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.store.ProxyTopicMessageStore;
import org.apache.activemq.store.TopicMessageStore;
import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
import org.apache.activemq.util.Wait;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class JDBCPersistenceAdapterExpiredMessageTest {
    private static final Logger LOG = LoggerFactory.getLogger(JDBCPersistenceAdapterExpiredMessageTest.class);
    @Rule
    public TemporaryFolder dataFileDir = new TemporaryFolder(new File("target"));
    protected BrokerService brokerService;
    private AtomicBoolean hasSpaceCalled = new AtomicBoolean();
    private int expireSize = 5;

    @Before
    public void setUp() throws Exception {
        this.hasSpaceCalled.set(false);
        this.brokerService = new BrokerService();
        JDBCPersistenceAdapter jdbc = new JDBCPersistenceAdapter(){

            public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException {
                ProxyTopicMessageStore proxy = new ProxyTopicMessageStore(super.createTopicMessageStore(destination)){

                    public void recover(final MessageRecoveryListener listener) throws Exception {
                        MessageRecoveryListener delegate = new MessageRecoveryListener(){

                            public boolean recoverMessageReference(MessageId ref) throws Exception {
                                return listener.recoverMessageReference(ref);
                            }

                            public boolean recoverMessage(org.apache.activemq.command.Message message) throws Exception {
                                return listener.recoverMessage(message);
                            }

                            public boolean isDuplicate(MessageId ref) {
                                return listener.isDuplicate(ref);
                            }

                            public boolean hasSpace() {
                                JDBCPersistenceAdapterExpiredMessageTest.this.hasSpaceCalled.set(true);
                                return listener.hasSpace();
                            }
                        };
                        super.recover(delegate);
                    }
                };
                return proxy;
            }
        };
        this.brokerService.setSchedulerSupport(false);
        this.brokerService.setDataDirectoryFile(this.dataFileDir.getRoot());
        this.brokerService.setPersistenceAdapter((PersistenceAdapter)jdbc);
        this.brokerService.setDeleteAllMessagesOnStartup(true);
        PolicyMap policyMap = new PolicyMap();
        PolicyEntry defaultEntry = new PolicyEntry();
        defaultEntry.setExpireMessagesPeriod(5000L);
        defaultEntry.setMaxExpirePageSize(this.expireSize);
        defaultEntry.setMemoryLimit(0x190000L);
        policyMap.setDefaultEntry(defaultEntry);
        this.brokerService.setDestinationPolicy(policyMap);
        this.brokerService.start();
    }

    @After
    public void stop() throws Exception {
        if (this.brokerService != null) {
            this.brokerService.stop();
            this.brokerService.waitUntilStopped();
        }
    }

    @Test
    public void testMaxExpirePageSize() throws Exception {
        final ActiveMQTopic topic = new ActiveMQTopic("test.topic");
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost");
        factory.setClientID("clientId");
        Connection conn = factory.createConnection();
        conn.start();
        Session sess = conn.createSession(false, 1);
        TopicSubscriber sub = sess.createDurableSubscriber((Topic)topic, "sub1");
        sub.close();
        MessageProducer producer = sess.createProducer((Destination)topic);
        producer.setTimeToLive(1000L);
        for (int i = 0; i < 50; ++i) {
            producer.send((Message)sess.createTextMessage("test message: " + i));
        }
        Assert.assertTrue((boolean)Wait.waitFor((Wait.Condition)new Wait.Condition(){

            public boolean isSatisified() throws Exception {
                long expired = JDBCPersistenceAdapterExpiredMessageTest.this.brokerService.getDestination((ActiveMQDestination)topic).getDestinationStatistics().getExpired().getCount();
                return expired == (long)JDBCPersistenceAdapterExpiredMessageTest.this.expireSize && JDBCPersistenceAdapterExpiredMessageTest.this.hasSpaceCalled.get();
            }
        }, (long)15000L, (long)1000L));
    }

    @Test
    public void testExpiredAfterCacheExhausted() throws Exception {
        final ActiveMQQueue queue = new ActiveMQQueue("test.q");
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost");
        factory.setWatchTopicAdvisories(false);
        Connection conn = factory.createConnection();
        conn.start();
        Session sess = conn.createSession(false, 1);
        MessageProducer producer = sess.createProducer((Destination)queue);
        producer.setTimeToLive(1000L);
        String payLoad = new String(new byte[16384]);
        int numMessages = 500;
        for (int i = 0; i < 500; ++i) {
            producer.send((Message)sess.createTextMessage("test message: " + payLoad));
        }
        Assert.assertTrue((boolean)Wait.waitFor((Wait.Condition)new Wait.Condition(){

            public boolean isSatisified() throws Exception {
                long expired = JDBCPersistenceAdapterExpiredMessageTest.this.brokerService.getDestination((ActiveMQDestination)queue).getDestinationStatistics().getExpired().getCount();
                LOG.info("Expired: " + expired);
                return expired == 500L;
            }
        }, (long)15000L, (long)1000L));
    }
}

