/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.checkpoint;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.execution.SavepointFormatType;
import org.apache.flink.core.testutils.FlinkAssertions;
import org.apache.flink.core.testutils.FlinkCompletableFutureAssert;
import org.apache.flink.runtime.checkpoint.CheckpointProperties;
import org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy;
import org.apache.flink.runtime.checkpoint.CheckpointStoreUtil;
import org.apache.flink.runtime.checkpoint.CheckpointsCleaner;
import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
import org.apache.flink.runtime.checkpoint.CompletedCheckpointStoreTest;
import org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore;
import org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStoreUtils;
import org.apache.flink.runtime.jobgraph.RestoreMode;
import org.apache.flink.runtime.persistence.TestingRetrievableStateStorageHelper;
import org.apache.flink.runtime.persistence.TestingStateHandleStore;
import org.apache.flink.runtime.state.CompletedCheckpointStorageLocation;
import org.apache.flink.runtime.state.RetrievableStateHandle;
import org.apache.flink.runtime.state.SharedStateRegistry;
import org.apache.flink.runtime.state.SharedStateRegistryImpl;
import org.apache.flink.runtime.state.SharedStateRegistryKey;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.TestingStreamStateHandle;
import org.apache.flink.runtime.state.testutils.TestCompletedCheckpointStorageLocation;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.concurrent.ExecutorThreadFactory;
import org.apache.flink.util.function.BiFunctionWithException;
import org.apache.flink.util.function.FunctionWithException;
import org.apache.flink.util.function.SupplierWithException;
import org.assertj.core.api.AbstractThrowableAssert;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

class DefaultCompletedCheckpointStoreTest {
    private final long timeout = 100L;
    private TestingStateHandleStore.Builder<CompletedCheckpoint> builder;
    private TestingRetrievableStateStorageHelper<CompletedCheckpoint> checkpointStorageHelper;
    private ExecutorService executorService;

    DefaultCompletedCheckpointStoreTest() {
    }

    @BeforeEach
    void setup() {
        this.builder = TestingStateHandleStore.newBuilder();
        this.checkpointStorageHelper = new TestingRetrievableStateStorageHelper();
        this.executorService = Executors.newFixedThreadPool(2, (ThreadFactory)new ExecutorThreadFactory("IO-Executor"));
    }

    @AfterEach
    void after() {
        this.executorService.shutdownNow();
    }

    @Test
    void testAtLeastOneCheckpointRetained() throws Exception {
        CompletedCheckpoint cp1 = this.getCheckpoint(false, 1L);
        CompletedCheckpoint cp2 = this.getCheckpoint(false, 2L);
        CompletedCheckpoint sp1 = this.getCheckpoint(true, 3L);
        CompletedCheckpoint sp2 = this.getCheckpoint(true, 4L);
        CompletedCheckpoint sp3 = this.getCheckpoint(true, 5L);
        this.testCheckpointRetention(1, Arrays.asList(cp1, cp2, sp1, sp2, sp3), Arrays.asList(cp2, sp3));
    }

    @Test
    void testOlderSavepointSubsumed() throws Exception {
        CompletedCheckpoint cp1 = this.getCheckpoint(false, 1L);
        CompletedCheckpoint sp1 = this.getCheckpoint(true, 2L);
        CompletedCheckpoint cp2 = this.getCheckpoint(false, 3L);
        this.testCheckpointRetention(1, Arrays.asList(cp1, sp1, cp2), Arrays.asList(cp2));
    }

    @Test
    void testSubsumeAfterStoppingWithSavepoint() throws Exception {
        CompletedCheckpoint cp1 = this.getCheckpoint(false, 1L);
        CompletedCheckpoint sp1 = this.getCheckpoint(true, 2L);
        CompletedCheckpoint stop = this.getCheckpoint(CheckpointProperties.forSyncSavepoint((boolean)false, (boolean)false, (SavepointFormatType)SavepointFormatType.CANONICAL), 3L);
        this.testCheckpointRetention(1, Arrays.asList(cp1, sp1, stop), Arrays.asList(stop));
    }

