/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.network;

import jakarta.jms.Connection;
import jakarta.jms.Destination;
import jakarta.jms.JMSException;
import jakarta.jms.Message;
import jakarta.jms.MessageConsumer;
import jakarta.jms.MessageProducer;
import jakarta.jms.Session;
import jakarta.jms.TextMessage;
import java.io.File;
import java.io.IOException;
import java.net.Socket;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import javax.net.ServerSocketFactory;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.broker.jmx.ManagementContext;
import org.apache.activemq.command.Response;
import org.apache.activemq.network.DiscoveryNetworkConnector;
import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportFilter;
import org.apache.activemq.transport.TransportServer;
import org.apache.activemq.transport.nio.NIOTransport;
import org.apache.activemq.transport.nio.NIOTransportFactory;
import org.apache.activemq.transport.tcp.TcpTransportFactory;
import org.apache.activemq.transport.tcp.TcpTransportServer;
import org.apache.activemq.wireformat.WireFormat;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CheckDuplicateMessagesOnDuplexTest {
    private static final Logger log = LoggerFactory.getLogger(CheckDuplicateMessagesOnDuplexTest.class);
    private BrokerService localBroker;
    private BrokerService remoteBroker;
    private ActiveMQConnectionFactory localFactory;
    private ActiveMQConnectionFactory remoteFactory;
    private Session localSession;
    private MessageConsumer consumer;
    private Session remoteSession;
    private MessageProducer producer;
    private Connection remoteConnection;
    private Connection localConnection;
    private DebugTransportFilter debugTransportFilter;

    @BeforeClass
    public static void setUpClass() {
    }

    @AfterClass
    public static void tearDownClass() {
    }

    @Before
    public void setUp() {
    }

    @After
    public void tearDown() {
    }

    @Test
    public void testConnectionLossBehaviorBeforeAckIsSent() throws Exception {
        this.createBrokers();
        this.localBroker.deleteAllMessages();
        this.remoteBroker.deleteAllMessages();
        this.startBrokers();
        this.openConnections();
        Thread.sleep(1000L);
        log.info("\n\n==============================================\nsend hello1\n");
        this.debugTransportFilter.closeOnResponse = true;
        this.producer.send((Message)this.remoteSession.createTextMessage("hello1"));
        Message msg = this.consumer.receive(30000L);
        Assert.assertNotNull((String)"expected hello1", (Object)msg);
        Assert.assertEquals((Object)"hello1", (Object)((TextMessage)msg).getText());
        Thread.sleep(1000L);
        log.info("\n\n------------------------------------------\nsend hello2\n");
        this.producer.send((Message)this.remoteSession.createTextMessage("hello2"));
        msg = this.consumer.receive(30000L);
        Assert.assertNotNull((String)"expected hello2", (Object)msg);
        Assert.assertEquals((Object)"hello2", (Object)((TextMessage)msg).getText());
        this.closeLocalConnection();
        Thread.sleep(1000L);
        log.info("\n\n------------------------------------------\nsend hello3\n");
        this.openLocalConnection();
        Thread.sleep(1000L);
        this.producer.send((Message)this.remoteSession.createTextMessage("hello3"));
        msg = this.consumer.receive(30000L);
        Assert.assertNotNull((String)"expected hello3", (Object)msg);
        Assert.assertEquals((Object)"hello3", (Object)((TextMessage)msg).getText());
        Thread.sleep(1000L);
        log.info("\n\n==============================================\n\n");
        this.closeConnections();
        this.stopBrokers();
        Thread.sleep(1000L);
        log.info("\n\n##############################################\n\n");
        this.createLocalBroker();
        this.startLocalBroker();
        this.openLocalConnection();
        msg = this.consumer.receive(1000L);
        this.closeLocalConnection();
        this.stopLocalBroker();
        Assert.assertNull((Object)msg);
    }

    private void createBrokers() throws Exception {
        this.createLocalBroker();
        this.createRemoteBroker();
    }

    private void createLocalBroker() throws Exception {
        this.localBroker = new BrokerService();
        this.localBroker.setBrokerName("LOCAL");
        this.localBroker.setUseJmx(true);
        this.localBroker.setSchedulePeriodForDestinationPurge(5000);
        ManagementContext managementContext = new ManagementContext();
        managementContext.setCreateConnector(false);
        this.localBroker.setManagementContext(managementContext);
        PersistenceAdapter persistenceAdapter = this.persistanceAdapterFactory("target/local");
        this.localBroker.setPersistenceAdapter(persistenceAdapter);
        ArrayList<TransportConnector> transportConnectors = new ArrayList<TransportConnector>();
        DebugTransportFactory tf = new DebugTransportFactory();
        TransportServer transport = tf.doBind(URI.create("nio://127.0.0.1:23539"));
        TransportConnector transportConnector = new TransportConnector(transport);
        transportConnector.setName("tc");
        transportConnector.setAuditNetworkProducers(true);
        transportConnectors.add(transportConnector);
        this.localBroker.setTransportConnectors(transportConnectors);
    }

    private void createRemoteBroker() throws Exception {
        this.remoteBroker = new BrokerService();
        this.remoteBroker.setBrokerName("REMOTE");
        this.remoteBroker.setUseJmx(true);
        this.remoteBroker.setSchedulePeriodForDestinationPurge(5000);
        ManagementContext managementContext = new ManagementContext();
        managementContext.setCreateConnector(false);
        this.remoteBroker.setManagementContext(managementContext);
        PersistenceAdapter persistenceAdapter = this.persistanceAdapterFactory("target/remote");
        this.remoteBroker.setPersistenceAdapter(persistenceAdapter);
        ArrayList<DiscoveryNetworkConnector> networkConnectors = new ArrayList<DiscoveryNetworkConnector>();
        DiscoveryNetworkConnector networkConnector = new DiscoveryNetworkConnector();
        networkConnector.setName("to local");
        networkConnector.setUri(URI.create("static://(tcp://127.0.0.1:23539?wireFormat.maxInactivityDuration=0)"));
        networkConnector.setDuplex(true);
        networkConnector.setAlwaysSyncSend(true);
        networkConnector.setDecreaseNetworkConsumerPriority(false);
        networkConnector.setPrefetchSize(1);
        networkConnector.setCheckDuplicateMessagesOnDuplex(true);
        networkConnectors.add(networkConnector);
        this.remoteBroker.setNetworkConnectors(networkConnectors);
    }

    private void startBrokers() throws Exception {
        this.startLocalBroker();
        this.startRemoteBroker();
    }

    private void startLocalBroker() throws Exception {
        this.localBroker.start();
        this.localBroker.waitUntilStarted();
    }

    private void startRemoteBroker() throws Exception {
        this.remoteBroker.start();
        this.remoteBroker.waitUntilStarted();
    }

    private void openConnections() throws JMSException {
        this.openLocalConnection();
        this.openRemoteConnection();
    }

    private void openLocalConnection() throws JMSException {
        this.localFactory = new ActiveMQConnectionFactory(this.localBroker.getVmConnectorURI());
        this.localConnection = this.localFactory.createConnection();
        this.localConnection.start();
        this.localSession = this.localConnection.createSession(false, 1);
        this.consumer = this.localSession.createConsumer((Destination)this.localSession.createQueue("testqueue"));
    }

    private void openRemoteConnection() throws JMSException {
        this.remoteFactory = new ActiveMQConnectionFactory(this.remoteBroker.getVmConnectorURI());
        this.remoteConnection = this.remoteFactory.createConnection();
        this.remoteConnection.start();
        this.remoteSession = this.remoteConnection.createSession(false, 1);
        this.producer = this.remoteSession.createProducer((Destination)this.remoteSession.createQueue("testqueue"));
    }

    private void closeConnections() throws JMSException {
        this.closeLocalConnection();
        this.closeRemoteConnection();
    }

    private void closeLocalConnection() throws JMSException {
        this.localConnection.close();
    }

    private void closeRemoteConnection() throws JMSException {
        this.remoteConnection.close();
    }

    private void stopBrokers() throws Exception {
        this.stopRemoteBroker();
        this.stopLocalBroker();
    }

    private void stopLocalBroker() throws Exception {
        this.localBroker.stop();
        this.localBroker.waitUntilStopped();
    }

    private void stopRemoteBroker() throws Exception {
        this.remoteBroker.stop();
        this.remoteBroker.waitUntilStopped();
    }

    private PersistenceAdapter persistanceAdapterFactory(String path) {
        return this.persistanceAdapterFactory_KahaDB(path);
    }

    private PersistenceAdapter persistanceAdapterFactory_KahaDB(String path) {
        KahaDBPersistenceAdapter kahaDBPersistenceAdapter = new KahaDBPersistenceAdapter();
        kahaDBPersistenceAdapter.setDirectory(new File(path));
        kahaDBPersistenceAdapter.setIgnoreMissingJournalfiles(true);
        kahaDBPersistenceAdapter.setCheckForCorruptJournalFiles(true);
        kahaDBPersistenceAdapter.setChecksumJournalFiles(true);
        return kahaDBPersistenceAdapter;
    }

    private class DebugTransportFilter
    extends TransportFilter {
        boolean closeOnResponse;

        public DebugTransportFilter(Transport next) {
            super(next);
            this.closeOnResponse = false;
        }

        public void oneway(Object command) throws IOException {
            if (this.closeOnResponse && command instanceof Response) {
                this.closeOnResponse = false;
                log.warn("\n\nclosing connection before response is sent\n\n");
                try {
                    ((NIOTransport)this.next).stop();
                }
                catch (Exception ex) {
                    log.error("couldn't stop niotransport", (Throwable)ex);
                }
                return;
            }
            super.oneway(command);
        }
    }

    private class DebugTransportFactory
    extends NIOTransportFactory {
        private DebugTransportFactory() {
        }

        protected TcpTransportServer createTcpTransportServer(URI location, ServerSocketFactory serverSocketFactory) throws IOException, URISyntaxException {
            return new DebugTransportServer((TcpTransportFactory)this, location, serverSocketFactory);
        }
    }

    private class DebugTransportServer
    extends TcpTransportServer {
        public DebugTransportServer(TcpTransportFactory transportFactory, URI location, ServerSocketFactory serverSocketFactory) throws IOException, URISyntaxException {
            super(transportFactory, location, serverSocketFactory);
        }

        protected Transport createTransport(Socket socket, WireFormat format) throws IOException {
            NIOTransport transport = new NIOTransport(format, socket);
            CheckDuplicateMessagesOnDuplexTest.this.debugTransportFilter = new DebugTransportFilter((Transport)transport);
            return CheckDuplicateMessagesOnDuplexTest.this.debugTransportFilter;
        }
    }
}

