/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.statistics;

import jakarta.jms.BytesMessage;
import jakarta.jms.Connection;
import jakarta.jms.JMSException;
import jakarta.jms.Message;
import jakarta.jms.MessageConsumer;
import jakarta.jms.MessageProducer;
import jakarta.jms.Session;
import java.util.Arrays;
import java.util.Collection;
import java.util.Random;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQPrefetchPolicy;
import org.apache.activemq.ActiveMQSession;
import org.apache.activemq.TestSupport;
import org.apache.activemq.broker.BrokerFilter;
import org.apache.activemq.broker.BrokerPlugin;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.MessageDispatch;
import org.apache.activemq.util.Wait;
import org.junit.After;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Before;
import org.junit.Test;
import org.junit.runners.Parameterized;

public abstract class AbstractInflightMessageSizeTest {
    protected BrokerService brokerService;
    protected Connection connection;
    protected String brokerUrlString;
    protected Session session;
    protected jakarta.jms.Destination dest;
    protected Destination amqDestination;
    protected MessageConsumer consumer;
    protected int prefetch = 100;
    protected boolean useTopicSubscriptionInflightStats;
    protected final int ackType;
    protected final boolean optimizeAcknowledge;
    protected final String destNamePrefix = "testDest";
    protected final String destName = "testDest.1";
    protected final String destName2 = "testDest.2";
    protected final long WAIT_DURATION = 10000L;
    protected final long SLEEP_DURATION = 500L;
    protected final AtomicBoolean failOnDispatch = new AtomicBoolean();

    @Parameterized.Parameters
    public static Collection<Object[]> data() {
        return Arrays.asList({0, true, true}, {1, true, true}, {4, true, true}, {2, true, true}, {0, false, true}, {1, false, true}, {4, false, true}, {2, false, true}, {0, true, false}, {1, true, false}, {4, true, false}, {2, true, false}, {0, false, false}, {1, false, false}, {4, false, false}, {2, false, false});
    }

    public AbstractInflightMessageSizeTest(int ackType, boolean optimizeAcknowledge, boolean useTopicSubscriptionInflightStats) {
        this.ackType = ackType;
        this.optimizeAcknowledge = optimizeAcknowledge;
        this.useTopicSubscriptionInflightStats = useTopicSubscriptionInflightStats;
    }

    @Before
    public void setUp() throws Exception {
        this.failOnDispatch.set(false);
        this.brokerService = new BrokerService();
        this.brokerService.setDeleteAllMessagesOnStartup(true);
        TransportConnector tcp = this.brokerService.addConnector("tcp://localhost:0");
        PolicyEntry policy = new PolicyEntry();
        policy.setUseTopicSubscriptionInflightStats(this.useTopicSubscriptionInflightStats);
        PolicyMap pMap = new PolicyMap();
        pMap.setDefaultEntry(policy);
        this.brokerService.setDestinationPolicy(pMap);
        this.brokerService.setPlugins(new BrokerPlugin[]{broker -> new BrokerFilter(broker){

            public void preProcessDispatch(MessageDispatch messageDispatch) {
                super.preProcessDispatch(messageDispatch);
                if (AbstractInflightMessageSizeTest.this.failOnDispatch.get()) {
                    throw new RuntimeException("fail dispatch");
                }
            }
        }});
        this.brokerService.start();
        String optAckString = this.optimizeAcknowledge ? "?jms.optimizeAcknowledge=true&jms.optimizedAckScheduledAckInterval=2000" : "";
        this.brokerUrlString = tcp.getPublishableConnectString() + optAckString;
        this.connection = this.createConnectionFactory().createConnection();
        this.connection.setClientID("client1");
        this.connection.start();
        this.session = this.connection.createSession(this.ackType == 0, this.ackType);
        this.dest = this.getDestination();
        this.consumer = this.getMessageConsumer();
        this.amqDestination = TestSupport.getDestination(this.brokerService, this.getActiveMQDestination());
    }

    protected ActiveMQConnectionFactory createConnectionFactory() throws Exception {
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(this.brokerUrlString);
        ActiveMQPrefetchPolicy prefetchPolicy = new ActiveMQPrefetchPolicy();
        prefetchPolicy.setTopicPrefetch(this.prefetch);
        prefetchPolicy.setQueuePrefetch(this.prefetch);
        prefetchPolicy.setOptimizeDurableTopicPrefetch(this.prefetch);
        factory.setPrefetchPolicy(prefetchPolicy);
        return factory;
    }

