/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.bugs;

import jakarta.jms.BytesMessage;
import jakarta.jms.Connection;
import jakarta.jms.Message;
import jakarta.jms.MessageConsumer;
import jakarta.jms.Session;
import jakarta.jms.Topic;
import jakarta.jms.TopicConnection;
import jakarta.jms.TopicPublisher;
import jakarta.jms.TopicSession;
import java.io.File;
import java.util.ArrayList;
import javax.management.ObjectName;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.broker.jmx.DurableSubscriptionViewMBean;
import org.apache.activemq.broker.region.policy.FilePendingQueueMessageStoragePolicy;
import org.apache.activemq.broker.region.policy.PendingQueueMessageStoragePolicy;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.usage.SystemUsage;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AMQ2801Test {
    private static final Logger LOG = LoggerFactory.getLogger(AMQ2801Test.class);
    private static final String TOPICNAME = "InvalidPendingQueueTest";
    private static final String SELECTOR1 = "JMS_ID = 'TEST'";
    private static final String SELECTOR2 = "JMS_ID = 'TEST2'";
    private static final String SUBSCRIPTION1 = "InvalidPendingQueueTest_1";
    private static final String SUBSCRIPTION2 = "InvalidPendingQueueTest_2";
    private static final int MSG_COUNT = 2500;
    private Session session1;
    private Connection conn1;
    private Topic topic1;
    private MessageConsumer consumer1;
    private Session session2;
    private Connection conn2;
    private Topic topic2;
    private MessageConsumer consumer2;
    private BrokerService broker;
    private String connectionUri;

    @Before
    public void setUp() throws Exception {
        this.broker = new BrokerService();
        this.broker.setDataDirectory("target" + File.separator + "activemq-data");
        this.broker.setPersistent(true);
        this.broker.setUseJmx(true);
        this.broker.setAdvisorySupport(false);
        this.broker.setDeleteAllMessagesOnStartup(true);
        this.broker.addConnector("tcp://localhost:0").setName("Default");
        this.applyMemoryLimitPolicy(this.broker);
        this.broker.start();
        this.connectionUri = ((TransportConnector)this.broker.getTransportConnectors().get(0)).getPublishableConnectString();
    }

    private void applyMemoryLimitPolicy(BrokerService broker) {
        SystemUsage memoryManager = new SystemUsage();
        memoryManager.getMemoryUsage().setLimit(5818230784L);
        memoryManager.getStoreUsage().setLimit(0x180000000L);
        memoryManager.getTempUsage().setLimit(0xC0000000L);
        broker.setSystemUsage(memoryManager);
        ArrayList<PolicyEntry> policyEntries = new ArrayList<PolicyEntry>();
        PolicyEntry entry = new PolicyEntry();
        entry.setQueue(">");
        entry.setProducerFlowControl(false);
        entry.setMemoryLimit(504857608L);
        entry.setPendingQueuePolicy((PendingQueueMessageStoragePolicy)new FilePendingQueueMessageStoragePolicy());
        policyEntries.add(entry);
        PolicyMap policyMap = new PolicyMap();
        policyMap.setPolicyEntries(policyEntries);
        broker.setDestinationPolicy(policyMap);
    }

    @After
    public void tearDown() throws Exception {
        try {
            this.conn1.close();
            this.conn2.close();
        }
        catch (Exception exception) {
            // empty catch block
        }
        if (this.broker != null) {
            this.broker.stop();
        }
    }

    private void produceMessages() throws Exception {
        TopicConnection connection = this.createConnection();
        TopicSession session = connection.createTopicSession(false, 1);
        Topic topic = session.createTopic(TOPICNAME);
        TopicPublisher producer = session.createPublisher(topic);
        connection.start();
        producer.setDeliveryMode(2);
        long tStamp = System.currentTimeMillis();
        BytesMessage message = this.session2.createBytesMessage();
        for (int i = 1; i <= 2500; ++i) {
            message.setStringProperty("JMS_ID", "TEST");
            message.setIntProperty("Type", i);
            producer.publish((Message)message);
            if (i % 100 != 0) continue;
            LOG.info("sent: " + i + " @ " + (System.currentTimeMillis() - tStamp) / 100L + "m/ms");
            tStamp = System.currentTimeMillis();
        }
    }

    private void activeateSubscribers() throws Exception {
        this.conn1 = this.createConnection();
        this.conn1.setClientID(SUBSCRIPTION1);
        this.session1 = this.conn1.createSession(true, 0);
        this.topic1 = this.session1.createTopic(TOPICNAME);
        this.consumer1 = this.session1.createDurableSubscriber(this.topic1, SUBSCRIPTION1, SELECTOR1, false);
        this.conn1.start();
        this.conn2 = this.createConnection();
        this.conn2.setClientID(SUBSCRIPTION2);
        this.session2 = this.conn2.createSession(true, 0);
        this.topic2 = this.session2.createTopic(TOPICNAME);
        this.consumer2 = this.session2.createDurableSubscriber(this.topic2, SUBSCRIPTION2, SELECTOR2, false);
        this.conn2.start();
    }

    @Test
    public void testInvalidPendingQueue() throws Exception {
        this.activeateSubscribers();
        Assert.assertNotNull((Object)this.consumer1);
        Assert.assertNotNull((Object)this.consumer2);
        this.produceMessages();
        LOG.debug("Sent messages to a single subscriber");
        Thread.sleep(2000L);
        LOG.debug("Closing durable subscriber connections");
        this.conn1.close();
        this.conn2.close();
        LOG.debug("Closed durable subscriber connections");
        Thread.sleep(2000L);
        LOG.debug("Re-starting durable subscriber connections");
        this.activeateSubscribers();
        LOG.debug("Started up durable subscriber connections - now view activemq console to see pending queue size on the other subscriber");
        ObjectName[] subs = this.broker.getAdminView().getDurableTopicSubscribers();
        for (int i = 0; i < subs.length; ++i) {
            ObjectName subName = subs[i];
            DurableSubscriptionViewMBean sub = (DurableSubscriptionViewMBean)this.broker.getManagementContext().newProxyInstance(subName, DurableSubscriptionViewMBean.class, true);
            LOG.info(sub.getSubscriptionName() + ": pending = " + sub.getPendingQueueSize() + ", dispatched: " + sub.getDispatchedQueueSize());
            if (sub.getSubscriptionName().equals(SUBSCRIPTION1)) {
                Assert.assertEquals((String)"Incorrect number of pending messages", (long)2500L, (long)(sub.getPendingQueueSize() + sub.getDispatchedQueueSize()));
                continue;
            }
            Assert.assertEquals((String)"Incorrect number of pending messages", (long)0L, (long)sub.getPendingQueueSize());
        }
    }

    private TopicConnection createConnection() throws Exception {
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
        connectionFactory.setBrokerURL(this.connectionUri);
        TopicConnection conn = connectionFactory.createTopicConnection();
        return conn;
    }
}

