/*
 * Decompiled with CFR 0.152.
 */
package org.apache.qpid.server.txn;

import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.qpid.server.logging.EventLogger;
import org.apache.qpid.server.logging.LogSubject;
import org.apache.qpid.server.logging.messages.ConnectionMessages;
import org.apache.qpid.server.message.EnqueueableMessage;
import org.apache.qpid.server.store.StorableMessageMetaData;
import org.apache.qpid.server.store.StoredMessage;
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.txn.TransactionObserver;

public class FlowToDiskTransactionObserver
implements TransactionObserver {
    private final AtomicLong _uncommittedMessageSize = new AtomicLong();
    private final ConcurrentMap<ServerTransaction, TransactionDetails> _uncommittedMessages = new ConcurrentHashMap<ServerTransaction, TransactionDetails>();
    private final LogSubject _logSubject;
    private final EventLogger _eventLogger;
    private final long _maxUncommittedInMemorySize;
    private volatile boolean _reported;

    public FlowToDiskTransactionObserver(long maxUncommittedInMemorySize, LogSubject logSubject, EventLogger eventLogger) {
        this._logSubject = logSubject;
        this._eventLogger = eventLogger;
        this._maxUncommittedInMemorySize = maxUncommittedInMemorySize;
    }

    @Override
    public void onMessageEnqueue(ServerTransaction transaction, EnqueueableMessage<? extends StorableMessageMetaData> message) {
        StoredMessage<? extends StorableMessageMetaData> handle = message.getStoredMessage();
        long messageSize = handle.getContentSize() + handle.getMetadataSize();
        long newUncommittedSize = this._uncommittedMessageSize.get() + messageSize;
        if (newUncommittedSize > this._maxUncommittedInMemorySize) {
            handle.flowToDisk();
            if (!this._reported) {
                this._eventLogger.message(this._logSubject, ConnectionMessages.LARGE_TRANSACTION_WARN(newUncommittedSize, this._maxUncommittedInMemorySize));
                this._reported = true;
            }
            if (!this._uncommittedMessages.isEmpty()) {
                for (TransactionDetails transactionDetails : this._uncommittedMessages.values()) {
                    transactionDetails.flowToDisk();
                }
            }
        } else {
            this._uncommittedMessageSize.addAndGet(messageSize);
            TransactionDetails details = this._uncommittedMessages.computeIfAbsent(transaction, key -> new TransactionDetails());
            details.messageEnqueued(handle);
        }
    }

    @Override
    public void onDischarge(ServerTransaction transaction) {
        TransactionDetails transactionDetails = (TransactionDetails)this._uncommittedMessages.remove(transaction);
        if (transactionDetails != null) {
            this._uncommittedMessageSize.addAndGet(-transactionDetails.getUncommittedMessageSize());
        }
        if (this._maxUncommittedInMemorySize > this._uncommittedMessageSize.get()) {
            this._reported = false;
        }
    }

    @Override
    public void reset() {
        this._uncommittedMessages.clear();
        this._uncommittedMessageSize.set(0L);
    }

    private static class TransactionDetails {
        private final AtomicLong _uncommittedMessageSize = new AtomicLong();
        private final Queue<StoredMessage<? extends StorableMessageMetaData>> _uncommittedMessages = new ConcurrentLinkedQueue<StoredMessage<? extends StorableMessageMetaData>>();

        private TransactionDetails() {
        }

        private void messageEnqueued(StoredMessage<? extends StorableMessageMetaData> handle) {
            long size = handle.getContentSize() + handle.getMetadataSize();
            this._uncommittedMessageSize.addAndGet(size);
            this._uncommittedMessages.add(handle);
        }

        private void flowToDisk() {
            for (StoredMessage storedMessage : this._uncommittedMessages) {
                storedMessage.flowToDisk();
            }
            this._uncommittedMessages.clear();
        }

        private long getUncommittedMessageSize() {
            return this._uncommittedMessageSize.get();
        }
    }
}

