/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.checkpoint.filemerging;

import java.io.IOException;
import java.util.HashSet;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import org.apache.flink.api.common.TaskInfo;
import org.apache.flink.api.common.TaskInfoImpl;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.fs.local.LocalFileSystem;
import org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManager;
import org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManagerBase;
import org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManagerBuilder;
import org.apache.flink.runtime.checkpoint.filemerging.LogicalFile;
import org.apache.flink.runtime.checkpoint.filemerging.PhysicalFile;
import org.apache.flink.runtime.checkpoint.filemerging.SegmentFileStateHandle;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.state.CheckpointedStateScope;
import org.apache.flink.runtime.state.filesystem.FileMergingCheckpointStateOutputStream;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

public class FileMergingSnapshotManagerTest {
    private final String tmId = "Testing";
    private final OperatorID operatorID = new OperatorID(289347923L, 75893479L);
    private FileMergingSnapshotManager.SubtaskKey subtaskKey1;
    private FileMergingSnapshotManager.SubtaskKey subtaskKey2;
    private Path checkpointBaseDir;
    private int writeBufferSize;

    @BeforeEach
    public void setup(@TempDir java.nio.file.Path tempFolder) {
        long jobId = 1L;
        this.subtaskKey1 = new FileMergingSnapshotManager.SubtaskKey(this.operatorID, (TaskInfo)new TaskInfoImpl("TestingTask", 128, 0, 128, 3));
        this.subtaskKey2 = new FileMergingSnapshotManager.SubtaskKey(this.operatorID, (TaskInfo)new TaskInfoImpl("TestingTask", 128, 1, 128, 3));
        this.checkpointBaseDir = new Path(tempFolder.toString(), String.valueOf(jobId));
        this.writeBufferSize = 4096;
    }

    @Test
    void testCreateFileMergingSnapshotManager() throws IOException {
        try (FileMergingSnapshotManagerBase fmsm = (FileMergingSnapshotManagerBase)this.createFileMergingSnapshotManager(this.checkpointBaseDir);){
            fmsm.registerSubtaskForSharedStates(this.subtaskKey1);
            Assertions.assertThat((Object)fmsm.getManagedDir(this.subtaskKey1, CheckpointedStateScope.EXCLUSIVE)).isEqualTo((Object)new Path(this.checkpointBaseDir, "taskowned/Testing"));
            Assertions.assertThat((Object)fmsm.getManagedDir(this.subtaskKey1, CheckpointedStateScope.SHARED)).isEqualTo((Object)new Path(this.checkpointBaseDir, "shared/" + this.subtaskKey1.getManagedDirName()));
        }
    }

