/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.artemis.core.paging.cursor.impl;

import io.netty.util.collection.LongObjectHashMap;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import org.apache.activemq.artemis.core.filter.Filter;
import org.apache.activemq.artemis.core.paging.PagedMessage;
import org.apache.activemq.artemis.core.paging.PagingStore;
import org.apache.activemq.artemis.core.paging.cursor.PageCursorProvider;
import org.apache.activemq.artemis.core.paging.cursor.PageSubscription;
import org.apache.activemq.artemis.core.paging.cursor.PageSubscriptionCounter;
import org.apache.activemq.artemis.core.paging.cursor.PagedReference;
import org.apache.activemq.artemis.core.paging.cursor.PagedReferenceImpl;
import org.apache.activemq.artemis.core.paging.cursor.impl.PagePositionImpl;
import org.apache.activemq.artemis.core.paging.cursor.impl.PageSubscriptionCounterImpl;
import org.apache.activemq.artemis.core.paging.cursor.impl.PageSubscriptionImpl;
import org.apache.activemq.artemis.core.paging.impl.Page;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl;
import org.apache.activemq.artemis.utils.SimpleFutureImpl;
import org.apache.activemq.artemis.utils.collections.ConcurrentLongHashMap;
import org.apache.activemq.artemis.utils.collections.LinkedList;
import org.apache.activemq.artemis.utils.collections.LongHashSet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PageCursorProviderImpl
implements PageCursorProvider {
    private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
    protected final AtomicInteger scheduledCleanup = new AtomicInteger(0);
    protected volatile boolean cleanupEnabled = true;
    protected volatile boolean rebuildDone = true;
    protected final PagingStore pagingStore;
    protected final StorageManager storageManager;
    private final ConcurrentLongHashMap<PageSubscription> activeCursors = new ConcurrentLongHashMap();
    private static final long PAGE_READ_TIMEOUT_NS = TimeUnit.SECONDS.toNanos(30L);
    private static final long CONCURRENT_PAGE_READ_TIMEOUT_NS = TimeUnit.SECONDS.toNanos(10L);
    private static final long PAGE_READ_PERMISSION_TIMEOUT_NS = TimeUnit.SECONDS.toNanos(10L);

    public PageCursorProviderImpl(PagingStore pagingStore, StorageManager storageManager) {
        this.pagingStore = pagingStore;
        this.storageManager = storageManager;
    }

    @Override
    public synchronized PageSubscription createSubscription(long cursorID, Filter filter, boolean persistent) {
        if (logger.isTraceEnabled()) {
            logger.trace("{} creating subscription {} {} with filter {}", new Object[]{this, this.pagingStore.getAddress(), cursorID, filter});
        }
        if (this.activeCursors.containsKey(cursorID)) {
            throw new IllegalStateException("Cursor " + cursorID + " had already been created");
        }
        PageSubscriptionCounter subscriptionCounter = this.createPageCounter(cursorID, persistent);
        PageSubscriptionImpl activeCursor = new PageSubscriptionImpl(this, this.pagingStore, this.storageManager, filter, cursorID, persistent, subscriptionCounter);
        this.activeCursors.put(cursorID, (Object)activeCursor);
        return activeCursor;
    }

    private PageSubscriptionCounter createPageCounter(long cursorID, boolean persistent) {
        return new PageSubscriptionCounterImpl(this.storageManager, cursorID);
    }

    @Override
    public synchronized PageSubscription getSubscription(long cursorID) {
        return (PageSubscription)this.activeCursors.get(cursorID);
    }

    @Override
    public void forEachSubscription(Consumer<PageSubscription> consumer) {
        this.activeCursors.forEach((k, v) -> consumer.accept((PageSubscription)v));
    }

    @Override
    public PagedReference newReference(PagedMessage msg, PageSubscription subscription) {
        return new PagedReferenceImpl(msg, subscription);
    }

    @Override
    public void processReload() throws Exception {
        long cursorsMinPage;
        List cursorList = this.activeCursors.values();
        for (PageSubscription cursor : cursorList) {
            cursor.processReload();
        }
        if (!cursorList.isEmpty() && (cursorsMinPage = this.checkMinPage(cursorList)) != Long.MAX_VALUE) {
            for (long startPage = this.pagingStore.getFirstPage(); startPage < cursorsMinPage; ++startPage) {
                for (PageSubscription cursor : cursorList) {
                    cursor.reloadPageInfo(startPage);
                }
            }
        }
        this.cleanup();
    }

    @Override
    public void stop() {
        for (PageSubscription cursor : this.activeCursors.values()) {
            cursor.stop();
        }
        int pendingCleanupTasks = this.scheduledCleanup.get();
        if (pendingCleanupTasks > 0) {
            logger.trace("Stopping with {} cleanup tasks to be completed yet", (Object)pendingCleanupTasks);
        }
    }

    @Override
    public void counterSnapshot() {
        for (PageSubscription cursor : this.activeCursors.values()) {
            cursor.counterSnapshot();
        }
    }

    @Override
    public void flushExecutors() {
        this.pagingStore.flushExecutors();
    }

    @Override
    public void close(PageSubscription cursor) {
        this.activeCursors.remove(cursor.getId());
        this.scheduleCleanup();
    }

    @Override
    public Future<Boolean> scheduleCleanup() {
        SimpleFutureImpl future = new SimpleFutureImpl();
        if (!this.cleanupEnabled || this.scheduledCleanup.intValue() > 2) {
            this.pagingStore.execute(() -> future.set((Object)true));
            return future;
        }
        this.scheduledCleanup.incrementAndGet();
        this.pagingStore.execute(() -> {
            this.storageManager.setContext(this.storageManager.newSingleThreadContext());
            try {
                if (this.cleanupEnabled) {
                    this.cleanup();
                }
            }
            finally {
                this.storageManager.clearContext();
                this.scheduledCleanup.decrementAndGet();
                future.set((Object)true);
            }
        });
        return future;
    }

    @Override
    public void onPageModeCleared() {
        ArrayList<PageSubscription> subscriptions = this.cloneSubscriptions();
        TransactionImpl tx = new TransactionImpl(this.storageManager);
        for (PageSubscription sub : subscriptions) {
            try {
                sub.onPageModeCleared(tx);
            }
            catch (Exception e) {
                ActiveMQServerLogger.LOGGER.errorCleaningPagingOnQueue(sub.getQueue().getName().toString(), e);
            }
        }
        try {
            tx.commit();
        }
        catch (Exception e) {
            ActiveMQServerLogger.LOGGER.errorCleaningPagingDuringCommit(e);
        }
    }

    @Override
    public void disableCleanup() {
        this.cleanupEnabled = false;
    }

    @Override
    public void resumeCleanup() {
        this.cleanupEnabled = true;
        this.scheduleCleanup();
    }

    private long getNumberOfMessagesOnSubscriptions() {
        AtomicLong largerCounter = new AtomicLong();
        this.activeCursors.forEach((id, sub) -> {
            long value = sub.getCounter().getValue();
            if (value > largerCounter.get()) {
                largerCounter.set(value);
            }
        });
        return largerCounter.get();
    }

    void checkClearPageLimit() {
        this.pagingStore.checkPageLimit(this.getNumberOfMessagesOnSubscriptions());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * WARNING - Removed back jump from a try to a catch block - possible behaviour change.
     * Unable to fully structure code
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    protected void cleanup() {
        if (!this.rebuildDone) {
            PageCursorProviderImpl.logger.debug("Counters were not rebuilt yet, cleanup has to be ignored on address {}", this.pagingStore != null ? this.pagingStore.getAddress() : "NULL");
            return;
        }
        depagedPages = new ArrayList<Page>();
        depagedPagesSet = new LongHashSet();
        readLock = this.storageManager.closeableReadLock();
        try {
            do {
                if (!this.pagingStore.lock(100L)) continue;
                PageCursorProviderImpl.logger.trace(">>>> Cleanup {}", (Object)this.pagingStore.getAddress());
                var4_4 = this;
                synchronized (var4_4) {
                    try {
                        if (!this.pagingStore.isStarted()) {
                            PageCursorProviderImpl.logger.trace("Paging store is not started");
                            ** break block27
                        }
                        ** GOTO lbl-1000
                    }
                    catch (Throwable ex) {
                        try {
                            ActiveMQServerLogger.LOGGER.problemCleaningPageAddress(this.pagingStore.getAddress(), ex);
                            PageCursorProviderImpl.logger.warn(ex.getMessage(), ex);
                            // MONITOREXIT @DISABLED, blocks:[0, 19, 8, 12, 13] lbl21 : MonitorExitStatement: MONITOREXIT : var4_4
                            if (readLock == null) return;
                        }
                        catch (Throwable var10_11) {
                            throw var10_11;
                        }
                        finally {
                            PageCursorProviderImpl.logger.trace("<<<< Cleanup end on {}", (Object)this.pagingStore.getAddress());
                            this.pagingStore.unlock();
                        }
                        readLock.close();
                        return;
                    }
                }
            } while (this.pagingStore.isStarted());
            return;
lbl-1000:
            // 1 sources

            {
                block28: {
                    PageCursorProviderImpl.logger.trace("<<<< Cleanup end on {}", (Object)this.pagingStore.getAddress());
                    this.pagingStore.unlock();
                    return;
lbl-1000:
                    // 1 sources

                    {
                        if (this.pagingStore.isPaging()) ** GOTO lbl-1000
                        PageCursorProviderImpl.logger.trace("Paging Store was not paging, so no reason to retry the cleanup");
                    }
                    PageCursorProviderImpl.logger.trace("<<<< Cleanup end on {}", (Object)this.pagingStore.getAddress());
                    this.pagingStore.unlock();
                    return;
lbl-1000:
                    // 1 sources

                    {
                        cursorList = this.cloneSubscriptions();
                        minPage = this.checkMinPage(cursorList);
                        firstPage = this.pagingStore.getFirstPage();
                        this.deliverIfNecessary(cursorList, minPage);
                        if (PageCursorProviderImpl.logger.isTraceEnabled()) {
                            PageCursorProviderImpl.logger.trace("firstPage={}, minPage={}, currentWritingPage={}", new Object[]{firstPage, minPage, this.pagingStore.getCurrentWritingPage()});
                        }
                        this.cleanupRegularStream(depagedPages, depagedPagesSet, cursorList, minPage, firstPage);
                        this.cleanupMiddleStream(depagedPages, depagedPagesSet, cursorList, minPage, firstPage);
                        if (this.pagingStore.isPageFull()) {
                            this.checkClearPageLimit();
                        }
                        if (!PageCursorProviderImpl.$assertionsDisabled && this.pagingStore.getNumberOfPages() < 0L) {
                            throw new AssertionError();
                        }
                        if (this.pagingStore.getNumberOfPages() == 0L || this.pagingStore.getNumberOfPages() == 1L && (this.pagingStore.getCurrentPage() == null || this.pagingStore.getCurrentPage().getNumberOfMessages() == 0)) {
                            PageCursorProviderImpl.logger.trace("StopPaging being called on {}", (Object)this.pagingStore);
                            this.pagingStore.stopPaging();
                            break block28;
                        }
                        if (!PageCursorProviderImpl.logger.isTraceEnabled()) break block28;
                        PageCursorProviderImpl.logger.trace("Couldn't cleanup page on address {} as numberOfPages == {}  and currentPage.numberOfMessages = {}", new Object[]{this.pagingStore.getAddress(), this.pagingStore.getNumberOfPages(), this.pagingStore.getCurrentPage().getNumberOfMessages()});
                    }
                }
                PageCursorProviderImpl.logger.trace("<<<< Cleanup end on {}", (Object)this.pagingStore.getAddress());
                this.pagingStore.unlock();
            }
        }
        finally {
            if (readLock != null) {
                readLock.close();
            }
        }
        this.finishCleanup(depagedPages);
    }

    private void cleanupRegularStream(ArrayList<Page> depagedPages, LongHashSet depagedPagesSet, ArrayList<PageSubscription> cursorList, long minPage, long firstPage) throws Exception {
        Page page;
        boolean complete;
        Page currentPage = this.pagingStore.getCurrentPage();
        if (minPage == this.pagingStore.getCurrentWritingPage() && currentPage != null && currentPage.getNumberOfMessages() > 0 && (complete = this.checkPageCompletion(cursorList, minPage))) {
            this.cleanupComplete(cursorList);
        }
        for (long i = firstPage; i <= minPage && this.checkPageCompletion(cursorList, i) && (page = this.pagingStore.depage()) != null; ++i) {
            if (logger.isDebugEnabled()) {
                logger.debug("Depaging page {}", (Object)page.getPageId());
            }
            depagedPagesSet.add(page.getPageId());
            depagedPages.add(page);
        }
    }

    private void cleanupMiddleStream(ArrayList<Page> depagedPages, LongHashSet depagedPagesSet, ArrayList<PageSubscription> cursorList, long minPage, long firstPage) {
        long currentPageId = this.pagingStore.getCurrentWritingPage();
        LongObjectHashMap counts = new LongObjectHashMap();
        int subscriptions = cursorList.size();
        cursorList.forEach(sub -> sub.forEachConsumedPage(consumedPage -> {
            if (consumedPage.isDone()) {
                AtomicInteger count = (AtomicInteger)counts.get(consumedPage.getPageId());
                if (count == null) {
                    count = new AtomicInteger(0);
                    counts.put(consumedPage.getPageId(), (Object)count);
                }
                count.incrementAndGet();
            }
        }));
        counts.forEach((pageID, counter) -> {
            try {
                if (pageID > minPage && pageID > firstPage && pageID != currentPageId && counter.get() >= subscriptions && !depagedPagesSet.contains(pageID.longValue())) {
                    Page page = this.pagingStore.removePage(pageID.intValue());
                    if (logger.isDebugEnabled()) {
                        logger.debug("Removing page {}", pageID);
                    }
                    if (page != null) {
                        depagedPages.add(page);
                        depagedPagesSet.add(page.getPageId());
                    }
                }
            }
            catch (Throwable e) {
                ActiveMQServerLogger.LOGGER.problemCleaningPagesubscriptionCounter(e);
                logger.debug("Error while Issuing cleanupMiddlePages with {}, counter = {}", new Object[]{pageID, counter, e});
                depagedPages.forEach(p -> logger.debug("page {}", p));
            }
        });
    }

    protected void cleanupComplete(ArrayList<PageSubscription> cursorList) throws Exception {
        logger.debug("Address {} is leaving page mode as all messages are consumed and acknowledged from the page store", (Object)this.pagingStore.getAddress());
        this.pagingStore.forceAnotherPage();
        Page currentPage = this.pagingStore.getCurrentPage();
        this.storeBookmark(cursorList, currentPage);
        this.pagingStore.stopPaging();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void finishCleanup(ArrayList<Page> depagedPages) {
        logger.trace("this({}) finishing cleanup on {}", (Object)this, depagedPages);
        try {
            for (Page depagedPage : depagedPages) {
                LinkedList<PagedMessage> pgdMessagesList = null;
                try {
                    depagedPage.open(false);
                    pgdMessagesList = depagedPage.read(this.storageManager, true);
                }
                finally {
                    try {
                        depagedPage.close(false, false);
                    }
                    catch (Exception exception) {}
                }
                depagedPage.delete(pgdMessagesList);
                this.onDeletePage(depagedPage);
            }
        }
        catch (Exception ex) {
            ActiveMQServerLogger.LOGGER.problemCleaningPageAddress(this.pagingStore.getAddress(), ex);
        }
    }

    private boolean checkPageCompletion(ArrayList<PageSubscription> cursorList, long minPage) throws Exception {
        logger.trace("checkPageCompletion({})", (Object)minPage);
        boolean complete = true;
        if (!this.pagingStore.checkPageFileExists(minPage)) {
            logger.trace("store {} did not have an existing file, considering it a complete file then", (Object)this.pagingStore.getAddress());
            return true;
        }
        for (PageSubscription cursor : cursorList) {
            if (!cursor.isComplete(minPage)) {
                if (logger.isTraceEnabled()) {
                    logger.trace("Cursor {} was considered incomplete at pageNr={}", (Object)cursor, (Object)minPage);
                }
                complete = false;
                break;
            }
            if (!logger.isTraceEnabled()) continue;
            logger.trace("Cursor {} was considered **complete** at pageNr={}", (Object)cursor, (Object)minPage);
        }
        return complete;
    }

    private synchronized ArrayList<PageSubscription> cloneSubscriptions() {
        ArrayList<PageSubscription> cursorList = new ArrayList<PageSubscription>(this.activeCursors.values());
        return cursorList;
    }

    protected void onDeletePage(Page deletedPage) throws Exception {
        ArrayList<PageSubscription> subscriptions = this.cloneSubscriptions();
        for (PageSubscription subs : subscriptions) {
            subs.onDeletePage(deletedPage);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void storeBookmark(ArrayList<PageSubscription> cursorList, Page currentPage) throws Exception {
        try {
            for (PageSubscription cursor : cursorList) {
                cursor.confirmPosition(new PagePositionImpl(currentPage.getPageId(), -1));
            }
        }
        finally {
            for (PageSubscription cursor : cursorList) {
                cursor.enableAutoCleanup();
            }
        }
    }

    public String toString() {
        return "PageCursorProviderImpl{pagingStore=" + this.pagingStore + "}";
    }

    private long checkMinPage(Collection<PageSubscription> cursorList) {
        long minPage = Long.MAX_VALUE;
        if (logger.isTraceEnabled()) {
            logger.trace("Min page cursorList size {} on {}", new Object[]{cursorList.size(), this.pagingStore.getAddress(), new Exception("trace")});
        }
        for (PageSubscription cursor : cursorList) {
            long firstPage = cursor.getFirstPage();
            if (logger.isTraceEnabled()) {
                logger.trace("{} has a cursor {} with first page={}", new Object[]{this.pagingStore.getAddress(), cursor, firstPage});
            }
            if (firstPage < 0L || firstPage >= minPage) continue;
            minPage = firstPage;
        }
        if (logger.isTraceEnabled()) {
            logger.trace("checkMinPage({}) will have minPage={}", (Object)this.pagingStore.getAddress(), (Object)minPage);
        }
        return minPage;
    }

    private void deliverIfNecessary(Collection<PageSubscription> cursorList, long minPage) {
        boolean currentWriting = minPage == this.pagingStore.getCurrentWritingPage();
        for (PageSubscription cursor : cursorList) {
            long firstPage = cursor.getFirstPage();
            if (firstPage != minPage || cursor.getQueue().getMessageCount() != 0L || currentWriting && cursor.isComplete(firstPage)) continue;
            cursor.getQueue().deliverAsync();
            break;
        }
    }

    @Override
    public void counterRebuildStarted() {
        this.rebuildDone = false;
    }

    @Override
    public void counterRebuildDone() {
        this.rebuildDone = true;
    }

    @Override
    public boolean isRebuildDone() {
        return this.rebuildDone;
    }
}

