/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.artemis.core.protocol.mqtt;

import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.persistence.impl.journal.LargeServerMessageImpl;
import org.apache.activemq.artemis.core.protocol.mqtt.MQTTSession;
import org.apache.activemq.artemis.core.protocol.mqtt.MQTTUtil;
import org.apache.activemq.artemis.core.server.BindingQueryResult;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.RoutingContext;
import org.apache.activemq.artemis.core.server.impl.RoutingContextImpl;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.utils.collections.LinkedListIterator;

public class MQTTRetainMessageManager {
    private MQTTSession session;

    public MQTTRetainMessageManager(MQTTSession session) {
        this.session = session;
    }

    void handleRetainedMessage(Message messageParameter, String address, boolean reset, Transaction tx) throws Exception {
        String retainAddress = MQTTUtil.getCoreRetainAddressFromMqttTopic(address, this.session.getWildcardConfiguration());
        Queue queue = this.session.getServer().locateQueue(retainAddress);
        if (queue == null) {
            queue = this.session.getServer().createQueue(new QueueConfiguration(retainAddress).setAutoCreated(Boolean.valueOf(true)));
        }
        queue.deleteAllReferences();
        if (!reset) {
            Message message = LargeServerMessageImpl.checkLargeMessage((Message)messageParameter, (StorageManager)this.session.getServer().getStorageManager());
            this.sendToQueue(message.copy(this.session.getServer().getStorageManager().generateID()), queue, tx);
        }
    }

    void addRetainedMessagesToQueue(Queue queue, String address) throws Exception {
        String retainAddress = MQTTUtil.getCoreRetainAddressFromMqttTopic(address, this.session.getWildcardConfiguration());
        BindingQueryResult bindingQueryResult = this.session.getServerSession().executeBindingQuery(new SimpleString(retainAddress));
        Transaction tx = this.session.getServerSession().newTransaction();
        try {
            for (SimpleString retainedQueueName : bindingQueryResult.getQueueNames()) {
                Queue retainedQueue = this.session.getServer().locateQueue(retainedQueueName);
                LinkedListIterator i = retainedQueue.iterator();
                try {
                    if (!i.hasNext()) continue;
                    MessageReference ref = (MessageReference)i.next();
                    while (i.hasNext()) {
                        ref = (MessageReference)i.next();
                        if (!i.hasNext()) continue;
                        i.remove();
                    }
                    Message message = ref.getMessage().copy(this.session.getServer().getStorageManager().generateID());
                    message.putStringProperty(MQTTUtil.MQTT_MESSAGE_RETAIN_INITIAL_DISTRIBUTION_KEY, (String)null);
                    this.sendToQueue(message, queue, tx);
                }
                finally {
                    if (i == null) continue;
                    i.close();
                }
            }
        }
        catch (Exception t) {
            tx.rollback();
            throw t;
        }
        tx.commit();
    }

    private void sendToQueue(Message message, Queue queue, Transaction tx) throws Exception {
        RoutingContextImpl context = new RoutingContextImpl(tx);
        queue.route(message, (RoutingContext)context);
        this.session.getServer().getPostOffice().processRoute(message, (RoutingContext)context, false);
    }
}

