/*
 * Decompiled with CFR 0.152.
 */
package org.wso2.carbon.inbound.endpoint.protocol.jms;

import java.util.Date;
import java.util.Properties;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.synapse.SynapseException;
import org.wso2.carbon.inbound.endpoint.protocol.jms.JMSConstants;
import org.wso2.carbon.inbound.endpoint.protocol.jms.JMSInjectHandler;
import org.wso2.carbon.inbound.endpoint.protocol.jms.JMSUtils;
import org.wso2.carbon.inbound.endpoint.protocol.jms.factory.CachedJMSConnectionFactory;

public class JMSPollingConsumer {
    private static final Log logger = LogFactory.getLog((String)JMSPollingConsumer.class.getName());
    private static final int DEFAULT_RETRY_ITERATION = 0;
    private static final int DEFAULT_RETRY_DURATION = 1000;
    private static final double RECONNECTION_PROGRESSION_FACTOR = 2.0;
    private static final long MAX_RECONNECTION_DURATION = 60000L;
    private static final int SCALE_FACTOR = 1000;
    private CachedJMSConnectionFactory jmsConnectionFactory;
    private JMSInjectHandler injectHandler;
    private long scanInterval;
    private Long lastRanTime;
    private String strUserName;
    private String strPassword;
    private Integer iReceiveTimeout;
    private String replyDestinationName;
    private String name;
    private Properties jmsProperties;
    private boolean isConnected;
    private Long reconnectDuration;
    private long retryDuration;
    private int retryIteration;
    private Connection connection = null;
    private Session session = null;
    private Destination destination = null;
    private MessageConsumer messageConsumer = null;
    private Destination replyDestination = null;
    private int currentNegativeCommitOrAckCount = 0;
    private boolean pollingSuspended = false;
    private int pollingSuspensionLimit = -1;
    private int pollingSuspensionPeriod = 60000;
    private boolean pollingSuspensionEnabled = false;
    private boolean resetConnectionAfterPollingSuspension = false;

    public JMSPollingConsumer(Properties jmsProperties, long scanInterval, String name) {
        String strReconnectDuration;
        String strReceiveTimeout;
        this.jmsConnectionFactory = new CachedJMSConnectionFactory(jmsProperties);
        this.strUserName = jmsProperties.getProperty("transport.jms.UserName");
        this.strPassword = jmsProperties.getProperty("transport.jms.Password");
        this.name = name;
        this.retryIteration = 0;
        this.retryDuration = 1000L;
        String pollingSuspensionLimitValue = jmsProperties.getProperty("transport.jms.RetriesBeforeSuspension");
        if (pollingSuspensionLimitValue != null) {
            try {
                this.pollingSuspensionLimit = Integer.parseInt(pollingSuspensionLimitValue);
                if (this.pollingSuspensionLimit > -1) {
                    this.pollingSuspensionEnabled = true;
                    if (logger.isDebugEnabled()) {
                        logger.debug((Object)("Polling suspension is enabled for Inbound Endpoint " + name));
                    }
                    if (this.pollingSuspensionLimit == 0) {
                        this.pollingSuspended = true;
                    }
                }
            }
            catch (NumberFormatException e) {
                throw new SynapseException("Invalid numeric value for transport.jms.RetriesBeforeSuspension. Inbound Endpoint " + name + " deployment failed.");
            }
            if (this.pollingSuspensionEnabled) {
                String pollingSuspensionPeriodValue = jmsProperties.getProperty("transport.jms.PollingSuspensionPeriod");
                if (pollingSuspensionPeriodValue != null) {
                    try {
                        this.pollingSuspensionPeriod = Integer.parseInt(pollingSuspensionPeriodValue);
                    }
                    catch (NumberFormatException e) {
                        logger.warn((Object)"Invalid numeric value for transport.jms.PollingSuspensionPeriod . Default value of 60000 milliseconds will be accounted.");
                    }
                } else {
                    logger.warn((Object)"No value specified for pollingSuspensionPeriod, hence default value of 60000 milliseconds will be accounted.");
                }
                String connectionResetAfterPollingSuspension = jmsProperties.getProperty("transport.jms.ResetConnectionOnPollingSuspension");
                if (connectionResetAfterPollingSuspension != null && !connectionResetAfterPollingSuspension.isEmpty() && connectionResetAfterPollingSuspension.trim().equals("true")) {
                    this.resetConnectionAfterPollingSuspension = true;
                }
            }
        }
        if ((strReceiveTimeout = jmsProperties.getProperty(JMSConstants.RECEIVER_TIMEOUT)) != null) {
            try {
                this.iReceiveTimeout = Integer.parseInt(strReceiveTimeout.trim());
            }
            catch (NumberFormatException e) {
                logger.warn((Object)("Invalid value for transport.jms.ReceiveTimeout : " + strReceiveTimeout));
                this.iReceiveTimeout = null;
            }
        }
        if ((strReconnectDuration = jmsProperties.getProperty("transport.jms.retry.duration")) != null) {
            try {
                this.reconnectDuration = Long.parseLong(strReconnectDuration.trim());
            }
            catch (NumberFormatException e) {
                logger.warn((Object)("Invalid value for transport.jms.retry.duration : " + strReconnectDuration));
                this.reconnectDuration = null;
            }
        }
        this.replyDestinationName = jmsProperties.getProperty("transport.jms.ReplyDestination");
        this.scanInterval = scanInterval;
        this.lastRanTime = null;
        this.jmsProperties = jmsProperties;
    }

