/*
 * Decompiled with CFR 0.152.
 */
package com.microsoft.azure.sdk.iot.device.transport.mqtt;

import com.microsoft.azure.sdk.iot.device.DeviceTwin.DeviceOperations;
import com.microsoft.azure.sdk.iot.device.Message;
import com.microsoft.azure.sdk.iot.device.MessageType;
import com.microsoft.azure.sdk.iot.device.exceptions.TransportException;
import com.microsoft.azure.sdk.iot.device.transport.IotHubListener;
import com.microsoft.azure.sdk.iot.device.transport.IotHubTransportMessage;
import com.microsoft.azure.sdk.iot.device.transport.ReconnectionNotifier;
import com.microsoft.azure.sdk.iot.device.transport.mqtt.MqttConnection;
import com.microsoft.azure.sdk.iot.device.transport.mqtt.MqttMessageListener;
import com.microsoft.azure.sdk.iot.device.transport.mqtt.exceptions.PahoExceptionTranslator;
import java.io.UnsupportedEncodingException;
import java.net.URLDecoder;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.commons.lang3.tuple.MutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.IMqttToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class Mqtt
implements MqttCallback {
    private static final Logger log = LoggerFactory.getLogger(Mqtt.class);
    private static final int CONNECTION_TIMEOUT = 60000;
    private static final int DISCONNECTION_TIMEOUT = 60000;
    private MqttConnection mqttConnection;
    private MqttMessageListener messageListener;
    ConcurrentLinkedQueue<Pair<String, byte[]>> allReceivedMessages;
    private final Object stateLock;
    protected final Object incomingLock;
    private final Object publishLock;
    private Map<Integer, Message> unacknowledgedSentMessages;
    private boolean userSpecifiedSASTokenExpiredOnRetry = false;
    static final char MESSAGE_PROPERTY_SEPARATOR = '&';
    private static final String MESSAGE_SYSTEM_PROPERTY_IDENTIFIER_ENCODED = "%24";
    private static final char MESSAGE_SYSTEM_PROPERTY_IDENTIFIER_DECODED = '$';
    static final char MESSAGE_PROPERTY_KEY_VALUE_SEPARATOR = '=';
    private static final int PROPERTY_KEY_INDEX = 0;
    private static final int PROPERTY_VALUE_INDEX = 1;
    private static final String ABSOLUTE_EXPIRY_TIME = "$.exp";
    static final String CORRELATION_ID = "$.cid";
    static final String MESSAGE_ID = "$.mid";
    static final String TO = "$.to";
    static final String USER_ID = "$.uid";
    static final String OUTPUT_NAME = "$.on";
    static final String CONNECTION_DEVICE_ID = "$.cdid";
    static final String CONNECTION_MODULE_ID = "$.cmid";
    static final String CONTENT_TYPE = "$.ct";
    static final String CONTENT_ENCODING = "$.ce";
    static final String CREATION_TIME_UTC = "$.ctime";
    static final String MQTT_SECURITY_INTERFACE_ID = "$.ifid";
    private static final String IOTHUB_ACK = "iothub-ack";
    private static final String INPUTS_PATH_STRING = "inputs";
    private static final String MODULES_PATH_STRING = "modules";
    private IotHubListener listener;
    private String connectionId;

    public Mqtt(MqttConnection mqttConnection, IotHubListener listener, MqttMessageListener messageListener, String connectionId, Map<Integer, Message> unacknowledgedSentMessages) throws IllegalArgumentException {
        if (mqttConnection == null) {
            throw new IllegalArgumentException("Mqtt connection info cannot be null");
        }
        this.mqttConnection = mqttConnection;
        this.allReceivedMessages = mqttConnection.getAllReceivedMessages();
        this.stateLock = mqttConnection.getMqttLock();
        this.incomingLock = new Object();
        this.publishLock = new Object();
        this.userSpecifiedSASTokenExpiredOnRetry = false;
        this.listener = listener;
        this.messageListener = messageListener;
        this.connectionId = connectionId;
        this.unacknowledgedSentMessages = unacknowledgedSentMessages;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void connect() throws TransportException {
        Object object = this.stateLock;
        synchronized (object) {
            try {
                if (!this.mqttConnection.getMqttAsyncClient().isConnected()) {
                    log.debug("Sending MQTT CONNECT packet...");
                    IMqttToken connectToken = this.mqttConnection.getMqttAsyncClient().connect(this.mqttConnection.getConnectionOptions());
                    connectToken.waitForCompletion(60000L);
                    log.debug("Sent MQTT CONNECT packet was acknowledged");
                }
            }
            catch (MqttException e) {
                log.warn("Exception encountered while sending MQTT CONNECT packet", (Throwable)e);
                this.disconnect();
                throw PahoExceptionTranslator.convertToMqttException(e, "Unable to establish MQTT connection");
            }
        }
    }

    protected void disconnect() throws TransportException {
        try {
            if (this.mqttConnection.isConnected()) {
                log.debug("Sending MQTT DISCONNECT packet");
                IMqttToken disconnectToken = this.mqttConnection.disconnect();
                if (disconnectToken != null) {
                    disconnectToken.waitForCompletion(60000L);
                }
                log.debug("Sent MQTT DISCONNECT packet was acknowledged");
            }
            this.mqttConnection.close();
            this.mqttConnection.setMqttAsyncClient(null);
        }
        catch (MqttException e) {
            log.warn("Exception encountered while sending MQTT DISCONNECT packet", (Throwable)e);
            throw PahoExceptionTranslator.convertToMqttException(e, "Unable to disconnect");
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void publish(String publishTopic, Message message) throws TransportException {
        try {
            if (this.mqttConnection.getMqttAsyncClient() == null) {
                TransportException transportException = new TransportException("Need to open first!");
                transportException.setRetryable(true);
                throw transportException;
            }
            if (this.userSpecifiedSASTokenExpiredOnRetry) {
                throw new TransportException("Cannot publish when user supplied SAS token has expired");
            }
            if (!this.mqttConnection.getMqttAsyncClient().isConnected()) {
                TransportException transportException = new TransportException("Cannot publish when mqtt client is disconnected");
                transportException.setRetryable(true);
                throw transportException;
            }
            if (message == null || publishTopic == null || publishTopic.length() == 0 || message.getBytes() == null) {
                throw new IllegalArgumentException("Cannot publish on null or empty publish topic");
            }
            byte[] payload = message.getBytes();
            while (this.mqttConnection.getMqttAsyncClient().getPendingDeliveryTokens().length >= 10) {
                Thread.sleep(10L);
                if (this.mqttConnection.getMqttAsyncClient() == null) {
                    TransportException transportException = new TransportException("Connection was lost while waiting for mqtt deliveries to finish");
                    transportException.setRetryable(true);
                    throw transportException;
                }
                if (this.mqttConnection.getMqttAsyncClient().isConnected()) continue;
                TransportException transportException = new TransportException("Cannot publish when mqtt client is holding 10 tokens and is disconnected");
                transportException.setRetryable(true);
                throw transportException;
            }
            MqttMessage mqttMessage = payload.length == 0 ? new MqttMessage() : new MqttMessage(payload);
            mqttMessage.setQos(1);
            Object object = this.publishLock;
            synchronized (object) {
                log.trace("Publishing message ({}) to MQTT topic {}", (Object)message, (Object)publishTopic);
                IMqttDeliveryToken publishToken = this.mqttConnection.getMqttAsyncClient().publish(publishTopic, mqttMessage);
                this.unacknowledgedSentMessages.put(publishToken.getMessageId(), message);
                log.trace("Message published to MQTT topic {}. Mqtt message id {} added to list of messages to wait for acknowledgement ({})", new Object[]{publishTopic, publishToken.getMessageId(), message});
            }
        }
        catch (MqttException e) {
            log.warn("Message could not be published to MQTT topic {} ({})", new Object[]{publishTopic, message, e});
            throw PahoExceptionTranslator.convertToMqttException(e, "Unable to publish message on topic : " + publishTopic);
        }
        catch (InterruptedException e) {
            throw new TransportException("Interrupted, Unable to publish message on topic : " + publishTopic, e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void subscribe(String topic) throws TransportException {
        Object object = this.stateLock;
        synchronized (object) {
            try {
                if (topic == null) {
                    throw new IllegalArgumentException("Topic cannot be null");
                }
                if (this.userSpecifiedSASTokenExpiredOnRetry) {
                    throw new TransportException("Cannot subscribe when user supplied SAS token has expired");
                }
                if (!this.mqttConnection.getMqttAsyncClient().isConnected()) {
                    TransportException transportException = new TransportException("Cannot subscribe when mqtt client is disconnected");
                    transportException.setRetryable(true);
                    throw transportException;
                }
                log.debug("Sending MQTT SUBSCRIBE packet for topic {}", (Object)topic);
                IMqttToken subToken = this.mqttConnection.getMqttAsyncClient().subscribe(topic, 1);
                subToken.waitForCompletion(15000L);
                log.debug("Sent MQTT SUBSCRIBE packet for topic {} was acknowledged", (Object)topic);
            }
            catch (MqttException e) {
                log.warn("Encountered exception while sending MQTT SUBSCRIBE packet for topic {}", (Object)topic, (Object)e);
                throw PahoExceptionTranslator.convertToMqttException(e, "Unable to subscribe to topic :" + topic);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public IotHubTransportMessage receive() throws TransportException {
        Object object = this.incomingLock;
        synchronized (object) {
            if (this.mqttConnection == null) {
                throw new TransportException(new IllegalArgumentException("Mqtt client should be initialised at least once before using it"));
            }
            Pair<String, byte[]> messagePair = this.peekMessage();
            if (messagePair != null) {
                String topic = (String)messagePair.getKey();
                if (topic != null) {
                    byte[] data = (byte[])messagePair.getValue();
                    if (data != null) {
                        this.allReceivedMessages.poll();
                        return this.constructMessage(data, topic);
                    }
                    throw new TransportException("Data cannot be null when topic is non-null");
                }
                return null;
            }
            return null;
        }
    }

    public void connectionLost(Throwable throwable) {
        TransportException ex = null;
        log.warn("Mqtt connection lost", throwable);
        try {
            if (this.mqttConnection != null) {
                this.disconnect();
            }
        }
        catch (TransportException e) {
            ex = e;
        }
        if (this.listener != null) {
            if (ex == null) {
                if (throwable instanceof MqttException) {
                    throwable = PahoExceptionTranslator.convertToMqttException((MqttException)throwable, "Mqtt connection lost");
                    log.trace("Mqtt connection loss interpreted into transport exception", throwable);
                } else {
                    throwable = new TransportException(throwable);
                }
            } else {
                throwable = ex;
            }
            ReconnectionNotifier.notifyDisconnectAsync(throwable, this.listener, this.connectionId);
        }
    }

    public void messageArrived(String topic, MqttMessage mqttMessage) {
        log.trace("Mqtt message arrived on topic {} with mqtt message id {}", (Object)topic, (Object)mqttMessage.getId());
        this.mqttConnection.getAllReceivedMessages().add((Pair<String, byte[]>)new MutablePair((Object)topic, (Object)mqttMessage.getPayload()));
        if (this.messageListener != null) {
            this.messageListener.onMessageArrived(mqttMessage.getId());
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
        DeviceOperations deviceOperation;
        Message deliveredMessage = null;
        log.trace("Mqtt message with message id {} was acknowledge by service", (Object)iMqttDeliveryToken.getMessageId());
        Object object = this.publishLock;
        synchronized (object) {
            if (this.listener != null && this.unacknowledgedSentMessages.containsKey(iMqttDeliveryToken.getMessageId())) {
                log.trace("Mqtt message with message id {} that was acknowledge by service was sent by this client", (Object)iMqttDeliveryToken.getMessageId());
                deliveredMessage = this.unacknowledgedSentMessages.remove(iMqttDeliveryToken.getMessageId());
            } else {
                log.warn("Mqtt message with message id {} that was acknowledge by service was not sent by this client, will be ignored", (Object)iMqttDeliveryToken.getMessageId());
            }
        }
        if (deliveredMessage instanceof IotHubTransportMessage && ((deviceOperation = ((IotHubTransportMessage)deliveredMessage).getDeviceOperationType()) == DeviceOperations.DEVICE_OPERATION_TWIN_SUBSCRIBE_DESIRED_PROPERTIES_REQUEST || deviceOperation == DeviceOperations.DEVICE_OPERATION_METHOD_SUBSCRIBE_REQUEST || deviceOperation == DeviceOperations.DEVICE_OPERATION_TWIN_UNSUBSCRIBE_DESIRED_PROPERTIES_REQUEST)) {
            return;
        }
        this.listener.onMessageSent(deliveredMessage, null);
    }

    public Pair<String, byte[]> peekMessage() {
        return this.allReceivedMessages.peek();
    }

    protected boolean sendMessageAcknowledgement(int messageId) throws TransportException {
        log.trace("Sending mqtt ack for received message with mqtt message id {}", (Object)messageId);
        return this.mqttConnection.sendMessageAcknowledgement(messageId);
    }

    private IotHubTransportMessage constructMessage(byte[] data, String topic) {
        IotHubTransportMessage message = new IotHubTransportMessage(data, MessageType.DEVICE_TELEMETRY);
        int propertiesStringStartingIndex = topic.indexOf(MESSAGE_SYSTEM_PROPERTY_IDENTIFIER_ENCODED);
        if (propertiesStringStartingIndex != -1) {
            String propertiesString = topic.substring(propertiesStringStartingIndex);
            this.assignPropertiesToMessage(message, propertiesString);
            String routeString = topic.substring(0, propertiesStringStartingIndex);
            String[] routeComponents = routeString.split("/");
            if (routeComponents.length > 2 && routeComponents[2].equals(MODULES_PATH_STRING)) {
                message.setConnectionModuleId(routeComponents[3]);
            }
            if (routeComponents.length > 4 && routeComponents[4].equals(INPUTS_PATH_STRING)) {
                message.setInputName(routeComponents[5]);
            }
        }
        return message;
    }

    private void assignPropertiesToMessage(Message message, String propertiesString) throws IllegalStateException, IllegalArgumentException {
        for (String propertyString : propertiesString.split(String.valueOf('&'))) {
            if (propertyString.contains("=")) {
                String key = propertyString.split("=")[0];
                String value = propertyString.split("=")[1];
                try {
                    key = URLDecoder.decode(key, StandardCharsets.UTF_8.name());
                    value = URLDecoder.decode(value, StandardCharsets.UTF_8.name());
                }
                catch (UnsupportedEncodingException e) {
                    throw new IllegalStateException(e);
                }
                switch (key) {
                    case "$.to": {
                        break;
                    }
                    case "$.mid": {
                        message.setMessageId(value);
                        break;
                    }
                    case "iothub-ack": {
                        break;
                    }
                    case "$.cid": {
                        message.setCorrelationId(value);
                        break;
                    }
                    case "$.uid": {
                        break;
                    }
                    case "$.on": {
                        message.setOutputName(value);
                        break;
                    }
                    case "$.ce": {
                        message.setContentEncoding(value);
                        break;
                    }
                    case "$.ct": {
                        message.setContentType(value);
                        break;
                    }
                    case "$.exp": {
                        break;
                    }
                    default: {
                        message.setProperty(key, value);
                        break;
                    }
                }
                continue;
            }
            throw new IllegalArgumentException("Unexpected property string provided. Expected '=' symbol between key and value of the property in string: " + propertyString);
        }
    }
}