    @After
    public void tearDown() throws Exception {
        if (this.connection != null) {
            this.connection.close();
        }
        this.brokerService.stop();
    }

    @Test(timeout=60000L)
    public void testInflightMessageSize() throws Exception {
        Assume.assumeTrue((boolean)this.useTopicSubscriptionInflightStats);
        long size = this.sendMessages(10);
        Assert.assertTrue((String)"Inflight message size should be greater than the content length sent", (boolean)Wait.waitFor(() -> this.getSubscription().getInFlightMessageSize() > size, (long)10000L, (long)500L));
        Assert.assertTrue((String)"Inflight sub dispatched message count should equal number of messages sent", (boolean)Wait.waitFor(() -> this.getSubscription().getDispatchedQueueSize() == 10, (long)10000L, (long)500L));
        Assert.assertTrue((String)"Destination inflight message count should equal number of messages sent", (boolean)Wait.waitFor(() -> this.amqDestination.getDestinationStatistics().getInflight().getCount() == 10L, (long)10000L, (long)500L));
        this.receiveMessages(10);
        Assert.assertTrue((String)"Destination inflight message count should be 0", (boolean)Wait.waitFor(() -> this.amqDestination.getDestinationStatistics().getInflight().getCount() == 0L, (long)10000L, (long)500L));
        Assert.assertTrue((String)"Inflight sub dispatched message count should be 0", (boolean)Wait.waitFor(() -> this.getSubscription().getDispatchedQueueSize() == 0, (long)10000L, (long)500L));
        Assert.assertTrue((String)"Inflight message size should be 0", (boolean)Wait.waitFor(() -> this.getSubscription().getInFlightMessageSize() == 0L, (long)10000L, (long)500L));
    }

    @Test(timeout=60000L)
    public void testInflightMessageSizePrefetchFilled() throws Exception {
        Assume.assumeTrue((boolean)this.useTopicSubscriptionInflightStats);
        long size = this.sendMessages(this.prefetch);
        Assert.assertTrue((String)"Inflight message size should be greater than content length", (boolean)Wait.waitFor(() -> this.getSubscription().getInFlightMessageSize() > size, (long)10000L, (long)500L));
        Assert.assertTrue((String)"Inflight sub dispatched message count should equal number of messages sent", (boolean)Wait.waitFor(() -> this.getSubscription().getDispatchedQueueSize() == this.prefetch, (long)10000L, (long)500L));
        Assert.assertTrue((String)"Destination inflight message count should equal number of messages sent", (boolean)Wait.waitFor(() -> this.amqDestination.getDestinationStatistics().getInflight().getCount() == (long)this.prefetch, (long)10000L, (long)500L));
        long inFlightSize = this.getSubscription().getInFlightMessageSize();
        this.sendMessages(10);
        Assert.assertTrue((String)"Destination inflight message count should equal number of messages sent", (boolean)Wait.waitFor(() -> this.amqDestination.getDestinationStatistics().getInflight().getCount() == (long)this.prefetch, (long)10000L, (long)500L));
        Assert.assertTrue((String)"Inflight sub dispatched message count should equal number of messages sent", (boolean)Wait.waitFor(() -> this.getSubscription().getDispatchedQueueSize() == this.prefetch, (long)10000L, (long)500L));
        Assert.assertTrue((String)"Inflight message size should not change", (boolean)Wait.waitFor(() -> this.getSubscription().getInFlightMessageSize() == inFlightSize, (long)10000L, (long)500L));
        this.receiveMessages(this.prefetch + 10);
        Assert.assertTrue((String)"Destination inflight message count should be 0", (boolean)Wait.waitFor(() -> this.amqDestination.getDestinationStatistics().getInflight().getCount() == 0L, (long)10000L, (long)500L));
        Assert.assertTrue((String)"Inflight sub dispatched message count should be 0", (boolean)Wait.waitFor(() -> this.getSubscription().getDispatchedQueueSize() == 0, (long)10000L, (long)500L));
        Assert.assertTrue((String)"Inflight message size should be 0", (boolean)Wait.waitFor(() -> this.getSubscription().getInFlightMessageSize() == 0L, (long)10000L, (long)500L));
    }

