/*
 * Decompiled with CFR 0.152.
 */
package org.apache.qpid.server.virtualhost;

import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import java.util.HashMap;
import java.util.Map;
import java.util.TreeMap;
import java.util.UUID;
import org.apache.qpid.server.logging.EventLogger;
import org.apache.qpid.server.logging.messages.MessageStoreMessages;
import org.apache.qpid.server.logging.messages.TransactionLogMessages;
import org.apache.qpid.server.logging.subjects.MessageStoreLogSubject;
import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.plugin.MessageMetaDataType;
import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.store.MessageEnqueueRecord;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StoredMessage;
import org.apache.qpid.server.store.Transaction;
import org.apache.qpid.server.store.handler.DistributedTransactionHandler;
import org.apache.qpid.server.store.handler.MessageHandler;
import org.apache.qpid.server.store.handler.MessageInstanceHandler;
import org.apache.qpid.server.transport.util.Functions;
import org.apache.qpid.server.txn.DtxBranch;
import org.apache.qpid.server.txn.DtxRegistry;
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.txn.Xid;
import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.util.ServerScopedRuntimeException;
import org.apache.qpid.server.virtualhost.MessageStoreRecoverer;
import org.apache.qpid.server.virtualhost.QueueManagingVirtualHost;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SynchronousMessageStoreRecoverer
implements MessageStoreRecoverer {
    private static final Logger LOGGER = LoggerFactory.getLogger(SynchronousMessageStoreRecoverer.class);

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public ListenableFuture<Void> recover(QueueManagingVirtualHost<?> virtualHost) {
        EventLogger eventLogger = virtualHost.getEventLogger();
        MessageStore store = virtualHost.getMessageStore();
        MessageStore.MessageStoreReader storeReader = store.newMessageStoreReader();
        MessageStoreLogSubject logSubject = new MessageStoreLogSubject(virtualHost.getName(), store.getClass().getSimpleName());
        TreeMap queueRecoveries = new TreeMap();
        HashMap recoveredMessages = new HashMap();
        TreeMap unusedMessages = new TreeMap();
        HashMap<UUID, Integer> unknownQueuesWithMessages = new HashMap<UUID, Integer>();
        HashMap<Queue, Integer> queuesWithUnknownMessages = new HashMap<Queue, Integer>();
        eventLogger.message(logSubject, MessageStoreMessages.RECOVERY_START());
        storeReader.visitMessages(new MessageVisitor(recoveredMessages, unusedMessages));
        eventLogger.message(logSubject, TransactionLogMessages.RECOVERY_START(null, false));
        try {
            storeReader.visitMessageInstances(new MessageInstanceVisitor(virtualHost, store, queueRecoveries, recoveredMessages, unusedMessages, unknownQueuesWithMessages, queuesWithUnknownMessages));
        }
        finally {
            if (!unknownQueuesWithMessages.isEmpty()) {
                unknownQueuesWithMessages.forEach((queueId, count) -> LOGGER.info("Discarded {} entry(s) associated with queue id '{}' as a queue with this id does not appear in the configuration.", count, queueId));
            }
            if (!queuesWithUnknownMessages.isEmpty()) {
                queuesWithUnknownMessages.forEach((queue, count) -> LOGGER.info("Discarded {} entry(s) associated with queue '{}' as the referenced message does not exist.", count, (Object)queue.getName()));
            }
        }
        for (Map.Entry entry : queueRecoveries.entrySet()) {
            Queue queue2 = (Queue)entry.getKey();
            Integer deliveredCount = (Integer)entry.getValue();
            eventLogger.message(logSubject, TransactionLogMessages.RECOVERED(deliveredCount, queue2.getName()));
            eventLogger.message(logSubject, TransactionLogMessages.RECOVERY_COMPLETE(queue2.getName(), true));
            queue2.completeRecovery();
        }
        for (Queue q : virtualHost.getChildren(Queue.class)) {
            if (queueRecoveries.containsKey(q)) continue;
            q.completeRecovery();
        }
        storeReader.visitDistributedTransactions(new DistributedTransactionVisitor(virtualHost, eventLogger, logSubject, recoveredMessages, unusedMessages));
        for (StoredMessage m : unusedMessages.values()) {
            LOGGER.debug("Message id '{}' is orphaned, removing", (Object)m.getMessageNumber());
            m.remove();
        }
        if (unusedMessages.size() > 0) {
            LOGGER.info("Discarded {} orphaned message(s).", (Object)unusedMessages.size());
        }
        eventLogger.message(logSubject, TransactionLogMessages.RECOVERY_COMPLETE(null, false));
        eventLogger.message(logSubject, MessageStoreMessages.RECOVERED(recoveredMessages.size() - unusedMessages.size()));
        eventLogger.message(logSubject, MessageStoreMessages.RECOVERY_COMPLETE());
        return Futures.immediateFuture(null);
    }

    @Override
    public void cancel() {
    }

    private static class DistributedTransactionVisitor
    implements DistributedTransactionHandler {
        private final QueueManagingVirtualHost<?> _virtualHost;
        private final EventLogger _eventLogger;
        private final MessageStoreLogSubject _logSubject;
        private final Map<Long, ServerMessage<?>> _recoveredMessages;
        private final Map<Long, StoredMessage<?>> _unusedMessages;

        private DistributedTransactionVisitor(QueueManagingVirtualHost<?> virtualHost, EventLogger eventLogger, MessageStoreLogSubject logSubject, Map<Long, ServerMessage<?>> recoveredMessages, Map<Long, StoredMessage<?>> unusedMessages) {
            this._virtualHost = virtualHost;
            this._eventLogger = eventLogger;
            this._logSubject = logSubject;
            this._recoveredMessages = recoveredMessages;
            this._unusedMessages = unusedMessages;
        }

        @Override
        public boolean handle(Transaction.StoredXidRecord storedXid, Transaction.EnqueueRecord[] enqueues, Transaction.DequeueRecord[] dequeues) {
            StringBuilder xidString;
            ServerMessage<?> message;
            Queue<?> queue;
            Xid id = new Xid(storedXid.getFormat(), storedXid.getGlobalId(), storedXid.getBranchId());
            DtxRegistry dtxRegistry = this._virtualHost.getDtxRegistry();
            DtxBranch branch = dtxRegistry.getBranch(id);
            if (branch == null) {
                branch = new DtxBranch(storedXid, dtxRegistry);
                dtxRegistry.registerBranch(branch);
            }
            for (Transaction.EnqueueRecord enqueueRecord : enqueues) {
                queue = this._virtualHost.getAttainedQueue(enqueueRecord.getResource().getId());
                if (queue != null) {
                    long messageId = enqueueRecord.getMessage().getMessageNumber();
                    message = this._recoveredMessages.get(messageId);
                    this._unusedMessages.remove(messageId);
                    if (message != null) {
                        final MessageReference ref = message.newReference();
                        final MessageEnqueueRecord[] records = new MessageEnqueueRecord[1];
                        branch.enqueue(queue, message, new Action<MessageEnqueueRecord>(){

                            @Override
                            public void performAction(MessageEnqueueRecord record) {
                                records[0] = record;
                            }
                        });
                        branch.addPostTransactionAction(new ServerTransaction.Action(){

                            @Override
                            public void postCommit() {
                                queue.enqueue(message, null, records[0]);
                                ref.release();
                            }

                            @Override
                            public void onRollback() {
                                ref.release();
                            }
                        });
                        continue;
                    }
                    xidString = this.xidAsString(id);
                    this._eventLogger.message(this._logSubject, TransactionLogMessages.XA_INCOMPLETE_MESSAGE(xidString.toString(), Long.toString(messageId)));
                    continue;
                }
                StringBuilder xidString2 = this.xidAsString(id);
                this._eventLogger.message(this._logSubject, TransactionLogMessages.XA_INCOMPLETE_QUEUE(xidString2.toString(), enqueueRecord.getResource().getId().toString()));
            }
            for (Transaction.DequeueRecord dequeueRecord : dequeues) {
                queue = this._virtualHost.getAttainedQueue(dequeueRecord.getEnqueueRecord().getQueueId());
                if (queue != null) {
                    long messageId = dequeueRecord.getEnqueueRecord().getMessageNumber();
                    message = this._recoveredMessages.get(messageId);
                    this._unusedMessages.remove(messageId);
                    if (message != null) {
                        final QueueEntry entry = queue.getMessageOnTheQueue(messageId);
                        if (entry.acquire()) {
                            branch.dequeue(entry.getEnqueueRecord());
                            branch.addPostTransactionAction(new ServerTransaction.Action(){

                                @Override
                                public void postCommit() {
                                    entry.delete();
                                }

                                @Override
                                public void onRollback() {
                                    entry.release();
                                }
                            });
                            continue;
                        }
                        throw new ServerScopedRuntimeException("Distributed transaction dequeue handler failed to acquire " + entry + " during recovery of queue " + queue);
                    }
                    xidString = this.xidAsString(id);
                    this._eventLogger.message(this._logSubject, TransactionLogMessages.XA_INCOMPLETE_MESSAGE(xidString.toString(), Long.toString(messageId)));
                    continue;
                }
                StringBuilder xidString3 = this.xidAsString(id);
                this._eventLogger.message(this._logSubject, TransactionLogMessages.XA_INCOMPLETE_QUEUE(xidString3.toString(), dequeueRecord.getEnqueueRecord().getQueueId().toString()));
            }
            branch.setState(DtxBranch.State.PREPARED);
            branch.prePrepareTransaction();
            return true;
        }

        private StringBuilder xidAsString(Xid id) {
            return new StringBuilder("(").append(id.getFormat()).append(',').append(Functions.str(id.getGlobalId())).append(',').append(Functions.str(id.getBranchId())).append(')');
        }
    }

    private static class MessageInstanceVisitor
    implements MessageInstanceHandler {
        private final QueueManagingVirtualHost<?> _virtualHost;
        private final MessageStore _store;
        private final Map<Queue<?>, Integer> _queueRecoveries;
        private final Map<Long, ServerMessage<?>> _recoveredMessages;
        private final Map<Long, StoredMessage<?>> _unusedMessages;
        private final Map<UUID, Integer> _unknownQueuesWithMessages;
        private Map<Queue<?>, Integer> _queuesWithUnknownMessages;

        private MessageInstanceVisitor(QueueManagingVirtualHost<?> virtualHost, MessageStore store, Map<Queue<?>, Integer> queueRecoveries, Map<Long, ServerMessage<?>> recoveredMessages, Map<Long, StoredMessage<?>> unusedMessages, Map<UUID, Integer> unknownQueuesWithMessages, Map<Queue<?>, Integer> queuesWithUnknownMessages) {
            this._virtualHost = virtualHost;
            this._store = store;
            this._queueRecoveries = queueRecoveries;
            this._recoveredMessages = recoveredMessages;
            this._unusedMessages = unusedMessages;
            this._unknownQueuesWithMessages = unknownQueuesWithMessages;
            this._queuesWithUnknownMessages = queuesWithUnknownMessages;
        }

        @Override
        public boolean handle(MessageEnqueueRecord record) {
            UUID queueId = record.getQueueId();
            long messageId = record.getMessageNumber();
            Queue<?> queue = this._virtualHost.getAttainedQueue(queueId);
            boolean dequeueMessageInstance = true;
            if (queue != null) {
                String queueName = queue.getName();
                ServerMessage<?> message = this._recoveredMessages.get(messageId);
                this._unusedMessages.remove(messageId);
                if (message != null) {
                    LOGGER.debug("Delivering message id '{}' to queue '{}'", (Object)message.getMessageNumber(), (Object)queueName);
                    this._queueRecoveries.merge(queue, 1, (old, unused) -> old + 1);
                    queue.recover(message, record);
                    dequeueMessageInstance = false;
                } else {
                    LOGGER.debug("Message id '{}' referenced in log as enqueued in queue '{}' is unknown, entry will be discarded", (Object)messageId, (Object)queueName);
                    this._queuesWithUnknownMessages.merge(queue, 1, (old, unused) -> old + 1);
                }
            } else {
                LOGGER.debug("Message id '{}' in log references queue with id '{}' which is not in the configuration, entry will be discarded", (Object)messageId, (Object)queueId);
                this._unknownQueuesWithMessages.merge(queueId, 1, (old, unused) -> old + 1);
            }
            if (dequeueMessageInstance) {
                Transaction txn = this._store.newTransaction();
                txn.dequeueMessage(record);
                txn.commitTranAsync(null);
            }
            return true;
        }
    }

    private static class MessageVisitor
    implements MessageHandler {
        private final Map<Long, ServerMessage<?>> _recoveredMessages;
        private final Map<Long, StoredMessage<?>> _unusedMessages;

        MessageVisitor(Map<Long, ServerMessage<?>> recoveredMessages, Map<Long, StoredMessage<?>> unusedMessages) {
            this._recoveredMessages = recoveredMessages;
            this._unusedMessages = unusedMessages;
        }

        @Override
        public boolean handle(StoredMessage<?> message) {
            Object metaData = message.getMetaData();
            MessageMetaDataType type = metaData.getType();
            ServerMessage<?> serverMessage = type.createMessage(message);
            this._recoveredMessages.put(message.getMessageNumber(), serverMessage);
            this._unusedMessages.put(message.getMessageNumber(), message);
            return true;
        }
    }
}

