/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.bugs;

import jakarta.jms.Connection;
import jakarta.jms.Destination;
import jakarta.jms.ExceptionListener;
import jakarta.jms.JMSException;
import jakarta.jms.Message;
import jakarta.jms.MessageConsumer;
import jakarta.jms.MessageProducer;
import jakarta.jms.Session;
import jakarta.jms.TextMessage;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.RegionBroker;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.store.kahadb.KahaDBStore;
import org.apache.activemq.usage.MemoryUsage;
import org.apache.activemq.usage.StoreUsage;
import org.apache.activemq.usage.SystemUsage;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DuplicateFromStoreTest {
    static Logger LOG = LoggerFactory.getLogger(DuplicateFromStoreTest.class);
    String activemqURL;
    BrokerService broker;
    protected static final String DESTNAME = "TEST";
    protected static final int NUM_PRODUCERS = 100;
    protected static final int NUM_CONSUMERS = 20;
    protected static final int NUM_MSGS = 20000;
    protected static final int CONSUMER_SLEEP = 0;
    protected static final int PRODUCER_SLEEP = 10;
    public static CountDownLatch producersFinished = new CountDownLatch(100);
    public static CountDownLatch consumersFinished = new CountDownLatch(20);
    public AtomicInteger totalMessagesToSend = new AtomicInteger(20000);
    public AtomicInteger totalMessagesSent = new AtomicInteger(20000);
    public AtomicInteger totalReceived = new AtomicInteger(0);
    public int messageSize = 16000;

    @Before
    public void startBroker() throws Exception {
        this.broker = new BrokerService();
        this.broker.setDeleteAllMessagesOnStartup(true);
        this.broker.addConnector("tcp://0.0.0.0:0");
        PolicyEntry policy = new PolicyEntry();
        ActiveMQQueue dest = new ActiveMQQueue(">");
        policy.setDestination((ActiveMQDestination)dest);
        policy.setMemoryLimit(0xA00000L);
        policy.setExpireMessagesPeriod(0L);
        policy.setEnableAudit(false);
        policy.setQueuePrefetch(100);
        PolicyMap policies = new PolicyMap();
        policies.put((ActiveMQDestination)dest, (Object)policy);
        this.broker.setDestinationPolicy(policies);
        MemoryUsage memoryUsage = new MemoryUsage();
        memoryUsage.setPercentOfJvmHeap(50);
        StoreUsage storeUsage = new StoreUsage();
        storeUsage.setLimit(0L);
        SystemUsage memoryManager = new SystemUsage();
        memoryManager.setMemoryUsage(memoryUsage);
        memoryManager.setStoreUsage(storeUsage);
        this.broker.setSystemUsage(memoryManager);
        KahaDBStore kahadb = new KahaDBStore();
        kahadb.setConcurrentStoreAndDispatchQueues(true);
        this.broker.setPersistenceAdapter((PersistenceAdapter)kahadb);
        this.broker.start();
        this.broker.waitUntilStarted();
        this.activemqURL = this.broker.getTransportConnectorByScheme("tcp").getPublishableConnectString();
    }

    @After
    public void stopBroker() throws Exception {
        if (this.broker != null) {
            this.broker.stop();
        }
    }

    @Test
    public void testDuplicateMessage() throws Exception {
        LOG.info("Testing for duplicate messages.");
        ExecutorService producers = Executors.newFixedThreadPool(100);
        ExecutorService consumers = Executors.newFixedThreadPool(20);
        this.createOpenwireClients(producers, consumers);
        LOG.info("All producers and consumers got started. Awaiting their termination");
        producersFinished.await(100L, TimeUnit.MINUTES);
        LOG.info("All producers have terminated. remaining to send: " + this.totalMessagesToSend.get() + ", sent:" + this.totalMessagesSent.get());
        consumersFinished.await(100L, TimeUnit.MINUTES);
        LOG.info("All consumers have terminated.");
        producers.shutdownNow();
        consumers.shutdownNow();
        Assert.assertEquals((String)"no messages pending, i.e. dlq empty", (long)0L, (long)((RegionBroker)this.broker.getRegionBroker()).getDestinationStatistics().getMessages().getCount());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void createOpenwireClients(ExecutorService producers, ExecutorService consumers) {
        int i;
        for (i = 0; i < 20; ++i) {
            LOG.trace("Creating consumer for destination TEST");
            Consumer consumer = new Consumer(DESTNAME, false);
            consumers.submit(consumer);
            Object object = consumer.init;
            synchronized (object) {
                try {
                    consumer.init.wait();
                }
                catch (InterruptedException e) {
                    LOG.error(e.toString(), (Throwable)e);
                }
                continue;
            }
        }
        for (i = 0; i < 100; ++i) {
            LOG.trace("Creating producer for destination TEST");
            Producer producer = new Producer(DESTNAME, false, 0);
            producers.submit(producer);
        }
    }

    class Consumer
    implements Runnable {
        public Object init = new Object();
        protected String queueName = "TEST";
        boolean isTopic = false;
        Logger log = LOG;

        public Consumer(String destName, boolean topic) {
            this.isTopic = topic;
            this.queueName = destName;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            Connection connection = null;
            Session session = null;
            MessageConsumer consumer = null;
            try {
                ActiveMQConnectionFactory amq = new ActiveMQConnectionFactory(DuplicateFromStoreTest.this.activemqURL);
                connection = amq.createConnection();
                connection.setExceptionListener(new ExceptionListener(){

                    public void onException(JMSException e) {
                        e.printStackTrace();
                    }
                });
                connection.start();
                session = connection.createSession(false, 1);
                Object destination = null;
                destination = this.isTopic ? session.createTopic(this.queueName) : session.createQueue(this.queueName);
                consumer = session.createConsumer((Destination)destination);
                Object object = this.init;
                synchronized (object) {
                    this.init.notifyAll();
                }
                long counter = 0L;
                while (DuplicateFromStoreTest.this.totalReceived.get() < 20000) {
                    Message message2 = consumer.receive(5000L);
                    if (message2 instanceof TextMessage) {
                        TextMessage textMessage = (TextMessage)message2;
                        String text = textMessage.getText();
                        this.log.debug("Received: " + text.substring(0, 50));
                    } else {
                        if (DuplicateFromStoreTest.this.totalReceived.get() >= 20000) break;
                        this.log.error("Received message of unsupported type. Expecting TextMessage. count: " + DuplicateFromStoreTest.this.totalReceived.get());
                    }
                    if (message2 == null) continue;
                    DuplicateFromStoreTest.this.totalReceived.incrementAndGet();
                    if (++counter % 10000L == 0L) {
                        this.log.info("received " + counter + " messages");
                    }
                    Thread.sleep(0L);
                }
            }
            catch (Exception e) {
                this.log.error("Error in Consumer: " + e.getMessage());
                return;
            }
            finally {
                try {
                    if (connection != null) {
                        connection.close();
                    }
                }
                catch (Exception exception) {
                }
                finally {
                    consumersFinished.countDown();
                }
            }
        }
    }

    class Producer
    implements Runnable {
        Logger log = LOG;
        protected String destName = "TEST";
        protected boolean isTopicDest = false;

        public Producer(String dest, boolean isTopic, int ttl) {
            this.destName = dest;
            this.isTopicDest = isTopic;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            Connection connection = null;
            Session session = null;
            MessageProducer producer = null;
            try {
                ActiveMQConnectionFactory amq = new ActiveMQConnectionFactory(DuplicateFromStoreTest.this.activemqURL);
                connection = amq.createConnection();
                connection.setExceptionListener(new ExceptionListener(){

                    public void onException(JMSException e) {
                        e.printStackTrace();
                    }
                });
                connection.start();
                session = connection.createSession(false, 1);
                Object destination = this.isTopicDest ? session.createTopic(this.destName) : session.createQueue(this.destName);
                producer = session.createProducer((Destination)destination);
                long counter = 0L;
                int msgSize = 16384;
                StringBuilder stringBuilder = new StringBuilder();
                stringBuilder.setLength(msgSize + 15);
                stringBuilder.append("Message: ");
                stringBuilder.append(counter);
                for (int j = 0; j < msgSize / 10; ++j) {
                    stringBuilder.append("XXXXXXXXXX");
                }
                String text = stringBuilder.toString();
                TextMessage message = session.createTextMessage(text);
                while (DuplicateFromStoreTest.this.totalMessagesToSend.decrementAndGet() >= 0) {
                    producer.send((Message)message);
                    DuplicateFromStoreTest.this.totalMessagesSent.incrementAndGet();
                    this.log.debug("Sent message: " + counter);
                    if (++counter % 10000L == 0L) {
                        this.log.info("sent " + counter + " messages");
                    }
                    Thread.sleep(10L);
                }
            }
            catch (Exception ex) {
                this.log.error(ex.toString());
                return;
            }
            finally {
                try {
                    if (connection != null) {
                        connection.close();
                    }
                }
                catch (Exception exception) {
                }
                finally {
                    producersFinished.countDown();
                }
            }
            this.log.debug("Closing producer for " + this.destName);
        }
    }
}