    public void registerHandler(JMSInjectHandler injectHandler) {
        this.injectHandler = injectHandler;
    }

    public void execute() {
        try {
            logger.debug((Object)"Executing : JMS Inbound EP : ");
            if (this.pollingSuspensionLimit == 0) {
                logger.info((Object)"Polling is suspended permanently since \"transport.jms.RetriesBeforeSuspension\" is Zero.");
                return;
            }
            long currentTime = new Date().getTime();
            if (this.pollingSuspended) {
                if (this.lastRanTime + (long)this.pollingSuspensionPeriod <= currentTime) {
                    this.pollingSuspended = false;
                    logger.info((Object)("Polling re-started since the suspension period of " + this.pollingSuspensionPeriod + " milliseconds exceeded."));
                } else {
                    if (logger.isDebugEnabled()) {
                        logger.debug((Object)("Polling is suspended. Polling will be re-activated in " + ((long)this.pollingSuspensionPeriod - (currentTime - this.lastRanTime)) + " milliseconds."));
                    }
                    return;
                }
            }
            if (this.lastRanTime == null || this.lastRanTime + this.scanInterval <= currentTime) {
                this.lastRanTime = currentTime;
                this.poll();
            } else if (logger.isDebugEnabled()) {
                logger.debug((Object)"Skip cycle since concurrent rate is higher than the scan interval : JMS Inbound EP ");
            }
            if (logger.isDebugEnabled()) {
                logger.debug((Object)"End : JMS Inbound EP : ");
            }
        }
        catch (Exception e) {
            logger.error((Object)("Error while retrieving or injecting JMS message. " + e.getMessage()), (Throwable)e);
        }
    }