    @Test(timeout=60000L)
    public void testInflightMessageSizePrefetchNotFilled() throws Exception {
        Assume.assumeTrue((boolean)this.useTopicSubscriptionInflightStats);
        long size = this.sendMessages(this.prefetch - 10);
        Assert.assertTrue((String)"Inflight message size should be greater than content length", (boolean)Wait.waitFor(() -> this.getSubscription().getInFlightMessageSize() > size, (long)10000L, (long)500L));
        Assert.assertTrue((String)"Inflight sub dispatched message count should equal number of messages sent", (boolean)Wait.waitFor(() -> this.getSubscription().getDispatchedQueueSize() == this.prefetch - 10, (long)10000L, (long)500L));
        Assert.assertTrue((String)"Destination inflight message count should equal number of messages sent", (boolean)Wait.waitFor(() -> this.amqDestination.getDestinationStatistics().getInflight().getCount() == (long)(this.prefetch - 10), (long)10000L, (long)500L));
        long inFlightSize = this.getSubscription().getInFlightMessageSize();
        this.sendMessages(10);
        Assert.assertTrue((String)"Inflight message size should be greater than previous inlight size", (boolean)Wait.waitFor(() -> this.getSubscription().getInFlightMessageSize() > inFlightSize, (long)10000L, (long)500L));
        this.receiveMessages(this.prefetch);
        Assert.assertTrue((String)"Destination inflight message count should be 0", (boolean)Wait.waitFor(() -> this.amqDestination.getDestinationStatistics().getInflight().getCount() == 0L, (long)10000L, (long)500L));
        Assert.assertTrue((String)"Inflight sub dispatched message count should be 0", (boolean)Wait.waitFor(() -> this.getSubscription().getDispatchedQueueSize() == 0, (long)10000L, (long)500L));
        Assert.assertTrue((String)"Inflight message size should be 0", (boolean)Wait.waitFor(() -> this.getSubscription().getInFlightMessageSize() == 0L, (long)10000L, (long)500L));
    }

    @Test(timeout=60000L)
    public void testInflightMessageSizeRollback() throws Exception {
        Assume.assumeTrue((boolean)this.useTopicSubscriptionInflightStats);
        Assume.assumeTrue((this.ackType == 0 ? 1 : 0) != 0);
        long size = this.sendMessages(10);
        Assert.assertTrue((String)"Inflight message size should be greater than the content length sent", (boolean)Wait.waitFor(() -> this.getSubscription().getInFlightMessageSize() > size, (long)10000L, (long)500L));
        Assert.assertTrue((String)"Inflight sub dispatched message count should equal number of messages sent", (boolean)Wait.waitFor(() -> this.getSubscription().getDispatchedQueueSize() == 10, (long)10000L, (long)500L));
        Assert.assertTrue((String)"Destination inflight message count should equal number of messages sent", (boolean)Wait.waitFor(() -> this.amqDestination.getDestinationStatistics().getInflight().getCount() == 10L, (long)10000L, (long)500L));
        long inFlightSize = this.getSubscription().getInFlightMessageSize();
        for (int i = 0; i < 10; ++i) {
            this.consumer.receive();
        }
        this.session.rollback();
        Assert.assertTrue((String)"Destination inflight message count should not change on rollback", (boolean)Wait.waitFor(() -> this.amqDestination.getDestinationStatistics().getInflight().getCount() == 10L, (long)10000L, (long)500L));
        Assert.assertTrue((String)"Inflight sub dispatched message count should not change on rollback", (boolean)Wait.waitFor(() -> this.getSubscription().getDispatchedQueueSize() == 10, (long)10000L, (long)500L));
        Assert.assertTrue((String)"Inflight message size should not change on rollback", (boolean)Wait.waitFor(() -> this.getSubscription().getInFlightMessageSize() == inFlightSize, (long)10000L, (long)500L));
    }