    @Test
    void testCreateAndReuseFiles() throws IOException {
        try (FileMergingSnapshotManagerBase fmsm = (FileMergingSnapshotManagerBase)this.createFileMergingSnapshotManager(this.checkpointBaseDir);){
            fmsm.registerSubtaskForSharedStates(this.subtaskKey1);
            fmsm.registerSubtaskForSharedStates(this.subtaskKey2);
            PhysicalFile file1 = fmsm.getOrCreatePhysicalFileForCheckpoint(this.subtaskKey1, 0L, CheckpointedStateScope.SHARED);
            Assertions.assertThat((Object)file1.getFilePath().getParent()).isEqualTo((Object)fmsm.getManagedDir(this.subtaskKey1, CheckpointedStateScope.SHARED));
            PhysicalFile file2 = fmsm.getOrCreatePhysicalFileForCheckpoint(this.subtaskKey1, 0L, CheckpointedStateScope.SHARED);
            Assertions.assertThat((Object)file2.getFilePath().getParent()).isEqualTo((Object)fmsm.getManagedDir(this.subtaskKey1, CheckpointedStateScope.SHARED));
            Assertions.assertThat((Object)file2).isNotEqualTo((Object)file1);
            fmsm.returnPhysicalFileForNextReuse(this.subtaskKey1, 0L, file1);
            PhysicalFile file3 = fmsm.getOrCreatePhysicalFileForCheckpoint(this.subtaskKey2, 0L, CheckpointedStateScope.SHARED);
            Assertions.assertThat((Object)file3.getFilePath().getParent()).isEqualTo((Object)fmsm.getManagedDir(this.subtaskKey2, CheckpointedStateScope.SHARED));
            Assertions.assertThat((Object)file3).isNotEqualTo((Object)file1);
            PhysicalFile file4 = fmsm.getOrCreatePhysicalFileForCheckpoint(this.subtaskKey1, 1L, CheckpointedStateScope.SHARED);
            Assertions.assertThat((Object)file4.getFilePath().getParent()).isEqualTo((Object)fmsm.getManagedDir(this.subtaskKey1, CheckpointedStateScope.SHARED));
            Assertions.assertThat((Object)file4).isNotEqualTo((Object)file1);
            PhysicalFile file5 = fmsm.getOrCreatePhysicalFileForCheckpoint(this.subtaskKey1, 0L, CheckpointedStateScope.SHARED);
            Assertions.assertThat((Object)file5.getFilePath().getParent()).isEqualTo((Object)fmsm.getManagedDir(this.subtaskKey1, CheckpointedStateScope.SHARED));
            Assertions.assertThat((Object)file5).isEqualTo((Object)file1);
            PhysicalFile file6 = fmsm.getOrCreatePhysicalFileForCheckpoint(this.subtaskKey1, 0L, CheckpointedStateScope.EXCLUSIVE);
            Assertions.assertThat((Object)file6.getFilePath().getParent()).isEqualTo((Object)fmsm.getManagedDir(this.subtaskKey1, CheckpointedStateScope.EXCLUSIVE));
            PhysicalFile file7 = fmsm.getOrCreatePhysicalFileForCheckpoint(this.subtaskKey1, 0L, CheckpointedStateScope.EXCLUSIVE);
            Assertions.assertThat((Object)file7.getFilePath().getParent()).isEqualTo((Object)fmsm.getManagedDir(this.subtaskKey1, CheckpointedStateScope.EXCLUSIVE));
            Assertions.assertThat((Object)file7).isNotEqualTo((Object)file6);
            fmsm.returnPhysicalFileForNextReuse(this.subtaskKey1, 0L, file6);
            PhysicalFile file8 = fmsm.getOrCreatePhysicalFileForCheckpoint(this.subtaskKey1, 1L, CheckpointedStateScope.EXCLUSIVE);
            Assertions.assertThat((Object)file8.getFilePath().getParent()).isEqualTo((Object)fmsm.getManagedDir(this.subtaskKey1, CheckpointedStateScope.EXCLUSIVE));
            Assertions.assertThat((Object)file8).isNotEqualTo((Object)file6);
            PhysicalFile file9 = fmsm.getOrCreatePhysicalFileForCheckpoint(this.subtaskKey2, 0L, CheckpointedStateScope.EXCLUSIVE);
            Assertions.assertThat((Object)file9.getFilePath().getParent()).isEqualTo((Object)fmsm.getManagedDir(this.subtaskKey2, CheckpointedStateScope.EXCLUSIVE));
            Assertions.assertThat((Object)file9).isEqualTo((Object)file6);
            Assertions.assertThat((Object)fmsm.getManagedDir(this.subtaskKey2, CheckpointedStateScope.EXCLUSIVE)).isEqualTo((Object)fmsm.getManagedDir(this.subtaskKey1, CheckpointedStateScope.EXCLUSIVE));
        }
    }

