/*
 * Decompiled with CFR 0.152.
 */
package com.microsoft.azure.sdk.iot.device.transport.amqps;

import com.microsoft.azure.sdk.iot.device.Message;
import com.microsoft.azure.sdk.iot.device.MessageProperty;
import com.microsoft.azure.sdk.iot.device.exceptions.ProtocolException;
import com.microsoft.azure.sdk.iot.device.transport.TransportUtils;
import com.microsoft.azure.sdk.iot.device.transport.amqps.AmqpsLinkStateCallback;
import com.microsoft.azure.sdk.iot.device.transport.amqps.AmqpsSendResult;
import java.nio.BufferOverflowException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.qpid.proton.Proton;
import org.apache.qpid.proton.amqp.Binary;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.messaging.ApplicationProperties;
import org.apache.qpid.proton.amqp.messaging.Data;
import org.apache.qpid.proton.amqp.messaging.Properties;
import org.apache.qpid.proton.amqp.messaging.Section;
import org.apache.qpid.proton.amqp.messaging.Target;
import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
import org.apache.qpid.proton.engine.BaseHandler;
import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.engine.EndpointState;
import org.apache.qpid.proton.engine.Event;
import org.apache.qpid.proton.engine.Extendable;
import org.apache.qpid.proton.engine.Handler;
import org.apache.qpid.proton.engine.Link;
import org.apache.qpid.proton.engine.Sender;
import org.apache.qpid.proton.message.impl.MessageImpl;
import org.apache.qpid.proton.reactor.FlowController;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AmqpsSenderLinkHandler
extends BaseHandler {
    private static final Logger log = LoggerFactory.getLogger(AmqpsSenderLinkHandler.class);
    static final String VERSION_IDENTIFIER_KEY = "com.microsoft:client-version";
    private static final String API_VERSION_KEY = "com.microsoft:api-version";
    final Map<Integer, Message> inProgressMessages = new ConcurrentHashMap<Integer, Message>();
    Map<Symbol, Object> amqpProperties = new HashMap<Symbol, Object>();
    String senderLinkTag;
    String linkCorrelationId;
    String senderLinkAddress;
    Sender senderLink;
    private long nextTag = 0L;
    private AmqpsLinkStateCallback amqpsLinkStateCallback;

    AmqpsSenderLinkHandler(Sender sender, AmqpsLinkStateCallback amqpsLinkStateCallback, String linkCorrelationId) {
        this.amqpProperties.put(Symbol.getSymbol((String)API_VERSION_KEY), TransportUtils.IOTHUB_API_VERSION);
        this.linkCorrelationId = linkCorrelationId;
        this.senderLink = sender;
        this.amqpsLinkStateCallback = amqpsLinkStateCallback;
        BaseHandler.setHandler((Extendable)sender, (Handler)this);
        this.add((Handler)new FlowController());
    }

    protected abstract String getLinkInstanceType();

    public void onLinkRemoteOpen(Event event) {
        log.debug("{} sender link with link correlation id {} was successfully opened", (Object)this.getLinkInstanceType(), (Object)this.linkCorrelationId);
        this.amqpsLinkStateCallback.onLinkOpened(this);
    }

    public void onLinkLocalOpen(Event event) {
        log.trace("{} sender link with link correlation id {} opened locally", (Object)this.getLinkInstanceType(), (Object)this.linkCorrelationId);
    }

    public void onDelivery(Event event) {
        Delivery delivery = event.getDelivery();
        int deliveryTag = Integer.valueOf(new String(event.getDelivery().getTag()));
        Message acknowledgedIotHubMessage = this.inProgressMessages.remove(deliveryTag);
        if (acknowledgedIotHubMessage == null) {
            log.warn("Received acknowledgement for a message that this sender did not send", (Object)deliveryTag);
        } else {
            this.amqpsLinkStateCallback.onMessageAcknowledged(acknowledgedIotHubMessage, deliveryTag);
        }
        delivery.free();
    }

    public void onLinkInit(Event event) {
        Link link = event.getLink();
        Target target = new Target();
        target.setAddress(this.senderLinkAddress);
        link.setTarget((org.apache.qpid.proton.amqp.transport.Target)target);
        link.setSenderSettleMode(SenderSettleMode.UNSETTLED);
        link.setProperties(this.amqpProperties);
        link.open();
        log.trace("Opening {} sender link with correlation id {}", (Object)this.getLinkInstanceType(), (Object)this.linkCorrelationId);
    }

    public void onLinkRemoteClose(Event event) {
        Link link = event.getLink();
        if (link.getLocalState() == EndpointState.ACTIVE) {
            log.debug("{} sender link with link correlation id {} was closed remotely unexpectedly", (Object)this.getLinkInstanceType(), (Object)this.linkCorrelationId);
            this.amqpsLinkStateCallback.onLinkClosedUnexpectedly(link.getRemoteCondition());
            link.close();
        } else {
            log.trace("Closing amqp session now that this {} sender link with link correlation id {} has closed remotely and locally", (Object)this.getLinkInstanceType(), (Object)this.linkCorrelationId);
            event.getSession().close();
        }
    }

    public void onLinkLocalClose(Event event) {
        Link link = event.getLink();
        if (link.getRemoteState() == EndpointState.CLOSED) {
            log.trace("Closing amqp session now that this {} sender link with link correlation id {} has closed remotely and locally", (Object)this.getLinkInstanceType(), (Object)this.linkCorrelationId);
            event.getSession().close();
        } else {
            log.trace("{} sender link with correlation id {} was closed locally", (Object)this.getLinkInstanceType(), (Object)this.linkCorrelationId);
        }
    }

    void close() {
        if (this.senderLink.getLocalState() != EndpointState.CLOSED) {
            log.debug("Closing {} sender link with link correlation id {}", (Object)this.getLinkInstanceType(), (Object)this.linkCorrelationId);
            this.senderLink.close();
        }
    }

    AmqpsSendResult sendMessageAndGetDeliveryTag(Message iotHubMessage) {
        MessageImpl protonMessage = this.iotHubMessageToProtonMessage(iotHubMessage);
        AmqpsSendResult sendResult = this.sendMessageAndGetDeliveryTag(protonMessage);
        this.inProgressMessages.put(sendResult.getDeliveryTag(), iotHubMessage);
        return sendResult;
    }

    AmqpsSendResult sendMessageAndGetDeliveryTag(MessageImpl protonMessage) {
        int length;
        this.nextTag = this.nextTag == Integer.MAX_VALUE || this.nextTag < 0L ? 0L : ++this.nextTag;
        byte[] msgData = new byte[1024];
        while (true) {
            try {
                length = protonMessage.encode(msgData, 0, msgData.length);
            }
            catch (BufferOverflowException e) {
                msgData = new byte[msgData.length * 2];
                continue;
            }
            break;
        }
        byte[] deliveryTag = String.valueOf(this.nextTag).getBytes();
        Delivery delivery = this.senderLink.delivery(deliveryTag);
        try {
            log.trace("Sending {} bytes over the amqp {} sender link with link correlation id {}", new Object[]{length, this.getLinkInstanceType(), this.linkCorrelationId});
            int bytesSent = this.senderLink.send(msgData, 0, length);
            log.trace("{} bytes sent over the amqp {} sender link with link correlation id {}", new Object[]{bytesSent, this.getLinkInstanceType(), this.linkCorrelationId});
            if (bytesSent != length) {
                throw new ProtocolException(String.format("Amqp send operation did not send all of the expected bytes for %s sender link with link correlation id %s, retrying to send the message", this.getLinkInstanceType(), this.linkCorrelationId));
            }
            boolean canAdvance = this.senderLink.advance();
            if (!canAdvance) {
                throw new ProtocolException(String.format("Failed to advance the senderLink after sending a message on %s sender link with link correlation id %s, retrying to send the message", this.getLinkInstanceType(), this.linkCorrelationId));
            }
            log.trace("Message was sent over {} sender link with delivery tag {} and hash {}", new Object[]{this.getLinkInstanceType(), new String(deliveryTag), delivery.hashCode()});
            return new AmqpsSendResult(true, deliveryTag);
        }
        catch (Exception e) {
            log.warn("Encountered a problem while sending a message on {} sender link with link correlation id {}", new Object[]{this.getLinkInstanceType(), this.linkCorrelationId, e});
            this.senderLink.advance();
            delivery.free();
            return new AmqpsSendResult(false);
        }
    }

    MessageImpl iotHubMessageToProtonMessage(Message message) {
        log.trace("Converting IoT Hub message to proton message for {} sender link with link correlation id {}. IoT Hub message correlationId {}", new Object[]{this.getLinkInstanceType(), this.linkCorrelationId, message.getCorrelationId()});
        MessageImpl outgoingMessage = (MessageImpl)Proton.message();
        Properties properties = new Properties();
        if (message.getMessageId() != null) {
            properties.setMessageId((Object)message.getMessageId());
        }
        if (message.getCorrelationId() != null) {
            properties.setCorrelationId((Object)message.getCorrelationId());
        }
        if (message.getContentType() != null) {
            properties.setContentType(Symbol.valueOf((String)message.getContentType()));
        }
        if (message.getContentEncoding() != null) {
            properties.setContentEncoding(Symbol.valueOf((String)message.getContentEncoding()));
        }
        outgoingMessage.setProperties(properties);
        HashMap<String, String> userProperties = new HashMap<String, String>();
        if (message.getProperties().length > 0) {
            for (MessageProperty messageProperty : message.getProperties()) {
                if (MessageProperty.RESERVED_PROPERTY_NAMES.contains(messageProperty.getName())) continue;
                userProperties.put(messageProperty.getName(), messageProperty.getValue());
            }
        }
        if (message.getConnectionDeviceId() != null) {
            userProperties.put("iothub-connection-device-id", message.getConnectionDeviceId());
        }
        if (message.getConnectionModuleId() != null) {
            userProperties.put("iothub-connection-module-id", message.getConnectionModuleId());
        }
        if (message.getCreationTimeUTC() != null) {
            userProperties.put("iothub-creation-time-utc", message.getCreationTimeUTCString());
        }
        if (message.isSecurityMessage()) {
            userProperties.put("iothub-interface-id", "urn:azureiot:Security:SecurityAgent:1");
        }
        ApplicationProperties applicationProperties = new ApplicationProperties(userProperties);
        outgoingMessage.setApplicationProperties(applicationProperties);
        Binary binary = new Binary(message.getBytes());
        Data section = new Data(binary);
        outgoingMessage.setBody((Section)section);
        return outgoingMessage;
    }
}

