/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.artemis.core.server.federation;

import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQNonExistentQueueException;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.client.ClientConsumer;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.MessageHandler;
import org.apache.activemq.artemis.api.core.client.SessionFailureListener;
import org.apache.activemq.artemis.core.client.impl.ClientLargeMessageInternal;
import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryInternal;
import org.apache.activemq.artemis.core.client.impl.LargeMessageControllerImpl;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.LargeServerMessage;
import org.apache.activemq.artemis.core.server.federation.FederatedConsumerKey;
import org.apache.activemq.artemis.core.server.federation.FederatedQueueConsumer;
import org.apache.activemq.artemis.core.server.federation.Federation;
import org.apache.activemq.artemis.core.server.federation.FederationUpstream;
import org.apache.activemq.artemis.core.server.transformer.Transformer;
import org.jboss.logging.Logger;

public class FederatedQueueConsumerImpl
implements FederatedQueueConsumer,
SessionFailureListener {
    private static final Logger logger = Logger.getLogger(FederatedQueueConsumerImpl.class);
    private final ActiveMQServer server;
    private final Federation federation;
    private final FederatedConsumerKey key;
    private final Transformer transformer;
    private final FederationUpstream upstream;
    private final AtomicInteger count = new AtomicInteger();
    private final ScheduledExecutorService scheduledExecutorService;
    private final int intialConnectDelayMultiplier = 2;
    private final int intialConnectDelayMax = 30;
    private final ClientSessionCallback clientSessionCallback;
    private ClientSessionFactoryInternal clientSessionFactory;
    private ClientSession clientSession;
    private ClientConsumer clientConsumer;

    public FederatedQueueConsumerImpl(Federation federation, ActiveMQServer server, Transformer transformer, FederatedConsumerKey key, FederationUpstream upstream, ClientSessionCallback clientSessionCallback) {
        this.federation = federation;
        this.server = server;
        this.key = key;
        this.transformer = transformer;
        this.upstream = upstream;
        this.scheduledExecutorService = server.getScheduledPool();
        this.clientSessionCallback = clientSessionCallback;
    }

    @Override
    public FederationUpstream getFederationUpstream() {
        return this.upstream;
    }

    @Override
    public Federation getFederation() {
        return this.federation;
    }

    @Override
    public FederatedConsumerKey getKey() {
        return this.key;
    }

    @Override
    public ClientSession getClientSession() {
        return this.clientSession;
    }

    @Override
    public int incrementCount() {
        return this.count.incrementAndGet();
    }

    @Override
    public int decrementCount() {
        return this.count.decrementAndGet();
    }

    @Override
    public void start() {
        this.scheduleConnect(0);
    }

    private void scheduleConnect(int delay) {
        this.scheduledExecutorService.schedule(() -> {
            try {
                this.connect();
            }
            catch (Exception e) {
                this.scheduleConnect(FederatedQueueConsumer.getNextDelay(delay, 2, 30));
            }
        }, (long)delay, TimeUnit.SECONDS);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void connect() throws Exception {
        block10: {
            try {
                if (this.clientConsumer != null) break block10;
                FederatedQueueConsumerImpl federatedQueueConsumerImpl = this;
                synchronized (federatedQueueConsumerImpl) {
                    this.clientSessionFactory = (ClientSessionFactoryInternal)this.upstream.getConnection().clientSessionFactory();
                    this.clientSession = this.clientSessionFactory.createSession(this.upstream.getUser(), this.upstream.getPassword(), false, true, true, this.clientSessionFactory.getServerLocator().isPreAcknowledge(), this.clientSessionFactory.getServerLocator().getAckBatchSize());
                    this.clientSession.addFailureListener((SessionFailureListener)this);
                    this.clientSession.addMetaData("federation-name", this.federation.getName().toString());
                    this.clientSession.addMetaData("federation-upstream-name", this.upstream.getName().toString());
                    this.clientSession.start();
                    if (this.clientSessionCallback != null) {
                        this.clientSessionCallback.callback(this.clientSession);
                    }
                    if (!this.clientSession.queueQuery(this.key.getQueueName()).isExists()) {
                        throw new ActiveMQNonExistentQueueException("Queue " + this.key.getQueueName() + " does not exist on remote");
                    }
                    this.clientConsumer = this.clientSession.createConsumer(this.key.getQueueName(), this.key.getFilterString(), this.key.getPriority(), false);
                    this.clientConsumer.setMessageHandler((MessageHandler)this);
                }
            }
            catch (Exception e) {
                try {
                    if (this.clientSessionFactory != null) {
                        this.clientSessionFactory.cleanup();
                    }
                    this.disconnect();
                }
                catch (ActiveMQException activeMQException) {
                    // empty catch block
                }
                throw e;
            }
        }
    }

    @Override
    public void close() {
        this.scheduleDisconnect(0);
    }

    private void scheduleDisconnect(int delay) {
        this.scheduledExecutorService.schedule(() -> {
            try {
                this.disconnect();
            }
            catch (Exception exception) {
                // empty catch block
            }
        }, (long)delay, TimeUnit.SECONDS);
    }

    private void disconnect() throws ActiveMQException {
        if (this.clientConsumer != null) {
            this.clientConsumer.close();
        }
        if (this.clientSession != null) {
            this.clientSession.close();
        }
        this.clientConsumer = null;
        this.clientSession = null;
        if (!(this.clientSessionFactory == null || this.upstream.getConnection().isSharedConnection() && this.clientSessionFactory.numSessions() != 0)) {
            this.clientSessionFactory.close();
            this.clientSessionFactory = null;
        }
    }

    public void onMessage(ClientMessage clientMessage) {
        block12: {
            try {
                ClientMessage message = clientMessage;
                if (message instanceof ClientLargeMessageInternal) {
                    StorageManager storageManager = this.server.getStorageManager();
                    LargeServerMessage lsm = storageManager.createLargeMessage(storageManager.generateID(), (Message)message);
                    LargeMessageControllerImpl.LargeData largeData = null;
                    do {
                        largeData = ((ClientLargeMessageInternal)clientMessage).getLargeMessageController().take();
                        lsm.addBytes(largeData.getChunk());
                    } while (largeData.isContinues());
                    message = lsm.toMessage();
                    lsm.releaseResources(true, true);
                }
                if (this.server.hasBrokerFederationPlugins()) {
                    try {
                        this.server.callBrokerFederationPlugins(plugin -> plugin.beforeFederatedQueueConsumerMessageHandled(this, (Message)clientMessage));
                    }
                    catch (ActiveMQException t) {
                        ActiveMQServerLogger.LOGGER.federationPluginExecutionError(t, "beforeFederatedQueueConsumerMessageHandled");
                        throw new IllegalStateException(t.getMessage(), t.getCause());
                    }
                }
                Object object = message = this.transformer == null ? message : this.transformer.transform((Message)message);
                if (message != null) {
                    this.server.getPostOffice().route((Message)message, true);
                }
                clientMessage.acknowledge();
                if (!this.server.hasBrokerFederationPlugins()) break block12;
                try {
                    this.server.callBrokerFederationPlugins(plugin -> plugin.afterFederatedQueueConsumerMessageHandled(this, (Message)clientMessage));
                }
                catch (ActiveMQException t) {
                    ActiveMQServerLogger.LOGGER.federationPluginExecutionError(t, "afterFederatedQueueConsumerMessageHandled");
                    throw new IllegalStateException(t.getMessage(), t.getCause());
                }
            }
            catch (Exception e) {
                ActiveMQServerLogger.LOGGER.federationDispatchError(e, clientMessage.toString());
                try {
                    this.clientSession.rollback();
                }
                catch (ActiveMQException activeMQException) {
                    // empty catch block
                }
            }
        }
    }

    public void connectionFailed(ActiveMQException exception, boolean failedOver) {
        this.connectionFailed(exception, failedOver, null);
    }

    public void connectionFailed(ActiveMQException exception, boolean failedOver, String scaleDownTargetNodeID) {
        try {
            this.clientSessionFactory.cleanup();
            this.clientSessionFactory.close();
            this.clientConsumer = null;
            this.clientSession = null;
            this.clientSessionFactory = null;
        }
        catch (Throwable throwable) {
            // empty catch block
        }
        this.start();
    }

    public void beforeReconnect(ActiveMQException exception) {
    }

    public static interface ClientSessionCallback {
        public void callback(ClientSession var1) throws ActiveMQException;
    }
}