    @Test
    void testRefCountBetweenLogicalAndPhysicalFiles() throws IOException {
        try (FileMergingSnapshotManagerBase fmsm = (FileMergingSnapshotManagerBase)this.createFileMergingSnapshotManager(this.checkpointBaseDir);){
            fmsm.registerSubtaskForSharedStates(this.subtaskKey1);
            fmsm.registerSubtaskForSharedStates(this.subtaskKey2);
            PhysicalFile physicalFile1 = fmsm.getOrCreatePhysicalFileForCheckpoint(this.subtaskKey1, 0L, CheckpointedStateScope.SHARED);
            Assertions.assertThat((boolean)physicalFile1.isOpen()).isTrue();
            LogicalFile logicalFile1 = fmsm.createLogicalFile(physicalFile1, 0L, 10L, this.subtaskKey1);
            Assertions.assertThat((Object)logicalFile1.getSubtaskKey()).isEqualTo((Object)this.subtaskKey1);
            Assertions.assertThat((Object)logicalFile1.getPhysicalFile()).isEqualTo((Object)physicalFile1);
            Assertions.assertThat((long)logicalFile1.getStartOffset()).isZero();
            Assertions.assertThat((long)logicalFile1.getLength()).isEqualTo(10L);
            Assertions.assertThat((int)physicalFile1.getRefCount()).isOne();
            Assertions.assertThat((boolean)logicalFile1.isDiscarded()).isFalse();
            logicalFile1.advanceLastCheckpointId(2L);
            Assertions.assertThat((long)logicalFile1.getLastUsedCheckpointID()).isEqualTo(2L);
            logicalFile1.advanceLastCheckpointId(1L);
            Assertions.assertThat((long)logicalFile1.getLastUsedCheckpointID()).isEqualTo(2L);
            logicalFile1.discardWithCheckpointId(1L);
            Assertions.assertThat((boolean)logicalFile1.isDiscarded()).isFalse();
            logicalFile1.discardWithCheckpointId(2L);
            Assertions.assertThat((boolean)logicalFile1.isDiscarded()).isTrue();
            Assertions.assertThat((boolean)physicalFile1.isOpen()).isTrue();
            Assertions.assertThat((boolean)physicalFile1.isDeleted()).isFalse();
            Assertions.assertThat((int)physicalFile1.getRefCount()).isZero();
            physicalFile1.close();
            Assertions.assertThat((boolean)physicalFile1.isOpen()).isFalse();
            Assertions.assertThat((boolean)physicalFile1.isDeleted()).isTrue();
            PhysicalFile physicalFile2 = fmsm.getOrCreatePhysicalFileForCheckpoint(this.subtaskKey1, 0L, CheckpointedStateScope.SHARED);
            LogicalFile logicalFile2 = fmsm.createLogicalFile(physicalFile2, 0L, 10L, this.subtaskKey1);
            Assertions.assertThat((Object)logicalFile2.getPhysicalFile()).isEqualTo((Object)physicalFile2);
            Assertions.assertThat((long)logicalFile2.getStartOffset()).isZero();
            Assertions.assertThat((long)logicalFile2.getLength()).isEqualTo(10L);
            Assertions.assertThat((int)physicalFile2.getRefCount()).isOne();
            logicalFile2.advanceLastCheckpointId(2L);
            Assertions.assertThat((boolean)physicalFile2.isOpen()).isTrue();
            Assertions.assertThat((boolean)physicalFile2.isDeleted()).isFalse();
            physicalFile2.close();
            Assertions.assertThat((boolean)physicalFile2.isOpen()).isFalse();
            Assertions.assertThat((boolean)physicalFile2.isDeleted()).isFalse();
            Assertions.assertThat((int)physicalFile2.getRefCount()).isOne();
            logicalFile2.discardWithCheckpointId(2L);
            Assertions.assertThat((boolean)logicalFile2.isDiscarded()).isTrue();
            Assertions.assertThat((boolean)physicalFile2.isDeleted()).isTrue();
            Assertions.assertThat((int)physicalFile2.getRefCount()).isZero();
        }
    }

    @Test
    void testSizeStatsInPhysicalFile() throws IOException {
        try (FileMergingSnapshotManagerBase fmsm = (FileMergingSnapshotManagerBase)this.createFileMergingSnapshotManager(this.checkpointBaseDir);){
            fmsm.registerSubtaskForSharedStates(this.subtaskKey1);
            fmsm.registerSubtaskForSharedStates(this.subtaskKey2);
            PhysicalFile physicalFile = fmsm.getOrCreatePhysicalFileForCheckpoint(this.subtaskKey1, 0L, CheckpointedStateScope.SHARED);
            Assertions.assertThat((long)physicalFile.getSize()).isZero();
            physicalFile.incSize(123L);
            Assertions.assertThat((long)physicalFile.getSize()).isEqualTo(123L);
            physicalFile.incSize(456L);
            Assertions.assertThat((long)physicalFile.getSize()).isEqualTo(579L);
        }
    }

