/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.bugs;

import jakarta.jms.Destination;
import jakarta.jms.JMSException;
import jakarta.jms.Message;
import jakarta.jms.MessageConsumer;
import jakarta.jms.MessageListener;
import jakarta.jms.MessageProducer;
import jakarta.jms.Session;
import jakarta.jms.Topic;
import jakarta.jms.TopicSubscriber;
import java.io.File;
import java.io.FilenameFilter;
import java.util.Arrays;
import java.util.Map;
import java.util.Properties;
import java.util.Vector;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.TestSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.jmx.BrokerView;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
import org.apache.activemq.store.kahadb.disk.journal.Journal;
import org.apache.activemq.util.IntrospectionSupport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AMQ2584ConcurrentDlqTest
extends TestSupport {
    static final Logger LOG = LoggerFactory.getLogger(AMQ2584ConcurrentDlqTest.class);
    BrokerService broker = null;
    ActiveMQTopic topic;
    ActiveMQConnection consumerConnection = null;
    ActiveMQConnection producerConnection = null;
    ActiveMQConnection dlqConnection = null;
    Session consumerSession;
    Session producerSession;
    MessageProducer producer;
    Vector<TopicSubscriber> duralbeSubs = new Vector();
    final int numMessages = 1000;
    final int numDurableSubs = 2;
    String data;
    private long dlqConsumerLastReceivedTimeStamp;
    private AtomicLong dlqReceivedCount = new AtomicLong(0L);
    CountDownLatch redeliveryConsumerLatch = new CountDownLatch(3999);
    CountDownLatch dlqConsumerLatch = new CountDownLatch(999);

    public void testSize() throws Exception {
        this.openConsumer(this.redeliveryConsumerLatch);
        this.openDlqConsumer(this.dlqConsumerLatch);
        AMQ2584ConcurrentDlqTest.assertEquals((int)0, (int)this.broker.getAdminView().getStorePercentUsage());
        for (int i = 0; i < 1000; ++i) {
            this.sendMessage(false);
        }
        BrokerView brokerView = this.broker.getAdminView();
        this.broker.getSystemUsage().getStoreUsage().isFull();
        LOG.info("store percent usage: " + brokerView.getStorePercentUsage());
        AMQ2584ConcurrentDlqTest.assertTrue((String)("redelivery consumer got all it needs, remaining: " + this.redeliveryConsumerLatch.getCount()), (boolean)this.redeliveryConsumerLatch.await(60L, TimeUnit.SECONDS));
        AMQ2584ConcurrentDlqTest.assertTrue((String)"dql  consumer got all it needs", (boolean)this.dlqConsumerLatch.await(60L, TimeUnit.SECONDS));
        this.closeConsumer();
        LOG.info("Giving dlq a chance to clear down once topic consumer is closed");
        this.closeDlqConsumer();
        Thread.sleep(5000L);
        FilenameFilter justLogFiles = new FilenameFilter(){

            @Override
            public boolean accept(File file, String s) {
                return s.endsWith(".log");
            }
        };
        int numFiles = ((KahaDBPersistenceAdapter)this.broker.getPersistenceAdapter()).getDirectory().list(justLogFiles).length;
        if (numFiles > 2) {
            LOG.info(Arrays.toString(((KahaDBPersistenceAdapter)this.broker.getPersistenceAdapter()).getDirectory().list(justLogFiles)));
        }
        LOG.info("num files: " + numFiles);
        AMQ2584ConcurrentDlqTest.assertEquals((String)("kahaDB dir should contain 1 db file,is: " + numFiles), (int)1, (int)numFiles);
    }

    private void openConsumer(final CountDownLatch latch) throws Exception {
        this.consumerConnection = (ActiveMQConnection)this.createConnection();
        this.consumerConnection.setClientID("cliID");
        this.consumerConnection.start();
        this.consumerSession = this.consumerConnection.createSession(false, 1);
        MessageListener listener = new MessageListener(){

            public void onMessage(Message message) {
                latch.countDown();
                try {
                    AMQ2584ConcurrentDlqTest.this.consumerSession.recover();
                }
                catch (Exception ignored) {
                    ignored.printStackTrace();
                }
            }
        };
        for (int i = 1; i <= 2; ++i) {
            TopicSubscriber sub = this.consumerSession.createDurableSubscriber((Topic)this.topic, "subName" + i);
            sub.setMessageListener(listener);
            this.duralbeSubs.add(sub);
        }
    }

    private void openDlqConsumer(final CountDownLatch received) throws Exception {
        this.dlqConnection = (ActiveMQConnection)this.createConnection();
        Session dlqSession = this.dlqConnection.createSession(false, 1);
        MessageConsumer dlqConsumer = dlqSession.createConsumer((Destination)new ActiveMQQueue("ActiveMQ.DLQ"));
        dlqConsumer.setMessageListener(new MessageListener(){

            public void onMessage(Message message) {
                if (received.getCount() > 0L && received.getCount() % 200L == 0L) {
                    LOG.info("remaining on DLQ: " + received.getCount());
                }
                received.countDown();
                AMQ2584ConcurrentDlqTest.this.dlqConsumerLastReceivedTimeStamp = System.currentTimeMillis();
                AMQ2584ConcurrentDlqTest.this.dlqReceivedCount.incrementAndGet();
            }
        });
        this.dlqConnection.start();
    }

    private void closeConsumer() throws JMSException {
        for (TopicSubscriber sub : this.duralbeSubs) {
            sub.close();
        }
        if (this.consumerSession != null) {
            for (int i = 1; i <= 2; ++i) {
                this.consumerSession.unsubscribe("subName" + i);
            }
        }
        if (this.consumerConnection != null) {
            this.consumerConnection.close();
            this.consumerConnection = null;
        }
    }

    private void closeDlqConsumer() throws JMSException, InterruptedException {
        long limit = System.currentTimeMillis() + 30000L;
        if (this.dlqConsumerLastReceivedTimeStamp > 0L) {
            while (System.currentTimeMillis() < this.dlqConsumerLastReceivedTimeStamp + 5000L && System.currentTimeMillis() < limit) {
                LOG.info("waiting for DLQ do drain, receivedCount: " + this.dlqReceivedCount);
                TimeUnit.SECONDS.sleep(1L);
            }
        }
        if (this.dlqConnection != null) {
            this.dlqConnection.close();
            this.dlqConnection = null;
        }
    }

    private void sendMessage(boolean filter) throws Exception {
        if (this.producerConnection == null) {
            this.producerConnection = (ActiveMQConnection)this.createConnection();
            this.producerConnection.start();
            this.producerSession = this.producerConnection.createSession(false, 1);
            this.producer = this.producerSession.createProducer((Destination)this.topic);
        }
        Message message = this.producerSession.createMessage();
        message.setStringProperty("data", this.data);
        this.producer.send(message);
    }

    private void startBroker(boolean deleteMessages) throws Exception {
        this.broker = new BrokerService();
        this.broker.setAdvisorySupport(false);
        this.broker.setBrokerName("testStoreSize");
        PolicyMap map = new PolicyMap();
        PolicyEntry entry = new PolicyEntry();
        entry.setEnableAudit(false);
        map.setDefaultEntry(entry);
        this.broker.setDestinationPolicy(map);
        if (deleteMessages) {
            this.broker.setDeleteAllMessagesOnStartup(true);
        }
        this.configurePersistenceAdapter(this.broker.getPersistenceAdapter());
        this.broker.getSystemUsage().getStoreUsage().setLimit(200000000L);
        this.broker.start();
    }

    private void configurePersistenceAdapter(PersistenceAdapter persistenceAdapter) {
        Properties properties = new Properties();
        String maxFileLengthVal = String.valueOf(0x200000);
        properties.put("journalMaxFileLength", maxFileLengthVal);
        properties.put("maxFileLength", maxFileLengthVal);
        properties.put("cleanupInterval", "2000");
        properties.put("checkpointInterval", "2000");
        properties.put("preallocationScope", Journal.PreallocationScope.ENTIRE_JOURNAL.name());
        IntrospectionSupport.setProperties((Object)persistenceAdapter, (Map)properties);
    }

    private void stopBroker() throws Exception {
        if (this.broker != null) {
            this.broker.stop();
        }
        this.broker = null;
    }

    @Override
    protected ActiveMQConnectionFactory createConnectionFactory() throws Exception {
        return new ActiveMQConnectionFactory("vm://testStoreSize?jms.watchTopicAdvisories=false&jms.redeliveryPolicy.maximumRedeliveries=1&jms.redeliveryPolicy.initialRedeliveryDelay=0&waitForStart=5000&create=false");
    }

    protected void setUp() throws Exception {
        super.setUp();
        StringBuilder sb = new StringBuilder(5000);
        for (int i = 0; i < 5000; ++i) {
            sb.append('a');
        }
        this.data = sb.toString();
        this.startBroker(true);
        this.topic = (ActiveMQTopic)this.createDestination();
    }

    protected void tearDown() throws Exception {
        this.stopBroker();
        super.tearDown();
    }
}

