/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.integration.mqtt.inbound;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.ConcurrentModificationException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.IntStream;
import org.eclipse.paho.mqttv5.client.IMqttAsyncClient;
import org.eclipse.paho.mqttv5.client.IMqttToken;
import org.eclipse.paho.mqttv5.client.MqttAsyncClient;
import org.eclipse.paho.mqttv5.client.MqttCallback;
import org.eclipse.paho.mqttv5.client.MqttClientPersistence;
import org.eclipse.paho.mqttv5.client.MqttConnectionOptions;
import org.eclipse.paho.mqttv5.client.MqttDisconnectResponse;
import org.eclipse.paho.mqttv5.common.MqttException;
import org.eclipse.paho.mqttv5.common.MqttMessage;
import org.eclipse.paho.mqttv5.common.MqttSubscription;
import org.eclipse.paho.mqttv5.common.packet.MqttProperties;
import org.springframework.beans.factory.BeanCreationException;
import org.springframework.context.ApplicationEvent;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.integration.acks.SimpleAcknowledgment;
import org.springframework.integration.mapping.HeaderMapper;
import org.springframework.integration.mqtt.core.ClientManager;
import org.springframework.integration.mqtt.core.MqttComponent;
import org.springframework.integration.mqtt.event.MqttConnectionFailedEvent;
import org.springframework.integration.mqtt.event.MqttProtocolErrorEvent;
import org.springframework.integration.mqtt.event.MqttSubscribedEvent;
import org.springframework.integration.mqtt.inbound.AbstractMqttMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.support.MqttHeaderMapper;
import org.springframework.integration.mqtt.support.MqttMessageConverter;
import org.springframework.integration.mqtt.support.MqttUtils;
import org.springframework.lang.Nullable;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.MessagingException;
import org.springframework.messaging.converter.SmartMessageConverter;
import org.springframework.messaging.support.GenericMessage;
import org.springframework.util.Assert;
import org.springframework.util.CollectionUtils;
import org.springframework.util.ObjectUtils;

