/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.transport.failover;

import jakarta.jms.Connection;
import jakarta.jms.Destination;
import jakarta.jms.Message;
import jakarta.jms.MessageConsumer;
import jakarta.jms.MessageProducer;
import jakarta.jms.Session;
import java.io.IOException;
import java.net.URI;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.JmsTestSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
import org.apache.activemq.transport.TransportListener;
import org.apache.activemq.util.SocketProxy;
import org.apache.activemq.util.URISupport;
import org.apache.activemq.util.Wait;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FailoverReadInactivityBlockWriteTimeoutClientTest
extends JmsTestSupport {
    private static final Logger LOG = LoggerFactory.getLogger(FailoverReadInactivityBlockWriteTimeoutClientTest.class);

    @Override
    protected BrokerService createBroker() throws Exception {
        BrokerService broker = new BrokerService();
        broker.setDeleteAllMessagesOnStartup(true);
        KahaDBPersistenceAdapter adapter = new KahaDBPersistenceAdapter();
        adapter.setConcurrentStoreAndDispatchQueues(false);
        broker.setPersistenceAdapter((PersistenceAdapter)adapter);
        broker.addConnector("tcp://localhost:0?wireFormat.maxInactivityDuration=0");
        return broker;
    }

    public void testBlockedFailoverSendWillReactToReadInactivityTimeout() throws Exception {
        final ActiveMQQueue dest = new ActiveMQQueue("testClientWriteTimeout");
        this.messageTextPrefix = this.initMessagePrefix(81920);
        URI tcpBrokerUri = URISupport.removeQuery((URI)((TransportConnector)this.broker.getTransportConnectors().get(0)).getConnectUri());
        LOG.info("consuming using uri: " + tcpBrokerUri);
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(tcpBrokerUri);
        Connection c = factory.createConnection();
        c.start();
        Session session = c.createSession(false, 1);
        MessageConsumer consumer = session.createConsumer((Destination)dest);
        SocketProxy proxy = new SocketProxy();
        proxy.setTarget(tcpBrokerUri);
        proxy.open();
        ActiveMQConnectionFactory pFactory = new ActiveMQConnectionFactory("failover:(" + proxy.getUrl() + "?wireFormat.maxInactivityDuration=5000&ignoreRemoteWireFormat=true)?jms.useAsyncSend=true&trackMessages=true&maxCacheSize=6638400");
        final ActiveMQConnection pc = (ActiveMQConnection)pFactory.createConnection();
        final AtomicInteger interruptCounter = new AtomicInteger(0);
        pc.addTransportListener(new TransportListener(){

            public void onCommand(Object command) {
            }

            public void onException(IOException error) {
                LOG.info("Got: " + error);
            }

            public void transportInterupted() {
                interruptCounter.incrementAndGet();
            }

            public void transportResumed() {
            }
        });
        pc.start();
        int messageCount = 200;
        final CountDownLatch sentOne = new CountDownLatch(1);
        ExecutorService executorService = Executors.newCachedThreadPool();
        executorService.execute(new Runnable(){

            @Override
            public void run() {
                try {
                    Session session = pc.createSession(false, 1);
                    MessageProducer producer = session.createProducer((Destination)dest);
                    for (int i = 0; i < 200; ++i) {
                        producer.send((Message)session.createTextMessage(FailoverReadInactivityBlockWriteTimeoutClientTest.this.messageTextPrefix + i));
                        sentOne.countDown();
                    }
                    producer.close();
                    session.close();
                    LOG.info("Done with send of: 200");
                }
                catch (Exception ignored) {
                    ignored.printStackTrace();
                }
            }
        });
        sentOne.await(5L, TimeUnit.SECONDS);
        proxy.pause();
        FailoverReadInactivityBlockWriteTimeoutClientTest.assertTrue((String)"Got interrupted", (boolean)Wait.waitFor((Wait.Condition)new Wait.Condition(){

            public boolean isSatisified() throws Exception {
                return interruptCounter.get() > 0;
            }
        }));
        proxy.goOn();
        for (int i = 0; i < 200; ++i) {
            FailoverReadInactivityBlockWriteTimeoutClientTest.assertNotNull((String)("Got message " + i + " after reconnect"), (Object)consumer.receive(5000L));
        }
        FailoverReadInactivityBlockWriteTimeoutClientTest.assertTrue((String)"no pending messages when done", (boolean)Wait.waitFor((Wait.Condition)new Wait.Condition(){

            public boolean isSatisified() throws Exception {
                LOG.info("current total message count: " + FailoverReadInactivityBlockWriteTimeoutClientTest.this.broker.getAdminView().getTotalMessageCount());
                return FailoverReadInactivityBlockWriteTimeoutClientTest.this.broker.getAdminView().getTotalMessageCount() == 0L;
            }
        }));
    }

    private String initMessagePrefix(int i) {
        byte[] content = new byte[i];
        return new String(content);
    }
}

