/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.core.fs;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.fs.FileStatus;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
import org.apache.flink.core.fs.RecoverableWriter;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.util.IOUtils;
import org.apache.flink.util.StringUtils;
import org.apache.flink.util.TestLogger;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public abstract class AbstractRecoverableWriterTest
extends TestLogger {
    private static final Random RND = new Random();
    private static final String testData1 = "THIS IS A TEST 1.";
    private static final String testData2 = "THIS IS A TEST 2.";
    private static final String testData3 = "THIS IS A TEST 3.";
    private Path basePathForTest;
    private static FileSystem fileSystem;
    private static final String INIT_EMPTY_PERSIST = "EMPTY";
    private static final String INTERM_WITH_STATE_PERSIST = "INTERM-STATE";
    private static final String INTERM_WITH_NO_ADDITIONAL_STATE_PERSIST = "INTERM-IMEDIATE";
    private static final String FINAL_WITH_EXTRA_STATE = "FINAL";

    public abstract Path getBasePath() throws Exception;

    public abstract FileSystem initializeFileSystem() throws Exception;

    public Path getBasePathForTest() {
        return this.basePathForTest;
    }

    private FileSystem getFileSystem() throws Exception {
        if (fileSystem == null) {
            fileSystem = this.initializeFileSystem();
        }
        return fileSystem;
    }

    private RecoverableWriter getNewFileSystemWriter() throws Exception {
        return this.getFileSystem().createRecoverableWriter();
    }

    @Before
    public void prepare() throws Exception {
        this.basePathForTest = new Path(this.getBasePath(), AbstractRecoverableWriterTest.randomName());
        this.getFileSystem().mkdirs(this.basePathForTest);
    }

    @After
    public void cleanup() throws Exception {
        this.getFileSystem().delete(this.basePathForTest, true);
    }

    @Test
    public void testCloseWithNoData() throws Exception {
        RecoverableWriter writer = this.getNewFileSystemWriter();
        Path testDir = this.getBasePathForTest();
        Path path = new Path(testDir, "part-0");
        RecoverableFsDataOutputStream stream = writer.open(path);
        for (Map.Entry<Path, String> fileContents : this.getFileContentByPath(testDir).entrySet()) {
            Assert.assertTrue((boolean)fileContents.getKey().getName().startsWith(".part-0.inprogress."));
            Assert.assertTrue((boolean)fileContents.getValue().isEmpty());
        }
        stream.closeForCommit().commit();
        for (Map.Entry<Path, String> fileContents : this.getFileContentByPath(testDir).entrySet()) {
            Assert.assertEquals((Object)"part-0", (Object)fileContents.getKey().getName());
            Assert.assertTrue((boolean)fileContents.getValue().isEmpty());
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testCommitAfterNormalClose() throws Exception {
        RecoverableWriter writer = this.getNewFileSystemWriter();
        Path testDir = this.getBasePathForTest();
        Path path = new Path(testDir, "part-0");
        RecoverableFsDataOutputStream stream = null;
        try {
            stream = writer.open(path);
            stream.write(testData1.getBytes(StandardCharsets.UTF_8));
            stream.closeForCommit().commit();
            for (Map.Entry<Path, String> fileContents : this.getFileContentByPath(testDir).entrySet()) {
                Assert.assertEquals((Object)"part-0", (Object)fileContents.getKey().getName());
                Assert.assertEquals((Object)testData1, (Object)fileContents.getValue());
            }
        }
        finally {
            IOUtils.closeQuietly((AutoCloseable)stream);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testCommitAfterPersist() throws Exception {
        RecoverableWriter writer = this.getNewFileSystemWriter();
        Path testDir = this.getBasePathForTest();
        Path path = new Path(testDir, "part-0");
        RecoverableFsDataOutputStream stream = null;
        try {
            stream = writer.open(path);
            stream.write(testData1.getBytes(StandardCharsets.UTF_8));
            stream.persist();
            stream.write(testData2.getBytes(StandardCharsets.UTF_8));
            stream.closeForCommit().commit();
            for (Map.Entry<Path, String> fileContents : this.getFileContentByPath(testDir).entrySet()) {
                Assert.assertEquals((Object)"part-0", (Object)fileContents.getKey().getName());
                Assert.assertEquals((Object)"THIS IS A TEST 1.THIS IS A TEST 2.", (Object)fileContents.getValue());
            }
        }
        finally {
            IOUtils.closeQuietly((AutoCloseable)stream);
        }
    }

    @Test
    public void testRecoverWithEmptyState() throws Exception {
        this.testResumeAfterMultiplePersist(INIT_EMPTY_PERSIST, "", testData3);
    }

    @Test
    public void testRecoverWithState() throws Exception {
        this.testResumeAfterMultiplePersist(INTERM_WITH_STATE_PERSIST, testData1, "THIS IS A TEST 1.THIS IS A TEST 3.");
    }

    @Test
    public void testRecoverFromIntermWithoutAdditionalState() throws Exception {
        this.testResumeAfterMultiplePersist(INTERM_WITH_NO_ADDITIONAL_STATE_PERSIST, testData1, "THIS IS A TEST 1.THIS IS A TEST 3.");
    }

    @Test
    public void testRecoverAfterMultiplePersistsState() throws Exception {
        this.testResumeAfterMultiplePersist(FINAL_WITH_EXTRA_STATE, "THIS IS A TEST 1.THIS IS A TEST 2.", "THIS IS A TEST 1.THIS IS A TEST 2.THIS IS A TEST 3.");
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void testResumeAfterMultiplePersist(String persistName, String expectedPostRecoveryContents, String expectedFinalContents) throws Exception {
        Path testDir = this.getBasePathForTest();
        Path path = new Path(testDir, "part-0");
        RecoverableWriter initWriter = this.getNewFileSystemWriter();
        HashMap<String, RecoverableWriter.ResumeRecoverable> recoverables = new HashMap<String, RecoverableWriter.ResumeRecoverable>(4);
        RecoverableFsDataOutputStream stream = null;
        try {
            stream = initWriter.open(path);
            recoverables.put(INIT_EMPTY_PERSIST, stream.persist());
            stream.write(testData1.getBytes(StandardCharsets.UTF_8));
            recoverables.put(INTERM_WITH_STATE_PERSIST, stream.persist());
            recoverables.put(INTERM_WITH_NO_ADDITIONAL_STATE_PERSIST, stream.persist());
            stream.write(testData2.getBytes(StandardCharsets.UTF_8));
            recoverables.put(FINAL_WITH_EXTRA_STATE, stream.persist());
        }
        finally {
            IOUtils.closeQuietly((AutoCloseable)stream);
        }
        SimpleVersionedSerializer serializer = initWriter.getResumeRecoverableSerializer();
        byte[] serializedRecoverable = serializer.serialize(recoverables.get(persistName));
        RecoverableWriter newWriter = this.getNewFileSystemWriter();
        SimpleVersionedSerializer deserializer = newWriter.getResumeRecoverableSerializer();
        RecoverableWriter.ResumeRecoverable recoveredRecoverable = (RecoverableWriter.ResumeRecoverable)deserializer.deserialize(serializer.getVersion(), serializedRecoverable);
        RecoverableFsDataOutputStream recoveredStream = null;
        try {
            recoveredStream = newWriter.recover(recoveredRecoverable);
            Map<Path, String> files = this.getFileContentByPath(testDir);
            Assert.assertEquals((long)1L, (long)files.size());
            for (Map.Entry<Path, String> fileContents : files.entrySet()) {
                Assert.assertTrue((boolean)fileContents.getKey().getName().startsWith(".part-0.inprogress."));
                Assert.assertEquals((Object)expectedPostRecoveryContents, (Object)fileContents.getValue());
            }
            recoveredStream.write(testData3.getBytes(StandardCharsets.UTF_8));
            recoveredStream.closeForCommit().commit();
            files = this.getFileContentByPath(testDir);
            Assert.assertEquals((long)1L, (long)files.size());
            for (Map.Entry<Path, String> fileContents : files.entrySet()) {
                Assert.assertEquals((Object)"part-0", (Object)fileContents.getKey().getName());
                Assert.assertEquals((Object)expectedFinalContents, (Object)fileContents.getValue());
            }
        }
        finally {
            IOUtils.closeQuietly((AutoCloseable)recoveredStream);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testCommitAfterRecovery() throws Exception {
        RecoverableWriter.CommitRecoverable recoverable;
        Path testDir = this.getBasePathForTest();
        Path path = new Path(testDir, "part-0");
        RecoverableWriter initWriter = this.getNewFileSystemWriter();
        RecoverableFsDataOutputStream stream = null;
        try {
            stream = initWriter.open(path);
            stream.write(testData1.getBytes(StandardCharsets.UTF_8));
            stream.persist();
            stream.persist();
            stream.write(testData2.getBytes(StandardCharsets.UTF_8));
            recoverable = stream.closeForCommit().getRecoverable();
        }
        finally {
            IOUtils.closeQuietly((AutoCloseable)stream);
        }
        byte[] serializedRecoverable = initWriter.getCommitRecoverableSerializer().serialize((Object)recoverable);
        RecoverableWriter newWriter = this.getNewFileSystemWriter();
        SimpleVersionedSerializer deserializer = newWriter.getCommitRecoverableSerializer();
        RecoverableWriter.CommitRecoverable recoveredRecoverable = (RecoverableWriter.CommitRecoverable)deserializer.deserialize(deserializer.getVersion(), serializedRecoverable);
        RecoverableFsDataOutputStream.Committer committer = newWriter.recoverForCommit(recoveredRecoverable);
        committer.commitAfterRecovery();
        Map<Path, String> files = this.getFileContentByPath(testDir);
        Assert.assertEquals((long)1L, (long)files.size());
        for (Map.Entry<Path, String> fileContents : files.entrySet()) {
            Assert.assertEquals((Object)"part-0", (Object)fileContents.getKey().getName());
            Assert.assertEquals((Object)"THIS IS A TEST 1.THIS IS A TEST 2.", (Object)fileContents.getValue());
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(expected=IOException.class)
    public void testExceptionWritingAfterCloseForCommit() throws Exception {
        Path testDir = this.getBasePathForTest();
        RecoverableWriter writer = this.getNewFileSystemWriter();
        Path path = new Path(testDir, "part-0");
        RecoverableFsDataOutputStream stream = null;
        try {
            stream = writer.open(path);
            stream.write(testData1.getBytes(StandardCharsets.UTF_8));
            stream.closeForCommit().getRecoverable();
            stream.write(testData2.getBytes(StandardCharsets.UTF_8));
            Assert.fail();
        }
        finally {
            IOUtils.closeQuietly((AutoCloseable)stream);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(expected=IOException.class)
    public void testResumeAfterCommit() throws Exception {
        RecoverableWriter.ResumeRecoverable recoverable;
        Path testDir = this.getBasePathForTest();
        RecoverableWriter writer = this.getNewFileSystemWriter();
        Path path = new Path(testDir, "part-0");
        RecoverableFsDataOutputStream stream = null;
        try {
            stream = writer.open(path);
            stream.write(testData1.getBytes(StandardCharsets.UTF_8));
            recoverable = stream.persist();
            stream.write(testData2.getBytes(StandardCharsets.UTF_8));
            stream.closeForCommit().commit();
        }
        finally {
            IOUtils.closeQuietly((AutoCloseable)stream);
        }
        writer.recover(recoverable);
        Assert.fail();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testResumeWithWrongOffset() throws Exception {
        Throwable throwable;
        RecoverableFsDataOutputStream ignored;
        RecoverableWriter.ResumeRecoverable recoverable2;
        RecoverableWriter.ResumeRecoverable recoverable1;
        Path testDir = this.getBasePathForTest();
        RecoverableWriter writer = this.getNewFileSystemWriter();
        Path path = new Path(testDir, "part-0");
        RecoverableFsDataOutputStream stream = null;
        try {
            stream = writer.open(path);
            stream.write(testData1.getBytes(StandardCharsets.UTF_8));
            recoverable1 = stream.persist();
            stream.write(testData2.getBytes(StandardCharsets.UTF_8));
            recoverable2 = stream.persist();
            stream.write(testData3.getBytes(StandardCharsets.UTF_8));
        }
        finally {
            IOUtils.closeQuietly((AutoCloseable)stream);
        }
        try {
            ignored = writer.recover(recoverable1);
            throwable = null;
            if (ignored != null) {
                if (throwable != null) {
                    try {
                        ignored.close();
                    }
                    catch (Throwable throwable2) {
                        throwable.addSuppressed(throwable2);
                    }
                } else {
                    ignored.close();
                }
            }
        }
        catch (Exception e) {
            Assert.fail();
        }
        try {
            ignored = writer.recover(recoverable2);
            throwable = null;
            try {
                Assert.fail();
            }
            catch (Throwable throwable3) {
                throwable = throwable3;
                throw throwable3;
            }
            finally {
                if (ignored != null) {
                    if (throwable != null) {
                        try {
                            ignored.close();
                        }
                        catch (Throwable throwable4) {
                            throwable.addSuppressed(throwable4);
                        }
                    } else {
                        ignored.close();
                    }
                }
            }
        }
        catch (IOException e) {
            return;
        }
        Assert.fail();
    }

    private Map<Path, String> getFileContentByPath(Path directory) throws Exception {
        FileStatus[] filesInBucket;
        HashMap<Path, String> contents = new HashMap<Path, String>();
        for (FileStatus file : filesInBucket = this.getFileSystem().listStatus(directory)) {
            long fileLength = file.getLen();
            byte[] serContents = new byte[(int)fileLength];
            try (FSDataInputStream stream = this.getFileSystem().open(file.getPath());){
                stream.read(serContents);
            }
            contents.put(file.getPath(), new String(serContents, StandardCharsets.UTF_8));
        }
        return contents;
    }

    private static String randomName() {
        return StringUtils.getRandomString((Random)RND, (int)16, (int)16, (char)'a', (char)'z');
    }
}

