/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.integration.mqtt.inbound;

import java.util.Arrays;
import java.util.Date;
import java.util.concurrent.ScheduledFuture;
import org.eclipse.paho.client.mqttv3.IMqttClient;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.context.ApplicationEvent;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.ApplicationEventPublisherAware;
import org.springframework.integration.mqtt.core.ConsumerStopAction;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.event.MqttConnectionFailedEvent;
import org.springframework.integration.mqtt.event.MqttSubscribedEvent;
import org.springframework.integration.mqtt.inbound.AbstractMqttMessageDrivenChannelAdapter;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessagingException;
import org.springframework.util.Assert;

public class MqttPahoMessageDrivenChannelAdapter
extends AbstractMqttMessageDrivenChannelAdapter
implements MqttCallback,
ApplicationEventPublisherAware {
    private static final int DEFAULT_COMPLETION_TIMEOUT = 30000;
    private static final int DEFAULT_RECOVERY_INTERVAL = 10000;
    private final MqttPahoClientFactory clientFactory;
    private volatile IMqttClient client;
    private volatile ScheduledFuture<?> reconnectFuture;
    private volatile boolean connected;
    private volatile int completionTimeout = 30000;
    private volatile int recoveryInterval = 10000;
    private volatile boolean cleanSession;
    private volatile ConsumerStopAction consumerStopAction;
    private ApplicationEventPublisher applicationEventPublisher;

    public MqttPahoMessageDrivenChannelAdapter(String url, String clientId, MqttPahoClientFactory clientFactory, String ... topic) {
        super(url, clientId, topic);
        this.clientFactory = clientFactory;
    }

    public MqttPahoMessageDrivenChannelAdapter(String clientId, MqttPahoClientFactory clientFactory, String ... topic) {
        super(null, clientId, topic);
        this.clientFactory = clientFactory;
    }

    public MqttPahoMessageDrivenChannelAdapter(String url, String clientId, String ... topic) {
        this(url, clientId, new DefaultMqttPahoClientFactory(), topic);
    }

    public void setCompletionTimeout(int completionTimeout) {
        this.completionTimeout = completionTimeout;
    }

    public void setRecoveryInterval(int recoveryInterval) {
        this.recoveryInterval = recoveryInterval;
    }

    public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
        this.applicationEventPublisher = applicationEventPublisher;
    }

    protected void doStart() {
        Assert.state((this.getTaskScheduler() != null ? 1 : 0) != 0, (String)"A 'taskScheduler' is required");
        super.doStart();
        try {
            this.connectAndSubscribe();
        }
        catch (Exception e) {
            this.logger.error((Object)"Exception while connecting and subscribing, retrying", (Throwable)e);
            this.scheduleReconnect();
        }
    }

    protected synchronized void doStop() {
        this.cancelReconnect();
        super.doStop();
        if (this.client != null) {
            try {
                if (this.consumerStopAction.equals((Object)ConsumerStopAction.UNSUBSCRIBE_ALWAYS) || this.consumerStopAction.equals((Object)ConsumerStopAction.UNSUBSCRIBE_CLEAN) && this.cleanSession) {
                    this.client.unsubscribe(this.getTopic());
                }
            }
            catch (MqttException e) {
                this.logger.error((Object)"Exception while unsubscribing", (Throwable)e);
            }
            try {
                this.client.disconnectForcibly((long)this.completionTimeout);
            }
            catch (MqttException e) {
                this.logger.error((Object)"Exception while disconnecting", (Throwable)e);
            }
            try {
                this.client.close();
            }
            catch (MqttException e) {
                this.logger.error((Object)"Exception while closing", (Throwable)e);
            }
            this.connected = false;
            this.client = null;
        }
    }

    @Override
    public void addTopic(String topic, int qos) {
        this.topicLock.lock();
        try {
            super.addTopic(topic, qos);
            if (this.client != null && this.client.isConnected()) {
                this.client.subscribe(topic, qos);
            }
        }
        catch (MqttException e) {
            super.removeTopic(topic);
            throw new MessagingException("Failed to subscribe to topic " + topic, (Throwable)e);
        }
        finally {
            this.topicLock.unlock();
        }
    }

    @Override
    public void removeTopic(String ... topic) {
        this.topicLock.lock();
        try {
            if (this.client != null && this.client.isConnected()) {
                this.client.unsubscribe(topic);
            }
            super.removeTopic(topic);
        }
        catch (MqttException e) {
            throw new MessagingException("Failed to unsubscribe from topic " + Arrays.asList(topic), (Throwable)e);
        }
        finally {
            this.topicLock.unlock();
        }
    }

    private synchronized void connectAndSubscribe() throws MqttException {
        MqttConnectOptions connectionOptions = this.clientFactory.getConnectionOptions();
        this.cleanSession = connectionOptions.isCleanSession();
        this.consumerStopAction = this.clientFactory.getConsumerStopAction();
        if (this.consumerStopAction == null) {
            this.consumerStopAction = ConsumerStopAction.UNSUBSCRIBE_CLEAN;
        }
        Assert.state((this.getUrl() != null || connectionOptions.getServerURIs() != null ? 1 : 0) != 0, (String)"If no 'url' provided, connectionOptions.getServerURIs() must not be null");
        this.client = this.clientFactory.getClientInstance(this.getUrl(), this.getClientId());
        this.client.setCallback((MqttCallback)this);
        if (this.client instanceof MqttClient) {
            ((MqttClient)this.client).setTimeToWait((long)this.completionTimeout);
        }
        this.topicLock.lock();
        Object[] topics = this.getTopic();
        try {
            this.client.connect(connectionOptions);
            int[] requestedQos = this.getQos();
            int[] grantedQos = Arrays.copyOf(requestedQos, requestedQos.length);
            this.client.subscribe((String[])topics, grantedQos);
            for (int i = 0; i < requestedQos.length; ++i) {
                if (grantedQos[i] == requestedQos[i]) continue;
                if (this.logger.isWarnEnabled()) {
                    this.logger.warn((Object)("Granted QOS different to Requested QOS; topics: " + Arrays.toString(topics) + " requested: " + Arrays.toString(requestedQos) + " granted: " + Arrays.toString(grantedQos)));
                }
                break;
            }
        }
        catch (MqttException e) {
            if (this.applicationEventPublisher != null) {
                this.applicationEventPublisher.publishEvent((ApplicationEvent)new MqttConnectionFailedEvent((Object)this, e));
            }
            this.logger.error((Object)("Error connecting or subscribing to " + Arrays.toString(topics)), (Throwable)e);
            this.client.disconnectForcibly((long)this.completionTimeout);
            throw e;
        }
        finally {
            this.topicLock.unlock();
        }
        if (this.client.isConnected()) {
            this.connected = true;
            String message = "Connected and subscribed to " + Arrays.toString(topics);
            if (this.logger.isDebugEnabled()) {
                this.logger.debug((Object)message);
            }
            if (this.applicationEventPublisher != null) {
                this.applicationEventPublisher.publishEvent((ApplicationEvent)new MqttSubscribedEvent((Object)this, message));
            }
        }
    }

    private synchronized void cancelReconnect() {
        if (this.reconnectFuture != null) {
            this.reconnectFuture.cancel(false);
            this.reconnectFuture = null;
        }
    }

    private void scheduleReconnect() {
        try {
            this.reconnectFuture = this.getTaskScheduler().schedule(new Runnable(){

                /*
                 * WARNING - Removed try catching itself - possible behaviour change.
                 */
                @Override
                public void run() {
                    try {
                        if (MqttPahoMessageDrivenChannelAdapter.this.logger.isDebugEnabled()) {
                            MqttPahoMessageDrivenChannelAdapter.this.logger.debug((Object)"Attempting reconnect");
                        }
                        MqttPahoMessageDrivenChannelAdapter mqttPahoMessageDrivenChannelAdapter = MqttPahoMessageDrivenChannelAdapter.this;
                        synchronized (mqttPahoMessageDrivenChannelAdapter) {
                            if (!MqttPahoMessageDrivenChannelAdapter.this.connected) {
                                MqttPahoMessageDrivenChannelAdapter.this.connectAndSubscribe();
                                MqttPahoMessageDrivenChannelAdapter.this.reconnectFuture = null;
                            }
                        }
                    }
                    catch (MqttException e) {
                        MqttPahoMessageDrivenChannelAdapter.this.logger.error((Object)"Exception while connecting and subscribing", (Throwable)e);
                        MqttPahoMessageDrivenChannelAdapter.this.scheduleReconnect();
                    }
                }
            }, new Date(System.currentTimeMillis() + (long)this.recoveryInterval));
        }
        catch (Exception e) {
            this.logger.error((Object)"Failed to schedule reconnect", (Throwable)e);
        }
    }

    public synchronized void connectionLost(Throwable cause) {
        this.logger.error((Object)("Lost connection:" + cause.getMessage() + "; retrying..."));
        this.connected = false;
        this.scheduleReconnect();
        if (this.applicationEventPublisher != null) {
            this.applicationEventPublisher.publishEvent((ApplicationEvent)new MqttConnectionFailedEvent((Object)this, cause));
        }
    }

    public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
        Message<?> message = this.getConverter().toMessage(topic, mqttMessage);
        try {
            this.sendMessage(message);
        }
        catch (RuntimeException e) {
            this.logger.error((Object)("Unhandled exception for " + message.toString()), (Throwable)e);
            throw e;
        }
    }

    public void deliveryComplete(IMqttDeliveryToken token) {
    }
}

