/*
 * Decompiled with CFR 0.152.
 */
package com.microsoft.azure.sdk.iot.device.transport;

import com.microsoft.azure.sdk.iot.device.DeviceClientConfig;
import com.microsoft.azure.sdk.iot.device.IotHubClientProtocol;
import com.microsoft.azure.sdk.iot.device.IotHubConnectionState;
import com.microsoft.azure.sdk.iot.device.IotHubConnectionStateCallback;
import com.microsoft.azure.sdk.iot.device.IotHubConnectionStatusChangeCallback;
import com.microsoft.azure.sdk.iot.device.IotHubConnectionStatusChangeReason;
import com.microsoft.azure.sdk.iot.device.IotHubEventCallback;
import com.microsoft.azure.sdk.iot.device.IotHubMessageResult;
import com.microsoft.azure.sdk.iot.device.IotHubStatusCode;
import com.microsoft.azure.sdk.iot.device.Message;
import com.microsoft.azure.sdk.iot.device.MessageCallback;
import com.microsoft.azure.sdk.iot.device.exceptions.DeviceClientException;
import com.microsoft.azure.sdk.iot.device.exceptions.DeviceOperationTimeoutException;
import com.microsoft.azure.sdk.iot.device.exceptions.IotHubServiceException;
import com.microsoft.azure.sdk.iot.device.exceptions.TransportException;
import com.microsoft.azure.sdk.iot.device.exceptions.UnauthorizedException;
import com.microsoft.azure.sdk.iot.device.transport.IotHubConnectionStatus;
import com.microsoft.azure.sdk.iot.device.transport.IotHubListener;
import com.microsoft.azure.sdk.iot.device.transport.IotHubTransportConnection;
import com.microsoft.azure.sdk.iot.device.transport.IotHubTransportMessage;
import com.microsoft.azure.sdk.iot.device.transport.IotHubTransportPacket;
import com.microsoft.azure.sdk.iot.device.transport.RetryDecision;
import com.microsoft.azure.sdk.iot.device.transport.RetryPolicy;
import com.microsoft.azure.sdk.iot.device.transport.amqps.AmqpsIotHubConnection;
import com.microsoft.azure.sdk.iot.device.transport.amqps.exceptions.AmqpConnectionThrottledException;
import com.microsoft.azure.sdk.iot.device.transport.amqps.exceptions.AmqpUnauthorizedAccessException;
import com.microsoft.azure.sdk.iot.device.transport.https.HttpsIotHubConnection;
import com.microsoft.azure.sdk.iot.device.transport.mqtt.MqttIotHubConnection;
import com.microsoft.azure.sdk.iot.device.transport.mqtt.exceptions.MqttUnauthorizedException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class IotHubTransport
implements IotHubListener {
    private static final Logger log = LoggerFactory.getLogger(IotHubTransport.class);
    private static final int MAX_MESSAGES_TO_SEND_PER_THREAD = 10;
    private volatile IotHubConnectionStatus connectionStatus;
    private IotHubTransportConnection iotHubTransportConnection;
    private final Queue<IotHubTransportPacket> waitingPacketsQueue = new ConcurrentLinkedQueue<IotHubTransportPacket>();
    private final Map<String, IotHubTransportPacket> inProgressPackets = new ConcurrentHashMap<String, IotHubTransportPacket>();
    private final Queue<IotHubTransportMessage> receivedMessagesQueue = new ConcurrentLinkedQueue<IotHubTransportMessage>();
    private final Queue<IotHubTransportPacket> callbackPacketsQueue = new ConcurrentLinkedQueue<IotHubTransportPacket>();
    private IotHubConnectionStateCallback stateCallback;
    private Object stateCallbackContext;
    private IotHubConnectionStatusChangeCallback connectionStatusChangeCallback;
    private Object connectionStatusChangeCallbackContext;
    private IotHubConnectionStatusChangeCallback deviceIOConnectionStatusChangeCallback;
    private final Object inProgressMessagesLock = new Object();
    private DeviceClientConfig defaultConfig;
    private Queue<DeviceClientConfig> deviceClientConfigs;
    private int currentReconnectionAttempt;
    private long reconnectionAttemptStartTimeMillis;
    private ScheduledExecutorService taskScheduler;
    private final Object reconnectionLock = new Object();
    private ScheduledExecutorService scheduledExecutorService;
    private static final int POOL_SIZE = 1;
    private final Object sendThreadLock = new Object();
    private final Object receiveThreadLock = new Object();

    public IotHubTransport(DeviceClientConfig defaultConfig, IotHubConnectionStatusChangeCallback deviceIOConnectionStatusChangeCallback) throws IllegalArgumentException {
        if (defaultConfig == null) {
            throw new IllegalArgumentException("Config cannot be null");
        }
        this.defaultConfig = defaultConfig;
        this.connectionStatus = IotHubConnectionStatus.DISCONNECTED;
        this.currentReconnectionAttempt = 0;
        this.deviceIOConnectionStatusChangeCallback = deviceIOConnectionStatusChangeCallback;
    }

    public Object getSendThreadLock() {
        return this.sendThreadLock;
    }

    public Object getReceiveThreadLock() {
        return this.receiveThreadLock;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public boolean hasMessagesToSend() {
        Object object = this.sendThreadLock;
        synchronized (object) {
            return this.waitingPacketsQueue.size() > 0;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public boolean hasReceivedMessagesToHandle() {
        Object object = this.receiveThreadLock;
        synchronized (object) {
            return this.receivedMessagesQueue.size() > 0;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public boolean hasCallbacksToExecute() {
        Object object = this.sendThreadLock;
        synchronized (object) {
            return this.callbackPacketsQueue.size() > 0;
        }
    }

    public boolean isClosed() {
        return this.connectionStatus == IotHubConnectionStatus.DISCONNECTED;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void onMessageSent(Message message, Throwable e) {
        if (message == null) {
            log.warn("onMessageSent called with null message");
            return;
        }
        log.debug("IotHub message was acknowledged. Checking if there is record of sending this message ({})", (Object)message);
        IotHubTransportPacket packet = null;
        Object object = this.inProgressMessagesLock;
        synchronized (object) {
            packet = this.inProgressPackets.remove(message.getMessageId());
        }
        if (packet != null) {
            if (e == null) {
                log.trace("Message was sent by this client, adding it to callbacks queue with OK_EMPTY ({})", (Object)message);
                packet.setStatus(IotHubStatusCode.OK_EMPTY);
                this.addToCallbackQueue(packet);
            } else if (e instanceof TransportException) {
                this.handleMessageException(packet, (TransportException)e);
            } else {
                this.handleMessageException(packet, new TransportException(e));
            }
        } else if (message != null) {
            log.warn("A message was acknowledged by IoT Hub, but this client has no record of sending it ({})", (Object)message);
        }
    }

    @Override
    public void onMessageReceived(IotHubTransportMessage message, Throwable e) {
        if (message != null && e != null) {
            log.error("Exception encountered while receiving a message from service {}", (Object)message, (Object)e);
        } else if (message != null) {
            log.info("Message was received from IotHub ({})", (Object)message);
            this.addToReceivedMessagesQueue(message);
        } else {
            log.error("Exception encountered while receiving messages from service", e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void onConnectionLost(Throwable e, String connectionId) {
        Object object = this.reconnectionLock;
        synchronized (object) {
            if (!connectionId.equals(this.iotHubTransportConnection.getConnectionId())) {
                log.trace("OnConnectionLost was fired, but for an outdated connection. Ignoring...");
                return;
            }
            if (this.connectionStatus != IotHubConnectionStatus.CONNECTED) {
                log.trace("OnConnectionLost was fired, but connection is already disocnnected. Ignoring...");
                return;
            }
            if (e instanceof TransportException) {
                this.handleDisconnection((TransportException)e);
            } else {
                this.handleDisconnection(new TransportException(e));
            }
        }
    }

    @Override
    public void onConnectionEstablished(String connectionId) {
        if (connectionId.equals(this.iotHubTransportConnection.getConnectionId())) {
            log.info("The connection to the IoT Hub has been established");
            this.updateStatus(IotHubConnectionStatus.CONNECTED, IotHubConnectionStatusChangeReason.CONNECTION_OK, null);
        }
    }

    public void open(Collection<DeviceClientConfig> deviceClientConfigs) throws DeviceClientException {
        if (deviceClientConfigs == null || deviceClientConfigs.isEmpty()) {
            throw new IllegalArgumentException("deviceClientConfigs cannot be null or empty");
        }
        if (this.connectionStatus == IotHubConnectionStatus.CONNECTED) {
            return;
        }
        if (this.connectionStatus == IotHubConnectionStatus.DISCONNECTED_RETRYING) {
            throw new TransportException("Open cannot be called while transport is reconnecting");
        }
        if (this.isSasTokenExpired()) {
            throw new SecurityException("Your sas token has expired");
        }
        this.deviceClientConfigs = new LinkedBlockingQueue<DeviceClientConfig>(deviceClientConfigs);
        this.defaultConfig = this.deviceClientConfigs.peek();
        this.taskScheduler = Executors.newScheduledThreadPool(1);
        this.openConnection();
        log.info("Client connection opened successfully");
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void close(IotHubConnectionStatusChangeReason reason, Throwable cause) throws DeviceClientException {
        if (reason == null) {
            throw new IllegalArgumentException("reason cannot be null");
        }
        this.cancelPendingPackets();
        this.invokeCallbacks();
        if (this.taskScheduler != null) {
            this.taskScheduler.shutdown();
        }
        if (this.scheduledExecutorService != null) {
            this.scheduledExecutorService.shutdownNow();
            this.scheduledExecutorService = null;
        }
        if (this.iotHubTransportConnection != null) {
            this.iotHubTransportConnection.close();
        }
        this.updateStatus(IotHubConnectionStatus.DISCONNECTED, reason, cause);
        Object object = this.sendThreadLock;
        synchronized (object) {
            this.sendThreadLock.notifyAll();
        }
        object = this.receiveThreadLock;
        synchronized (object) {
            this.receiveThreadLock.notifyAll();
        }
        log.info("Client connection closed successfully");
    }

    public void addMessage(Message message, IotHubEventCallback callback, Object callbackContext) {
        if (this.connectionStatus == IotHubConnectionStatus.DISCONNECTED) {
            throw new IllegalStateException("Cannot add a message when the transport is closed.");
        }
        IotHubTransportPacket packet = new IotHubTransportPacket(message, callback, callbackContext, null, System.currentTimeMillis());
        this.addToWaitingQueue(packet);
        log.info("Message was queued to be sent later ({})", (Object)message);
    }

    public IotHubClientProtocol getProtocol() {
        return this.defaultConfig.getProtocol();
    }

    public void sendMessages() {
        this.checkForExpiredMessages();
        if (this.connectionStatus == IotHubConnectionStatus.DISCONNECTED || this.connectionStatus == IotHubConnectionStatus.DISCONNECTED_RETRYING) {
            return;
        }
        int timeSlice = 10;
        while (this.connectionStatus == IotHubConnectionStatus.CONNECTED && timeSlice-- > 0) {
            IotHubTransportPacket packet = this.waitingPacketsQueue.poll();
            if (packet == null) continue;
            Message message = packet.getMessage();
            log.trace("Dequeued a message from waiting queue to be sent ({})", (Object)message);
            if (message == null || !this.isMessageValid(packet)) continue;
            this.sendPacket(packet);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void checkForExpiredMessages() {
        IotHubTransportPacket packet = this.waitingPacketsQueue.poll();
        LinkedBlockingQueue<IotHubTransportPacket> packetsToAddBackIntoWaitingPacketsQueue = new LinkedBlockingQueue<IotHubTransportPacket>();
        while (packet != null) {
            if (packet.getMessage().isExpired()) {
                packet.setStatus(IotHubStatusCode.MESSAGE_EXPIRED);
                this.addToCallbackQueue(packet);
            } else {
                packetsToAddBackIntoWaitingPacketsQueue.add(packet);
            }
            packet = this.waitingPacketsQueue.poll();
        }
        this.waitingPacketsQueue.addAll(packetsToAddBackIntoWaitingPacketsQueue);
        Object object = this.inProgressMessagesLock;
        synchronized (object) {
            ArrayList<String> expiredPacketMessageIds = new ArrayList<String>();
            for (String messageId : this.inProgressPackets.keySet()) {
                if (!this.inProgressPackets.get(messageId).getMessage().isExpired()) continue;
                expiredPacketMessageIds.add(messageId);
            }
            for (String messageId : expiredPacketMessageIds) {
                IotHubTransportPacket expiredPacket = this.inProgressPackets.remove(messageId);
                expiredPacket.setStatus(IotHubStatusCode.MESSAGE_EXPIRED);
                this.addToCallbackQueue(expiredPacket);
            }
        }
    }

    public void invokeCallbacks() {
        IotHubTransportPacket packet = this.callbackPacketsQueue.poll();
        while (packet != null) {
            IotHubStatusCode status = packet.getStatus();
            IotHubEventCallback callback = packet.getCallback();
            Object context = packet.getContext();
            log.info("Invoking the callback function for sent message, IoT Hub responded to message ({}) with status {}", (Object)packet.getMessage(), (Object)status);
            callback.execute(status, context);
            packet = this.callbackPacketsQueue.poll();
        }
    }

    public void handleMessage() throws DeviceClientException {
        if (this.connectionStatus == IotHubConnectionStatus.CONNECTED) {
            IotHubTransportMessage receivedMessage;
            if (this.iotHubTransportConnection instanceof HttpsIotHubConnection) {
                log.trace("Sending http request to check for any cloud to device messages...");
                this.addReceivedMessagesOverHttpToReceivedQueue();
            }
            if ((receivedMessage = this.receivedMessagesQueue.poll()) != null) {
                this.acknowledgeReceivedMessage(receivedMessage);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public boolean isEmpty() {
        Object object = this.inProgressMessagesLock;
        synchronized (object) {
            return this.waitingPacketsQueue.isEmpty() && this.inProgressPackets.size() == 0 && this.callbackPacketsQueue.isEmpty();
        }
    }

    public void registerConnectionStateCallback(IotHubConnectionStateCallback callback, Object callbackContext) {
        if (callback == null) {
            throw new IllegalArgumentException("Callback cannot be null");
        }
        this.stateCallback = callback;
        this.stateCallbackContext = callbackContext;
    }

    public void registerConnectionStatusChangeCallback(IotHubConnectionStatusChangeCallback callback, Object callbackContext) {
        if (callbackContext != null && callback == null) {
            throw new IllegalArgumentException("Callback cannot be null if callback context is null");
        }
        this.connectionStatusChangeCallback = callback;
        this.connectionStatusChangeCallbackContext = callbackContext;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void cancelPendingPackets() {
        IotHubTransportPacket packet = this.waitingPacketsQueue.poll();
        while (packet != null) {
            packet.setStatus(IotHubStatusCode.MESSAGE_CANCELLED_ONCLOSE);
            this.addToCallbackQueue(packet);
            packet = this.waitingPacketsQueue.poll();
        }
        Object object = this.inProgressMessagesLock;
        synchronized (object) {
            for (Map.Entry<String, IotHubTransportPacket> packetEntry : this.inProgressPackets.entrySet()) {
                IotHubTransportPacket inProgressPacket = packetEntry.getValue();
                inProgressPacket.setStatus(IotHubStatusCode.MESSAGE_CANCELLED_ONCLOSE);
                this.addToCallbackQueue(inProgressPacket);
            }
            this.inProgressPackets.clear();
        }
    }

    private void acknowledgeReceivedMessage(IotHubTransportMessage receivedMessage) throws TransportException {
        MessageCallback messageCallback = receivedMessage.getMessageCallback();
        Object messageCallbackContext = receivedMessage.getMessageCallbackContext();
        if (messageCallback != null) {
            log.debug("Executing callback for received message ({})", (Object)receivedMessage);
            IotHubMessageResult result = messageCallback.execute(receivedMessage, messageCallbackContext);
            try {
                log.debug("Sending acknowledgement for received cloud to device message ({})", (Object)receivedMessage);
                this.iotHubTransportConnection.sendMessageResult(receivedMessage, result);
            }
            catch (TransportException e) {
                log.warn("Sending acknowledgement for received cloud to device message failed, adding it back to the queue ({})", (Object)receivedMessage, (Object)e);
                this.addToReceivedMessagesQueue(receivedMessage);
                throw e;
            }
        }
    }

    private void addReceivedMessagesOverHttpToReceivedQueue() throws TransportException {
        IotHubTransportMessage transportMessage = ((HttpsIotHubConnection)this.iotHubTransportConnection).receiveMessage();
        if (transportMessage != null) {
            log.info("Message was received from IotHub ({})", (Object)transportMessage);
            this.addToReceivedMessagesQueue(transportMessage);
        }
    }

    private IotHubConnectionStatusChangeReason exceptionToStatusChangeReason(Throwable e) {
        if (e instanceof TransportException) {
            TransportException transportException = (TransportException)e;
            if (transportException.isRetryable()) {
                log.debug("Mapping throwable to NO_NETWORK because it was a retryable exception", e);
                return IotHubConnectionStatusChangeReason.NO_NETWORK;
            }
            if (this.isSasTokenExpired()) {
                log.debug("Mapping throwable to EXPIRED_SAS_TOKEN because it was a non-retryable exception and the saved sas token has expired", e);
                return IotHubConnectionStatusChangeReason.EXPIRED_SAS_TOKEN;
            }
            if (e instanceof UnauthorizedException || e instanceof MqttUnauthorizedException || e instanceof AmqpUnauthorizedAccessException) {
                log.debug("Mapping throwable to BAD_CREDENTIAL because it was a non-retryable exception authorization exception but the saved sas token has not expired yet", e);
                return IotHubConnectionStatusChangeReason.BAD_CREDENTIAL;
            }
        }
        log.debug("Mapping exception throwable to COMMUNICATION_ERROR because the sdk was unable to classify the thrown exception to anything other category", e);
        return IotHubConnectionStatusChangeReason.COMMUNICATION_ERROR;
    }

    private void openConnection() throws TransportException {
        this.scheduledExecutorService = Executors.newScheduledThreadPool(1);
        if (this.iotHubTransportConnection == null) {
            switch (this.defaultConfig.getProtocol()) {
                case HTTPS: {
                    this.iotHubTransportConnection = new HttpsIotHubConnection(this.defaultConfig);
                    break;
                }
                case MQTT: 
                case MQTT_WS: {
                    this.iotHubTransportConnection = new MqttIotHubConnection(this.defaultConfig);
                    break;
                }
                case AMQPS: 
                case AMQPS_WS: {
                    this.iotHubTransportConnection = new AmqpsIotHubConnection(this.defaultConfig);
                    break;
                }
                default: {
                    throw new TransportException("Protocol not supported");
                }
            }
        }
        this.iotHubTransportConnection.setListener(this);
        this.iotHubTransportConnection.open(this.deviceClientConfigs, this.scheduledExecutorService);
        this.updateStatus(IotHubConnectionStatus.CONNECTED, IotHubConnectionStatusChangeReason.CONNECTION_OK, null);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void handleDisconnection(TransportException transportException) {
        log.info("Handling a disconnection event", (Throwable)transportException);
        Object object = this.inProgressMessagesLock;
        synchronized (object) {
            log.trace("Due to disconnection event, clearing active queues, and re-queueing them to waiting queues to be re-processed later upon reconnection");
            for (IotHubTransportPacket packetToRequeue : this.inProgressPackets.values()) {
                this.addToWaitingQueue(packetToRequeue);
            }
            this.inProgressPackets.clear();
        }
        this.updateStatus(IotHubConnectionStatus.DISCONNECTED_RETRYING, this.exceptionToStatusChangeReason(transportException), transportException);
        this.checkForUnauthorizedException(transportException);
        log.debug("Starting reconnection logic");
        this.reconnect(transportException);
    }

    private void reconnect(TransportException transportException) {
        if (this.reconnectionAttemptStartTimeMillis == 0L) {
            this.reconnectionAttemptStartTimeMillis = System.currentTimeMillis();
        }
        boolean hasReconnectOperationTimedOut = this.hasOperationTimedOut(this.reconnectionAttemptStartTimeMillis);
        RetryDecision retryDecision = null;
        while (this.connectionStatus == IotHubConnectionStatus.DISCONNECTED_RETRYING && !hasReconnectOperationTimedOut && transportException != null && transportException.isRetryable()) {
            log.trace("Attempting reconnect attempt {}", (Object)this.currentReconnectionAttempt);
            ++this.currentReconnectionAttempt;
            RetryPolicy retryPolicy = this.defaultConfig.getRetryPolicy();
            retryDecision = retryPolicy.getRetryDecision(this.currentReconnectionAttempt, transportException);
            if (!retryDecision.shouldRetry()) break;
            log.trace("Sleeping between reconnect attempts");
            IotHubTransport.sleepUninterruptibly(retryDecision.getDuration(), TimeUnit.MILLISECONDS);
            hasReconnectOperationTimedOut = this.hasOperationTimedOut(this.reconnectionAttemptStartTimeMillis);
            transportException = this.singleReconnectAttempt();
        }
        try {
            if (retryDecision != null && !retryDecision.shouldRetry()) {
                log.debug("Reconnection was abandoned due to the retry policy");
                this.close(IotHubConnectionStatusChangeReason.RETRY_EXPIRED, transportException);
            } else if (this.hasOperationTimedOut(this.reconnectionAttemptStartTimeMillis)) {
                log.debug("Reconnection was abandoned due to the operation timeout");
                this.close(IotHubConnectionStatusChangeReason.RETRY_EXPIRED, new DeviceOperationTimeoutException("Device operation for reconnection timed out"));
            } else if (transportException != null && !transportException.isRetryable()) {
                log.error("Reconnection was abandoned due to encountering a non-retryable exception", (Throwable)transportException);
                this.close(this.exceptionToStatusChangeReason(transportException), transportException);
            }
        }
        catch (DeviceClientException ex) {
            log.error("Encountered an exception while closing the client object, client instance should no longer be used as the state is unknown", (Throwable)ex);
            this.updateStatus(IotHubConnectionStatus.DISCONNECTED, IotHubConnectionStatusChangeReason.COMMUNICATION_ERROR, transportException);
        }
    }

    private TransportException singleReconnectAttempt() {
        try {
            log.trace("Attempting to close and re-open the iot hub transport connection...");
            this.iotHubTransportConnection.close();
            this.openConnection();
            log.trace("Successfully closed and re-opened the iot hub transport connection");
        }
        catch (TransportException newTransportException) {
            this.checkForUnauthorizedException(newTransportException);
            log.warn("Failed to close and re-open the iot hub transport connection, checking if another retry attempt should be made", (Throwable)newTransportException);
            return newTransportException;
        }
        return null;
    }

    private void handleMessageException(IotHubTransportPacket packet, TransportException transportException) {
        IotHubStatusCode errorCode;
        log.warn("Handling an exception from sending message: Attempt number {}", (Object)packet.getCurrentRetryAttempt(), (Object)transportException);
        packet.incrementRetryAttempt();
        if (!this.hasOperationTimedOut(packet.getStartTimeMillis())) {
            if (transportException.isRetryable()) {
                RetryDecision retryDecision = this.defaultConfig.getRetryPolicy().getRetryDecision(packet.getCurrentRetryAttempt(), transportException);
                if (retryDecision.shouldRetry()) {
                    this.taskScheduler.schedule(new MessageRetryRunnable(this.waitingPacketsQueue, packet, this), retryDecision.getDuration(), TimeUnit.MILLISECONDS);
                    return;
                }
                log.warn("Retry policy dictated that the message should be abandoned, so it has been abandoned ({})", (Object)packet.getMessage(), (Object)transportException);
            } else {
                log.warn("Encountering an non-retryable exception while sending a message, so it has been abandoned ({})", (Object)packet.getMessage(), (Object)transportException);
            }
        } else {
            log.warn("The device operation timeout has been exceeded for the message, so it has been abandoned ({})", (Object)packet.getMessage(), (Object)transportException);
        }
        IotHubStatusCode iotHubStatusCode = errorCode = transportException instanceof IotHubServiceException ? ((IotHubServiceException)transportException).getStatusCode() : IotHubStatusCode.ERROR;
        if (transportException instanceof AmqpConnectionThrottledException) {
            errorCode = IotHubStatusCode.THROTTLED;
        }
        packet.setStatus(errorCode);
        this.addToCallbackQueue(packet);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void sendPacket(IotHubTransportPacket packet) {
        Message message = packet.getMessage();
        boolean messageAckExpected = !(message instanceof IotHubTransportMessage) || ((IotHubTransportMessage)message).isMessageAckNeeded(this.defaultConfig.getProtocol());
        try {
            if (messageAckExpected) {
                Object object = this.inProgressMessagesLock;
                synchronized (object) {
                    log.trace("Adding transport message to the inProgressPackets to wait for acknowledgement ({})", (Object)message);
                    this.inProgressPackets.put(message.getMessageId(), packet);
                }
            }
            log.info("Sending message ({})", (Object)message);
            IotHubStatusCode statusCode = this.iotHubTransportConnection.sendMessage(message);
            log.trace("Sent message ({}) to protocol level, returned status code was {}", (Object)message, (Object)statusCode);
            if (statusCode != IotHubStatusCode.OK_EMPTY && statusCode != IotHubStatusCode.OK) {
                this.handleMessageException(this.inProgressPackets.remove(message.getMessageId()), IotHubStatusCode.getConnectionStatusException(statusCode, ""));
            } else if (!messageAckExpected) {
                packet.setStatus(statusCode);
                this.addToCallbackQueue(packet);
            }
        }
        catch (TransportException transportException) {
            IotHubTransportPacket outboundPacket;
            log.warn("Encountered exception while sending message with correlation id {}", (Object)message.getCorrelationId(), (Object)transportException);
            if (messageAckExpected) {
                Object object = this.inProgressMessagesLock;
                synchronized (object) {
                    outboundPacket = this.inProgressPackets.remove(message.getMessageId());
                }
            } else {
                outboundPacket = packet;
            }
            this.handleMessageException(outboundPacket, transportException);
        }
    }

    private boolean isMessageValid(IotHubTransportPacket packet) {
        Message message = packet.getMessage();
        if (message.isExpired()) {
            log.warn("Message with has expired, adding to callbacks queue with MESSAGE_EXPIRED ({})", (Object)message);
            packet.setStatus(IotHubStatusCode.MESSAGE_EXPIRED);
            this.addToCallbackQueue(packet);
            return false;
        }
        if (this.isSasTokenExpired()) {
            log.info("Creating a callback for the message with expired sas token with UNAUTHORIZED status");
            packet.setStatus(IotHubStatusCode.UNAUTHORIZED);
            this.addToCallbackQueue(packet);
            this.updateStatus(IotHubConnectionStatus.DISCONNECTED, IotHubConnectionStatusChangeReason.EXPIRED_SAS_TOKEN, new SecurityException("Your sas token has expired"));
            return false;
        }
        return true;
    }

    private void updateStatus(IotHubConnectionStatus newConnectionStatus, IotHubConnectionStatusChangeReason reason, Throwable throwable) {
        if (this.connectionStatus != newConnectionStatus) {
            if (throwable == null) {
                log.info("Updating transport status to new status {} with reason {}", (Object)newConnectionStatus, (Object)reason);
            } else {
                log.warn("Updating transport status to new status {} with reason {}", new Object[]{newConnectionStatus, reason, throwable});
            }
            this.connectionStatus = newConnectionStatus;
            log.debug("Invoking connection status callbacks with new status details");
            this.invokeConnectionStateCallback(newConnectionStatus, reason);
            this.invokeConnectionStatusChangeCallback(newConnectionStatus, reason, throwable);
            this.deviceIOConnectionStatusChangeCallback.execute(newConnectionStatus, reason, throwable, null);
            if (newConnectionStatus == IotHubConnectionStatus.CONNECTED) {
                this.currentReconnectionAttempt = 0;
                this.reconnectionAttemptStartTimeMillis = 0L;
            }
        }
    }

    private void invokeConnectionStateCallback(IotHubConnectionStatus status, IotHubConnectionStatusChangeReason reason) {
        if (this.stateCallback != null) {
            if (status == IotHubConnectionStatus.CONNECTED) {
                this.stateCallback.execute(IotHubConnectionState.CONNECTION_SUCCESS, this.stateCallbackContext);
            } else if (reason == IotHubConnectionStatusChangeReason.EXPIRED_SAS_TOKEN) {
                this.stateCallback.execute(IotHubConnectionState.SAS_TOKEN_EXPIRED, this.stateCallbackContext);
            } else if (status == IotHubConnectionStatus.DISCONNECTED) {
                this.stateCallback.execute(IotHubConnectionState.CONNECTION_DROP, this.stateCallbackContext);
            }
        }
    }

    private void invokeConnectionStatusChangeCallback(IotHubConnectionStatus status, IotHubConnectionStatusChangeReason reason, Throwable e) {
        if (this.connectionStatusChangeCallback != null) {
            this.connectionStatusChangeCallback.execute(status, reason, e, this.connectionStatusChangeCallbackContext);
        }
    }

    private boolean isSasTokenExpired() {
        return this.defaultConfig.getAuthenticationType() == DeviceClientConfig.AuthType.SAS_TOKEN && this.defaultConfig.getSasTokenAuthentication().isRenewalNecessary();
    }

    private boolean hasOperationTimedOut(long startTime) {
        if (startTime == 0L) {
            return false;
        }
        return System.currentTimeMillis() - startTime > this.defaultConfig.getOperationTimeout();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void addToCallbackQueue(IotHubTransportPacket packet) {
        if (packet.getCallback() != null) {
            Object object = this.sendThreadLock;
            synchronized (object) {
                this.callbackPacketsQueue.add(packet);
                this.sendThreadLock.notifyAll();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void addToWaitingQueue(IotHubTransportPacket packet) {
        Object object = this.sendThreadLock;
        synchronized (object) {
            this.waitingPacketsQueue.add(packet);
            this.sendThreadLock.notifyAll();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void addToReceivedMessagesQueue(IotHubTransportMessage message) {
        Object object = this.receiveThreadLock;
        synchronized (object) {
            this.receivedMessagesQueue.add(message);
            this.receiveThreadLock.notifyAll();
        }
    }

    private static void sleepUninterruptibly(long sleepFor, TimeUnit unit) {
        boolean interrupted = false;
        try {
            long remainingNanos = unit.toNanos(sleepFor);
            long end = System.nanoTime() + remainingNanos;
            while (true) {
                try {
                    TimeUnit.NANOSECONDS.sleep(remainingNanos);
                    return;
                }
                catch (InterruptedException e) {
                    interrupted = true;
                    remainingNanos = end - System.nanoTime();
                    continue;
                }
                break;
            }
        }
        finally {
            if (interrupted) {
                Thread.currentThread().interrupt();
            }
        }
    }

    private void checkForUnauthorizedException(TransportException transportException) {
        if (!this.isSasTokenExpired() && (transportException instanceof MqttUnauthorizedException || transportException instanceof UnauthorizedException || transportException instanceof AmqpUnauthorizedAccessException)) {
            transportException.setRetryable(true);
        }
    }

    public class MessageRetryRunnable
    implements Runnable {
        final IotHubTransportPacket transportPacket;
        final Queue<IotHubTransportPacket> waitingPacketsQueue;
        final Object sendThreadLock;

        public MessageRetryRunnable(Queue<IotHubTransportPacket> waitingPacketsQueue, IotHubTransportPacket transportPacket, Object sendThreadLock) {
            this.waitingPacketsQueue = waitingPacketsQueue;
            this.transportPacket = transportPacket;
            this.sendThreadLock = sendThreadLock;
        }

        @Override
        public void run() {
            this.waitingPacketsQueue.add(this.transportPacket);
            this.sendThreadLock.notifyAll();
        }
    }
}

