/*
 * Decompiled with CFR 0.152.
 */
package com.microsoft.azure.sdk.iot.device.transport.amqps;

import com.microsoft.azure.sdk.iot.device.Message;
import com.microsoft.azure.sdk.iot.device.MessageProperty;
import com.microsoft.azure.sdk.iot.device.MessageType;
import com.microsoft.azure.sdk.iot.device.transport.IotHubTransportMessage;
import com.microsoft.azure.sdk.iot.device.transport.TransportUtils;
import com.microsoft.azure.sdk.iot.device.transport.amqps.AmqpsLinkStateCallback;
import com.microsoft.azure.sdk.iot.device.transport.amqps.AmqpsMessage;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.qpid.proton.amqp.Binary;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.messaging.Data;
import org.apache.qpid.proton.amqp.messaging.Properties;
import org.apache.qpid.proton.amqp.messaging.Source;
import org.apache.qpid.proton.amqp.transport.DeliveryState;
import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode;
import org.apache.qpid.proton.engine.BaseHandler;
import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.engine.EndpointState;
import org.apache.qpid.proton.engine.Event;
import org.apache.qpid.proton.engine.Extendable;
import org.apache.qpid.proton.engine.Handler;
import org.apache.qpid.proton.engine.Link;
import org.apache.qpid.proton.engine.Receiver;
import org.apache.qpid.proton.reactor.FlowController;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AmqpsReceiverLinkHandler
extends BaseHandler {
    private static final Logger log = LoggerFactory.getLogger(AmqpsReceiverLinkHandler.class);
    static final String VERSION_IDENTIFIER_KEY = "com.microsoft:client-version";
    private static final String API_VERSION_KEY = "com.microsoft:api-version";
    private static final String TO_KEY = "to";
    private static final String USER_ID_KEY = "userId";
    private static final String AMQPS_APP_PROPERTY_PREFIX = "iothub-app-";
    private final Map<Message, AmqpsMessage> receivedMessagesMap = new ConcurrentHashMap<Message, AmqpsMessage>();
    Map<Symbol, Object> amqpProperties = new HashMap<Symbol, Object>();
    String receiverLinkTag;
    String linkCorrelationId;
    String receiverLinkAddress;
    Receiver receiverLink;
    private AmqpsLinkStateCallback amqpsLinkStateCallback;

    AmqpsReceiverLinkHandler(Receiver receiver, AmqpsLinkStateCallback amqpsLinkStateCallback, String linkCorrelationId) {
        this.amqpProperties.put(Symbol.getSymbol((String)API_VERSION_KEY), TransportUtils.IOTHUB_API_VERSION);
        this.receiverLink = receiver;
        this.linkCorrelationId = linkCorrelationId;
        this.amqpsLinkStateCallback = amqpsLinkStateCallback;
        BaseHandler.setHandler((Extendable)receiver, (Handler)this);
        this.add((Handler)new FlowController());
    }

    public void onLinkRemoteOpen(Event event) {
        log.debug("{} receiver link with link correlation id {} was successfully opened", (Object)this.getLinkInstanceType(), (Object)this.linkCorrelationId);
        this.amqpsLinkStateCallback.onLinkOpened(this);
    }

    public void onLinkLocalOpen(Event event) {
        log.trace("{} receiver link with link correlation id {} opened locally", (Object)this.getLinkInstanceType(), (Object)this.linkCorrelationId);
    }

    public void onDelivery(Event event) {
        Receiver receiverLink = (Receiver)event.getLink();
        AmqpsMessage amqpsMessage = this.getMessageFromReceiverLink(receiverLink);
        IotHubTransportMessage iotHubMessage = this.protonMessageToIoTHubMessage(amqpsMessage);
        this.receivedMessagesMap.put(iotHubMessage, amqpsMessage);
        this.amqpsLinkStateCallback.onMessageReceived(iotHubMessage);
    }

    public void onLinkInit(Event event) {
        Link link = event.getLink();
        Source source = new Source();
        source.setAddress(this.receiverLinkAddress);
        link.setSource((org.apache.qpid.proton.amqp.transport.Source)source);
        link.setReceiverSettleMode(ReceiverSettleMode.FIRST);
        link.setProperties(this.amqpProperties);
        link.open();
        log.trace("Opening {} receiver link with correlation id {}", (Object)this.getLinkInstanceType(), (Object)this.linkCorrelationId);
    }

    public void onLinkRemoteClose(Event event) {
        Link link = event.getLink();
        if (link.getLocalState() == EndpointState.ACTIVE) {
            log.debug("{} receiver link with link correlation id {} was closed remotely unexpectedly", (Object)this.getLinkInstanceType(), (Object)this.linkCorrelationId);
            this.amqpsLinkStateCallback.onLinkClosedUnexpectedly(link.getRemoteCondition());
            link.close();
        } else {
            log.trace("Closing amqp session now that this {} receiver link with link correlation id {} has closed remotely and locally", (Object)this.getLinkInstanceType(), (Object)this.linkCorrelationId);
            event.getSession().close();
        }
    }

    public void onLinkLocalClose(Event event) {
        Link link = event.getLink();
        if (link.getRemoteState() == EndpointState.CLOSED) {
            log.trace("Closing amqp session now that this {} receiver link with link correlation id {} has closed remotely and locally", (Object)this.getLinkInstanceType(), (Object)this.linkCorrelationId);
            event.getSession().close();
        } else {
            log.trace("{} receiver link with correlation id {} was closed locally", (Object)this.getLinkInstanceType(), (Object)this.linkCorrelationId);
        }
    }

    public boolean acknowledgeReceivedMessage(IotHubTransportMessage message, DeliveryState ackType) {
        if (this.receivedMessagesMap.containsKey(message)) {
            this.receivedMessagesMap.remove(message).acknowledge(ackType);
            return true;
        }
        return false;
    }

    abstract String getLinkInstanceType();

    AmqpsMessage getMessageFromReceiverLink(Receiver receiver) {
        Delivery delivery = receiver.current();
        if (delivery != null && delivery.isReadable() && !delivery.isPartial()) {
            int size = delivery.pending();
            byte[] buffer = new byte[size];
            int bytesRead = receiver.recv(buffer, 0, buffer.length);
            log.trace("read {} bytes from receiver link {}", (Object)bytesRead, (Object)receiver.getName());
            boolean receiverLinkAdvanced = receiver.advance();
            if (!receiverLinkAdvanced) {
                log.warn("{} receiver link with link correlation id {} did not advance after bytes were read from it", (Object)this.getLinkInstanceType(), (Object)this.linkCorrelationId);
            }
            if (size != bytesRead) {
                log.warn("Amqp read from {} receiver link with link correlation id {} did not read the expected amount of bytes. Read {} but expected {}", new Object[]{this.getLinkInstanceType(), this.linkCorrelationId, bytesRead, size});
            }
            AmqpsMessage amqpsMessage = new AmqpsMessage();
            amqpsMessage.setDelivery(delivery);
            amqpsMessage.decode(buffer, 0, bytesRead);
            return amqpsMessage;
        }
        return null;
    }

    IotHubTransportMessage protonMessageToIoTHubMessage(AmqpsMessage protonMsg) {
        byte[] msgBody;
        log.trace("Converting proton message to iot hub message for {} receiver link with link correlation id {}. Proton message correlation id {}", new Object[]{this.getLinkInstanceType(), this.linkCorrelationId, protonMsg.getCorrelationId()});
        Data d = (Data)protonMsg.getBody();
        if (d != null) {
            Binary b = d.getValue();
            msgBody = new byte[b.getLength()];
            ByteBuffer buffer = b.asByteBuffer();
            buffer.get(msgBody);
        } else {
            msgBody = new byte[]{};
        }
        IotHubTransportMessage iotHubTransportMessage = new IotHubTransportMessage(msgBody, MessageType.UNKNOWN);
        Properties properties = protonMsg.getProperties();
        if (properties != null) {
            if (properties.getCorrelationId() != null) {
                iotHubTransportMessage.setCorrelationId(properties.getCorrelationId().toString());
            }
            if (properties.getMessageId() != null) {
                iotHubTransportMessage.setMessageId(properties.getMessageId().toString());
            }
            if (properties.getTo() != null) {
                iotHubTransportMessage.setProperty("iothub-app-to", properties.getTo());
            }
            if (properties.getUserId() != null) {
                iotHubTransportMessage.setProperty("iothub-app-userId", properties.getUserId().toString());
            }
            if (properties.getContentEncoding() != null) {
                iotHubTransportMessage.setContentEncoding(properties.getContentEncoding().toString());
            }
            if (properties.getContentType() != null) {
                iotHubTransportMessage.setContentType(properties.getContentType().toString());
            }
        }
        if (protonMsg.getApplicationProperties() != null) {
            Map applicationProperties = protonMsg.getApplicationProperties().getValue();
            for (Map.Entry entry : applicationProperties.entrySet()) {
                String propertyKey = (String)entry.getKey();
                if (propertyKey.equalsIgnoreCase("iothub-connection-device-id")) {
                    iotHubTransportMessage.setConnectionDeviceId(entry.getValue().toString());
                    continue;
                }
                if (propertyKey.equalsIgnoreCase("iothub-connection-module-id")) {
                    iotHubTransportMessage.setConnectionModuleId(entry.getValue().toString());
                    continue;
                }
                if (MessageProperty.RESERVED_PROPERTY_NAMES.contains(propertyKey)) continue;
                iotHubTransportMessage.setProperty((String)entry.getKey(), entry.getValue().toString());
            }
        }
        return iotHubTransportMessage;
    }

    void close() {
        if (this.receiverLink.getLocalState() != EndpointState.CLOSED) {
            log.debug("Closing {} receiver link with link correlation id {}", (Object)this.getLinkInstanceType(), (Object)this.linkCorrelationId);
            this.receiverLink.close();
        }
    }
}

