/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq;

import jakarta.jms.BytesMessage;
import jakarta.jms.Connection;
import jakarta.jms.Destination;
import jakarta.jms.JMSException;
import jakarta.jms.Message;
import jakarta.jms.MessageConsumer;
import jakarta.jms.MessageListener;
import jakarta.jms.MessageProducer;
import jakarta.jms.Session;
import jakarta.jms.Topic;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ClientTestSupport;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.util.IdGenerator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class LargeMessageTestSupport
extends ClientTestSupport
implements MessageListener {
    protected static final int LARGE_MESSAGE_SIZE = 131072;
    protected static final int MESSAGE_COUNT = 100;
    private static final Logger LOG = LoggerFactory.getLogger(LargeMessageTestSupport.class);
    protected Connection producerConnection;
    protected Connection consumerConnection;
    protected MessageConsumer consumer;
    protected MessageProducer producer;
    protected Session producerSession;
    protected Session consumerSession;
    protected byte[] largeMessageData;
    protected Destination destination;
    protected boolean isTopic = true;
    protected boolean isDurable = true;
    protected int deliveryMode = 2;
    protected IdGenerator idGen = new IdGenerator();
    protected boolean validMessageConsumption = true;
    protected AtomicInteger messageCount = new AtomicInteger(0);
    protected int prefetchValue = 10000000;

    protected Destination createDestination() {
        String subject = ((Object)((Object)this)).getClass().getName();
        if (this.isTopic) {
            return new ActiveMQTopic(subject);
        }
        return new ActiveMQQueue(subject);
    }

    protected MessageConsumer createConsumer() throws JMSException {
        if (this.isTopic && this.isDurable) {
            return this.consumerSession.createDurableSubscriber((Topic)this.destination, this.idGen.generateId());
        }
        return this.consumerSession.createConsumer(this.destination);
    }

    @Override
    public void setUp() throws Exception {
        super.setUp();
        ClientTestSupport.removeMessageStore();
        LOG.info("Setting up . . . . . ");
        this.messageCount.set(0);
        this.destination = this.createDestination();
        this.largeMessageData = new byte[131072];
        for (int i = 0; i < 131072; ++i) {
            this.largeMessageData[i] = i % 2 == 0 ? 97 : 122;
        }
        try {
            Thread.sleep(1000L);
        }
        catch (InterruptedException e) {
            throw new JMSException(e.getMessage());
        }
        ActiveMQConnectionFactory fac = this.getConnectionFactory();
        this.producerConnection = fac.createConnection();
        this.setPrefetchPolicy((ActiveMQConnection)this.producerConnection);
        this.producerConnection.start();
        this.consumerConnection = fac.createConnection();
        this.setPrefetchPolicy((ActiveMQConnection)this.consumerConnection);
        this.consumerConnection.setClientID(this.idGen.generateId());
        this.consumerConnection.start();
        this.producerSession = this.producerConnection.createSession(false, 1);
        this.producer = this.producerSession.createProducer(this.createDestination());
        this.producer.setDeliveryMode(this.deliveryMode);
        this.consumerSession = this.consumerConnection.createSession(false, 1);
        this.consumer = this.createConsumer();
        this.consumer.setMessageListener((MessageListener)this);
        LOG.info("Setup complete");
    }

    protected void setPrefetchPolicy(ActiveMQConnection activeMQConnection) {
        activeMQConnection.getPrefetchPolicy().setTopicPrefetch(this.prefetchValue);
        activeMQConnection.getPrefetchPolicy().setQueuePrefetch(this.prefetchValue);
        activeMQConnection.getPrefetchPolicy().setDurableTopicPrefetch(this.prefetchValue);
        activeMQConnection.getPrefetchPolicy().setQueueBrowserPrefetch(this.prefetchValue);
        activeMQConnection.getPrefetchPolicy().setOptimizeDurableTopicPrefetch(this.prefetchValue);
    }

    @Override
    public void tearDown() throws Exception {
        Thread.sleep(1000L);
        this.producerConnection.close();
        this.consumerConnection.close();
        super.tearDown();
        this.largeMessageData = null;
    }

    protected boolean isSame(BytesMessage msg1) throws Exception {
        boolean result = false;
        ((ActiveMQMessage)msg1).setReadOnlyBody(true);
        for (int i = 0; i < 131072; ++i) {
            boolean bl = result = msg1.readByte() == this.largeMessageData[i];
            if (!result) break;
        }
        return result;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void onMessage(Message msg) {
        try {
            BytesMessage ba = (BytesMessage)msg;
            this.validMessageConsumption &= this.isSame(ba);
            LargeMessageTestSupport.assertTrue((ba.getBodyLength() == 131072L ? 1 : 0) != 0);
            if (this.messageCount.incrementAndGet() >= 100) {
                AtomicInteger atomicInteger = this.messageCount;
                synchronized (atomicInteger) {
                    this.messageCount.notify();
                }
            }
            LOG.info("got message = " + this.messageCount);
            if (this.messageCount.get() % 50 == 0) {
                LOG.info("count = " + this.messageCount);
            }
        }
        catch (Exception e) {
            e.printStackTrace();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void testLargeMessages() throws Exception {
        for (int i = 0; i < 100; ++i) {
            LOG.info("Sending message: " + i);
            BytesMessage msg = this.producerSession.createBytesMessage();
            msg.writeBytes(this.largeMessageData);
            this.producer.send((Message)msg);
        }
        long now = System.currentTimeMillis();
        while (now + 60000L > System.currentTimeMillis() && this.messageCount.get() < 100) {
            LOG.info("message count = " + this.messageCount);
            AtomicInteger atomicInteger = this.messageCount;
            synchronized (atomicInteger) {
                this.messageCount.wait(1000L);
            }
        }
        LOG.info("Finished count = " + this.messageCount);
        LargeMessageTestSupport.assertTrue((String)("Not enough messages - expected 100 but got " + this.messageCount), (this.messageCount.get() == 100 ? 1 : 0) != 0);
        LargeMessageTestSupport.assertTrue((String)"received messages are not valid", (boolean)this.validMessageConsumption);
        Thread.sleep(1000L);
        LOG.info("FINAL count = " + this.messageCount);
    }
}

