/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.bugs;

import jakarta.jms.BytesMessage;
import jakarta.jms.Connection;
import jakarta.jms.Destination;
import jakarta.jms.Message;
import jakarta.jms.MessageConsumer;
import jakarta.jms.MessageProducer;
import jakarta.jms.Session;
import jakarta.jms.TemporaryQueue;
import jakarta.jms.Topic;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.advisory.AdvisorySupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.broker.region.DestinationInterceptor;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.broker.region.virtual.MirroredQueue;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.util.Wait;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AMQ3324Test {
    private static final transient Logger LOG = LoggerFactory.getLogger(AMQ3324Test.class);
    private static final String bindAddress = "tcp://0.0.0.0:0";
    private BrokerService broker;
    private ActiveMQConnectionFactory cf;
    private static final int MESSAGE_COUNT = 100;

    @Before
    public void setUp() throws Exception {
        this.broker = this.createBroker();
        String address = ((TransportConnector)this.broker.getTransportConnectors().get(0)).getPublishableConnectString();
        this.broker.start();
        this.broker.waitUntilStarted();
        this.cf = new ActiveMQConnectionFactory(address);
    }

    @After
    public void tearDown() throws Exception {
        if (this.broker != null) {
            this.broker.stop();
            this.broker.waitUntilStopped();
        }
    }

    @Test
    public void testTempMessageConsumedAdvisoryConnectionClose() throws Exception {
        Connection connection = this.cf.createConnection();
        connection.start();
        Session session = connection.createSession(false, 1);
        TemporaryQueue queue = session.createTemporaryQueue();
        MessageConsumer consumer = session.createConsumer((Destination)queue);
        ActiveMQTopic advisoryTopic = AdvisorySupport.getMessageConsumedAdvisoryTopic((ActiveMQDestination)((ActiveMQDestination)queue));
        MessageConsumer advisoryConsumer = session.createConsumer((Destination)advisoryTopic);
        MessageProducer producer = session.createProducer((Destination)queue);
        for (int i = 0; i < 100; ++i) {
            BytesMessage m = session.createBytesMessage();
            m.writeBytes(new byte[1024]);
            producer.send((Message)m);
        }
        Message msg = consumer.receive(5000L);
        Assert.assertNotNull((Object)msg);
        Message advCmsg = advisoryConsumer.receive(5000L);
        Assert.assertNotNull((Object)advCmsg);
        connection.close();
        LOG.debug("Connection closed, destinations should now become inactive.");
        Assert.assertTrue((String)("The destination " + (Topic)advisoryTopic + "was not removed. "), (boolean)Wait.waitFor((Wait.Condition)new Wait.Condition(){

            public boolean isSatisified() throws Exception {
                return AMQ3324Test.this.broker.getAdminView().getTopics().length == 0;
            }
        }));
        Assert.assertTrue((String)("The destination " + queue + " was not removed. "), (boolean)Wait.waitFor((Wait.Condition)new Wait.Condition(){

            public boolean isSatisified() throws Exception {
                return AMQ3324Test.this.broker.getAdminView().getTemporaryQueues().length == 0;
            }
        }));
    }

    protected BrokerService createBroker() throws Exception {
        BrokerService answer = new BrokerService();
        answer.setUseMirroredQueues(true);
        answer.setPersistent(false);
        answer.setSchedulePeriodForDestinationPurge(1000);
        PolicyEntry entry = new PolicyEntry();
        entry.setGcInactiveDestinations(true);
        entry.setInactiveTimoutBeforeGC(2000L);
        entry.setProducerFlowControl(true);
        entry.setAdvisoryForConsumed(true);
        entry.setAdvisoryForFastProducers(true);
        entry.setAdvisoryForDelivery(true);
        PolicyMap map = new PolicyMap();
        map.setDefaultEntry(entry);
        MirroredQueue mirrorQ = new MirroredQueue();
        mirrorQ.setCopyMessage(true);
        DestinationInterceptor[] destinationInterceptors = new DestinationInterceptor[]{mirrorQ};
        answer.setDestinationInterceptors(destinationInterceptors);
        answer.setDestinationPolicy(map);
        answer.addConnector(bindAddress);
        return answer;
    }
}