    @Test
    void testNotSubsumedIfNotNeeded() throws Exception {
        CompletedCheckpoint cp1 = this.getCheckpoint(false, 1L);
        CompletedCheckpoint cp2 = this.getCheckpoint(false, 2L);
        CompletedCheckpoint cp3 = this.getCheckpoint(false, 3L);
        this.testCheckpointRetention(3, Arrays.asList(cp1, cp2, cp3), Arrays.asList(cp1, cp2, cp3));
    }

    private void testCheckpointRetention(int numRetain, List<CompletedCheckpoint> completed, List<CompletedCheckpoint> expectedRetained) throws Exception {
        TestingStateHandleStore<CompletedCheckpoint> stateHandleStore = this.builder.setGetAllSupplier((SupplierWithException<List<Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String>>, Exception>)((SupplierWithException)() -> this.createStateHandles(3))).build();
        CompletedCheckpointStore completedCheckpointStore = this.createCompletedCheckpointStore(stateHandleStore, numRetain);
        for (CompletedCheckpoint c : completed) {
            completedCheckpointStore.addCheckpointAndSubsumeOldestOne(c, new CheckpointsCleaner(), () -> {});
        }
        Assertions.assertThat((List)completedCheckpointStore.getAllCheckpoints()).isEqualTo(expectedRetained);
    }

    @Test
    void testRecoverSortedCheckpoints() throws Exception {
        TestingStateHandleStore<CompletedCheckpoint> stateHandleStore = this.builder.setGetAllSupplier((SupplierWithException<List<Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String>>, Exception>)((SupplierWithException)() -> this.createStateHandles(3))).build();
        CompletedCheckpointStore completedCheckpointStore = this.createCompletedCheckpointStore(stateHandleStore);
        List recoveredCompletedCheckpoint = completedCheckpointStore.getAllCheckpoints();
        Assertions.assertThat((List)recoveredCompletedCheckpoint).hasSize(3);
        List checkpointIds = recoveredCompletedCheckpoint.stream().map(CompletedCheckpoint::getCheckpointID).collect(Collectors.toList());
        Assertions.assertThat(checkpointIds).containsExactly((Object[])new Long[]{1L, 2L, 3L});
    }

