/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.usecases;

import jakarta.jms.Connection;
import jakarta.jms.Destination;
import jakarta.jms.Message;
import jakarta.jms.MessageConsumer;
import jakarta.jms.MessageListener;
import jakarta.jms.MessageProducer;
import jakarta.jms.Session;
import jakarta.jms.TextMessage;
import java.io.Serializable;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import junit.framework.Test;
import junit.textui.TestRunner;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQPrefetchPolicy;
import org.apache.activemq.CombinationTestSupport;
import org.apache.activemq.TestSupport;
import org.apache.activemq.broker.BrokerPlugin;
import org.apache.activemq.broker.BrokerPluginSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.broker.region.DestinationStatistics;
import org.apache.activemq.broker.region.MessageReference;
import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.broker.region.TopicSubscription;
import org.apache.activemq.broker.region.policy.PendingQueueMessageStoragePolicy;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.broker.region.policy.VMPendingQueueMessageStoragePolicy;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.store.memory.MemoryPersistenceAdapter;
import org.apache.activemq.util.Wait;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ExpiredMessagesTest
extends CombinationTestSupport {
    private static final Logger LOG = LoggerFactory.getLogger(ExpiredMessagesTest.class);
    BrokerService broker;
    Connection connection;
    Session session;
    MessageProducer producer;
    MessageConsumer consumer;
    private ActiveMQDestination dlqDestination = new ActiveMQQueue("ActiveMQ.DLQ");
    private boolean useTextMessage = true;
    private boolean useVMCursor = true;
    private boolean deleteAllMessages = true;
    private boolean usePrefetchExtension = true;
    private String brokerUri;

    public static Test suite() {
        return ExpiredMessagesTest.suite(ExpiredMessagesTest.class);
    }

    public static void main(String[] args) {
        TestRunner.run((Test)ExpiredMessagesTest.suite());
    }

    protected void setUp() throws Exception {
    }

    protected void tearDown() throws Exception {
        if (null != this.producer) {
            this.producer.close();
        }
        if (null != this.consumer) {
            this.consumer.close();
        }
        this.session.close();
        this.connection.stop();
        this.broker.stop();
        this.broker.waitUntilStopped();
    }

    public void testExpiredMessages() throws Exception {
        ActiveMQQueue destination = new ActiveMQQueue("test");
        int numMessagesToSend = 10000;
        this.buildBroker((ActiveMQDestination)destination);
        DestinationStatistics view = this.verifyMessageExpirationOnDestination((ActiveMQDestination)destination, 10000);
        this.verifyDestinationDlq((ActiveMQDestination)destination, 10000, view);
    }

    public void testClientAckInflight_onTopic_withPrefetchExtension() throws Exception {
        this.usePrefetchExtension = true;
        this.doTestClientAckInflight_onTopic_checkPrefetchExtension();
    }

    public void testClientAckInflight_onTopic_withOutPrefetchExtension() throws Exception {
        this.usePrefetchExtension = false;
        this.doTestClientAckInflight_onTopic_checkPrefetchExtension();
    }

    public void doTestClientAckInflight_onTopic_checkPrefetchExtension() throws Exception {
        ActiveMQTopic destination = new ActiveMQTopic("test");
        this.buildBroker((ActiveMQDestination)destination);
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("failover://" + this.brokerUri);
        ActiveMQPrefetchPolicy prefetchTwo = new ActiveMQPrefetchPolicy();
        prefetchTwo.setAll(6);
        factory.setPrefetchPolicy(prefetchTwo);
        this.connection = factory.createConnection();
        this.connection.start();
        this.session = this.connection.createSession(false, 2);
        MessageConsumer consumer = this.session.createConsumer((Destination)destination);
        this.produce(10, (ActiveMQDestination)destination);
        Message m = null;
        for (int i = 0; i < 5; ++i) {
            m = consumer.receive(4000L);
        }
        ExpiredMessagesTest.assertNotNull(m);
        List<Subscription> subscriptions = TestSupport.getDestinationConsumers(this.broker, (ActiveMQDestination)destination);
        ExpiredMessagesTest.assertTrue((String)"prefetch extension was not incremented", (boolean)subscriptions.stream().filter(s -> s instanceof TopicSubscription).mapToInt(s -> ((TopicSubscription)s).getPrefetchExtension().get()).allMatch(e -> this.usePrefetchExtension ? e > 1 : e == 0));
        m.acknowledge();
        ExpiredMessagesTest.assertTrue((String)"prefetch extension was not incremented", (boolean)subscriptions.stream().filter(s -> s instanceof TopicSubscription).mapToInt(s -> ((TopicSubscription)s).getPrefetchExtension().get()).allMatch(e -> e == 0));
    }

    public void testReceiveTimeoutRespectedWithExpiryProcessing() throws Exception {
        int i;
        ActiveMQQueue destination = new ActiveMQQueue("test");
        this.broker = new BrokerService();
        this.broker.setBrokerName("localhost");
        this.broker.setDestinations(new ActiveMQDestination[]{destination});
        this.broker.setPersistenceAdapter((PersistenceAdapter)new MemoryPersistenceAdapter());
        PolicyEntry defaultPolicy = new PolicyEntry();
        defaultPolicy.setExpireMessagesPeriod(1000L);
        defaultPolicy.setMaxExpirePageSize(2000);
        PolicyMap policyMap = new PolicyMap();
        policyMap.setDefaultEntry(defaultPolicy);
        this.broker.setDestinationPolicy(policyMap);
        this.broker.setDeleteAllMessagesOnStartup(this.deleteAllMessages);
        this.broker.addConnector("tcp://localhost:0");
        this.broker.setPlugins(new BrokerPlugin[]{new BrokerPluginSupport(){

            public boolean sendToDeadLetterQueue(ConnectionContext context, MessageReference messageReference, Subscription subscription, Throwable poisonCause) {
                try {
                    LOG.info("Sleeping before delegation on sendToDeadLetterQueue");
                    TimeUnit.SECONDS.sleep(1L);
                }
                catch (Exception exception) {
                    // empty catch block
                }
                return super.sendToDeadLetterQueue(context, messageReference, subscription, poisonCause);
            }
        }});
        this.broker.start();
        this.broker.waitUntilStarted();
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost");
        ActiveMQPrefetchPolicy prefetchPolicy = new ActiveMQPrefetchPolicy();
        prefetchPolicy.setAll(0);
        factory.setPrefetchPolicy(prefetchPolicy);
        this.connection = factory.createConnection();
        this.connection.start();
        this.session = this.connection.createSession(false, 1);
        this.producer = this.session.createProducer((Destination)destination);
        this.producer.setTimeToLive(1000L);
        this.producer.setDeliveryMode(2);
        for (i = 0; i < 10; ++i) {
            this.producer.send((Message)this.session.createTextMessage("RTR"), 2, 0, 2000L);
        }
        this.consumer = this.session.createConsumer((Destination)new ActiveMQQueue("another-test"));
        for (i = 0; i < 10; ++i) {
            long timeStamp = System.currentTimeMillis();
            this.consumer.receive(1000L);
            long duration = System.currentTimeMillis() - timeStamp;
            LOG.info("Duration: " + i + " : " + duration);
            ExpiredMessagesTest.assertTrue((String)("Delay about 500: " + i + ", actual: " + duration), (duration < 1500L ? 1 : 0) != 0);
        }
    }

    private void produce(int num, ActiveMQDestination destination) throws Exception {
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("failover://" + this.brokerUri);
        Connection connection = factory.createConnection();
        connection.start();
        this.session = connection.createSession(false, 1);
        this.producer = this.session.createProducer((Destination)destination);
        this.producer.setDeliveryMode(2);
        int i = 0;
        while (i++ < num) {
            TextMessage message = this.useTextMessage ? this.session.createTextMessage("test") : this.session.createObjectMessage((Serializable)((Object)"test"));
            this.producer.send((Message)message);
        }
        connection.close();
    }

    private void buildBroker(ActiveMQDestination destination) throws Exception {
        this.broker = this.createBroker(this.deleteAllMessages, this.usePrefetchExtension, 100L, destination);
        this.brokerUri = ((TransportConnector)this.broker.getTransportConnectors().get(0)).getPublishableConnectString();
    }

    public void testRecoverExpiredMessages() throws Exception {
        ActiveMQQueue destination = new ActiveMQQueue("test");
        this.buildBroker((ActiveMQDestination)destination);
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("failover://" + this.brokerUri);
        this.connection = factory.createConnection();
        this.connection.start();
        this.session = this.connection.createSession(false, 1);
        this.producer = this.session.createProducer((Destination)destination);
        this.producer.setTimeToLive(2000L);
        this.producer.setDeliveryMode(2);
        Thread producingThread = new Thread("Producing Thread"){

            @Override
            public void run() {
                try {
                    int i = 0;
                    while (i++ < 1000) {
                        TextMessage message = ExpiredMessagesTest.this.useTextMessage ? ExpiredMessagesTest.this.session.createTextMessage("test") : ExpiredMessagesTest.this.session.createObjectMessage((Serializable)((Object)"test"));
                        ExpiredMessagesTest.this.producer.send((Message)message);
                    }
                }
                catch (Throwable ex) {
                    ex.printStackTrace();
                }
            }
        };
        producingThread.start();
        producingThread.join();
        DestinationStatistics view = TestSupport.getDestinationStatistics(this.broker, (ActiveMQDestination)destination);
        LOG.info("Stats: size: " + view.getMessages().getCount() + ", enqueues: " + view.getEnqueues().getCount() + ", dequeues: " + view.getDequeues().getCount() + ", dispatched: " + view.getDispatched().getCount() + ", inflight: " + view.getInflight().getCount() + ", expiries: " + view.getExpired().getCount());
        LOG.info("stopping broker");
        this.broker.stop();
        this.broker.waitUntilStopped();
        Thread.sleep(5000L);
        LOG.info("recovering broker");
        boolean deleteAllMessages = false;
        boolean usePrefetchExtension = true;
        this.broker = this.createBroker(false, true, 5000L, (ActiveMQDestination)destination);
        Wait.waitFor((Wait.Condition)new Wait.Condition(){
            final /* synthetic */ ActiveMQDestination val$destination;
            {
                this.val$destination = activeMQDestination;
            }

            public boolean isSatisified() throws Exception {
                DestinationStatistics view = TestSupport.getDestinationStatistics(ExpiredMessagesTest.this.broker, this.val$destination);
                LOG.info("Stats: size: " + view.getMessages().getCount() + ", enqueues: " + view.getEnqueues().getCount() + ", dequeues: " + view.getDequeues().getCount() + ", dispatched: " + view.getDispatched().getCount() + ", inflight: " + view.getInflight().getCount() + ", expiries: " + view.getExpired().getCount());
                return view.getMessages().getCount() == 0L;
            }
        });
        view = TestSupport.getDestinationStatistics(this.broker, (ActiveMQDestination)destination);
        ExpiredMessagesTest.assertEquals((String)"Expect empty queue, QueueSize: ", (long)0L, (long)view.getMessages().getCount());
        ExpiredMessagesTest.assertEquals((String)"all dequeues were expired", (long)view.getDequeues().getCount(), (long)view.getExpired().getCount());
    }

    private DestinationStatistics verifyMessageExpirationOnDestination(ActiveMQDestination destination, final int numMessagesToSend) throws Exception {
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(this.brokerUri);
        this.connection = factory.createConnection();
        this.session = this.connection.createSession(false, 1);
        this.producer = this.session.createProducer((Destination)destination);
        this.producer.setTimeToLive(100L);
        this.consumer = this.session.createConsumer((Destination)destination);
        this.connection.start();
        final AtomicLong received = new AtomicLong();
        Thread consumerThread2 = new Thread("Consumer Thread"){

            @Override
            public void run() {
                long start = System.currentTimeMillis();
                try {
                    long end = System.currentTimeMillis();
                    while (end - start < 3000L) {
                        if (ExpiredMessagesTest.this.consumer.receive(1000L) != null) {
                            received.incrementAndGet();
                        }
                        Thread.sleep(100L);
                        end = System.currentTimeMillis();
                    }
                }
                catch (Throwable ex) {
                    ex.printStackTrace();
                }
            }
        };
        consumerThread2.start();
        Thread producingThread = new Thread("Producing Thread"){

            @Override
            public void run() {
                try {
                    int i = 0;
                    while (i++ < numMessagesToSend) {
                        ExpiredMessagesTest.this.producer.send((Message)ExpiredMessagesTest.this.session.createTextMessage("test"));
                    }
                }
                catch (Throwable ex) {
                    ex.printStackTrace();
                }
            }
        };
        producingThread.start();
        consumerThread2.join();
        producingThread.join();
        final DestinationStatistics view = TestSupport.getDestinationStatistics(this.broker, destination);
        ExpiredMessagesTest.assertTrue((String)"all inflight messages expired ", (boolean)Wait.waitFor((Wait.Condition)new Wait.Condition(){

            public boolean isSatisified() throws Exception {
                return view.getInflight().getCount() == 0L;
            }
        }));
        ExpiredMessagesTest.assertEquals((String)"Wrong inFlightCount: ", (long)0L, (long)view.getInflight().getCount());
        LOG.info("Stats: received: " + received.get() + ", enqueues: " + view.getEnqueues().getCount() + ", dequeues: " + view.getDequeues().getCount() + ", dispatched: " + view.getDispatched().getCount() + ", inflight: " + view.getInflight().getCount() + ", expiries: " + view.getExpired().getCount());
        ExpiredMessagesTest.assertTrue((String)"all sent messages expired ", (boolean)Wait.waitFor((Wait.Condition)new Wait.Condition(){

            public boolean isSatisified() throws Exception {
                long oldEnqueues = view.getEnqueues().getCount();
                Thread.sleep(200L);
                LOG.info("Stats: received: " + received.get() + ", size= " + view.getMessages().getCount() + ", enqueues: " + view.getDequeues().getCount() + ", dequeues: " + view.getDequeues().getCount() + ", dispatched: " + view.getDispatched().getCount() + ", inflight: " + view.getInflight().getCount() + ", expiries: " + view.getExpired().getCount());
                return oldEnqueues == view.getEnqueues().getCount();
            }
        }, (long)60000L));
        LOG.info("Stats: received: " + received.get() + ", size= " + view.getMessages().getCount() + ", enqueues: " + view.getEnqueues().getCount() + ", dequeues: " + view.getDequeues().getCount() + ", dispatched: " + view.getDispatched().getCount() + ", inflight: " + view.getInflight().getCount() + ", expiries: " + view.getExpired().getCount());
        ExpiredMessagesTest.assertTrue((String)"got at least what did not expire", (received.get() >= view.getDequeues().getCount() - view.getExpired().getCount() ? 1 : 0) != 0);
        ExpiredMessagesTest.assertTrue((String)("all messages expired - queue size gone to zero " + view.getMessages().getCount()), (boolean)Wait.waitFor((Wait.Condition)new Wait.Condition(){

            public boolean isSatisified() throws Exception {
                LOG.info("Stats: received: " + received.get() + ", size= " + view.getMessages().getCount() + ", enqueues: " + view.getEnqueues().getCount() + ", dequeues: " + view.getDequeues().getCount() + ", dispatched: " + view.getDispatched().getCount() + ", inflight: " + view.getInflight().getCount() + ", expiries: " + view.getExpired().getCount());
                return view.getMessages().getCount() == 0L;
            }
        }));
        return view;
    }

    private void verifyDestinationDlq(ActiveMQDestination destination, int numMessagesToSend, DestinationStatistics view) throws Exception {
        long expiredBeforeEnqueue = (long)numMessagesToSend - view.getEnqueues().getCount();
        final long totalExpiredCount = view.getExpired().getCount() + expiredBeforeEnqueue;
        final DestinationStatistics dlqView = TestSupport.getDestinationStatistics(this.broker, this.dlqDestination);
        LOG.info("DLQ stats: size= " + dlqView.getMessages().getCount() + ", enqueues: " + dlqView.getDequeues().getCount() + ", dequeues: " + dlqView.getDequeues().getCount() + ", dispatched: " + dlqView.getDispatched().getCount() + ", inflight: " + dlqView.getInflight().getCount() + ", expiries: " + dlqView.getExpired().getCount());
        Wait.waitFor((Wait.Condition)new Wait.Condition(){

            public boolean isSatisified() throws Exception {
                return totalExpiredCount == dlqView.getMessages().getCount();
            }
        });
        ExpiredMessagesTest.assertEquals((String)"dlq contains all expired", (long)totalExpiredCount, (long)dlqView.getMessages().getCount());
        ExpiredMessagesTest.assertEquals((String)"memory usage is back to duck egg", (int)0, (int)TestSupport.getDestination(this.broker, destination).getMemoryUsage().getPercentUsage());
        ExpiredMessagesTest.assertTrue((String)"memory usage is increased ", (0L < TestSupport.getDestination(this.broker, this.dlqDestination).getMemoryUsage().getUsage() ? 1 : 0) != 0);
        MessageConsumer dlqConsumer = this.createDlqConsumer(this.connection);
        final DLQListener dlqListener = new DLQListener();
        dlqConsumer.setMessageListener((MessageListener)dlqListener);
        Wait.waitFor((Wait.Condition)new Wait.Condition(){

            public boolean isSatisified() throws Exception {
                return totalExpiredCount == (long)dlqListener.count;
            }
        }, (long)60000L);
        ExpiredMessagesTest.assertEquals((String)"dlq returned all expired", (long)dlqListener.count, (long)totalExpiredCount);
    }

    private MessageConsumer createDlqConsumer(Connection connection) throws Exception {
        return connection.createSession(false, 1).createConsumer((Destination)this.dlqDestination);
    }

    public void initCombosForTestRecoverExpiredMessages() {
        this.addCombinationValues("useVMCursor", new Object[]{Boolean.TRUE, Boolean.FALSE});
    }

    private BrokerService createBroker(boolean deleteAllMessages, boolean usePrefetchExtension, long expireMessagesPeriod, ActiveMQDestination destination) throws Exception {
        BrokerService broker = new BrokerService();
        broker.setBrokerName("localhost");
        broker.setDestinations(new ActiveMQDestination[]{destination});
        broker.setPersistenceAdapter((PersistenceAdapter)new MemoryPersistenceAdapter());
        PolicyEntry defaultPolicy = new PolicyEntry();
        if (this.useVMCursor) {
            defaultPolicy.setPendingQueuePolicy((PendingQueueMessageStoragePolicy)new VMPendingQueueMessageStoragePolicy());
        }
        defaultPolicy.setExpireMessagesPeriod(expireMessagesPeriod);
        defaultPolicy.setMaxExpirePageSize(1200);
        defaultPolicy.setUsePrefetchExtension(usePrefetchExtension);
        PolicyMap policyMap = new PolicyMap();
        policyMap.setDefaultEntry(defaultPolicy);
        broker.setDestinationPolicy(policyMap);
        broker.setDeleteAllMessagesOnStartup(deleteAllMessages);
        broker.addConnector("tcp://localhost:0");
        broker.start();
        broker.waitUntilStarted();
        return broker;
    }

    class DLQListener
    implements MessageListener {
        int count = 0;

        DLQListener() {
        }

        public void onMessage(Message message) {
            ++this.count;
        }
    }
}

