/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.state.filesystem;

import java.io.DataInputStream;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.util.Random;
import java.util.UUID;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.fs.EntropyInjector;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.fs.FSDataOutputStream;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.OutputStreamAndPath;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.checkpoint.filemerging.PhysicalFile;
import org.apache.flink.runtime.checkpoint.filemerging.SegmentFileStateHandle;
import org.apache.flink.runtime.state.CheckpointedStateScope;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.filesystem.FileMergingCheckpointStateOutputStream;
import org.apache.flink.util.Preconditions;
import org.assertj.core.api.Assertions;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

public class FileMergingCheckpointStateOutputStreamTest {
    @Rule
    public final TemporaryFolder tempDir = new TemporaryFolder();
    private static boolean failWhenClosePhysicalFile = false;
    private static final String CLOSE_FILE_FAILURE_MESSAGE = "Cannot close physical file.";
    private static final int WRITE_BUFFER_SIZE = 256;
    private static boolean isPhysicalFileProvided = false;
    private static boolean physicalFileCanBeReused;
    private static PhysicalFile lastPhysicalFile;

    @Before
    public void setEnv() {
        failWhenClosePhysicalFile = false;
        physicalFileCanBeReused = false;
    }

    private FileMergingCheckpointStateOutputStream getNewStream() throws IOException {
        return this.getNewStream(false);
    }

    private FileMergingCheckpointStateOutputStream getNewStream(boolean reuseLastPhysicalFile) throws IOException {
        PhysicalFile physicalFile;
        if (reuseLastPhysicalFile) {
            Assertions.assertThat((Object)lastPhysicalFile).isNotNull();
            physicalFile = lastPhysicalFile;
        } else {
            Path dirPath = Path.fromLocalFile((File)this.tempDir.newFolder());
            String fileName = UUID.randomUUID().toString();
            Path physicalFilePath = new Path(dirPath, fileName);
            OutputStreamAndPath streamAndPath = EntropyInjector.createEntropyAware((FileSystem)dirPath.getFileSystem(), (Path)physicalFilePath, (FileSystem.WriteMode)FileSystem.WriteMode.NO_OVERWRITE);
            physicalFile = new PhysicalFile(streamAndPath.stream(), physicalFilePath, path -> {}, CheckpointedStateScope.EXCLUSIVE);
        }
        isPhysicalFileProvided = false;
        return new FileMergingCheckpointStateOutputStream(256, new FileMergingCheckpointStateOutputStream.FileMergingSnapshotManagerProxy(){

            public Tuple2<FSDataOutputStream, Path> providePhysicalFile() {
                isPhysicalFileProvided = true;
                lastPhysicalFile = physicalFile;
                Preconditions.checkArgument((boolean)physicalFile.isOpen());
                return new Tuple2((Object)physicalFile.getOutputStream(), (Object)physicalFile.getFilePath());
            }

            public SegmentFileStateHandle closeStreamAndCreateStateHandle(Path filePath, long startPos, long stateSize) throws IOException {
                if (isPhysicalFileProvided) {
                    if (failWhenClosePhysicalFile) {
                        throw new IOException(FileMergingCheckpointStateOutputStreamTest.CLOSE_FILE_FAILURE_MESSAGE);
                    }
                    if (!physicalFileCanBeReused) {
                        physicalFile.close();
                    }
                }
                return new SegmentFileStateHandle(filePath, startPos, stateSize, CheckpointedStateScope.EXCLUSIVE);
            }

            public void closeStreamExceptionally() throws IOException {
                if (isPhysicalFileProvided) {
                    if (failWhenClosePhysicalFile) {
                        throw new IOException(FileMergingCheckpointStateOutputStreamTest.CLOSE_FILE_FAILURE_MESSAGE);
                    }
                    physicalFile.close();
                }
            }
        });
    }

    @Test
    public void testGetHandleFromStream() throws IOException {
        FileMergingCheckpointStateOutputStream stream = this.getNewStream();
        Assertions.assertThat((boolean)isPhysicalFileProvided).isFalse();
        Assertions.assertThat((Object)stream.closeAndGetHandle()).isNull();
        stream = this.getNewStream();
        stream.flush();
        Assertions.assertThat((boolean)isPhysicalFileProvided).isFalse();
        Assertions.assertThat((Object)stream.closeAndGetHandle()).isNull();
        stream = this.getNewStream();
        stream.flushToFile();
        Assertions.assertThat((boolean)isPhysicalFileProvided).isTrue();
        SegmentFileStateHandle stateHandle = stream.closeAndGetHandle();
        Assertions.assertThat((Object)stateHandle).isNotNull();
        Assertions.assertThat((long)stateHandle.getStateSize()).isEqualTo(0L);
        stream = this.getNewStream();
        stream.write(new byte[0]);
        stream.flushToFile();
        stateHandle = stream.closeAndGetHandle();
        Assertions.assertThat((Object)stateHandle).isNotNull();
        Assertions.assertThat((long)stateHandle.getStateSize()).isEqualTo(0L);
        stream = this.getNewStream();
        stream.write(new byte[10]);
        stream.flushToFile();
        stateHandle = stream.closeAndGetHandle();
        Assertions.assertThat((Object)stateHandle).isNotNull();
        Assertions.assertThat((long)stateHandle.getStateSize()).isEqualTo(10L);
        stream = this.getNewStream();
        stream.write(new byte[10]);
        stateHandle = stream.closeAndGetHandle();
        Assertions.assertThat((Object)stateHandle).isNotNull();
        Assertions.assertThat((long)stateHandle.getStateSize()).isEqualTo(10L);
    }

    @Test
    public void testGetHandleFromClosedStream() throws IOException {
        FileMergingCheckpointStateOutputStream stream = this.getNewStream();
        stream.close();
        try {
            stream.closeAndGetHandle();
        }
        catch (Exception exception) {
            // empty catch block
        }
    }

