/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.network;

import jakarta.jms.Connection;
import jakarta.jms.Destination;
import jakarta.jms.JMSException;
import jakarta.jms.Message;
import jakarta.jms.MessageConsumer;
import jakarta.jms.MessageListener;
import jakarta.jms.MessageProducer;
import jakarta.jms.Session;
import jakarta.jms.TemporaryQueue;
import jakarta.jms.TextMessage;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicInteger;
import junit.framework.TestCase;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.DestinationDoesNotExistException;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.transport.TransportFilter;
import org.apache.activemq.transport.failover.FailoverTransport;
import org.apache.activemq.xbean.BrokerFactoryBean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.support.AbstractApplicationContext;
import org.springframework.core.io.ClassPathResource;
import org.springframework.core.io.Resource;

public class NetworkFailoverTest
extends TestCase {
    protected static final int MESSAGE_COUNT = 10;
    private static final Logger LOG = LoggerFactory.getLogger(NetworkFailoverTest.class);
    protected AbstractApplicationContext context;
    protected Connection localConnection;
    protected Connection remoteConnection;
    protected BrokerService localBroker;
    protected BrokerService remoteBroker;
    protected Session localSession;
    protected Session remoteSession;
    protected ActiveMQQueue included = new ActiveMQQueue("include.test.foo");
    private final AtomicInteger replyToNonExistDest = new AtomicInteger(0);
    private final AtomicInteger roundTripComplete = new AtomicInteger(0);
    private final AtomicInteger remoteDLQCount = new AtomicInteger(0);

    public void testRequestReply() throws Exception {
        final MessageProducer remoteProducer = this.remoteSession.createProducer(null);
        MessageConsumer remoteConsumer = this.remoteSession.createConsumer((Destination)this.included);
        remoteConsumer.setMessageListener(new MessageListener(){

            public void onMessage(Message msg) {
                TextMessage textMsg = (TextMessage)msg;
                try {
                    String payload = "REPLY: " + textMsg.getText() + ", " + textMsg.getJMSMessageID();
                    Destination replyTo = msg.getJMSReplyTo();
                    textMsg.clearBody();
                    textMsg.setText(payload);
                    LOG.info("*** Sending response: {}", (Object)textMsg.getText());
                    remoteProducer.send(replyTo, (Message)textMsg);
                    LOG.info("replied with: " + textMsg.getJMSMessageID());
                }
                catch (DestinationDoesNotExistException expected) {
                    NetworkFailoverTest.this.replyToNonExistDest.incrementAndGet();
                    try {
                        LOG.info("NED: " + textMsg.getJMSMessageID());
                    }
                    catch (JMSException e) {
                        e.printStackTrace();
                    }
                }
                catch (Exception e) {
                    LOG.warn("*** Responder listener caught exception: ", (Throwable)e);
                    e.printStackTrace();
                }
            }
        });
        TemporaryQueue tempQueue = this.localSession.createTemporaryQueue();
        MessageProducer requestProducer = this.localSession.createProducer((Destination)this.included);
        requestProducer.setDeliveryMode(1);
        MessageConsumer requestConsumer = this.localSession.createConsumer((Destination)tempQueue);
        MessageConsumer dlqconsumer = this.remoteSession.createConsumer((Destination)new ActiveMQQueue("ActiveMQ.DLQ"));
        dlqconsumer.setMessageListener(new MessageListener(){

            public void onMessage(Message message) {
                try {
                    LOG.info("dlq " + message.getJMSMessageID());
                }
                catch (JMSException e) {
                    e.printStackTrace();
                }
                NetworkFailoverTest.this.remoteDLQCount.incrementAndGet();
            }
        });
        Thread.sleep(2000L);
        long done = System.currentTimeMillis() + 60000L;
        int i = 0;
        while (10 > this.roundTripComplete.get() + this.remoteDLQCount.get() + this.replyToNonExistDest.get() && done > System.currentTimeMillis()) {
            TextMessage result;
            if (i < 10) {
                String payload = "test msg " + i;
                ++i;
                TextMessage msg = this.localSession.createTextMessage(payload);
                msg.setJMSReplyTo((Destination)tempQueue);
                requestProducer.send((Message)msg);
                LOG.info("Sent: " + msg.getJMSMessageID() + ", Failing over");
                ((FailoverTransport)((TransportFilter)((TransportFilter)((ActiveMQConnection)this.localConnection).getTransport()).getNext()).getNext()).handleTransportFailure(new IOException("Forcing failover from test"));
            }
            if ((result = (TextMessage)requestConsumer.receive(5000L)) == null) continue;
            LOG.info("Got reply: " + result.getJMSMessageID() + ", " + result.getText());
            this.roundTripComplete.incrementAndGet();
        }
        LOG.info("complete: " + this.roundTripComplete.get() + ", remoteDLQCount: " + this.remoteDLQCount.get() + ", replyToNonExistDest: " + this.replyToNonExistDest.get());
        NetworkFailoverTest.assertEquals((String)("complete:" + this.roundTripComplete.get() + ", remoteDLQCount: " + this.remoteDLQCount.get() + ", replyToNonExistDest: " + this.replyToNonExistDest.get()), (int)10, (int)(this.roundTripComplete.get() + this.remoteDLQCount.get() + this.replyToNonExistDest.get()));
    }

    protected void setUp() throws Exception {
        super.setUp();
        this.doSetUp(true);
    }

    protected void tearDown() throws Exception {
        this.doTearDown();
        super.tearDown();
    }

    protected void doTearDown() throws Exception {
        try {
            this.localConnection.close();
            this.remoteConnection.close();
        }
        catch (Exception exception) {
            // empty catch block
        }
        try {
            this.localBroker.stop();
        }
        catch (Exception exception) {
            // empty catch block
        }
        try {
            this.remoteBroker.stop();
        }
        catch (Exception exception) {
            // empty catch block
        }
    }

    protected void doSetUp(boolean deleteAllMessages) throws Exception {
        this.remoteBroker = this.createRemoteBroker();
        this.remoteBroker.setDeleteAllMessagesOnStartup(deleteAllMessages);
        this.remoteBroker.setCacheTempDestinations(true);
        this.remoteBroker.start();
        this.localBroker = this.createLocalBroker();
        this.localBroker.setDeleteAllMessagesOnStartup(deleteAllMessages);
        this.localBroker.setCacheTempDestinations(true);
        this.localBroker.start();
        String localURI = "tcp://localhost:61616";
        String remoteURI = "tcp://localhost:61617";
        ActiveMQConnectionFactory fac = new ActiveMQConnectionFactory("failover:(" + localURI + "," + remoteURI + ")?randomize=false&backup=false&trackMessages=true");
        this.localConnection = fac.createConnection();
        this.localConnection.setClientID("local");
        this.localConnection.start();
        fac = new ActiveMQConnectionFactory("failover:(" + remoteURI + "," + localURI + ")?randomize=false&backup=false&trackMessages=true");
        fac.setWatchTopicAdvisories(false);
        this.remoteConnection = fac.createConnection();
        this.remoteConnection.setClientID("remote");
        this.remoteConnection.start();
        this.localSession = this.localConnection.createSession(false, 1);
        this.remoteSession = this.remoteConnection.createSession(false, 1);
    }

    protected String getRemoteBrokerURI() {
        return "org/apache/activemq/network/remoteBroker.xml";
    }

    protected String getLocalBrokerURI() {
        return "org/apache/activemq/network/localBroker.xml";
    }

    protected BrokerService createBroker(String uri) throws Exception {
        ClassPathResource resource = new ClassPathResource(uri);
        BrokerFactoryBean factory = new BrokerFactoryBean((Resource)resource);
        resource = new ClassPathResource(uri);
        factory = new BrokerFactoryBean((Resource)resource);
        factory.afterPropertiesSet();
        BrokerService result = factory.getBroker();
        return result;
    }

    protected BrokerService createLocalBroker() throws Exception {
        return this.createBroker(this.getLocalBrokerURI());
    }

    protected BrokerService createRemoteBroker() throws Exception {
        return this.createBroker(this.getRemoteBrokerURI());
    }
}