public class Mqttv5PahoMessageDrivenChannelAdapter
extends AbstractMqttMessageDrivenChannelAdapter<IMqttAsyncClient, MqttConnectionOptions>
implements MqttCallback,
MqttComponent<MqttConnectionOptions> {
    private final Lock lock = new ReentrantLock();
    private final MqttConnectionOptions connectionOptions;
    private List<MqttSubscription> subscriptions;
    private IMqttAsyncClient mqttClient;
    @Nullable
    private MqttClientPersistence persistence;
    private SmartMessageConverter messageConverter;
    private Class<?> payloadType = byte[].class;
    private HeaderMapper<MqttProperties> headerMapper = new MqttHeaderMapper();
    private volatile boolean readyToSubscribeOnStart;

    public Mqttv5PahoMessageDrivenChannelAdapter(String url, String clientId, MqttSubscription ... mqttSubscriptions) {
        this(url, clientId, (String[])Arrays.stream(mqttSubscriptions).map(MqttSubscription::getTopic).toArray(String[]::new));
        this.subscriptions = new ArrayList<MqttSubscription>();
        Collections.addAll(this.subscriptions, mqttSubscriptions);
    }

    public Mqttv5PahoMessageDrivenChannelAdapter(String url, String clientId, String ... topic) {
        super(url, clientId, topic);
        Assert.hasText((String)url, (String)"'url' cannot be null or empty");
        this.connectionOptions = new MqttConnectionOptions();
        this.connectionOptions.setServerURIs(new String[]{url});
        this.connectionOptions.setAutomaticReconnect(true);
    }

    public Mqttv5PahoMessageDrivenChannelAdapter(MqttConnectionOptions connectionOptions, String clientId, MqttSubscription ... mqttSubscriptions) {
        this(connectionOptions, clientId, (String[])Arrays.stream(mqttSubscriptions).map(MqttSubscription::getTopic).toArray(String[]::new));
        this.subscriptions = new ArrayList<MqttSubscription>();
        Collections.addAll(this.subscriptions, mqttSubscriptions);
    }

    public Mqttv5PahoMessageDrivenChannelAdapter(MqttConnectionOptions connectionOptions, String clientId, String ... topic) {
        super(Mqttv5PahoMessageDrivenChannelAdapter.obtainServerUrlFromOptions(connectionOptions), clientId, topic);
        this.connectionOptions = connectionOptions;
        if (!this.connectionOptions.isAutomaticReconnect()) {
            this.logger.warn((CharSequence)"It is recommended to set 'automaticReconnect' MQTT client option. Otherwise the current channel adapter restart should be used explicitly, e.g. via handling 'MqttConnectionFailedEvent' on client disconnection.");
        }
    }

    public Mqttv5PahoMessageDrivenChannelAdapter(ClientManager<IMqttAsyncClient, MqttConnectionOptions> clientManager, MqttSubscription ... mqttSubscriptions) {
        this(clientManager, (String[])Arrays.stream(mqttSubscriptions).map(MqttSubscription::getTopic).toArray(String[]::new));
        this.subscriptions = new ArrayList<MqttSubscription>();
        Collections.addAll(this.subscriptions, mqttSubscriptions);
    }

    public Mqttv5PahoMessageDrivenChannelAdapter(ClientManager<IMqttAsyncClient, MqttConnectionOptions> clientManager, String ... topic) {
        super(clientManager, topic);
        this.connectionOptions = (MqttConnectionOptions)clientManager.getConnectionInfo();
    }

    @Override
    public MqttConnectionOptions getConnectionInfo() {
        return this.connectionOptions;
    }

    public void setPersistence(@Nullable MqttClientPersistence persistence) {
        this.persistence = persistence;
    }

    @Override
    public void setConverter(MqttMessageConverter converter) {
        throw new UnsupportedOperationException("Use setMessageConverter(SmartMessageConverter) instead");
    }

    public void setMessageConverter(SmartMessageConverter messageConverter) {
        this.messageConverter = messageConverter;
    }

    public void setPayloadType(Class<?> payloadType) {
        Assert.notNull(payloadType, (String)"'payloadType' must not be null.");
        this.payloadType = payloadType;
    }

    public void setHeaderMapper(HeaderMapper<MqttProperties> headerMapper) {
        Assert.notNull(headerMapper, (String)"'headerMapper' must not be null.");
        this.headerMapper = headerMapper;
    }

    @Override
    protected void onInit() {
        super.onInit();
        if (this.getClientManager() == null && this.mqttClient == null) {
            try {
                this.mqttClient = new MqttAsyncClient(this.getUrl(), this.getClientId(), this.persistence);
                this.mqttClient.setCallback((MqttCallback)this);
                this.mqttClient.setManualAcks(this.isManualAcks());
            }
            catch (MqttException ex) {
                throw new BeanCreationException("Cannot create 'MqttAsyncClient' for: " + this.getComponentName(), (Throwable)ex);
            }
        }
        if (this.messageConverter == null) {
            this.setMessageConverter((SmartMessageConverter)this.getBeanFactory().getBean("integrationArgumentResolverMessageConverter", SmartMessageConverter.class));
        }
    }

    protected void doStart() {
        try {
            this.connect();
            if (this.readyToSubscribeOnStart) {
                this.subscribe();
            }
        }
        catch (MqttException ex) {
            if (this.getConnectionInfo().isAutomaticReconnect()) {
                try {
                    this.mqttClient.reconnect();
                }
                catch (MqttException re) {
                    this.logger.error((Throwable)re, (CharSequence)"MQTT client failed to connect. Never happens.");
                }
            }
            ApplicationEventPublisher applicationEventPublisher = this.getApplicationEventPublisher();
            if (applicationEventPublisher != null) {
                applicationEventPublisher.publishEvent((ApplicationEvent)new MqttConnectionFailedEvent(this, ex));
            }
            this.logger.error((Throwable)ex, (CharSequence)"MQTT client failed to connect.");
        }
    }

    private void connect() throws MqttException {
        this.lock.lock();
        try {
            ClientManager clientManager = this.getClientManager();
            if (clientManager == null) {
                this.mqttClient.connect(this.connectionOptions).waitForCompletion(this.getCompletionTimeout());
            } else {
                this.mqttClient = (IMqttAsyncClient)clientManager.getClient();
            }
        }
        finally {
            this.lock.unlock();
        }
    }

    protected void doStop() {
        this.topicLock.lock();
        this.readyToSubscribeOnStart = false;
        String[] topics = this.getTopic();
        try {
            if (this.mqttClient != null && this.mqttClient.isConnected()) {
                if (this.connectionOptions.isCleanStart()) {
                    this.unsubscribe(topics);
                    this.readyToSubscribeOnStart = true;
                }
                if (this.getClientManager() == null) {
                    this.mqttClient.disconnectForcibly(this.getDisconnectCompletionTimeout());
                    if (this.getConnectionInfo().isAutomaticReconnect()) {
                        MqttUtils.stopClientReconnectCycle(this.mqttClient);
                    }
                }
            }
        }
        catch (MqttException ex) {
            this.logger.error((Throwable)ex, () -> "Error unsubscribing from " + Arrays.toString(topics));
        }
        finally {
            this.topicLock.unlock();
        }
    }

    private void unsubscribe(String ... topics) throws MqttException {
        try {
            this.mqttClient.unsubscribe(topics).waitForCompletion(this.getCompletionTimeout());
        }
        catch (ConcurrentModificationException ex) {
            this.logger.error((Throwable)ex, () -> "Error unsubscribing from " + Arrays.toString(topics));
        }
    }

    @Override
    public void destroy() {
        super.destroy();
        try {
            if (this.getClientManager() == null && this.mqttClient != null) {
                this.mqttClient.close();
            }
        }
        catch (MqttException ex) {
            this.logger.error((Throwable)ex, (CharSequence)"Failed to close 'MqttAsyncClient'");
        }
    }

    @Override
    public void setQos(int ... qos) {
        Assert.isNull(this.subscriptions, (String)"The 'qos' must be provided with the 'MqttSubscription'.");
        super.setQos(qos);
    }

    @Override
    public void addTopic(String topic, int qos) {
        this.topicLock.lock();
        try {
            super.addTopic(topic, qos);
            MqttSubscription subscription = new MqttSubscription(topic, qos);
            if (this.subscriptions != null) {
                this.subscriptions.add(subscription);
            }
            if (this.mqttClient != null && this.mqttClient.isConnected()) {
                MqttProperties subscriptionProperties = new MqttProperties();
                subscriptionProperties.setSubscriptionIdentifiers(List.of(Integer.valueOf(0)));
                this.mqttClient.subscribe(new MqttSubscription[]{subscription}, null, null, this::messageArrived, subscriptionProperties).waitForCompletion(this.getCompletionTimeout());
            }
        }
        catch (MqttException ex) {
            throw new MessagingException("Failed to subscribe to topic " + topic, (Throwable)ex);
        }
        finally {
            this.topicLock.unlock();
        }
    }

    @Override
    public void removeTopic(String ... topic) {
        this.topicLock.lock();
        try {
            if (this.mqttClient != null && this.mqttClient.isConnected()) {
                this.unsubscribe(topic);
            }
            super.removeTopic(topic);
            if (!CollectionUtils.isEmpty(this.subscriptions)) {
                this.subscriptions.removeIf(sub -> ObjectUtils.containsElement((Object[])topic, (Object)sub.getTopic()));
            }
        }
        catch (MqttException ex) {
            throw new MessagingException("Failed to unsubscribe from topic(s) " + Arrays.toString(topic), (Throwable)ex);
        }
        finally {
            this.topicLock.unlock();
        }
    }

    public void messageArrived(String topic, MqttMessage mqttMessage) {
        Map headers = this.headerMapper.toHeaders((Object)mqttMessage.getProperties());
        headers.put("mqtt_id", mqttMessage.getId());
        headers.put("mqtt_receivedQos", mqttMessage.getQos());
        headers.put("mqtt_duplicate", mqttMessage.isDuplicate());
        headers.put("mqtt_receivedRetained", mqttMessage.isRetained());
        headers.put("mqtt_receivedTopic", topic);
        if (this.isManualAcks()) {
            headers.put("acknowledgmentCallback", new AcknowledgmentImpl(mqttMessage.getId(), mqttMessage.getQos(), this.mqttClient));
        }
        Object payload = MqttMessage.class.isAssignableFrom(this.payloadType) ? mqttMessage : (Object)mqttMessage.getPayload();
        Object message = MqttMessage.class.isAssignableFrom(this.payloadType) || byte[].class.isAssignableFrom(this.payloadType) ? new GenericMessage(payload, headers) : this.messageConverter.toMessage(payload, new MessageHeaders(headers), this.payloadType);
        try {
            this.sendMessage((Message)message);
        }
        catch (RuntimeException ex) {
            this.logger.error((Throwable)ex, () -> Mqttv5PahoMessageDrivenChannelAdapter.lambda$messageArrived$6((Message)message));
            throw ex;
        }
    }

    public void disconnected(MqttDisconnectResponse disconnectResponse) {
        if (this.isRunning()) {
            MqttException cause = disconnectResponse.getException();
            ApplicationEventPublisher applicationEventPublisher = this.getApplicationEventPublisher();
            if (applicationEventPublisher != null) {
                applicationEventPublisher.publishEvent((ApplicationEvent)new MqttConnectionFailedEvent(this, cause));
            }
        } else {
            this.readyToSubscribeOnStart = false;
        }
    }

    public void mqttErrorOccurred(MqttException exception) {
        ApplicationEventPublisher applicationEventPublisher = this.getApplicationEventPublisher();
        if (applicationEventPublisher != null) {
            applicationEventPublisher.publishEvent((ApplicationEvent)new MqttProtocolErrorEvent((Object)this, exception));
        }
    }

    public void deliveryComplete(IMqttToken token) {
    }

    @Override
    public void connectComplete(boolean isReconnect) {
        this.connectComplete(isReconnect, this.getUrl());
    }

    public void connectComplete(boolean reconnect, String serverURI) {
        if (this.isActive()) {
            this.subscribe();
        } else {
            this.readyToSubscribeOnStart = true;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void subscribe() {
        Object[] mqttSubscriptions;
        ClientManager clientManager = this.getClientManager();
        if (clientManager != null && this.mqttClient == null) {
            this.mqttClient = (IMqttAsyncClient)clientManager.getClient();
        }
        if (ObjectUtils.isEmpty((Object[])(mqttSubscriptions = this.obtainSubscriptions()))) {
            return;
        }
        ApplicationEventPublisher applicationEventPublisher = this.getApplicationEventPublisher();
        this.topicLock.lock();
        try {
            MqttProperties subscriptionProperties = new MqttProperties();
            subscriptionProperties.setSubscriptionIdentifiers(List.of(Integer.valueOf(0)));
            this.mqttClient.subscribe((MqttSubscription[])mqttSubscriptions, null, null, this::messageArrived, subscriptionProperties).waitForCompletion(this.getCompletionTimeout());
            String message = "Connected and subscribed to " + Arrays.toString(mqttSubscriptions);
            this.logger.debug((CharSequence)message);
            if (applicationEventPublisher != null) {
                applicationEventPublisher.publishEvent((ApplicationEvent)new MqttSubscribedEvent((Object)this, message));
            }
        }
        catch (MqttException ex) {
            if (applicationEventPublisher != null) {
                applicationEventPublisher.publishEvent((ApplicationEvent)new MqttConnectionFailedEvent(this, ex));
            }
            this.logger.error((Throwable)ex, () -> Mqttv5PahoMessageDrivenChannelAdapter.lambda$subscribe$7((MqttSubscription[])mqttSubscriptions));
        }
        finally {
            this.topicLock.unlock();
        }
    }

    private MqttSubscription[] obtainSubscriptions() {
        if (this.subscriptions != null) {
            return this.subscriptions.toArray(new MqttSubscription[0]);
        }
        String[] topics = this.getTopic();
        if (topics.length == 0) {
            return null;
        }
        int[] requestedQos = this.getQos();
        return (MqttSubscription[])IntStream.range(0, topics.length).mapToObj(i -> new MqttSubscription(topics[i], requestedQos[i])).toArray(MqttSubscription[]::new);
    }

    public void authPacketArrived(int reasonCode, MqttProperties properties) {
    }

    private static String obtainServerUrlFromOptions(MqttConnectionOptions connectionOptions) {
        Assert.notNull((Object)connectionOptions, (String)"'connectionOptions' must not be null");
        Object[] serverURIs = connectionOptions.getServerURIs();
        Assert.notEmpty((Object[])serverURIs, (String)"'serverURIs' must be provided in the 'MqttConnectionOptions'");
        return serverURIs[0];
    }

    private static /* synthetic */ CharSequence lambda$subscribe$7(MqttSubscription[] mqttSubscriptions) {
        return "Error subscribing to " + Arrays.toString(mqttSubscriptions);
    }

    private static /* synthetic */ CharSequence lambda$messageArrived$6(Message message) {
        return "Unhandled exception for " + String.valueOf(message);
    }

    private static class AcknowledgmentImpl
    implements SimpleAcknowledgment {
        private final int id;
        private final int qos;
        private final IMqttAsyncClient ackClient;

        AcknowledgmentImpl(int id, int qos, IMqttAsyncClient client) {
            this.id = id;
            this.qos = qos;
            this.ackClient = client;
        }

        public void acknowledge() {
            try {
                this.ackClient.messageArrivedComplete(this.id, this.qos);
            }
            catch (MqttException ex) {
                throw new IllegalStateException(ex);
            }
        }
    }
}

