/*
 * Decompiled with CFR 0.152.
 */
package com.microsoft.azure.sdk.iot.device.transport.amqps;

import com.microsoft.azure.proton.transport.proxy.ProxyAuthenticationType;
import com.microsoft.azure.proton.transport.proxy.ProxyConfiguration;
import com.microsoft.azure.proton.transport.proxy.ProxyHandler;
import com.microsoft.azure.proton.transport.proxy.impl.ProxyHandlerImpl;
import com.microsoft.azure.proton.transport.proxy.impl.ProxyImpl;
import com.microsoft.azure.proton.transport.ws.impl.WebSocketImpl;
import com.microsoft.azure.sdk.iot.device.DeviceClientConfig;
import com.microsoft.azure.sdk.iot.device.IotHubMessageResult;
import com.microsoft.azure.sdk.iot.device.IotHubStatusCode;
import com.microsoft.azure.sdk.iot.device.Message;
import com.microsoft.azure.sdk.iot.device.ProxySettings;
import com.microsoft.azure.sdk.iot.device.auth.IotHubSasTokenAuthenticationProvider;
import com.microsoft.azure.sdk.iot.device.exceptions.TransportException;
import com.microsoft.azure.sdk.iot.device.transport.IotHubConnectionStatus;
import com.microsoft.azure.sdk.iot.device.transport.IotHubListener;
import com.microsoft.azure.sdk.iot.device.transport.IotHubTransportConnection;
import com.microsoft.azure.sdk.iot.device.transport.IotHubTransportMessage;
import com.microsoft.azure.sdk.iot.device.transport.ReconnectionNotifier;
import com.microsoft.azure.sdk.iot.device.transport.amqps.AmqpsCbsSessionHandler;
import com.microsoft.azure.sdk.iot.device.transport.amqps.AmqpsExceptionTranslator;
import com.microsoft.azure.sdk.iot.device.transport.amqps.AmqpsSasTokenRenewalHandler;
import com.microsoft.azure.sdk.iot.device.transport.amqps.AmqpsSessionHandler;
import com.microsoft.azure.sdk.iot.device.transport.amqps.AmqpsSessionStateCallback;
import com.microsoft.azure.sdk.iot.device.transport.amqps.IotHubReactor;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.Queue;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import javax.net.ssl.SSLContext;
import org.apache.qpid.proton.Proton;
import org.apache.qpid.proton.amqp.messaging.Accepted;
import org.apache.qpid.proton.amqp.messaging.Rejected;
import org.apache.qpid.proton.amqp.messaging.Released;
import org.apache.qpid.proton.amqp.transport.DeliveryState;
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
import org.apache.qpid.proton.engine.BaseHandler;
import org.apache.qpid.proton.engine.Connection;
import org.apache.qpid.proton.engine.EndpointState;
import org.apache.qpid.proton.engine.Event;
import org.apache.qpid.proton.engine.Handler;
import org.apache.qpid.proton.engine.HandlerException;
import org.apache.qpid.proton.engine.Sasl;
import org.apache.qpid.proton.engine.Session;
import org.apache.qpid.proton.engine.SslDomain;
import org.apache.qpid.proton.engine.Transport;
import org.apache.qpid.proton.engine.impl.TransportInternal;
import org.apache.qpid.proton.engine.impl.TransportLayer;
import org.apache.qpid.proton.reactor.Handshaker;
import org.apache.qpid.proton.reactor.Reactor;
import org.apache.qpid.proton.reactor.ReactorOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class AmqpsIotHubConnection
extends BaseHandler
implements IotHubTransportConnection,
AmqpsSessionStateCallback {
    private static final Logger log = LoggerFactory.getLogger(AmqpsIotHubConnection.class);
    private static final int MAX_WAIT_TO_CLOSE_CONNECTION = 20000;
    private static final int MAX_WAIT_TO_OPEN_AUTHENTICATION_SESSION = 20000;
    private static final int MAX_WAIT_TO_OPEN_WORKER_SESSIONS = 60000;
    private static final int MAX_WAIT_TO_TERMINATE_EXECUTOR = 10;
    private static final String WEB_SOCKET_PATH = "/$iothub/websocket";
    private static final String WEB_SOCKET_SUB_PROTOCOL = "AMQPWSB10";
    private static final String WEB_SOCKET_QUERY = "iothub-no-client-cert=true";
    private static final int MAX_MESSAGE_PAYLOAD_SIZE = 262144;
    private static final int WEB_SOCKET_PORT = 443;
    private static final int AMQP_PORT = 5671;
    private static final int REACTOR_COUNT = 1;
    private static final int CBS_SESSION_COUNT = 1;
    private static final int SEND_MESSAGES_PERIOD_MILLIS = 50;
    private static final int MAX_MESSAGES_TO_SEND_PER_CALLBACK = 1000;
    private final Queue<Message> messagesToSend = new ConcurrentLinkedQueue<Message>();
    private String connectionId;
    private IotHubConnectionStatus state;
    private String hostName;
    private DeviceClientConfig deviceClientConfig;
    private IotHubListener listener;
    private TransportException savedException;
    private boolean reconnectionScheduled = false;
    private ExecutorService executorService;
    private CountDownLatch authenticationSessionOpenedLatch;
    private CountDownLatch deviceSessionsOpenedLatch;
    private CountDownLatch closeReactorLatch;
    private Connection connection;
    private ArrayList<AmqpsSessionHandler> sessionHandlerList = new ArrayList();
    private ArrayList<AmqpsSasTokenRenewalHandler> sasTokenRenwalHandlerList = new ArrayList();
    private AmqpsCbsSessionHandler amqpsCbsSessionHandler;

    public AmqpsIotHubConnection(DeviceClientConfig config) {
        this.deviceClientConfig = config;
        String gatewayHostname = this.deviceClientConfig.getGatewayHostname();
        if (gatewayHostname != null && !gatewayHostname.isEmpty()) {
            log.debug("Gateway hostname was present in config, connecting to gateway rather than directly to hub");
            this.hostName = gatewayHostname;
        } else {
            log.trace("No gateway hostname was present in config, connecting directly to hub");
            this.hostName = this.deviceClientConfig.getIotHubHostname();
        }
        this.add((Handler)new Handshaker());
        this.state = IotHubConnectionStatus.DISCONNECTED;
        log.trace("AmqpsIotHubConnection object is created successfully and will use port {}", (Object)(this.deviceClientConfig.isUseWebsocket() ? 443 : 5671));
    }

    @Override
    public void open(Queue<DeviceClientConfig> deviceClientConfigs, ScheduledExecutorService scheduledExecutorService) throws TransportException {
        log.debug("Opening amqp layer...");
        this.reconnectionScheduled = false;
        this.connectionId = UUID.randomUUID().toString();
        this.savedException = null;
        if (this.state == IotHubConnectionStatus.DISCONNECTED) {
            for (DeviceClientConfig clientConfig : deviceClientConfigs) {
                this.addDeviceSession(clientConfig, false);
            }
            this.initializeStateLatches();
            try {
                boolean deviceSessionsOpenTimedOut;
                boolean authenticationSessionOpenTimedOut;
                this.openAsync();
                log.trace("Waiting for authentication links to open...");
                boolean bl = authenticationSessionOpenTimedOut = !this.authenticationSessionOpenedLatch.await(20000L, TimeUnit.MILLISECONDS);
                if (this.savedException != null) {
                    throw this.savedException;
                }
                if (authenticationSessionOpenTimedOut) {
                    this.closeConnectionWithException("Timed out waiting for authentication session to open", true);
                }
                log.trace("Waiting for device sessions to open...");
                boolean bl2 = deviceSessionsOpenTimedOut = !this.deviceSessionsOpenedLatch.await(60000L, TimeUnit.MILLISECONDS);
                if (this.savedException != null) {
                    throw this.savedException;
                }
                if (deviceSessionsOpenTimedOut) {
                    this.closeConnectionWithException("Timed out waiting for worker links to open", true);
                }
            }
            catch (InterruptedException e) {
                this.executorServicesCleanup();
                throw new TransportException("Interrupted while waiting for links to open for AMQP connection", e);
            }
        }
        this.state = IotHubConnectionStatus.CONNECTED;
        this.listener.onConnectionEstablished(this.connectionId);
        log.debug("Amqp connection opened successfully");
    }

    @Override
    public void close() throws TransportException {
        log.debug("Shutting down amqp layer...");
        this.closeAsync();
        try {
            log.trace("Waiting for reactor to close...");
            this.closeReactorLatch.await(20000L, TimeUnit.MILLISECONDS);
        }
        catch (InterruptedException e) {
            throw new TransportException("Interrupted while closing proton reactor", e);
        }
        this.executorServicesCleanup();
        log.trace("Amqp connection closed successfully");
        this.state = IotHubConnectionStatus.DISCONNECTED;
    }

    public void onReactorInit(Event event) {
        Reactor reactor = event.getReactor();
        String hostName = this.hostName;
        int port = 5671;
        if (this.deviceClientConfig.isUseWebsocket()) {
            ProxySettings proxySettings = this.deviceClientConfig.getProxySettings();
            if (proxySettings != null) {
                hostName = proxySettings.getHostname();
                port = proxySettings.getPort();
            } else {
                port = 443;
            }
        }
        reactor.connectionToHost(hostName, port, (Handler)this);
        reactor.schedule(50, (Handler)this);
    }

    public void onReactorFinal(Event event) {
        log.trace("Amqps reactor finalized");
        this.releaseLatch(this.authenticationSessionOpenedLatch);
        this.releaseLatch(this.deviceSessionsOpenedLatch);
        this.releaseLatch(this.closeReactorLatch);
        if (this.savedException != null) {
            this.scheduleReconnection(this.savedException);
        }
    }

    public void onConnectionInit(Event event) {
        this.connection = event.getConnection();
        this.connection.setHostname(this.hostName);
        this.connection.open();
        if (this.deviceClientConfig.getAuthenticationType() == DeviceClientConfig.AuthType.SAS_TOKEN) {
            Session cbsSession = this.connection.session();
            this.amqpsCbsSessionHandler = new AmqpsCbsSessionHandler(cbsSession, this);
            this.sasTokenRenwalHandlerList.clear();
            for (AmqpsSessionHandler amqpsSessionHandler : this.sessionHandlerList) {
                amqpsSessionHandler.setSession(this.connection.session());
                this.sasTokenRenwalHandlerList.add(new AmqpsSasTokenRenewalHandler(this.amqpsCbsSessionHandler, amqpsSessionHandler));
            }
        } else {
            AmqpsSessionHandler amqpsSessionHandler = this.sessionHandlerList.get(0);
            amqpsSessionHandler.setSession(this.connection.session());
        }
    }

    public void onConnectionBound(Event event) {
        Transport transport = event.getTransport();
        if (this.deviceClientConfig.isUseWebsocket()) {
            this.addWebSocketLayer(transport);
        }
        try {
            SSLContext sslContext = this.deviceClientConfig.getAuthenticationProvider().getSSLContext();
            if (this.deviceClientConfig.getAuthenticationType() == DeviceClientConfig.AuthType.SAS_TOKEN) {
                Sasl sasl = transport.sasl();
                sasl.setMechanisms(new String[]{"ANONYMOUS"});
            }
            SslDomain domain = Proton.sslDomain();
            domain.setSslContext(sslContext);
            domain.setPeerAuthentication(SslDomain.VerifyMode.VERIFY_PEER);
            domain.init(SslDomain.Mode.CLIENT);
            transport.ssl(domain);
        }
        catch (IOException e) {
            this.savedException = new TransportException(e);
            log.error("Encountered an exception while setting ssl domain for the amqp connection", (Throwable)this.savedException);
        }
        if (this.deviceClientConfig.getProxySettings() != null) {
            this.addProxyLayer(transport, event.getConnection().getHostname() + ":" + 443);
        }
    }

    public void onConnectionLocalOpen(Event event) {
        log.trace("Amqp connection opened locally");
    }

    public void onConnectionRemoteOpen(Event event) {
        log.trace("Amqp connection opened remotely");
    }

    public void onConnectionLocalClose(Event event) {
        log.debug("Amqp connection closed locally, shutting down all active sessions...");
        for (AmqpsSessionHandler amqpSessionHandler : this.sessionHandlerList) {
            amqpSessionHandler.closeSession();
        }
        if (this.amqpsCbsSessionHandler != null) {
            log.debug("Shutting down cbs session...");
            this.amqpsCbsSessionHandler.close();
        }
        log.trace("Closing reactor since connection has closed");
        event.getReactor().stop();
    }

    public void onConnectionRemoteClose(Event event) {
        Connection connection = event.getConnection();
        if (connection.getLocalState() == EndpointState.ACTIVE) {
            ErrorCondition errorCondition = connection.getRemoteCondition();
            this.savedException = AmqpsExceptionTranslator.convertFromAmqpException(errorCondition);
            log.error("Amqp connection was closed remotely", (Throwable)this.savedException);
            this.connection.close();
        } else {
            log.trace("Closing reactor since connection has closed");
            event.getReactor().stop();
        }
    }

    public void onTransportError(Event event) {
        super.onTransportError(event);
        this.state = IotHubConnectionStatus.DISCONNECTED;
        ErrorCondition errorCondition = event.getTransport().getRemoteCondition();
        if (errorCondition == null) {
            errorCondition = event.getTransport().getCondition();
        }
        this.savedException = AmqpsExceptionTranslator.convertFromAmqpException(errorCondition);
        log.error("Amqp transport error occurred, closing the amqps connection", (Throwable)this.savedException);
        event.getConnection().close();
    }

    public void onTimerTask(Event event) {
        this.sendQueuedMessages();
        event.getReactor().schedule(50, (Handler)this);
    }

    @Override
    public void setListener(IotHubListener listener) {
        this.listener = listener;
    }

    @Override
    public IotHubStatusCode sendMessage(Message message) {
        log.trace("Adding message to amqp message queue to be sent later ({})", (Object)message);
        this.messagesToSend.add(message);
        return IotHubStatusCode.OK;
    }

    @Override
    public boolean sendMessageResult(IotHubTransportMessage message, IotHubMessageResult result) {
        Released ackType;
        if (result == IotHubMessageResult.ABANDON) {
            ackType = Released.getInstance();
        } else if (result == IotHubMessageResult.REJECT) {
            ackType = new Rejected();
        } else if (result == IotHubMessageResult.COMPLETE) {
            ackType = Accepted.getInstance();
        } else {
            log.warn("Invalid IoT Hub message result {}", (Object)result.name());
            return false;
        }
        for (AmqpsSessionHandler sessionHandler : this.sessionHandlerList) {
            if (!sessionHandler.acknowledgeReceivedMessage(message, (DeliveryState)ackType)) continue;
            return true;
        }
        log.warn("No sessions could acknowledge the message ({})", (Object)message);
        return false;
    }

    @Override
    public String getConnectionId() {
        return this.connectionId;
    }

    @Override
    public void onDeviceSessionOpened(String deviceId) {
        log.trace("Device session opened, counting down the device sessions opening latch");
        this.deviceSessionsOpenedLatch.countDown();
    }

    @Override
    public void onAuthenticationSessionOpened() {
        log.trace("Authentication session opened, counting down the authentication session opening latch");
        this.authenticationSessionOpenedLatch.countDown();
        if (this.deviceClientConfig.getAuthenticationType() == DeviceClientConfig.AuthType.SAS_TOKEN) {
            for (AmqpsSasTokenRenewalHandler amqpsSasTokenRenewalHandler : this.sasTokenRenwalHandlerList) {
                try {
                    amqpsSasTokenRenewalHandler.sendAuthenticationMessage(this.connection.getReactor());
                }
                catch (TransportException e) {
                    log.error("Failed to send CBS authentication message", (Throwable)e);
                    this.savedException = e;
                }
            }
        }
    }

    @Override
    public void onMessageAcknowledged(Message message) {
        this.listener.onMessageSent(message, null);
    }

    @Override
    public void onMessageReceived(IotHubTransportMessage message) {
        this.listener.onMessageReceived(message, null);
    }

    @Override
    public void onAuthenticationFailed(TransportException transportException) {
        this.savedException = transportException;
        this.releaseLatch(this.authenticationSessionOpenedLatch);
        this.releaseLatch(this.deviceSessionsOpenedLatch);
    }

    @Override
    public void onSessionClosedUnexpectedly(ErrorCondition errorCondition) {
        this.savedException = AmqpsExceptionTranslator.convertFromAmqpException(errorCondition);
        log.error("Amqp session closed unexpectedly. Closing this connection...", (Throwable)this.savedException);
        this.connection.close();
    }

    private void addWebSocketLayer(Transport transport) {
        log.debug("Adding websocket layer to amqp transport");
        WebSocketImpl webSocket = new WebSocketImpl(262144);
        webSocket.configure(this.hostName, WEB_SOCKET_PATH, WEB_SOCKET_QUERY, 443, WEB_SOCKET_SUB_PROTOCOL, null, null);
        ((TransportInternal)transport).addTransportLayer((TransportLayer)webSocket);
    }

    private void addProxyLayer(Transport transport, String hostName) {
        ProxyImpl proxy;
        log.debug("Adding proxy layer to amqp transport");
        ProxySettings proxySettings = this.deviceClientConfig.getProxySettings();
        if (proxySettings.getUsername() != null && proxySettings.getPassword() != null) {
            log.trace("Adding proxy username and password to amqp proxy configuration");
            ProxyConfiguration proxyConfiguration = new ProxyConfiguration(ProxyAuthenticationType.BASIC, proxySettings.getProxy(), proxySettings.getUsername(), new String(proxySettings.getPassword()));
            proxy = new ProxyImpl(proxyConfiguration);
        } else {
            log.trace("No proxy username and password will be used amqp proxy configuration");
            proxy = new ProxyImpl();
        }
        ProxyHandlerImpl proxyHandler = new ProxyHandlerImpl();
        proxy.configure(hostName, null, (ProxyHandler)proxyHandler, transport);
        ((TransportInternal)transport).addTransportLayer((TransportLayer)proxy);
    }

    private void sendQueuedMessages() {
        int messagesAttemptedToBeProcessed = 0;
        boolean lastSendSucceeded = true;
        Message message = this.messagesToSend.poll();
        while (message != null && messagesAttemptedToBeProcessed < 1000 && lastSendSucceeded) {
            ++messagesAttemptedToBeProcessed;
            lastSendSucceeded = this.sendQueuedMessage(message);
            if (!lastSendSucceeded) {
                log.trace("Amqp message failed to send, adding it back to messages to send queue ({})", (Object)message);
                this.messagesToSend.add(message);
            }
            message = this.messagesToSend.poll();
        }
        if (message != null) {
            this.messagesToSend.add(message);
        }
    }

    private boolean sendQueuedMessage(Message message) {
        AmqpsSessionHandler sessionHandler;
        boolean sendSucceeded = false;
        log.trace("Sending message over amqp ({})", (Object)message);
        Iterator<AmqpsSessionHandler> iterator = this.sessionHandlerList.iterator();
        while (iterator.hasNext() && !(sendSucceeded = (sessionHandler = iterator.next()).sendMessage(message))) {
        }
        return sendSucceeded;
    }

    private Reactor createReactor() throws TransportException {
        try {
            if (this.deviceClientConfig.getAuthenticationType() == DeviceClientConfig.AuthType.X509_CERTIFICATE) {
                ReactorOptions options = new ReactorOptions();
                options.setEnableSaslByDefault(false);
                return Proton.reactor((ReactorOptions)options, (Handler[])new Handler[]{this});
            }
            return Proton.reactor((Handler[])new Handler[]{this});
        }
        catch (IOException e) {
            throw new TransportException("Could not create Proton reactor", e);
        }
    }

    private void scheduleReconnection(Throwable throwable) {
        if (!this.reconnectionScheduled) {
            this.reconnectionScheduled = true;
            log.warn("Amqp connection was closed, creating a thread to notify transport layer", throwable);
            ReconnectionNotifier.notifyDisconnectAsync(throwable, this.listener, this.connectionId);
        }
    }

    private void releaseLatch(CountDownLatch latch) {
        int i = 0;
        while ((long)i < latch.getCount()) {
            latch.countDown();
            ++i;
        }
    }

    private void addDeviceSession(DeviceClientConfig deviceClientConfig, boolean afterOpen) {
        AmqpsSessionHandler amqpsSessionHandler = null;
        for (AmqpsSessionHandler existingAmqpsSessionHandler : this.sessionHandlerList) {
            if (!existingAmqpsSessionHandler.getDeviceId().equals(deviceClientConfig.getDeviceId())) continue;
            amqpsSessionHandler = existingAmqpsSessionHandler;
            break;
        }
        if (amqpsSessionHandler == null) {
            amqpsSessionHandler = new AmqpsSessionHandler(deviceClientConfig, this);
            this.sessionHandlerList.add(amqpsSessionHandler);
        }
        if (afterOpen) {
            amqpsSessionHandler.setSession(this.connection.session());
        }
    }

    private void initializeStateLatches() {
        this.closeReactorLatch = new CountDownLatch(1);
        if (this.deviceClientConfig.getAuthenticationProvider() instanceof IotHubSasTokenAuthenticationProvider) {
            log.trace("Initializing authentication link latch count to {}", (Object)1);
            this.authenticationSessionOpenedLatch = new CountDownLatch(1);
        } else {
            log.trace("Initializing authentication link latch count to 0 because x509 connections don't have authentication links");
            this.authenticationSessionOpenedLatch = new CountDownLatch(0);
        }
        int expectedDeviceSessionCount = this.sessionHandlerList.size();
        this.deviceSessionsOpenedLatch = new CountDownLatch(expectedDeviceSessionCount);
        log.trace("Initializing device session latch count to {}", (Object)expectedDeviceSessionCount);
    }

    private void closeConnectionWithException(String errorMessage, boolean isRetryable) throws TransportException {
        TransportException transportException = new TransportException(errorMessage);
        transportException.setRetryable(isRetryable);
        log.error(errorMessage, (Throwable)transportException);
        this.close();
        throw transportException;
    }

    private void openAsync() throws TransportException {
        log.trace("OpenAsnyc called for amqp connection");
        if (this.executorService == null) {
            log.trace("Creating new executor service");
            this.executorService = Executors.newFixedThreadPool(1);
        }
        ReactorRunner reactorRunner = new ReactorRunner(new IotHubReactor(this.createReactor()), this.listener, this.connectionId);
        this.executorService.submit(reactorRunner);
    }

    private void closeAsync() {
        log.trace("OpenAsync called for amqp connection");
        if (this.connection.getLocalState() == EndpointState.CLOSED && this.connection.getRemoteState() == EndpointState.CLOSED) {
            log.trace("Closing amqp reactor since the connection was already closed");
            this.connection.getReactor().stop();
        } else {
            log.trace("Closing amqp connection");
            this.connection.close();
        }
    }

    private void executorServicesCleanup() throws TransportException {
        if (this.executorService != null) {
            log.trace("Shutdown of executor service has started");
            this.executorService.shutdown();
            try {
                if (!this.executorService.awaitTermination(10L, TimeUnit.SECONDS)) {
                    this.executorService.shutdownNow();
                    if (!this.executorService.awaitTermination(10L, TimeUnit.SECONDS)) {
                        log.trace("Pool did not terminate");
                    }
                }
                this.executorService = null;
            }
            catch (InterruptedException e) {
                log.warn("Interrupted while cleaning up executor services", (Throwable)e);
                this.executorService.shutdownNow();
                this.executorService = null;
                throw new TransportException("Waited too long for the connection to close.", e);
            }
            log.trace("Shutdown of executor service completed");
        }
    }

    private class ReactorRunner
    implements Callable {
        private static final String THREAD_NAME = "azure-iot-sdk-ReactorRunner";
        private final IotHubReactor iotHubReactor;
        private final IotHubListener listener;
        private String connectionId;

        ReactorRunner(IotHubReactor iotHubReactor, IotHubListener listener, String connectionId) {
            this.listener = listener;
            this.iotHubReactor = iotHubReactor;
            this.connectionId = connectionId;
        }

        public Object call() {
            try {
                Thread.currentThread().setName(THREAD_NAME);
                this.iotHubReactor.run();
            }
            catch (HandlerException e) {
                this.listener.onConnectionLost(new TransportException(e), this.connectionId);
            }
            return null;
        }
    }
}

