/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.bugs;

import jakarta.jms.Destination;
import jakarta.jms.JMSException;
import jakarta.jms.Message;
import jakarta.jms.MessageListener;
import jakarta.jms.MessageProducer;
import jakarta.jms.Session;
import jakarta.jms.TextMessage;
import java.net.URI;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQMessageConsumer;
import org.apache.activemq.ActiveMQSession;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.broker.region.policy.SharedDeadLetterStrategy;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AMQ3436Test {
    protected static final Logger LOG = LoggerFactory.getLogger(AMQ3436Test.class);
    private BrokerService broker;
    private PersistenceAdapter adapter;
    private boolean useCache = true;
    private boolean prioritizeMessages = true;

    protected PersistenceAdapter createPersistenceAdapter(boolean delete) throws Exception {
        KahaDBPersistenceAdapter adapter = new KahaDBPersistenceAdapter();
        adapter.setConcurrentStoreAndDispatchQueues(false);
        adapter.setConcurrentStoreAndDispatchTopics(false);
        adapter.deleteAllMessages();
        return adapter;
    }

    @Before
    public void setUp() throws Exception {
        this.broker = new BrokerService();
        this.broker.setBrokerName("priorityTest");
        this.broker.setAdvisorySupport(false);
        this.broker.setUseJmx(false);
        this.adapter = this.createPersistenceAdapter(true);
        this.broker.setPersistenceAdapter(this.adapter);
        PolicyEntry policy = new PolicyEntry();
        policy.setPrioritizedMessages(this.prioritizeMessages);
        policy.setUseCache(this.useCache);
        policy.setProducerFlowControl(false);
        PolicyMap policyMap = new PolicyMap();
        policyMap.put((ActiveMQDestination)new ActiveMQQueue("TEST"), (Object)policy);
        PolicyEntry ignoreExpired = new PolicyEntry();
        SharedDeadLetterStrategy ignoreExpiredStrategy = new SharedDeadLetterStrategy();
        ignoreExpiredStrategy.setProcessExpired(false);
        ignoreExpired.setDeadLetterStrategy((DeadLetterStrategy)ignoreExpiredStrategy);
        this.broker.setDestinationPolicy(policyMap);
        this.broker.start();
        this.broker.waitUntilStarted();
    }

    protected void tearDown() throws Exception {
        this.broker.stop();
        this.broker.waitUntilStopped();
    }

    @Test
    public void testPriorityWhenConsumerCreatedBeforeProduction() throws Exception {
        int messageCount = 200;
        URI failoverUri = new URI("vm://priorityTest?jms.prefetchPolicy.all=1");
        ActiveMQQueue dest = new ActiveMQQueue("TEST?consumer.dispatchAsync=false");
        ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(failoverUri);
        cf.setDispatchAsync(false);
        ActiveMQConnection producerConnection = (ActiveMQConnection)cf.createConnection();
        producerConnection.setMessagePrioritySupported(true);
        producerConnection.start();
        Session producerSession = producerConnection.createSession(true, 0);
        MessageProducer producer = producerSession.createProducer((Destination)dest);
        ActiveMQConnection consumerConnection = (ActiveMQConnection)cf.createConnection();
        consumerConnection.setMessagePrioritySupported(true);
        consumerConnection.start();
        ActiveMQSession consumerSession = (ActiveMQSession)consumerConnection.createSession(true, 0);
        ActiveMQMessageConsumer consumer = (ActiveMQMessageConsumer)consumerSession.createConsumer((Destination)dest);
        Random random = new Random();
        for (int i = 0; i < messageCount; ++i) {
            TextMessage message = producerSession.createTextMessage("Test message #" + i);
            producer.send((Message)message, 2, random.nextInt(10), 45000L);
            producerSession.commit();
        }
        producer.close();
        final CountDownLatch latch = new CountDownLatch(messageCount);
        final StringBuffer failureMessage = new StringBuffer();
        consumer.setMessageListener(new MessageListener(){
            int lowestPrioritySeen = 10;
            boolean firstMessage = true;

            public void onMessage(Message msg) {
                try {
                    int currentPriority = msg.getJMSPriority();
                    LOG.debug(currentPriority + "<=" + this.lowestPrioritySeen);
                    if (this.firstMessage) {
                        this.firstMessage = false;
                        LOG.debug("Ignoring first message since it was prefetched");
                    } else {
                        if (this.lowestPrioritySeen > currentPriority) {
                            this.lowestPrioritySeen = currentPriority;
                        }
                        if (this.lowestPrioritySeen < currentPriority) {
                            failureMessage.append("Incorrect priority seen (Lowest Priority = " + this.lowestPrioritySeen + " Current Priority = " + currentPriority + ")" + System.getProperty("line.separator"));
                        }
                    }
                }
                catch (JMSException e) {
                    e.printStackTrace();
                }
                finally {
                    latch.countDown();
                    LOG.debug("Messages remaining = " + latch.getCount());
                }
            }
        });
        latch.await();
        consumer.close();
        producerSession.close();
        producerConnection.stop();
        producerConnection.close();
        consumerSession.close();
        consumerConnection.stop();
        consumerConnection.close();
        if (failureMessage.length() > 0) {
            Assert.fail((String)failureMessage.toString());
        }
    }
}