    @Test
    public void testWhetherFileIsCreatedWhenWritingStream() throws IOException {
        FileMergingCheckpointStateOutputStream stream = this.getNewStream();
        stream.write(new byte[255]);
        Assertions.assertThat((boolean)isPhysicalFileProvided).isFalse();
        stream.write(new byte[2]);
        Assertions.assertThat((boolean)isPhysicalFileProvided).isTrue();
        stream = this.getNewStream();
        stream.write(new byte[256]);
        Assertions.assertThat((boolean)isPhysicalFileProvided).isTrue();
        stream = this.getNewStream();
        stream.write(new byte[255]);
        stream.close();
        Assertions.assertThat((boolean)isPhysicalFileProvided).isFalse();
        stream = this.getNewStream();
        stream.write(new byte[255]);
        stream.closeAndGetHandle();
        Assertions.assertThat((boolean)isPhysicalFileProvided).isTrue();
    }

    @Test
    public void testCloseStream() throws IOException {
        block4: {
            FileMergingCheckpointStateOutputStream stream = this.getNewStream();
            stream.flushToFile();
            stream.close();
            stream.write(new byte[0]);
            try {
                stream.write(new byte[1]);
            }
            catch (IOException e) {
                Assertions.assertThat((String)e.getMessage()).isEqualTo("Cannot call flushToFile() to a closed stream.");
            }
            failWhenClosePhysicalFile = true;
            stream = this.getNewStream();
            stream.flushToFile();
            Assertions.assertThat((boolean)isPhysicalFileProvided).isTrue();
            stream.close();
            stream = this.getNewStream();
            stream.flushToFile();
            Assertions.assertThat((boolean)isPhysicalFileProvided).isTrue();
            try {
                stream.closeAndGetHandle();
            }
            catch (IOException e) {
                if (e.getMessage().equals(CLOSE_FILE_FAILURE_MESSAGE)) break block4;
                throw e;
            }
        }
    }

    @Test
    public void testStateAboveBufferSize() throws Exception {
        this.runTest(576446);
    }

    @Test
    public void testStateUnderBufferSize() throws Exception {
        this.runTest(100);
    }

    @Test
    public void testGetPos() throws Exception {
        FileMergingCheckpointStateOutputStream stream = this.getNewStream();
        for (int i = 0; i < 64; ++i) {
            Assertions.assertThat((long)stream.getPos()).isEqualTo((long)i);
            stream.write(66);
        }
        stream.closeAndGetHandle();
        stream = this.getNewStream();
        Random rnd = new Random();
        long expectedPos = 0L;
        for (int i = 0; i < 7; ++i) {
            int numBytes = rnd.nextInt(16);
            stream.write(new byte[numBytes]);
            Assertions.assertThat((long)stream.getPos()).isEqualTo(expectedPos += (long)numBytes);
        }
        physicalFileCanBeReused = true;
        SegmentFileStateHandle stateHandle = stream.closeAndGetHandle();
        Assertions.assertThat((Object)stateHandle).isNotNull();
        expectedPos = 0L;
        stream = this.getNewStream(true);
        stream.flushToFile();
        for (int i = 0; i < 7; ++i) {
            int numBytes = rnd.nextInt(16);
            stream.write(new byte[numBytes]);
            Assertions.assertThat((long)stream.getPos()).isEqualTo(expectedPos += (long)numBytes);
        }
        stream.closeAndGetHandle();
    }

    @Test
    public void testCannotReuseClosedFile() throws IOException {
        FileMergingCheckpointStateOutputStream stream = this.getNewStream();
        stream.flushToFile();
        Assertions.assertThat((boolean)isPhysicalFileProvided).isTrue();
        stream.close();
        stream = this.getNewStream(true);
        try {
            stream.flushToFile();
            Assertions.fail((String)"Cannot reuse a closed physical file.");
        }
        catch (IllegalArgumentException illegalArgumentException) {
            // empty catch block
        }
    }

    @Test
    public void testWriteFailsFastWhenClosed() throws Exception {
        FileMergingCheckpointStateOutputStream stream = this.getNewStream();
        stream.flushToFile();
        Assertions.assertThat((boolean)isPhysicalFileProvided).isTrue();
        stream.close();
        try {
            stream.write(1);
            Assertions.fail((String)"Cannot reuse a closed physical file.");
        }
        catch (IOException iOException) {
            // empty catch block
        }
    }

    private void runTest(int numBytes) throws Exception {
        FileMergingCheckpointStateOutputStream stream = this.getNewStream();
        Random rnd = new Random();
        byte[] original = new byte[numBytes];
        byte[] bytes = new byte[original.length];
        rnd.nextBytes(original);
        System.arraycopy(original, 0, bytes, 0, original.length);
        int pos = 0;
        while (pos < bytes.length) {
            boolean single = rnd.nextBoolean();
            if (single) {
                stream.write((int)bytes[pos++]);
                continue;
            }
            int num = rnd.nextBoolean() ? bytes.length - pos : rnd.nextInt(bytes.length - pos);
            stream.write(bytes, pos, num);
            pos += num;
        }
        StreamStateHandle handle = stream.closeAndGetHandle();
        Assertions.assertThat((Object)handle).isNotNull();
        Assertions.assertThat((byte[])bytes).containsExactly(original);
        try (FSDataInputStream inStream = handle.openInputStream();){
            byte[] validation = new byte[bytes.length];
            DataInputStream dataInputStream = new DataInputStream((InputStream)inStream);
            dataInputStream.readFully(validation);
            Assertions.assertThat((byte[])validation).containsExactly(bytes);
        }
        handle.discardState();
    }
}