    @Test(timeout=60000L)
    public void testInflightMessageSizeConsumerExpiration() throws Exception {
        Assume.assumeTrue((boolean)this.useTopicSubscriptionInflightStats);
        int ttl = 500;
        int messageCount = 10;
        this.sendMessages(10, ttl);
        Thread.sleep(ttl * 2);
        Assert.assertNull((Object)this.consumer.receive(10L));
        Assert.assertTrue((String)"Expired count is wrong", (boolean)Wait.waitFor(() -> this.brokerService.getDestination(this.getActiveMQDestination()).getDestinationStatistics().getExpired().getCount() == (long)messageCount, (long)10000L, (long)500L));
        Assert.assertTrue((String)"Destination inflight message count should be 0", (boolean)Wait.waitFor(() -> this.amqDestination.getDestinationStatistics().getInflight().getCount() == 0L, (long)10000L, (long)500L));
        Assert.assertTrue((String)"Inflight sub dispatched message count should be 0", (boolean)Wait.waitFor(() -> this.getSubscription().getDispatchedQueueSize() == 0, (long)10000L, (long)500L));
        Assert.assertTrue((String)"Inflight message size should be 0", (boolean)Wait.waitFor(() -> this.getSubscription().getInFlightMessageSize() == 0L, (long)10000L, (long)500L));
    }

    @Test(timeout=60000L)
    public void testInflightMessageSizeDispatchFailure() throws Exception {
        Assume.assumeTrue((boolean)this.useTopicSubscriptionInflightStats);
        this.failOnDispatch.set(true);
        try {
            this.sendMessages(1);
        }
        catch (Exception exception) {
            // empty catch block
        }
        Assert.assertTrue((boolean)Wait.waitFor(() -> ((ActiveMQSession)this.session).isClosed(), (long)10000L, (long)500L));
        Assert.assertTrue((String)"Destination inflight message count should be 0", (boolean)Wait.waitFor(() -> this.amqDestination.getDestinationStatistics().getInflight().getCount() == 0L, (long)10000L, (long)500L));
        Assert.assertTrue((String)"Consumers size should be 0 due to failure or Inflight sub dispatched message count should be 0 for durable sub", (boolean)Wait.waitFor(() -> this.amqDestination.getConsumers().size() == 0 || this.getSubscription().getDispatchedQueueSize() == 0, (long)10000L, (long)500L));
        Assert.assertTrue((String)"Consumers size should be 0 due to failure or Inflight message size should be 0 for durable sub", (boolean)Wait.waitFor(() -> this.amqDestination.getConsumers().size() == 0 || this.getSubscription().getInFlightMessageSize() == 0L, (long)10000L, (long)500L));
    }

    @Test(timeout=60000L)
    public void testInflightMessageSizeConsumerClosed() throws Exception {
        Assume.assumeTrue((boolean)this.useTopicSubscriptionInflightStats);
        this.sendMessages(10);
        Assert.assertTrue((String)"Should be 10 in flight messages", (boolean)Wait.waitFor(() -> this.amqDestination.getDestinationStatistics().getInflight().getCount() == 10L, (long)10000L, (long)500L));
        this.consumer.close();
        Assert.assertTrue((String)"Destination inflight message count should be 0", (boolean)Wait.waitFor(() -> this.amqDestination.getDestinationStatistics().getInflight().getCount() == 0L, (long)10000L, (long)500L));
        Assert.assertTrue((String)"Consumers size should be 0 due to failure or Inflight sub dispatched message count should be 0 for durable sub", (boolean)Wait.waitFor(() -> this.amqDestination.getConsumers().size() == 0 || this.getSubscription().getDispatchedQueueSize() == 0, (long)10000L, (long)500L));
        Assert.assertTrue((String)"Consumers size should be 0 due to failure or Inflight message size should be 0 for durable sub", (boolean)Wait.waitFor(() -> this.amqDestination.getConsumers().size() == 0 || this.getSubscription().getInFlightMessageSize() == 0L, (long)10000L, (long)500L));
    }