    @Test
    public void testReusedFileWriting() throws Exception {
        long checkpointId = 1L;
        int streamNum = 10;
        int perStreamWriteNum = 128;
        byte[] bytes = new byte[streamNum * perStreamWriteNum];
        Random rd = new Random();
        rd.nextBytes(bytes);
        int byteIndex = 0;
        SegmentFileStateHandle[] handles = new SegmentFileStateHandle[streamNum];
        try (FileMergingSnapshotManager fmsm = this.createFileMergingSnapshotManager(this.checkpointBaseDir);
             CloseableRegistry closeableRegistry = new CloseableRegistry();){
            for (int i = 0; i < streamNum; ++i) {
                FileMergingCheckpointStateOutputStream stream = fmsm.createCheckpointStateOutputStream(this.subtaskKey1, checkpointId, CheckpointedStateScope.EXCLUSIVE);
                try {
                    closeableRegistry.registerCloseable((AutoCloseable)stream);
                    for (int j = 0; j < perStreamWriteNum; ++j) {
                        stream.write((int)bytes[byteIndex++]);
                    }
                    handles[i] = stream.closeAndGetHandle();
                    continue;
                }
                catch (IOException e) {
                    throw new RuntimeException(e);
                }
            }
            byteIndex = 0;
            Path filePath = null;
            for (SegmentFileStateHandle handle : handles) {
                int readValue;
                Path thisFilePath = handle.getFilePath();
                Assertions.assertThat((filePath == null || filePath.equals((Object)thisFilePath) ? 1 : 0) != 0).isTrue();
                filePath = thisFilePath;
                FSDataInputStream is = handle.openInputStream();
                closeableRegistry.registerCloseable((AutoCloseable)is);
                while ((readValue = is.read()) != -1) {
                    Assertions.assertThat((byte)((byte)readValue)).isEqualTo(bytes[byteIndex++]);
                }
            }
        }
    }

    @Test
    public void testConcurrentWriting() throws Exception {
        long checkpointId = 1L;
        int numThreads = 12;
        int perStreamWriteNum = 128;
        HashSet<CompletableFuture<SegmentFileStateHandle>> futures = new HashSet<CompletableFuture<SegmentFileStateHandle>>();
        try (FileMergingSnapshotManager fmsm = this.createFileMergingSnapshotManager(this.checkpointBaseDir);
             CloseableRegistry closeableRegistry = new CloseableRegistry();){
            for (int i = 0; i < numThreads; ++i) {
                futures.add(CompletableFuture.supplyAsync(() -> {
                    FileMergingCheckpointStateOutputStream stream = fmsm.createCheckpointStateOutputStream(this.subtaskKey1, checkpointId, CheckpointedStateScope.EXCLUSIVE);
                    try {
                        closeableRegistry.registerCloseable((AutoCloseable)stream);
                        for (int j = 0; j < perStreamWriteNum; ++j) {
                            stream.write(j);
                        }
                        return stream.closeAndGetHandle();
                    }
                    catch (IOException e) {
                        throw new RuntimeException(e);
                    }
                }));
            }
            for (Future future : futures) {
                int readValue;
                SegmentFileStateHandle segmentFileStateHandle = (SegmentFileStateHandle)future.get();
                FSDataInputStream is = segmentFileStateHandle.openInputStream();
                closeableRegistry.registerCloseable((AutoCloseable)is);
                int expected = 0;
                while ((readValue = is.read()) != -1) {
                    Assertions.assertThat((int)readValue).isEqualTo(expected++);
                }
            }
        }
    }

    private FileMergingSnapshotManager createFileMergingSnapshotManager(Path checkpointBaseDir) throws IOException {
        LocalFileSystem fs = LocalFileSystem.getSharedInstance();
        Path sharedStateDir = new Path(checkpointBaseDir, "shared");
        Path taskOwnedStateDir = new Path(checkpointBaseDir, "taskowned");
        if (!fs.exists(checkpointBaseDir)) {
            fs.mkdirs(checkpointBaseDir);
            fs.mkdirs(sharedStateDir);
            fs.mkdirs(taskOwnedStateDir);
        }
        FileMergingSnapshotManager fmsm = new FileMergingSnapshotManagerBuilder("Testing").build();
        fmsm.initFileSystem((FileSystem)LocalFileSystem.getSharedInstance(), checkpointBaseDir, sharedStateDir, taskOwnedStateDir, this.writeBufferSize);
        Assertions.assertThat((Object)fmsm).isNotNull();
        return fmsm;
    }
}

