/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.usecases;

import jakarta.jms.Destination;
import jakarta.jms.MessageConsumer;
import jakarta.jms.TextMessage;
import java.net.URI;
import java.util.List;
import java.util.concurrent.TimeUnit;
import junit.framework.Test;
import org.apache.activemq.JmsMultipleBrokersTestSupport;
import org.apache.activemq.broker.BrokerPlugin;
import org.apache.activemq.broker.BrokerPluginSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ConnectionInfo;
import org.apache.activemq.network.DiscoveryNetworkConnector;
import org.apache.activemq.network.NetworkConnector;
import org.apache.activemq.transport.vm.VMTransportFactory;
import org.apache.activemq.transport.vm.VMTransportServer;
import org.apache.activemq.util.MessageIdList;
import org.apache.activemq.util.SocketProxy;
import org.apache.activemq.util.Wait;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

public class BrokerQueueNetworkWithDisconnectTest
extends JmsMultipleBrokersTestSupport {
    private static final Log LOG = LogFactory.getLog(BrokerQueueNetworkWithDisconnectTest.class);
    private static final int NETWORK_DOWN_TIME = 5000;
    protected static final int MESSAGE_COUNT = 200;
    private static final String HUB = "HubBroker";
    private static final String SPOKE = "SpokeBroker";
    private SocketProxy socketProxy;
    private long networkDownTimeStart;
    public boolean useDuplexNetworkBridge = true;
    public boolean simulateStalledNetwork;
    private long inactiveDuration = 1000L;
    private boolean useSocketProxy = true;

    public void initCombosForTestSendOnAReceiveOnBWithTransportDisconnect() {
        this.addCombinationValues("useDuplexNetworkBridge", new Object[]{Boolean.TRUE, Boolean.FALSE});
        this.addCombinationValues("simulateStalledNetwork", new Object[]{Boolean.TRUE});
    }

    public void testSendOnAReceiveOnBWithTransportDisconnect() throws Exception {
        this.bridgeBrokers(SPOKE, HUB);
        this.startAllBrokers();
        ActiveMQDestination dest = this.createDestination("TEST.FOO", false);
        MessageConsumer client = this.createConsumer(HUB, (Destination)dest);
        this.sleep(600);
        this.sendMessages(SPOKE, (Destination)dest, 200);
        MessageIdList msgs = this.getConsumerMessages(HUB, client);
        msgs.waitForMessagesToArrive(200);
        BrokerQueueNetworkWithDisconnectTest.assertTrue((String)("At least message 200 must be recieved, duplicates are expected, count=" + msgs.getMessageCount()), (200 <= msgs.getMessageCount() ? 1 : 0) != 0);
    }

    public void testNoStuckConnectionsWithTransportDisconnect() throws Exception {
        this.inactiveDuration = 60000L;
        this.useDuplexNetworkBridge = true;
        this.bridgeBrokers(SPOKE, HUB);
        final JmsMultipleBrokersTestSupport.BrokerItem hub = (JmsMultipleBrokersTestSupport.BrokerItem)this.brokers.get(HUB);
        hub.broker.setPlugins(new BrokerPlugin[]{new BrokerPluginSupport(){
            int sleepCount = 2;

            public void removeConnection(ConnectionContext context, ConnectionInfo info, Throwable error) throws Exception {
                try {
                    while (--this.sleepCount >= 0) {
                        LOG.info((Object)"sleeping for a bit in close impl to simulate load where reconnect fails due to a pending close");
                        TimeUnit.SECONDS.sleep(2L);
                    }
                }
                catch (Exception exception) {
                    // empty catch block
                }
                super.removeConnection(context, info, error);
            }
        }});
        this.startAllBrokers();
        this.waitForBridgeFormation();
        for (int i = 0; i < 3; ++i) {
            this.socketProxy.halfClose();
            this.sleep(10000);
        }
        boolean allGood = Wait.waitFor((Wait.Condition)new Wait.Condition(){

            public boolean isSatisified() throws Exception {
                long numConnections = ((TransportConnector)hub.broker.getTransportConnectors().get(0)).getConnections().size();
                LOG.info((Object)("Num connetions:" + numConnections));
                return numConnections == 1L;
            }
        });
        if (!allGood) {
            BrokerQueueNetworkWithDisconnectTest.dumpAllThreads((String)"ExtraHubConnection");
        }
        BrokerQueueNetworkWithDisconnectTest.assertTrue((String)"should be only one transport connection for the single duplex network connector", (boolean)allGood);
        allGood = Wait.waitFor((Wait.Condition)new Wait.Condition(){

            public boolean isSatisified() throws Exception {
                long numVmConnections = ((VMTransportServer)VMTransportFactory.SERVERS.get(BrokerQueueNetworkWithDisconnectTest.HUB)).getConnectionCount();
                LOG.info((Object)("Num VM connetions:" + numVmConnections));
                return numVmConnections == 2L;
            }
        });
        if (!allGood) {
            BrokerQueueNetworkWithDisconnectTest.dumpAllThreads((String)"ExtraHubVMConnection");
        }
        BrokerQueueNetworkWithDisconnectTest.assertTrue((String)"should be only 2 vm connections for the single network duplex network connector", (boolean)allGood);
    }

    public void testTwoDuplexNCsAreAllowed() throws Exception {
        this.useDuplexNetworkBridge = true;
        this.useSocketProxy = false;
        NetworkConnector connector = this.bridgeBrokers(SPOKE, HUB);
        connector.setName("FirstDuplex");
        connector = this.bridgeBrokers(SPOKE, HUB);
        connector.setName("SecondDuplex");
        this.startAllBrokers();
        this.waitForBridgeFormation();
        JmsMultipleBrokersTestSupport.BrokerItem hub = (JmsMultipleBrokersTestSupport.BrokerItem)this.brokers.get(HUB);
        BrokerQueueNetworkWithDisconnectTest.assertEquals((String)"Has two transport Connectors", (int)2, (int)((TransportConnector)hub.broker.getTransportConnectors().get(0)).getConnections().size());
    }

    @Override
    protected void startAllBrokers() throws Exception {
        JmsMultipleBrokersTestSupport.BrokerItem brokerItem = (JmsMultipleBrokersTestSupport.BrokerItem)this.brokers.get(HUB);
        brokerItem.broker.start();
        brokerItem = (JmsMultipleBrokersTestSupport.BrokerItem)this.brokers.get(SPOKE);
        brokerItem.broker.start();
        this.sleep(600);
    }

    @Override
    public void setUp() throws Exception {
        this.networkDownTimeStart = 0L;
        this.inactiveDuration = 1000L;
        this.useSocketProxy = true;
        super.setAutoFail(true);
        super.setUp();
        String options = "?persistent=true&useJmx=false&deleteAllMessagesOnStartup=true";
        this.createBroker(new URI("broker:(tcp://localhost:61617)/HubBroker?persistent=true&useJmx=false&deleteAllMessagesOnStartup=true"));
        this.createBroker(new URI("broker:(tcp://localhost:61616)/SpokeBroker?persistent=true&useJmx=false&deleteAllMessagesOnStartup=true"));
    }

    @Override
    public void tearDown() throws Exception {
        super.tearDown();
        if (this.socketProxy != null) {
            this.socketProxy.close();
        }
    }

    public static Test suite() {
        return BrokerQueueNetworkWithDisconnectTest.suite(BrokerQueueNetworkWithDisconnectTest.class);
    }

    @Override
    protected void onSend(int i, TextMessage msg) {
        this.sleep(50);
        if (i == 50 || i == 150) {
            if (this.simulateStalledNetwork) {
                this.socketProxy.pause();
            } else {
                this.socketProxy.close();
            }
            this.networkDownTimeStart = System.currentTimeMillis();
        } else if (this.networkDownTimeStart > 0L) {
            if (this.networkDownTimeStart + 5000L < System.currentTimeMillis()) {
                if (this.simulateStalledNetwork) {
                    this.socketProxy.goOn();
                } else {
                    this.socketProxy.reopen();
                }
                this.networkDownTimeStart = 0L;
            } else {
                this.sleep(500);
            }
        }
        super.onSend(i, msg);
    }

    private void sleep(int milliSecondTime) {
        try {
            Thread.sleep(milliSecondTime);
        }
        catch (InterruptedException interruptedException) {
            // empty catch block
        }
    }

    @Override
    protected NetworkConnector bridgeBrokers(BrokerService localBroker, BrokerService remoteBroker, boolean dynamicOnly, int networkTTL, boolean conduit, boolean failover) throws Exception {
        List transportConnectors = remoteBroker.getTransportConnectors();
        if (!transportConnectors.isEmpty()) {
            URI remoteURI = ((TransportConnector)transportConnectors.get(0)).getConnectUri();
            if (this.useSocketProxy) {
                this.socketProxy = new SocketProxy(remoteURI);
                remoteURI = this.socketProxy.getUrl();
            }
            DiscoveryNetworkConnector connector = new DiscoveryNetworkConnector(new URI("static:(" + remoteURI + "?wireFormat.maxInactivityDuration=" + this.inactiveDuration + "&wireFormat.maxInactivityDurationInitalDelay=" + this.inactiveDuration + ")?useExponentialBackOff=false"));
            connector.setDynamicOnly(dynamicOnly);
            connector.setNetworkTTL(networkTTL);
            localBroker.addNetworkConnector((NetworkConnector)connector);
            maxSetupTime = 2000;
            if (this.useDuplexNetworkBridge) {
                connector.setDuplex(true);
            }
            return connector;
        }
        throw new Exception("Remote broker has no registered connectors.");
    }
}

