/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ignite.internal.processors.cache.persistence.checkpoint;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinWorkerThread;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteInterruptedException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.configuration.CheckpointWriteOrder;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.pagemem.FullPageId;
import org.apache.ignite.internal.pagemem.store.PageStore;
import org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager;
import org.apache.ignite.internal.pagemem.wal.record.CacheState;
import org.apache.ignite.internal.pagemem.wal.record.CheckpointRecord;
import org.apache.ignite.internal.processors.cache.CacheGroupContext;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState;
import org.apache.ignite.internal.processors.cache.persistence.CheckpointState;
import org.apache.ignite.internal.processors.cache.persistence.DataRegion;
import org.apache.ignite.internal.processors.cache.persistence.checkpoint.Checkpoint;
import org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointContextImpl;
import org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointEntry;
import org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointEntryType;
import org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointListener;
import org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointMarkersStorage;
import org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointPagesInfoHolder;
import org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointPagesWriterFactory;
import org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointProgressImpl;
import org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointReadWriteLock;
import org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointStatus;
import org.apache.ignite.internal.processors.cache.persistence.pagemem.CheckpointMetricsTracker;
import org.apache.ignite.internal.processors.cache.persistence.pagemem.PageMemoryEx;
import org.apache.ignite.internal.processors.cache.persistence.partstate.PartitionAllocationMap;
import org.apache.ignite.internal.processors.cache.persistence.wal.WALPointer;
import org.apache.ignite.internal.util.GridConcurrentHashSet;
import org.apache.ignite.internal.util.GridConcurrentMultiPairQueue;
import org.apache.ignite.internal.util.GridMultiCollectionWrapper;
import org.apache.ignite.internal.util.StripedExecutor;
import org.apache.ignite.internal.util.future.GridCompoundFuture;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.internal.util.worker.WorkProgressDispatcher;
import org.apache.ignite.thread.IgniteThreadPoolExecutor;
import org.jetbrains.annotations.Nullable;
import org.jsr166.ConcurrentLinkedHashMap;

public class CheckpointWorkflow {
    public static final int DFLT_CHECKPOINT_PARALLEL_SORT_THRESHOLD = 524288;
    private static final DataRegion NO_REGION = new DataRegion(null, null, null, null);
    private final int parallelSortThreshold = IgniteSystemProperties.getInteger("CHECKPOINT_PARALLEL_SORT_THRESHOLD", 524288);
    private static final int PARALLEL_SORT_THREADS = Math.min(Runtime.getRuntime().availableProcessors(), 8);
    private final boolean skipSync = IgniteSystemProperties.getBoolean("IGNITE_PDS_CHECKPOINT_TEST_SKIP_SYNC");
    private final IgniteLogger log;
    private final IgniteWriteAheadLogManager wal;
    private final CheckpointReadWriteLock checkpointReadWriteLock;
    private final Supplier<Collection<DataRegion>> dataRegions;
    private final Supplier<Collection<CacheGroupContext>> cacheGroupsContexts;
    private final CheckpointMarkersStorage checkpointMarkersStorage;
    private final CheckpointWriteOrder checkpointWriteOrder;
    private final Map<CheckpointListener, DataRegion> lsnrs = new ConcurrentLinkedHashMap<CheckpointListener, DataRegion>();
    private final String igniteInstanceName;
    private final int checkpointCollectInfoThreads;
    @Nullable
    private volatile IgniteThreadPoolExecutor checkpointCollectPagesInfoPool;
    private volatile WALPointer memoryRecoveryRecordPtr;

    CheckpointWorkflow(Function<Class<?>, IgniteLogger> logger, IgniteWriteAheadLogManager wal, CheckpointMarkersStorage checkpointMarkersStorage, CheckpointReadWriteLock checkpointReadWriteLock, CheckpointWriteOrder checkpointWriteOrder, Supplier<Collection<DataRegion>> dataRegions, Supplier<Collection<CacheGroupContext>> cacheGroupContexts, int checkpointCollectInfoThreads, String igniteInstanceName) {
        this.wal = wal;
        this.checkpointReadWriteLock = checkpointReadWriteLock;
        this.dataRegions = dataRegions;
        this.cacheGroupsContexts = cacheGroupContexts;
        this.checkpointCollectInfoThreads = checkpointCollectInfoThreads;
        this.log = logger.apply(this.getClass());
        this.checkpointMarkersStorage = checkpointMarkersStorage;
        this.checkpointWriteOrder = checkpointWriteOrder;
        this.igniteInstanceName = igniteInstanceName;
        this.checkpointCollectPagesInfoPool = this.initializeCheckpointPool();
    }