    @Test(timeout=60000L)
    public void testInflightMessageSizeRemoveDestination() throws Exception {
        Assume.assumeTrue((boolean)this.useTopicSubscriptionInflightStats);
        this.consumer.close();
        this.consumer = this.getMessageConsumer("testDest.>");
        this.sendMessages(10);
        this.sendMessages(10, (jakarta.jms.Destination)this.getActiveMQDestination("testDest.2"));
        Destination amqDestination2 = TestSupport.getDestination(this.brokerService, this.getActiveMQDestination("testDest.2"));
        Subscription subscription = this.getSubscription();
        Assert.assertTrue((String)"Should be 10 in flight messages", (boolean)Wait.waitFor(() -> this.amqDestination.getDestinationStatistics().getInflight().getCount() == 10L, (long)10000L, (long)500L));
        Assert.assertTrue((String)"Should be 10 in flight messages", (boolean)Wait.waitFor(() -> amqDestination2.getDestinationStatistics().getInflight().getCount() == 10L, (long)10000L, (long)500L));
        Assert.assertTrue((String)"Inflight message size should be 20", (boolean)Wait.waitFor(() -> subscription.getInFlightSize() == 20, (long)10000L, (long)500L));
        Assert.assertTrue((String)"Inflight message size should be greater than 0", (boolean)Wait.waitFor(() -> subscription.getInFlightMessageSize() > 0L, (long)10000L, (long)500L));
        this.brokerService.getBroker().removeDestination(this.brokerService.getAdminConnectionContext(), this.getActiveMQDestination(), 1000L);
        Assert.assertTrue((String)"Destination inflight message count should be 0", (boolean)Wait.waitFor(() -> this.amqDestination.getDestinationStatistics().getInflight().getCount() == 0L, (long)10000L, (long)500L));
        Assert.assertTrue((String)"Destination inflight message count should still be 10", (boolean)Wait.waitFor(() -> amqDestination2.getDestinationStatistics().getInflight().getCount() == 10L, (long)10000L, (long)500L));
        Assert.assertTrue((String)"Inflight message size should be 10", (boolean)Wait.waitFor(() -> subscription.getInFlightSize() == 10, (long)10000L, (long)500L));
        Assert.assertTrue((String)"Inflight message size should be greater than 0", (boolean)Wait.waitFor(() -> subscription.getInFlightMessageSize() > 0L, (long)10000L, (long)500L));
        this.brokerService.getBroker().removeDestination(this.brokerService.getAdminConnectionContext(), this.getActiveMQDestination("testDest.2"), 1000L);
        Assert.assertTrue((String)"Destination inflight message count should be 0", (boolean)Wait.waitFor(() -> amqDestination2.getDestinationStatistics().getInflight().getCount() == 0L, (long)10000L, (long)500L));
        Assert.assertTrue((String)"Inflight message size should be 0", (boolean)Wait.waitFor(() -> subscription.getInFlightSize() == 0, (long)10000L, (long)500L));
        Assert.assertTrue((String)"Inflight message size should be 0", (boolean)Wait.waitFor(() -> subscription.getInFlightMessageSize() == 0L, (long)10000L, (long)500L));
    }

    protected long sendMessages(int count) throws JMSException {
        return this.sendMessages(count, null, this.dest);
    }

    protected long sendMessages(int count, jakarta.jms.Destination dest) throws JMSException {
        return this.sendMessages(count, null, dest);
    }

    protected long sendMessages(int count, Integer ttl) throws JMSException {
        return this.sendMessages(count, ttl, this.dest);
    }

    protected long sendMessages(int count, Integer ttl, jakarta.jms.Destination dest) throws JMSException {
        MessageProducer producer = this.session.createProducer(dest);
        if (ttl != null) {
            producer.setTimeToLive((long)ttl.intValue());
        }
        long totalSize = 0L;
        for (int i = 0; i < count; ++i) {
            Random r = new Random();
            int size = r.nextInt(150000);
            totalSize += (long)size;
            byte[] bytes = new byte[size > 0 ? size : 1];
            r.nextBytes(bytes);
            BytesMessage bytesMessage = this.session.createBytesMessage();
            bytesMessage.writeBytes(bytes);
            producer.send((Message)bytesMessage);
        }
        if (this.session.getTransacted()) {
            this.session.commit();
        }
        return totalSize;
    }

    protected void receiveMessages(int count) throws JMSException {
        for (int i = 0; i < count; ++i) {
            Message m = this.consumer.receive();
            if (this.ackType == 0) {
                this.session.commit();
                continue;
            }
            if (this.ackType == 1) continue;
            m.acknowledge();
        }
    }

    protected abstract Subscription getSubscription();

    protected ActiveMQDestination getActiveMQDestination() {
        return this.getActiveMQDestination("testDest.1");
    }

    protected abstract ActiveMQDestination getActiveMQDestination(String var1);

    protected MessageConsumer getMessageConsumer() throws JMSException {
        return this.getMessageConsumer("testDest.1");
    }

    protected abstract MessageConsumer getMessageConsumer(String var1) throws JMSException;

    protected jakarta.jms.Destination getDestination() throws JMSException {
        return this.getDestination("testDest.1");
    }

    protected abstract jakarta.jms.Destination getDestination(String var1) throws JMSException;
}

