/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.store.amq;

import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.DataStructure;
import org.apache.activemq.command.JournalQueueAck;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.filter.NonCachedMessageEvaluationContext;
import org.apache.activemq.kaha.MessageAckWithLocation;
import org.apache.activemq.kaha.impl.async.Location;
import org.apache.activemq.store.AbstractMessageStore;
import org.apache.activemq.store.MessageRecoveryListener;
import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.store.ReferenceStore;
import org.apache.activemq.store.amq.AMQPersistenceAdapter;
import org.apache.activemq.store.amq.AMQTransactionStore;
import org.apache.activemq.store.amq.RecoveryListenerAdapter;
import org.apache.activemq.thread.Task;
import org.apache.activemq.thread.TaskRunner;
import org.apache.activemq.transaction.Synchronization;
import org.apache.activemq.usage.MemoryUsage;
import org.apache.activemq.util.Callback;
import org.apache.activemq.util.TransactionTemplate;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

public class AMQMessageStore
extends AbstractMessageStore {
    private static final Log LOG = LogFactory.getLog(AMQMessageStore.class);
    protected final AMQPersistenceAdapter peristenceAdapter;
    protected final AMQTransactionStore transactionStore;
    protected final ReferenceStore referenceStore;
    protected final TransactionTemplate transactionTemplate;
    protected Location lastLocation;
    protected Location lastWrittenLocation;
    protected Set<Location> inFlightTxLocations = new HashSet<Location>();
    protected final TaskRunner asyncWriteTask;
    protected CountDownLatch flushLatch;
    private Map<MessageId, ReferenceStore.ReferenceData> messages = new LinkedHashMap<MessageId, ReferenceStore.ReferenceData>();
    private List<MessageAckWithLocation> messageAcks = new ArrayList<MessageAckWithLocation>();
    private Map<MessageId, ReferenceStore.ReferenceData> cpAddedMessageIds;
    private final boolean debug = LOG.isDebugEnabled();
    private final AtomicReference<Location> mark = new AtomicReference();
    protected final Lock lock;

    public AMQMessageStore(AMQPersistenceAdapter adapter, ReferenceStore referenceStore, ActiveMQDestination destination) {
        super(destination);
        this.peristenceAdapter = adapter;
        this.lock = referenceStore.getStoreLock();
        this.transactionStore = adapter.getTransactionStore();
        this.referenceStore = referenceStore;
        this.transactionTemplate = new TransactionTemplate(adapter, new ConnectionContext(new NonCachedMessageEvaluationContext()));
        this.asyncWriteTask = adapter.getTaskRunnerFactory().createTaskRunner(new Task(){

            public boolean iterate() {
                AMQMessageStore.this.asyncWrite();
                return false;
            }
        }, "Checkpoint: " + destination);
    }

    public void setMemoryUsage(MemoryUsage memoryUsage) {
        this.referenceStore.setMemoryUsage(memoryUsage);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public final void addMessage(ConnectionContext context, final Message message) throws IOException {
        final MessageId id = message.getMessageId();
        final Location location = this.peristenceAdapter.writeCommand(message, message.isResponseRequired());
        if (!context.isInTransaction()) {
            if (this.debug) {
                LOG.debug("Journalled message add for: " + id + ", at: " + location);
            }
            this.peristenceAdapter.addInProgressDataFile(this, location.getDataFileId());
            this.addMessage(message, location);
        } else {
            if (this.debug) {
                LOG.debug("Journalled transacted message add for: " + id + ", at: " + location);
            }
            this.lock.lock();
            try {
                this.inFlightTxLocations.add(location);
            }
            finally {
                this.lock.unlock();
            }
            this.transactionStore.addMessage(this, message, location);
            context.getTransaction().addSynchronization(new Synchronization(){

                /*
                 * WARNING - Removed try catching itself - possible behaviour change.
                 */
                public void afterCommit() throws Exception {
                    if (AMQMessageStore.this.debug) {
                        LOG.debug("Transacted message add commit for: " + id + ", at: " + location);
                    }
                    AMQMessageStore.this.lock.lock();
                    try {
                        AMQMessageStore.this.inFlightTxLocations.remove(location);
                    }
                    finally {
                        AMQMessageStore.this.lock.unlock();
                    }
                    AMQMessageStore.this.addMessage(message, location);
                }

                /*
                 * WARNING - Removed try catching itself - possible behaviour change.
                 */
                public void afterRollback() throws Exception {
                    if (AMQMessageStore.this.debug) {
                        LOG.debug("Transacted message add rollback for: " + id + ", at: " + location);
                    }
                    AMQMessageStore.this.lock.lock();
                    try {
                        AMQMessageStore.this.inFlightTxLocations.remove(location);
                    }
                    finally {
                        AMQMessageStore.this.lock.unlock();
                    }
                }
            });
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    final void addMessage(Message message, Location location) throws InterruptedIOException {
        ReferenceStore.ReferenceData data = new ReferenceStore.ReferenceData();
        data.setExpiration(message.getExpiration());
        data.setFileId(location.getDataFileId());
        data.setOffset(location.getOffset());
        this.lock.lock();
        try {
            this.lastLocation = location;
            this.messages.put(message.getMessageId(), data);
        }
        finally {
            this.lock.unlock();
        }
        if (this.messages.size() > this.peristenceAdapter.getMaxCheckpointMessageAddSize()) {
            this.flush();
        } else {
            try {
                this.asyncWriteTask.wakeup();
            }
            catch (InterruptedException e) {
                throw new InterruptedIOException();
            }
        }
    }

    public boolean replayAddMessage(ConnectionContext context, Message message, Location location) {
        MessageId id = message.getMessageId();
        try {
            ReferenceStore.ReferenceData data = this.referenceStore.getMessageReference(id);
            if (data == null) {
                data = new ReferenceStore.ReferenceData();
                data.setExpiration(message.getExpiration());
                data.setFileId(location.getDataFileId());
                data.setOffset(location.getOffset());
                this.referenceStore.addMessageReference(context, id, data);
                return true;
            }
        }
        catch (Throwable e) {
            LOG.warn("Could not replay add for message '" + id + "'.  Message may have already been added. reason: " + e, e);
        }
        return false;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void removeMessage(ConnectionContext context, final MessageAck ack) throws IOException {
        JournalQueueAck remove = new JournalQueueAck();
        remove.setDestination(this.destination);
        remove.setMessageAck(ack);
        final Location location = this.peristenceAdapter.writeCommand(remove, ack.isResponseRequired());
        if (!context.isInTransaction()) {
            if (this.debug) {
                LOG.debug("Journalled message remove for: " + ack.getLastMessageId() + ", at: " + location);
            }
            this.removeMessage(ack, location);
        } else {
            if (this.debug) {
                LOG.debug("Journalled transacted message remove for: " + ack.getLastMessageId() + ", at: " + location);
            }
            this.lock.lock();
            try {
                this.inFlightTxLocations.add(location);
            }
            finally {
                this.lock.unlock();
            }
            this.transactionStore.removeMessage(this, ack, location);
            context.getTransaction().addSynchronization(new Synchronization(){

                /*
                 * WARNING - Removed try catching itself - possible behaviour change.
                 */
                public void afterCommit() throws Exception {
                    if (AMQMessageStore.this.debug) {
                        LOG.debug("Transacted message remove commit for: " + ack.getLastMessageId() + ", at: " + location);
                    }
                    AMQMessageStore.this.lock.lock();
                    try {
                        AMQMessageStore.this.inFlightTxLocations.remove(location);
                    }
                    finally {
                        AMQMessageStore.this.lock.unlock();
                    }
                    AMQMessageStore.this.removeMessage(ack, location);
                }

                /*
                 * WARNING - Removed try catching itself - possible behaviour change.
                 */
                public void afterRollback() throws Exception {
                    if (AMQMessageStore.this.debug) {
                        LOG.debug("Transacted message remove rollback for: " + ack.getLastMessageId() + ", at: " + location);
                    }
                    AMQMessageStore.this.lock.lock();
                    try {
                        AMQMessageStore.this.inFlightTxLocations.remove(location);
                    }
                    finally {
                        AMQMessageStore.this.lock.unlock();
                    }
                }
            });
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    final void removeMessage(MessageAck ack, Location location) throws InterruptedIOException {
        ReferenceStore.ReferenceData data;
        this.lock.lock();
        try {
            this.lastLocation = location;
            MessageId id = ack.getLastMessageId();
            data = this.messages.remove(id);
            if (data == null) {
                this.messageAcks.add(new MessageAckWithLocation(ack, location));
            } else {
                this.peristenceAdapter.removeInProgressDataFile(this, data.getFileId());
            }
        }
        finally {
            this.lock.unlock();
        }
        if (this.messageAcks.size() > this.peristenceAdapter.getMaxCheckpointMessageAddSize()) {
            this.flush();
        } else if (data == null) {
            try {
                this.asyncWriteTask.wakeup();
            }
            catch (InterruptedException e) {
                throw new InterruptedIOException();
            }
        }
    }

    public boolean replayRemoveMessage(ConnectionContext context, MessageAck messageAck) {
        try {
            ReferenceStore.ReferenceData t = this.referenceStore.getMessageReference(messageAck.getLastMessageId());
            if (t != null) {
                this.referenceStore.removeMessage(context, messageAck);
                return true;
            }
        }
        catch (Throwable e) {
            LOG.warn("Could not replay acknowledge for message '" + messageAck.getLastMessageId() + "'.  Message may have already been acknowledged. reason: " + e);
        }
        return false;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void flush() throws InterruptedIOException {
        CountDownLatch countDown;
        if (LOG.isDebugEnabled()) {
            LOG.debug("flush starting ...");
        }
        this.lock.lock();
        try {
            if (this.lastWrittenLocation == this.lastLocation) {
                return;
            }
            if (this.flushLatch == null) {
                this.flushLatch = new CountDownLatch(1);
            }
            countDown = this.flushLatch;
        }
        finally {
            this.lock.unlock();
        }
        try {
            this.asyncWriteTask.wakeup();
            countDown.await();
        }
        catch (InterruptedException e) {
            throw new InterruptedIOException();
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("flush finished");
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    synchronized void asyncWrite() {
        try {
            CountDownLatch countDown;
            this.lock.lock();
            try {
                countDown = this.flushLatch;
                this.flushLatch = null;
            }
            finally {
                this.lock.unlock();
            }
            this.mark.set(this.doAsyncWrite());
            if (countDown != null) {
                countDown.countDown();
            }
        }
        catch (IOException e) {
            LOG.error("Checkpoint failed: " + e, e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected Location doAsyncWrite() throws IOException {
        Location lastLocation;
        ArrayList<Location> cpActiveJournalLocations;
        List<MessageAckWithLocation> cpRemovedMessageLocations;
        final int maxCheckpointMessageAddSize = this.peristenceAdapter.getMaxCheckpointMessageAddSize();
        this.lock.lock();
        try {
            this.cpAddedMessageIds = this.messages;
            cpRemovedMessageLocations = this.messageAcks;
            cpActiveJournalLocations = new ArrayList<Location>(this.inFlightTxLocations);
            this.messages = new LinkedHashMap<MessageId, ReferenceStore.ReferenceData>();
            this.messageAcks = new ArrayList<MessageAckWithLocation>();
            lastLocation = this.lastLocation;
        }
        finally {
            this.lock.unlock();
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("Doing batch update... adding: " + this.cpAddedMessageIds.size() + " removing: " + cpRemovedMessageLocations.size() + " ");
        }
        this.transactionTemplate.run(new Callback(){

            public void execute() throws Exception {
                int size = 0;
                PersistenceAdapter persitanceAdapter = AMQMessageStore.this.transactionTemplate.getPersistenceAdapter();
                ConnectionContext context = AMQMessageStore.this.transactionTemplate.getContext();
                for (Map.Entry entry : AMQMessageStore.this.cpAddedMessageIds.entrySet()) {
                    try {
                        if (AMQMessageStore.this.referenceStore.addMessageReference(context, (MessageId)entry.getKey(), (ReferenceStore.ReferenceData)entry.getValue())) {
                            if (LOG.isDebugEnabled()) {
                                LOG.debug("adding message ref:" + entry.getKey());
                            }
                            ++size;
                        } else if (LOG.isDebugEnabled()) {
                            LOG.debug("not adding duplicate reference: " + entry.getKey() + ", " + entry.getValue());
                        }
                        AMQMessageStore.this.peristenceAdapter.removeInProgressDataFile(AMQMessageStore.this, ((ReferenceStore.ReferenceData)entry.getValue()).getFileId());
                    }
                    catch (Throwable e) {
                        LOG.warn("Message could not be added to long term store: " + e.getMessage(), e);
                    }
                    if (size < maxCheckpointMessageAddSize) continue;
                    persitanceAdapter.commitTransaction(context);
                    persitanceAdapter.beginTransaction(context);
                    size = 0;
                }
                persitanceAdapter.commitTransaction(context);
                persitanceAdapter.beginTransaction(context);
                for (MessageAckWithLocation ack : cpRemovedMessageLocations) {
                    try {
                        AMQMessageStore.this.referenceStore.removeMessage(AMQMessageStore.this.transactionTemplate.getContext(), ack);
                    }
                    catch (Throwable e) {
                        LOG.warn("Message could not be removed from long term store: " + e.getMessage(), e);
                    }
                }
            }
        });
        LOG.debug("Batch update done. lastLocation:" + lastLocation);
        this.lock.lock();
        try {
            this.cpAddedMessageIds = null;
            this.lastWrittenLocation = lastLocation;
        }
        finally {
            this.lock.unlock();
        }
        if (cpActiveJournalLocations.size() > 0) {
            Collections.sort(cpActiveJournalLocations);
            return (Location)cpActiveJournalLocations.get(0);
        }
        return lastLocation;
    }

    public Message getMessage(MessageId identity) throws IOException {
        Location location = this.getLocation(identity);
        if (location != null) {
            DataStructure rc = this.peristenceAdapter.readCommand(location);
            try {
                return (Message)rc;
            }
            catch (ClassCastException e) {
                throw new IOException("Could not read message " + identity + " at location " + location + ", expected a message, but got: " + rc);
            }
        }
        return null;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected Location getLocation(MessageId messageId) throws IOException {
        ReferenceStore.ReferenceData data = null;
        this.lock.lock();
        try {
            data = this.messages.get(messageId);
            if (data == null && this.cpAddedMessageIds != null) {
                data = this.cpAddedMessageIds.get(messageId);
            }
        }
        finally {
            this.lock.unlock();
        }
        if (data == null && (data = this.referenceStore.getMessageReference(messageId)) == null) {
            return null;
        }
        Location location = new Location();
        location.setDataFileId(data.getFileId());
        location.setOffset(data.getOffset());
        return location;
    }

    public void recover(MessageRecoveryListener listener) throws Exception {
        this.flush();
        this.referenceStore.recover(new RecoveryListenerAdapter(this, listener));
    }

    public void start() throws Exception {
        this.referenceStore.start();
    }

    public void stop() throws Exception {
        this.flush();
        this.asyncWriteTask.shutdown();
        this.referenceStore.stop();
    }

    public ReferenceStore getReferenceStore() {
        return this.referenceStore;
    }

    public void removeAllMessages(ConnectionContext context) throws IOException {
        this.flush();
        this.referenceStore.removeAllMessages(context);
    }

    public void addMessageReference(ConnectionContext context, MessageId messageId, long expirationTime, String messageRef) throws IOException {
        throw new IOException("The journal does not support message references.");
    }

    public String getMessageReference(MessageId identity) throws IOException {
        throw new IOException("The journal does not support message references.");
    }

    public int getMessageCount() throws IOException {
        this.flush();
        return this.referenceStore.getMessageCount();
    }

    public void recoverNextMessages(int maxReturned, MessageRecoveryListener listener) throws Exception {
        RecoveryListenerAdapter recoveryListener = new RecoveryListenerAdapter(this, listener);
        this.referenceStore.recoverNextMessages(maxReturned, recoveryListener);
        if (recoveryListener.size() == 0 && recoveryListener.hasSpace()) {
            this.flush();
            this.referenceStore.recoverNextMessages(maxReturned, recoveryListener);
        }
    }

    Message getMessage(ReferenceStore.ReferenceData data) throws IOException {
        Location location = new Location();
        location.setDataFileId(data.getFileId());
        location.setOffset(data.getOffset());
        DataStructure rc = this.peristenceAdapter.readCommand(location);
        try {
            return (Message)rc;
        }
        catch (ClassCastException e) {
            throw new IOException("Could not read message  at location " + location + ", expected a message, but got: " + rc);
        }
    }

    public void resetBatching() {
        this.referenceStore.resetBatching();
    }

    public Location getMark() {
        return this.mark.get();
    }

    public void dispose(ConnectionContext context) {
        try {
            this.flush();
        }
        catch (InterruptedIOException e) {
            Thread.currentThread().interrupt();
        }
        this.referenceStore.dispose(context);
        super.dispose(context);
    }

    public void setBatch(MessageId messageId) {
        try {
            this.flush();
        }
        catch (InterruptedIOException e) {
            LOG.debug("flush on setBatch resulted in exception", e);
        }
        this.getReferenceStore().setBatch(messageId);
    }
}

