/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.broker.advisory;

import jakarta.jms.Connection;
import jakarta.jms.Destination;
import jakarta.jms.MessageConsumer;
import jakarta.jms.Session;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.network.DiscoveryNetworkConnector;
import org.apache.activemq.network.NetworkConnector;
import org.apache.activemq.util.ServiceStopper;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;

public class AdvisoryDuringStartTest {
    BrokerService brokerService;

    @After
    public void stopBroker() throws Exception {
        if (this.brokerService != null) {
            this.brokerService.stop();
        }
    }

    @Test
    public void testConsumerAdvisoryDuringSlowStart() throws Exception {
        this.brokerService = new BrokerService();
        this.brokerService.setPersistent(false);
        this.brokerService.setUseJmx(false);
        this.brokerService.addConnector("tcp://localhost:0");
        final CountDownLatch resumeStart = new CountDownLatch(1);
        this.brokerService.addNetworkConnector((NetworkConnector)new DiscoveryNetworkConnector(){

            protected void handleStart() throws Exception {
                resumeStart.await(5L, TimeUnit.SECONDS);
            }

            protected void handleStop(ServiceStopper s) throws Exception {
            }
        });
        Executors.newCachedThreadPool().submit(new Runnable(){

            @Override
            public void run() {
                try {
                    AdvisoryDuringStartTest.this.brokerService.start();
                }
                catch (Exception e) {
                    e.printStackTrace();
                    Assert.fail((String)("error on start: " + e.toString()));
                }
            }
        });
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("failover:(" + this.brokerService.getTransportConnectorByScheme("tcp").getPublishableConnectString() + ")");
        Connection advisoryConnection = connectionFactory.createConnection();
        advisoryConnection.start();
        Session advisorySession = advisoryConnection.createSession(false, 1);
        MessageConsumer advisoryConsumer = advisorySession.createConsumer((Destination)advisorySession.createTopic("ActiveMQ.Advisory.Consumer.>"));
        Connection consumerConnection = connectionFactory.createConnection();
        Session consumerSession = consumerConnection.createSession(false, 1);
        consumerConnection.start();
        ActiveMQTopic dest = new ActiveMQTopic("SomeTopic");
        consumerSession.createConsumer((Destination)dest);
        resumeStart.countDown();
        ActiveMQMessage advisory = (ActiveMQMessage)advisoryConsumer.receive(4000L);
        Assert.assertNotNull((Object)advisory);
        Assert.assertTrue((boolean)(advisory.getDataStructure() instanceof ConsumerInfo));
        Assert.assertTrue((boolean)((ConsumerInfo)advisory.getDataStructure()).getDestination().equals((Object)dest));
        advisoryConnection.close();
        consumerConnection.close();
    }
}