    @Test
    void testCorruptDataInStateHandleStoreShouldNotBeSkipped() throws Exception {
        long corruptCkpId = 2L;
        this.checkpointStorageHelper.setRetrieveStateFunction((FunctionWithException<CompletedCheckpoint, CompletedCheckpoint, IOException>)((FunctionWithException)state -> {
            if (state.getCheckpointID() == 2L) {
                throw new IOException("Failed to retrieve checkpoint 2");
            }
            return state;
        }));
        TestingStateHandleStore<CompletedCheckpoint> stateHandleStore = this.builder.setGetAllSupplier((SupplierWithException<List<Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String>>, Exception>)((SupplierWithException)() -> this.createStateHandles(3))).build();
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> this.createCompletedCheckpointStore(stateHandleStore)).as("Exception should have been thrown.", new Object[0])).hasCauseInstanceOf(IOException.class);
    }

    @Test
    void testAddCheckpointSuccessfullyShouldRemoveOldOnes() throws Exception {
        boolean num = true;
        CompletableFuture addFuture = new CompletableFuture();
        TestingStateHandleStore<CompletedCheckpoint> stateHandleStore = this.builder.setGetAllSupplier((SupplierWithException<List<Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String>>, Exception>)((SupplierWithException)() -> this.createStateHandles(1))).setAddFunction((BiFunctionWithException<String, CompletedCheckpoint, RetrievableStateHandle<CompletedCheckpoint>, Exception>)((BiFunctionWithException)(ignore, ckp) -> {
            addFuture.complete(ckp);
            return null;
        })).build();
        CompletedCheckpointStore completedCheckpointStore = this.createCompletedCheckpointStore(stateHandleStore);
        Assertions.assertThat((List)completedCheckpointStore.getAllCheckpoints()).hasSize(1);
        Assertions.assertThat((long)((CompletedCheckpoint)completedCheckpointStore.getAllCheckpoints().get(0)).getCheckpointID()).isOne();
        long ckpId = 100L;
        CompletedCheckpointStoreTest.TestCompletedCheckpoint ckp2 = CompletedCheckpointStoreTest.createCheckpoint(100L, (SharedStateRegistry)new SharedStateRegistryImpl());
        completedCheckpointStore.addCheckpointAndSubsumeOldestOne((CompletedCheckpoint)ckp2, new CheckpointsCleaner(), () -> {});
        CompletedCheckpoint addedCkp = (CompletedCheckpoint)addFuture.get(100L, TimeUnit.MILLISECONDS);
        Assertions.assertThat((long)addedCkp.getCheckpointID()).isEqualTo(100L);
        Assertions.assertThat((List)completedCheckpointStore.getAllCheckpoints()).hasSize(1);
        Assertions.assertThat((long)((CompletedCheckpoint)completedCheckpointStore.getAllCheckpoints().get(0)).getCheckpointID()).isEqualTo(100L);
    }

    @Test
    void testAddCheckpointFailedShouldNotRemoveOldOnes() throws Exception {
        boolean num = true;
        String errMsg = "Add to state handle failed.";
        TestingStateHandleStore<CompletedCheckpoint> stateHandleStore = this.builder.setGetAllSupplier((SupplierWithException<List<Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String>>, Exception>)((SupplierWithException)() -> this.createStateHandles(1))).setAddFunction((BiFunctionWithException<String, CompletedCheckpoint, RetrievableStateHandle<CompletedCheckpoint>, Exception>)((BiFunctionWithException)(ignore, ckp) -> {
            throw new FlinkException("Add to state handle failed.");
        })).build();
        CompletedCheckpointStore completedCheckpointStore = this.createCompletedCheckpointStore(stateHandleStore);
        Assertions.assertThat((List)completedCheckpointStore.getAllCheckpoints()).hasSize(1);
        Assertions.assertThat((long)((CompletedCheckpoint)completedCheckpointStore.getAllCheckpoints().get(0)).getCheckpointID()).isOne();
        long ckpId = 100L;
        CompletedCheckpointStoreTest.TestCompletedCheckpoint ckp2 = CompletedCheckpointStoreTest.createCheckpoint(100L, (SharedStateRegistry)new SharedStateRegistryImpl());
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> completedCheckpointStore.addCheckpointAndSubsumeOldestOne(ckp2, new CheckpointsCleaner(), () -> {})).as("We should get an exception when add checkpoint to failed..", new Object[0])).hasMessageContaining("Add to state handle failed.").isInstanceOf(FlinkException.class);
        Assertions.assertThat((List)completedCheckpointStore.getAllCheckpoints()).hasSize(1);
        Assertions.assertThat((long)((CompletedCheckpoint)completedCheckpointStore.getAllCheckpoints().get(0)).getCheckpointID()).isOne();
    }

    @Test
    void testShutdownShouldDiscardStateHandleWhenJobIsGloballyTerminalState() throws Exception {
        int numBeforeRetained = 3;
        int numAfterRetained = 4;
        long retainedCheckpointID = numBeforeRetained + 1;
        int numCheckpoints = numBeforeRetained + 1 + numAfterRetained;
        AtomicInteger removeCalledNum = new AtomicInteger(0);
        CompletableFuture clearEntriesAllFuture = new CompletableFuture();
        TestingStateHandleStore<CompletedCheckpoint> stateHandleStore = this.builder.setGetAllSupplier((SupplierWithException<List<Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String>>, Exception>)((SupplierWithException)() -> this.createStateHandles(numBeforeRetained, numAfterRetained))).setRemoveFunction((FunctionWithException<String, Boolean, Exception>)((FunctionWithException)ignore -> {
            removeCalledNum.incrementAndGet();
            return true;
        })).setClearEntriesRunnable(() -> clearEntriesAllFuture.complete(null)).build();
        CompletedCheckpointStore completedCheckpointStore = this.createCompletedCheckpointStore(stateHandleStore);
        Assertions.assertThat((List)completedCheckpointStore.getAllCheckpoints()).hasSize(numCheckpoints);
        TestingStreamStateHandle nonRetainedState = this.registerState(completedCheckpointStore, retainedCheckpointID - 1L);
        TestingStreamStateHandle retainedState = this.registerState(completedCheckpointStore, retainedCheckpointID);
        TestingStreamStateHandle beyondRetained = this.registerState(completedCheckpointStore, retainedCheckpointID + 1L);
        completedCheckpointStore.shutdown(JobStatus.CANCELED, new CheckpointsCleaner());
        Assertions.assertThat((AtomicInteger)removeCalledNum).hasValue(numCheckpoints);
        Assertions.assertThat(clearEntriesAllFuture).isDone();
        Assertions.assertThat((List)completedCheckpointStore.getAllCheckpoints()).isEmpty();
        Assertions.assertThat((boolean)nonRetainedState.isDisposed()).isTrue();
        Assertions.assertThat((boolean)retainedState.isDisposed()).isFalse();
        Assertions.assertThat((boolean)beyondRetained.isDisposed()).isFalse();
    }

    @Test
    void testShutdownShouldNotDiscardStateHandleWhenJobIsNotGloballyTerminalState() throws Exception {
        AtomicInteger removeCalledNum = new AtomicInteger(0);
        CompletableFuture removeAllFuture = new CompletableFuture();
        CompletableFuture releaseAllFuture = new CompletableFuture();
        TestingStateHandleStore<CompletedCheckpoint> stateHandleStore = this.builder.setGetAllSupplier((SupplierWithException<List<Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String>>, Exception>)((SupplierWithException)() -> this.createStateHandles(3))).setRemoveFunction((FunctionWithException<String, Boolean, Exception>)((FunctionWithException)ignore -> {
            removeCalledNum.incrementAndGet();
            return true;
        })).setReleaseAllHandlesRunnable(() -> releaseAllFuture.complete(null)).setClearEntriesRunnable(() -> removeAllFuture.complete(null)).build();
        CompletedCheckpointStore completedCheckpointStore = this.createCompletedCheckpointStore(stateHandleStore);
        Assertions.assertThat((List)completedCheckpointStore.getAllCheckpoints()).hasSize(3);
        TestingStreamStateHandle streamStateHandle = this.registerState(completedCheckpointStore, 3L);
        completedCheckpointStore.shutdown(JobStatus.CANCELLING, new CheckpointsCleaner());
        ((FlinkCompletableFutureAssert)FlinkAssertions.assertThatFuture(removeAllFuture).as("We should get an expected timeout because the job is not globally terminated.", new Object[0])).failsWithin(100L, TimeUnit.MILLISECONDS);
        Assertions.assertThat((AtomicInteger)removeCalledNum).hasValue(0);
        Assertions.assertThat(removeAllFuture).isNotDone();
        Assertions.assertThat(releaseAllFuture).isDone();
        Assertions.assertThat((List)completedCheckpointStore.getAllCheckpoints()).isEmpty();
        Assertions.assertThat((boolean)streamStateHandle.isDisposed()).isFalse();
    }

    @Test
    void testShutdownFailsAnyFutureCallsToAddCheckpoint() throws Exception {
        CheckpointsCleaner checkpointsCleaner = new CheckpointsCleaner();
        for (JobStatus status : JobStatus.values()) {
            CompletedCheckpointStore completedCheckpointStore = this.createCompletedCheckpointStore(this.builder.build());
            completedCheckpointStore.shutdown(status, checkpointsCleaner);
            Assertions.assertThatExceptionOfType(IllegalStateException.class).isThrownBy(() -> completedCheckpointStore.addCheckpointAndSubsumeOldestOne((CompletedCheckpoint)CompletedCheckpointStoreTest.createCheckpoint(0L, (SharedStateRegistry)new SharedStateRegistryImpl()), checkpointsCleaner, () -> {}));
        }
    }

    private List<Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String>> createStateHandles(int num) {
        ArrayList<Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String>> stateHandles = new ArrayList<Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String>>();
        for (int i = 1; i <= num; ++i) {
            CompletedCheckpointStoreTest.TestCompletedCheckpoint completedCheckpoint = CompletedCheckpointStoreTest.createCheckpoint(i, (SharedStateRegistry)new SharedStateRegistryImpl());
            RetrievableStateHandle<CompletedCheckpoint> checkpointStateHandle = this.checkpointStorageHelper.store(completedCheckpoint);
            stateHandles.add((Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String>)new Tuple2(checkpointStateHandle, (Object)String.valueOf(i)));
        }
        return stateHandles;
    }

    private CompletedCheckpointStore createCompletedCheckpointStore(TestingStateHandleStore<CompletedCheckpoint> stateHandleStore) throws Exception {
        return this.createCompletedCheckpointStore(stateHandleStore, 1);
    }

    private CompletedCheckpointStore createCompletedCheckpointStore(TestingStateHandleStore<CompletedCheckpoint> stateHandleStore, int toRetain) throws Exception {
        CheckpointStoreUtil checkpointStoreUtil = new CheckpointStoreUtil(){

            public String checkpointIDToName(long checkpointId) {
                return String.valueOf(checkpointId);
            }

            public long nameToCheckpointID(String name) {
                return Long.parseLong(name);
            }
        };
        return new DefaultCompletedCheckpointStore(toRetain, stateHandleStore, checkpointStoreUtil, DefaultCompletedCheckpointStoreUtils.retrieveCompletedCheckpoints(stateHandleStore, (CheckpointStoreUtil)checkpointStoreUtil), SharedStateRegistry.DEFAULT_FACTORY.create(org.apache.flink.util.concurrent.Executors.directExecutor(), Collections.emptyList(), RestoreMode.DEFAULT), (Executor)this.executorService);
    }

    private CompletedCheckpoint getCheckpoint(boolean isSavepoint, long id) {
        return this.getCheckpoint(isSavepoint ? CheckpointProperties.forSavepoint((boolean)false, (SavepointFormatType)SavepointFormatType.CANONICAL) : CheckpointProperties.forCheckpoint((CheckpointRetentionPolicy)CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION), id);
    }

    private CompletedCheckpoint getCheckpoint(CheckpointProperties props, long id) {
        return new CompletedCheckpoint(new JobID(), id, 0L, 0L, Collections.emptyMap(), Collections.emptyList(), props, (CompletedCheckpointStorageLocation)new TestCompletedCheckpointStorageLocation(), null);
    }

    private List<Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String>> createStateHandles(int numBeforeRetained, int numAfterRetained) {
        long retainedCheckpointID = numBeforeRetained + 1;
        ArrayList<Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String>> cpHandles = new ArrayList<Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String>>(this.createStateHandles(numBeforeRetained));
        CompletedCheckpointStoreTest.TestCompletedCheckpoint completedCheckpoint = CompletedCheckpointStoreTest.createCheckpoint(retainedCheckpointID, (SharedStateRegistry)new SharedStateRegistryImpl(), CheckpointProperties.forCheckpoint((CheckpointRetentionPolicy)CheckpointRetentionPolicy.RETAIN_ON_CANCELLATION));
        cpHandles.add((Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String>)Tuple2.of(this.checkpointStorageHelper.store(completedCheckpoint), (Object)String.valueOf(retainedCheckpointID)));
        cpHandles.addAll(this.createStateHandles(numAfterRetained));
        return cpHandles;
    }

    private TestingStreamStateHandle registerState(CompletedCheckpointStore completedCheckpointStore, long checkpointID) {
        TestingStreamStateHandle handle = new TestingStreamStateHandle();
        completedCheckpointStore.getSharedStateRegistry().registerReference(new SharedStateRegistryKey(String.valueOf(new Object().hashCode())), (StreamStateHandle)handle, checkpointID);
        return handle;
    }
}