    private IgniteThreadPoolExecutor initializeCheckpointPool() {
        if (this.checkpointCollectInfoThreads > 1) {
            return new IgniteThreadPoolExecutor("checkpoint-runner-cpu", this.igniteInstanceName, this.checkpointCollectInfoThreads, this.checkpointCollectInfoThreads, 30000L, new LinkedBlockingQueue<Runnable>());
        }
        return null;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public Checkpoint markCheckpointBegin(long cpTs, CheckpointProgressImpl curr, CheckpointMetricsTracker tracker, WorkProgressDispatcher workProgressDispatcher) throws IgniteCheckedException {
        boolean hasPartitionsToDestroy;
        int dirtyPagesCount;
        CheckpointPagesInfoHolder cpPagesHolder;
        Collection<DataRegion> checkpointedRegions = this.dataRegions.get();
        List<CheckpointListener> dbLsnrs = this.getRelevantCheckpointListeners(checkpointedRegions);
        CheckpointRecord cpRec = new CheckpointRecord(this.memoryRecoveryRecordPtr);
        this.memoryRecoveryRecordPtr = null;
        WALPointer cpPtr = null;
        CheckpointContextImpl ctx0 = new CheckpointContextImpl(curr, new PartitionAllocationMap(), this.checkpointCollectPagesInfoPool, workProgressDispatcher);
        this.checkpointReadWriteLock.readLock();
        try {
            for (CheckpointListener lsnr : dbLsnrs) {
                lsnr.beforeCheckpointBegin(ctx0);
            }
            ctx0.awaitPendingTasksFinished();
        }
        finally {
            this.checkpointReadWriteLock.readUnlock();
        }
        tracker.onLockWaitStart();
        this.checkpointReadWriteLock.writeLock();
        try {
            curr.transitTo(CheckpointState.LOCK_TAKEN);
            tracker.onMarkStart();
            for (CheckpointListener lsnr : dbLsnrs) {
                lsnr.onMarkCheckpointBegin(ctx0);
            }
            ctx0.awaitPendingTasksFinished();
            tracker.onListenersExecuteEnd();
            this.fillCacheGroupState(cpRec);
            cpPagesHolder = this.beginAllCheckpoints(checkpointedRegions, curr.futureFor(CheckpointState.MARKER_STORED_TO_DISK));
            curr.currentCheckpointPagesCount(cpPagesHolder.pagesNum());
            dirtyPagesCount = cpPagesHolder.pagesNum();
            boolean bl = hasPartitionsToDestroy = !curr.getDestroyQueue().pendingReqs().isEmpty();
            if (dirtyPagesCount > 0 || hasPartitionsToDestroy) {
                if (this.wal != null) {
                    cpPtr = this.wal.log(cpRec);
                }
                if (cpPtr == null) {
                    cpPtr = CheckpointStatus.NULL_PTR;
                }
            }
            curr.transitTo(CheckpointState.PAGE_SNAPSHOT_TAKEN);
        }
        finally {
            this.checkpointReadWriteLock.writeUnlock();
            tracker.onLockRelease();
        }
        curr.transitTo(CheckpointState.LOCK_RELEASED);
        for (CheckpointListener lsnr : dbLsnrs) {
            lsnr.onCheckpointBegin(ctx0);
        }
        if (dirtyPagesCount > 0 || hasPartitionsToDestroy) {
            tracker.onWalCpRecordFsyncStart();
            if (this.wal != null) {
                this.wal.flush(cpPtr, true);
            }
            tracker.onWalCpRecordFsyncEnd();
            CheckpointEntry checkpointEntry = null;
            if (this.checkpointMarkersStorage != null) {
                checkpointEntry = this.checkpointMarkersStorage.writeCheckpointEntry(cpTs, cpRec.checkpointId(), cpPtr, cpRec, CheckpointEntryType.START, this.skipSync);
            }
            curr.transitTo(CheckpointState.MARKER_STORED_TO_DISK);
            tracker.onSplitAndSortCpPagesStart();
            GridConcurrentMultiPairQueue<PageMemoryEx, FullPageId> cpPages = this.splitAndSortCpPagesIfNeeded(cpPagesHolder);
            tracker.onSplitAndSortCpPagesEnd();
            return new Checkpoint(checkpointEntry, cpPages, curr);
        }
        if (ctx0.walFlush() && this.wal != null) {
            this.wal.flush(null, true);
        }
        return new Checkpoint(null, GridConcurrentMultiPairQueue.EMPTY, curr);
    }

    private void fillCacheGroupState(CheckpointRecord cpRec) throws IgniteCheckedException {
        GridCompoundFuture grpHandleFut = this.checkpointCollectPagesInfoPool == null ? null : new GridCompoundFuture();
        for (CacheGroupContext grp : this.cacheGroupsContexts.get()) {
            if (!grp.walEnabled()) continue;
            Runnable r = () -> {
                ArrayList<GridDhtLocalPartition> parts = new ArrayList<GridDhtLocalPartition>(grp.topology().localPartitions().size());
                for (GridDhtLocalPartition part : grp.topology().currentLocalPartitions()) {
                    parts.add(part);
                }
                CacheState state = new CacheState(parts.size());
                for (GridDhtLocalPartition part : parts) {
                    GridDhtPartitionState partState = part.state();
                    if (partState == GridDhtPartitionState.LOST) {
                        partState = GridDhtPartitionState.OWNING;
                    }
                    state.addPartitionState(part.id(), part.dataStore().fullSize(), part.highestAppliedCounter(), (byte)partState.ordinal());
                }
                CheckpointRecord checkpointRecord = cpRec;
                synchronized (checkpointRecord) {
                    cpRec.addCacheGroupState(grp.groupId(), state);
                }
            };
            if (this.checkpointCollectPagesInfoPool == null) {
                r.run();
                continue;
            }
            try {
                GridFutureAdapter res = new GridFutureAdapter();
                this.checkpointCollectPagesInfoPool.execute(U.wrapIgniteFuture(r, res));
                grpHandleFut.add(res);
            }
            catch (RejectedExecutionException e) {
                assert (false) : "Task should never be rejected by async runner";
                throw new IgniteException(e);
            }
        }
        if (grpHandleFut != null) {
            grpHandleFut.markInitialized();
            grpHandleFut.get();
        }
    }

    private CheckpointPagesInfoHolder beginAllCheckpoints(Collection<DataRegion> regions, IgniteInternalFuture<?> allowToReplace) {
        ArrayList<Map.Entry<PageMemoryEx, GridMultiCollectionWrapper<FullPageId>>> res = new ArrayList<Map.Entry<PageMemoryEx, GridMultiCollectionWrapper<FullPageId>>>(regions.size());
        int pagesNum = 0;
        for (DataRegion reg : regions) {
            if (!reg.config().isPersistenceEnabled()) continue;
            GridMultiCollectionWrapper<FullPageId> nextCpPages = ((PageMemoryEx)reg.pageMemory()).beginCheckpoint(allowToReplace);
            pagesNum += nextCpPages.size();
            res.add(new T2<PageMemoryEx, GridMultiCollectionWrapper<FullPageId>>((PageMemoryEx)reg.pageMemory(), nextCpPages));
        }
        return new CheckpointPagesInfoHolder(res, pagesNum);
    }

    private GridConcurrentMultiPairQueue<PageMemoryEx, FullPageId> splitAndSortCpPagesIfNeeded(CheckpointPagesInfoHolder cpPages) throws IgniteCheckedException {
        HashSet cpPagesPerRegion = new HashSet();
        int realPagesArrSize = 0;
        int totalPagesCnt = cpPages.pagesNum();
        for (Map.Entry<PageMemoryEx, GridMultiCollectionWrapper<FullPageId>> regPages : cpPages.cpPages()) {
            FullPageId[] pages = new FullPageId[regPages.getValue().size()];
            int n = 0;
            for (int i = 0; i < regPages.getValue().collectionsSize(); ++i) {
                for (FullPageId page : regPages.getValue().innerCollection(i)) {
                    if (realPagesArrSize++ == totalPagesCnt) {
                        throw new AssertionError((Object)("Incorrect estimated dirty pages number: " + totalPagesCnt));
                    }
                    pages[n++] = page;
                }
            }
            if (n != pages.length) {
                cpPagesPerRegion.add(new T2<PageMemoryEx, FullPageId[]>(regPages.getKey(), Arrays.copyOf(pages, n)));
                continue;
            }
            cpPagesPerRegion.add(new T2<PageMemoryEx, FullPageId[]>(regPages.getKey(), pages));
        }
        if (this.checkpointWriteOrder == CheckpointWriteOrder.SEQUENTIAL) {
            Comparator<FullPageId> cmp = Comparator.comparingInt(FullPageId::groupId).thenComparingLong(FullPageId::effectivePageId);
            ForkJoinPool pool = null;
            for (T2 t2 : cpPagesPerRegion) {
                if (((FullPageId[])t2.getValue()).length >= this.parallelSortThreshold) {
                    pool = CheckpointWorkflow.parallelSortInIsolatedPool((FullPageId[])t2.get2(), cmp, pool);
                    continue;
                }
                Arrays.sort((Object[])t2.get2(), cmp);
            }
            if (pool != null) {
                pool.shutdown();
            }
        }
        return new GridConcurrentMultiPairQueue<PageMemoryEx, FullPageId>(cpPagesPerRegion);
    }

    private static ForkJoinPool parallelSortInIsolatedPool(FullPageId[] pagesArr, Comparator<FullPageId> cmp, ForkJoinPool pool) throws IgniteCheckedException {
        ForkJoinPool.ForkJoinWorkerThreadFactory factory = new ForkJoinPool.ForkJoinWorkerThreadFactory(){

            @Override
            public ForkJoinWorkerThread newThread(ForkJoinPool pool) {
                ForkJoinWorkerThread worker = ForkJoinPool.defaultForkJoinWorkerThreadFactory.newThread(pool);
                worker.setName("checkpoint-pages-sorter-" + worker.getPoolIndex());
                return worker;
            }
        };
        ForkJoinPool execPool = pool == null ? new ForkJoinPool(PARALLEL_SORT_THREADS + 1, factory, null, false) : pool;
        Future sortTask = execPool.submit(() -> Arrays.parallelSort(pagesArr, cmp));
        try {
            sortTask.get();
        }
        catch (InterruptedException e) {
            throw new IgniteInterruptedCheckedException(e);
        }
        catch (ExecutionException e) {
            throw new IgniteCheckedException("Failed to perform pages array parallel sort", e.getCause());
        }
        return execPool;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void markCheckpointEnd(Checkpoint chp) throws IgniteCheckedException {
        CheckpointWorkflow checkpointWorkflow = this;
        synchronized (checkpointWorkflow) {
            chp.progress.clearCounters();
            for (DataRegion memPlc : this.dataRegions.get()) {
                if (!memPlc.config().isPersistenceEnabled()) continue;
                ((PageMemoryEx)memPlc.pageMemory()).finishCheckpoint();
            }
        }
        if (chp.hasDelta()) {
            if (this.checkpointMarkersStorage != null) {
                this.checkpointMarkersStorage.writeCheckpointEntry(chp.cpEntry.timestamp(), chp.cpEntry.checkpointId(), chp.cpEntry.checkpointMark(), null, CheckpointEntryType.END, this.skipSync);
            }
            if (this.wal != null) {
                this.wal.notchLastCheckpointPtr(chp.cpEntry.checkpointMark());
            }
        }
        if (this.checkpointMarkersStorage != null) {
            this.checkpointMarkersStorage.onCheckpointFinished(chp);
        }
        CheckpointContextImpl emptyCtx = new CheckpointContextImpl(chp.progress, null, null, null);
        Collection<DataRegion> checkpointedRegions = this.dataRegions.get();
        List<CheckpointListener> dbLsnrs = this.getRelevantCheckpointListeners(checkpointedRegions);
        for (CheckpointListener lsnr : dbLsnrs) {
            lsnr.afterCheckpointEnd(emptyCtx);
        }
        chp.progress.transitTo(CheckpointState.FINISHED);
    }

    public List<CheckpointListener> getRelevantCheckpointListeners(Collection<DataRegion> checkpointedRegions) {
        return this.lsnrs.entrySet().stream().filter(entry -> entry.getValue() == NO_REGION || checkpointedRegions.contains(entry.getValue())).map(Map.Entry::getKey).collect(Collectors.toList());
    }

    public void finalizeCheckpointOnRecovery(long cpTs, UUID cpId, WALPointer walPtr, StripedExecutor exec, CheckpointPagesWriterFactory checkpointPagesWriterFactory) throws IgniteCheckedException {
        assert (cpTs != 0L);
        long start = System.currentTimeMillis();
        Collection<DataRegion> regions = this.dataRegions.get();
        CheckpointPagesInfoHolder cpPagesHolder = this.beginAllCheckpoints(regions, new GridFinishedFuture());
        GridConcurrentMultiPairQueue<PageMemoryEx, FullPageId> pages = this.splitAndSortCpPagesIfNeeded(cpPagesHolder);
        GridConcurrentHashSet<PageStore> updStores = new GridConcurrentHashSet<PageStore>();
        AtomicInteger cpPagesCnt = new AtomicInteger();
        AtomicReference<Throwable> writePagesError = new AtomicReference<Throwable>();
        for (int stripeIdx = 0; stripeIdx < exec.stripesCount(); ++stripeIdx) {
            exec.execute(stripeIdx, checkpointPagesWriterFactory.buildRecovery(pages, updStores, writePagesError, cpPagesCnt));
        }
        this.awaitApplyComplete(exec, writePagesError);
        long written = U.currentTimeMillis();
        for (PageStore updStore : updStores) {
            updStore.sync();
        }
        long fsync = U.currentTimeMillis();
        for (DataRegion memPlc : regions) {
            if (!memPlc.config().isPersistenceEnabled()) continue;
            ((PageMemoryEx)memPlc.pageMemory()).finishCheckpoint();
        }
        this.checkpointMarkersStorage.writeCheckpointEntry(cpTs, cpId, walPtr, null, CheckpointEntryType.END, this.skipSync);
        if (this.log.isInfoEnabled()) {
            this.log.info(String.format("Checkpoint finished [cpId=%s, pages=%d, markPos=%s, pagesWrite=%dms, fsync=%dms, total=%dms]", cpId, cpPagesCnt.get(), walPtr, written - start, fsync - written, fsync - start));
        }
    }

    private void awaitApplyComplete(StripedExecutor exec, AtomicReference<Throwable> applyError) throws IgniteCheckedException {
        try {
            exec.awaitComplete(new int[0]);
        }
        catch (InterruptedException e) {
            throw new IgniteInterruptedException(e);
        }
        Throwable error = applyError.get();
        if (error != null) {
            throw error instanceof IgniteCheckedException ? (IgniteCheckedException)error : new IgniteCheckedException(error);
        }
    }

    public void memoryRecoveryRecordPtr(WALPointer memoryRecoveryRecordPtr) {
        this.memoryRecoveryRecordPtr = memoryRecoveryRecordPtr;
    }

    public void addCheckpointListener(CheckpointListener lsnr, DataRegion dataRegion) {
        this.lsnrs.put(lsnr, dataRegion == null ? NO_REGION : dataRegion);
    }

    public void removeCheckpointListener(CheckpointListener lsnr) {
        this.lsnrs.remove(lsnr);
    }

    public void stop() {
        IgniteThreadPoolExecutor pool = this.checkpointCollectPagesInfoPool;
        if (pool != null) {
            pool.shutdownNow();
            try {
                pool.awaitTermination(2L, TimeUnit.MINUTES);
            }
            catch (InterruptedException ignore) {
                Thread.currentThread().interrupt();
            }
            this.checkpointCollectPagesInfoPool = null;
        }
        for (CheckpointListener lsnr : this.lsnrs.keySet()) {
            this.lsnrs.remove(lsnr);
        }
    }

    public void start() {
        if (this.checkpointCollectPagesInfoPool == null) {
            this.checkpointCollectPagesInfoPool = this.initializeCheckpointPool();
        }
    }
}