    private void resetConnection() {
        logger.info((Object)"Resetting the JMS connection.");
        this.destroy();
        this.jmsConnectionFactory.createConnection(this.strUserName, this.strPassword);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public Message poll() {
        logger.debug((Object)"Polling JMS messages.");
        try {
            this.connection = this.jmsConnectionFactory.getConnection(this.strUserName, this.strPassword);
            if (this.connection == null) {
                logger.warn((Object)"Inbound JMS endpoint unable to get a connection.");
                this.isConnected = false;
                Message message = null;
                return message;
            }
            if (this.retryIteration != 0) {
                logger.info((Object)("Reconnection attempt: " + this.retryIteration + " for the JMS Inbound: " + this.name + " was successful!"));
                this.retryIteration = 0;
                this.retryDuration = 1000L;
            }
            this.isConnected = true;
            this.session = this.jmsConnectionFactory.getSession(this.connection);
            if (this.session == null) {
                logger.warn((Object)"Inbound JMS endpoint unable to get a session.");
                this.jmsConnectionFactory.closeConnection();
                Message message = null;
                return message;
            }
            this.destination = this.jmsConnectionFactory.getDestination(this.session);
            if (this.replyDestinationName != null && !this.replyDestinationName.trim().equals("")) {
                if (logger.isDebugEnabled()) {
                    logger.debug((Object)("Using the reply destination as " + this.replyDestinationName + " in inbound endpoint."));
                }
                this.replyDestination = this.jmsConnectionFactory.createDestination(this.session, this.replyDestinationName);
            }
            this.messageConsumer = this.jmsConnectionFactory.getMessageConsumer(this.session, this.destination);
            if (this.messageConsumer == null) {
                logger.debug((Object)"Inbound JMS Endpoint. No JMS consumer initialized. No JMS message received.");
                if (this.session != null) {
                    this.jmsConnectionFactory.closeSession(this.session, true);
                }
                if (this.connection != null) {
                    this.jmsConnectionFactory.closeConnection(this.connection, true);
                }
                Message message = null;
                return message;
            }
            Message msg = this.receiveMessage(this.messageConsumer);
            if (msg == null) {
                logger.debug((Object)"Inbound JMS Endpoint. No JMS message received.");
                Message ignore = null;
                return ignore;
            }
            while (msg != null) {
                if (JMSUtils.inferJMSMessageType(msg) == null) {
                    logger.error((Object)"Invalid JMS Message type.");
                    Message ignore = null;
                    return ignore;
                }
                if (this.injectHandler != null) {
                    boolean commitOrAck = true;
                    if (this.replyDestination != null) {
                        this.injectHandler.setReplyDestination(this.replyDestination);
                    }
                    this.injectHandler.setConnection(this.connection);
                    commitOrAck = this.injectHandler.invoke(msg, this.name);
                    if (this.jmsConnectionFactory.getSessionAckMode() == 2) {
                        if (commitOrAck) {
                            try {
                                msg.acknowledge();
                                if (logger.isDebugEnabled()) {
                                    logger.debug((Object)("Message : " + msg.getJMSMessageID() + " acknowledged"));
                                }
                            }
                            catch (JMSException e) {
                                logger.error((Object)("Error acknowledging message : " + msg.getJMSMessageID()), (Throwable)e);
                            }
                        } else {
                            if (!this.jmsConnectionFactory.isTransactedSession()) {
                                this.jmsConnectionFactory.recoverSession(this.session, false);
                            }
                            if (this.messageConsumer != null) {
                                this.jmsConnectionFactory.closeConsumer(this.messageConsumer);
                            }
                            if (this.session != null) {
                                this.jmsConnectionFactory.closeSession(this.session);
                            }
                            this.session = this.jmsConnectionFactory.getSession(this.connection);
                            this.messageConsumer = this.jmsConnectionFactory.getMessageConsumer(this.session, this.destination);
                        }
                    }
                    if (this.jmsConnectionFactory.isTransactedSession()) {
                        try {
                            if (this.session.getTransacted()) {
                                if (commitOrAck) {
                                    this.session.commit();
                                    if (logger.isDebugEnabled()) {
                                        logger.debug((Object)("Session for message : " + msg.getJMSMessageID() + " committed"));
                                    }
                                } else {
                                    this.session.rollback();
                                    if (logger.isDebugEnabled()) {
                                        logger.debug((Object)("Session for message : " + msg.getJMSMessageID() + " rolled back"));
                                    }
                                }
                            }
                        }
                        catch (JMSException e) {
                            logger.error((Object)("Error " + (commitOrAck ? "committing" : "rolling back") + " local session txn for message : " + msg.getJMSMessageID()), (Throwable)e);
                        }
                    }
                    if (this.pollingSuspensionEnabled) {
                        if (!commitOrAck) {
                            ++this.currentNegativeCommitOrAckCount;
                            if (this.currentNegativeCommitOrAckCount >= this.pollingSuspensionLimit) {
                                this.pollingSuspended = true;
                                this.currentNegativeCommitOrAckCount = 0;
                                logger.info((Object)("Suspending polling as the pollingSuspensionLimit of " + this.pollingSuspensionLimit + " reached. Polling will be re-started after " + this.pollingSuspensionPeriod + " milliseconds"));
                                if (this.resetConnectionAfterPollingSuspension) {
                                    this.resetConnection();
                                }
                                break;
                            }
                        } else {
                            this.currentNegativeCommitOrAckCount = 0;
                        }
                    }
                } else {
                    Message message = msg;
                    return message;
                }
                msg = this.receiveMessage(this.messageConsumer);
            }
        }
        catch (JMSException e) {
            logger.error((Object)("Error while receiving JMS message for " + this.name), (Throwable)e);
            this.releaseResources(true);
        }
        catch (Exception e) {
            logger.error((Object)("Error while receiving JMS message for " + this.name), (Throwable)e);
        }
        finally {
            if (!this.isConnected) {
                if (this.reconnectDuration != null) {
                    this.retryDuration = this.reconnectDuration;
                    logger.error((Object)("Reconnection attempt : " + this.retryIteration++ + " for JMS Inbound : " + this.name + " failed. Next retry in " + this.retryDuration / 1000L + " seconds. (Fixed Interval)"));
                } else {
                    this.retryDuration = (long)((double)this.retryDuration * 2.0);
                    if (this.retryDuration > 60000L) {
                        this.retryDuration = 60000L;
                        logger.info((Object)"InitialReconnectDuration reached to MaxReconnectDuration.");
                    }
                    logger.error((Object)("Reconnection attempt : " + this.retryIteration++ + " for JMS Inbound : " + this.name + " failed. Next retry in " + this.retryDuration / 1000L + " seconds"));
                }
                try {
                    Thread.sleep(this.retryDuration);
                }
                catch (InterruptedException ignore) {
                    Thread.currentThread().interrupt();
                }
            }
            this.releaseResources(false);
        }
        return null;
    }

    private void releaseResources(boolean forcefullyClose) {
        if (this.messageConsumer != null) {
            this.jmsConnectionFactory.closeConsumer(this.messageConsumer, forcefullyClose);
        }
        if (this.session != null) {
            this.jmsConnectionFactory.closeSession(this.session, forcefullyClose);
        }
        if (this.connection != null) {
            this.jmsConnectionFactory.closeConnection(this.connection, forcefullyClose);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void destroy() {
        CachedJMSConnectionFactory cachedJMSConnectionFactory = this.jmsConnectionFactory;
        synchronized (cachedJMSConnectionFactory) {
            if (this.messageConsumer != null) {
                this.jmsConnectionFactory.closeConsumer(this.messageConsumer, true);
            }
            if (this.session != null) {
                this.jmsConnectionFactory.closeSession(this.session, true);
            }
            if (this.connection != null) {
                this.jmsConnectionFactory.closeConnection(this.connection, true);
            }
        }
    }

    private Message receiveMessage(MessageConsumer messageConsumer) throws JMSException {
        Message msg = null;
        msg = this.iReceiveTimeout == null ? messageConsumer.receive(1L) : (this.iReceiveTimeout > 0 ? messageConsumer.receive((long)this.iReceiveTimeout.intValue()) : messageConsumer.receive());
        return msg;
    }

    protected Properties getInboundProperites() {
        return this.jmsProperties;
    }
}

