/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.core.fs;

import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.Random;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.fs.FSDataOutputStream;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.LimitedConnectionsFileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.fs.local.LocalFileSystem;
import org.apache.flink.core.testutils.CheckedThread;
import org.apache.flink.core.testutils.OneShotLatch;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

public class LimitedConnectionsFileSystemTest {
    @Rule
    public final TemporaryFolder tempFolder = new TemporaryFolder();

    @Test
    public void testConstructionNumericOverflow() {
        LimitedConnectionsFileSystem limitedFs = new LimitedConnectionsFileSystem((FileSystem)LocalFileSystem.getSharedInstance(), Integer.MAX_VALUE, Integer.MAX_VALUE, Integer.MAX_VALUE, 0x7FFFFFFFFFFFFFFEL, 0x7FFFFFFFFFFFFFFEL);
        Assert.assertEquals((long)Integer.MAX_VALUE, (long)limitedFs.getMaxNumOpenStreamsTotal());
        Assert.assertEquals((long)Integer.MAX_VALUE, (long)limitedFs.getMaxNumOpenOutputStreams());
        Assert.assertEquals((long)Integer.MAX_VALUE, (long)limitedFs.getMaxNumOpenInputStreams());
        Assert.assertTrue((limitedFs.getStreamOpenTimeout() > 0L ? 1 : 0) != 0);
        Assert.assertTrue((limitedFs.getStreamInactivityTimeout() > 0L ? 1 : 0) != 0);
    }

    @Test
    public void testLimitingOutputStreams() throws Exception {
        int maxConcurrentOpen = 2;
        int numThreads = 61;
        LimitedConnectionsFileSystem limitedFs = new LimitedConnectionsFileSystem((FileSystem)LocalFileSystem.getSharedInstance(), Integer.MAX_VALUE, 2, Integer.MAX_VALUE, 0L, 0L);
        WriterThread[] threads = new WriterThread[61];
        for (int i = 0; i < 61; ++i) {
            Path path = new Path(this.tempFolder.newFile().toURI());
            threads[i] = new WriterThread(limitedFs, path, 2, Integer.MAX_VALUE);
        }
        for (WriterThread t : threads) {
            t.start();
        }
        for (WriterThread t : threads) {
            t.sync();
        }
    }

    @Test
    public void testLimitingInputStreams() throws Exception {
        int maxConcurrentOpen = 2;
        int numThreads = 61;
        LimitedConnectionsFileSystem limitedFs = new LimitedConnectionsFileSystem((FileSystem)LocalFileSystem.getSharedInstance(), Integer.MAX_VALUE, Integer.MAX_VALUE, 2, 0L, 0L);
        Random rnd = new Random();
        ReaderThread[] threads = new ReaderThread[61];
        for (int i = 0; i < 61; ++i) {
            File file = this.tempFolder.newFile();
            this.createRandomContents(file, rnd);
            Path path = new Path(file.toURI());
            threads[i] = new ReaderThread(limitedFs, path, 2, Integer.MAX_VALUE);
        }
        for (ReaderThread t : threads) {
            t.start();
        }
        for (ReaderThread t : threads) {
            t.sync();
        }
    }

    @Test
    public void testLimitingMixedStreams() throws Exception {
        int maxConcurrentOpen = 2;
        int numThreads = 61;
        LimitedConnectionsFileSystem limitedFs = new LimitedConnectionsFileSystem((FileSystem)LocalFileSystem.getSharedInstance(), 2);
        Random rnd = new Random();
        CheckedThread[] threads = new CheckedThread[61];
        for (int i = 0; i < 61; ++i) {
            File file = this.tempFolder.newFile();
            Path path = new Path(file.toURI());
            if (rnd.nextBoolean()) {
                this.createRandomContents(file, rnd);
                threads[i] = new ReaderThread(limitedFs, path, Integer.MAX_VALUE, 2);
                continue;
            }
            threads[i] = new WriterThread(limitedFs, path, Integer.MAX_VALUE, 2);
        }
        for (CheckedThread t : threads) {
            t.start();
        }
        for (CheckedThread t : threads) {
            t.sync();
        }
    }

    @Test
    public void testOpenTimeoutOutputStreams() throws Exception {
        long openTimeout = 50L;
        int maxConcurrentOpen = 2;
        LimitedConnectionsFileSystem limitedFs = new LimitedConnectionsFileSystem((FileSystem)LocalFileSystem.getSharedInstance(), 2, 50L, 0L);
        BlockingWriterThread[] threads = new BlockingWriterThread[2];
        for (int i = 0; i < 2; ++i) {
            Path path = new Path(this.tempFolder.newFile().toURI());
            threads[i] = new BlockingWriterThread(limitedFs, path, Integer.MAX_VALUE, 2);
            threads[i].start();
        }
        while (limitedFs.getTotalNumberOfOpenStreams() < 2) {
            Thread.sleep(1L);
        }
        try {
            limitedFs.create(new Path(this.tempFolder.newFile().toURI()), FileSystem.WriteMode.OVERWRITE);
            Assert.fail((String)"this should have timed out");
        }
        catch (IOException iOException) {
            // empty catch block
        }
        for (BlockingWriterThread t : threads) {
            t.wakeup();
            t.sync();
        }
    }

    @Test
    public void testOpenTimeoutInputStreams() throws Exception {
        long openTimeout = 50L;
        int maxConcurrentOpen = 2;
        LimitedConnectionsFileSystem limitedFs = new LimitedConnectionsFileSystem((FileSystem)LocalFileSystem.getSharedInstance(), 2, 50L, 0L);
        Random rnd = new Random();
        BlockingReaderThread[] threads = new BlockingReaderThread[2];
        for (int i = 0; i < 2; ++i) {
            File file = this.tempFolder.newFile();
            this.createRandomContents(file, rnd);
            Path path = new Path(file.toURI());
            threads[i] = new BlockingReaderThread(limitedFs, path, 2, Integer.MAX_VALUE);
            threads[i].start();
        }
        while (limitedFs.getTotalNumberOfOpenStreams() < 2) {
            Thread.sleep(1L);
        }
        File file = this.tempFolder.newFile();
        this.createRandomContents(file, rnd);
        try {
            limitedFs.open(new Path(file.toURI()));
            Assert.fail((String)"this should have timed out");
        }
        catch (IOException iOException) {
            // empty catch block
        }
        for (BlockingReaderThread t : threads) {
            t.wakeup();
            t.sync();
        }
    }

    @Test
    public void testTerminateStalledOutputStreams() throws Exception {
        int i;
        int maxConcurrentOpen = 2;
        int numThreads = 20;
        LimitedConnectionsFileSystem limitedFs = new LimitedConnectionsFileSystem((FileSystem)LocalFileSystem.getSharedInstance(), Integer.MAX_VALUE, 2, Integer.MAX_VALUE, 0L, 50L);
        WriterThread[] threads = new WriterThread[20];
        BlockingWriterThread[] blockers = new BlockingWriterThread[20];
        for (i = 0; i < 20; ++i) {
            Path path1 = new Path(this.tempFolder.newFile().toURI());
            Path path2 = new Path(this.tempFolder.newFile().toURI());
            threads[i] = new WriterThread(limitedFs, path1, 2, Integer.MAX_VALUE);
            blockers[i] = new BlockingWriterThread(limitedFs, path2, 2, Integer.MAX_VALUE);
        }
        for (i = 0; i < 20; ++i) {
            blockers[i].start();
            threads[i].start();
        }
        for (WriterThread writerThread : threads) {
            try {
                writerThread.sync();
            }
            catch (LimitedConnectionsFileSystem.StreamTimeoutException streamTimeoutException) {
                // empty catch block
            }
        }
        for (CheckedThread checkedThread : blockers) {
            checkedThread.wakeup();
        }
        for (CheckedThread checkedThread : blockers) {
            try {
                checkedThread.sync();
            }
            catch (LimitedConnectionsFileSystem.StreamTimeoutException streamTimeoutException) {
                // empty catch block
            }
        }
    }

    @Test
    public void testTerminateStalledInputStreams() throws Exception {
        int i;
        int maxConcurrentOpen = 2;
        int numThreads = 20;
        LimitedConnectionsFileSystem limitedFs = new LimitedConnectionsFileSystem((FileSystem)LocalFileSystem.getSharedInstance(), Integer.MAX_VALUE, Integer.MAX_VALUE, 2, 0L, 50L);
        Random rnd = new Random();
        ReaderThread[] threads = new ReaderThread[20];
        BlockingReaderThread[] blockers = new BlockingReaderThread[20];
        for (i = 0; i < 20; ++i) {
            File file1 = this.tempFolder.newFile();
            File file2 = this.tempFolder.newFile();
            this.createRandomContents(file1, rnd);
            this.createRandomContents(file2, rnd);
            Path path = new Path(file1.toURI());
            Path path2 = new Path(file2.toURI());
            threads[i] = new ReaderThread(limitedFs, path, 2, Integer.MAX_VALUE);
            blockers[i] = new BlockingReaderThread(limitedFs, path2, 2, Integer.MAX_VALUE);
        }
        for (i = 0; i < 20; ++i) {
            blockers[i].start();
            threads[i].start();
        }
        for (ReaderThread readerThread : threads) {
            try {
                readerThread.sync();
            }
            catch (LimitedConnectionsFileSystem.StreamTimeoutException streamTimeoutException) {
                // empty catch block
            }
        }
        for (CheckedThread checkedThread : blockers) {
            checkedThread.wakeup();
        }
        for (CheckedThread checkedThread : blockers) {
            try {
                checkedThread.sync();
            }
            catch (LimitedConnectionsFileSystem.StreamTimeoutException streamTimeoutException) {
                // empty catch block
            }
        }
    }

    @Test
    public void testTerminateStalledMixedStreams() throws Exception {
        int i;
        int maxConcurrentOpen = 2;
        int numThreads = 20;
        LimitedConnectionsFileSystem limitedFs = new LimitedConnectionsFileSystem((FileSystem)LocalFileSystem.getSharedInstance(), 2, 0L, 50L);
        Random rnd = new Random();
        CheckedThread[] threads = new CheckedThread[20];
        BlockingThread[] blockers = new BlockingThread[20];
        for (i = 0; i < 20; ++i) {
            File file1 = this.tempFolder.newFile();
            File file2 = this.tempFolder.newFile();
            Path path = new Path(file1.toURI());
            Path path2 = new Path(file2.toURI());
            if (rnd.nextBoolean()) {
                this.createRandomContents(file1, rnd);
                this.createRandomContents(file2, rnd);
                threads[i] = new ReaderThread(limitedFs, path, 2, Integer.MAX_VALUE);
                blockers[i] = new BlockingReaderThread(limitedFs, path2, 2, Integer.MAX_VALUE);
                continue;
            }
            threads[i] = new WriterThread(limitedFs, path, 2, Integer.MAX_VALUE);
            blockers[i] = new BlockingWriterThread(limitedFs, path2, 2, Integer.MAX_VALUE);
        }
        for (i = 0; i < 20; ++i) {
            blockers[i].start();
            threads[i].start();
        }
        for (CheckedThread checkedThread : threads) {
            try {
                checkedThread.sync();
            }
            catch (LimitedConnectionsFileSystem.StreamTimeoutException streamTimeoutException) {
                // empty catch block
            }
        }
        for (CheckedThread checkedThread : blockers) {
            checkedThread.wakeup();
        }
        for (CheckedThread checkedThread : blockers) {
            try {
                checkedThread.sync();
            }
            catch (LimitedConnectionsFileSystem.StreamTimeoutException streamTimeoutException) {
                // empty catch block
            }
        }
    }

    @Test
    public void testFailingStreamsUnregister() throws Exception {
        LimitedConnectionsFileSystem fs = new LimitedConnectionsFileSystem((FileSystem)new FailFs(), 1);
        Assert.assertEquals((long)0L, (long)fs.getNumberOfOpenInputStreams());
        Assert.assertEquals((long)0L, (long)fs.getNumberOfOpenOutputStreams());
        Assert.assertEquals((long)0L, (long)fs.getTotalNumberOfOpenStreams());
        try {
            fs.open(new Path(this.tempFolder.newFile().toURI()));
            Assert.fail((String)"this is expected to fail with an exception");
        }
        catch (IOException iOException) {
            // empty catch block
        }
        try {
            fs.create(new Path(this.tempFolder.newFile().toURI()), FileSystem.WriteMode.NO_OVERWRITE);
            Assert.fail((String)"this is expected to fail with an exception");
        }
        catch (IOException iOException) {
            // empty catch block
        }
        Assert.assertEquals((long)0L, (long)fs.getNumberOfOpenInputStreams());
        Assert.assertEquals((long)0L, (long)fs.getNumberOfOpenOutputStreams());
        Assert.assertEquals((long)0L, (long)fs.getTotalNumberOfOpenStreams());
    }

    @Test
    public void testSlowOutputStreamNotClosed() throws Exception {
        LimitedConnectionsFileSystem fs = new LimitedConnectionsFileSystem((FileSystem)LocalFileSystem.getSharedInstance(), 1, 0L, 1000L);
        Random rnd = new Random();
        ReaderThread[] threads = new ReaderThread[10];
        for (int i = 0; i < threads.length; ++i) {
            File file = this.tempFolder.newFile();
            this.createRandomContents(file, rnd);
            ReaderThread[] path = new Path(file.toURI());
            threads[i] = new ReaderThread(fs, (Path)path, 1, Integer.MAX_VALUE);
        }
        try (FSDataOutputStream out = fs.create(new Path(this.tempFolder.newFile().toURI()), FileSystem.WriteMode.OVERWRITE);){
            for (ReaderThread t : threads) {
                t.start();
            }
            Thread.sleep(5L);
            for (int bytesLeft = 50; bytesLeft > 0; --bytesLeft) {
                out.write(bytesLeft);
                Thread.sleep(5L);
            }
        }
        for (ReaderThread t : threads) {
            t.sync();
        }
    }

    @Test
    public void testSlowInputStreamNotClosed() throws Exception {
        File file = this.tempFolder.newFile();
        this.createRandomContents(file, new Random(), 50);
        LimitedConnectionsFileSystem fs = new LimitedConnectionsFileSystem((FileSystem)LocalFileSystem.getSharedInstance(), 1, 0L, 1000L);
        WriterThread[] threads = new WriterThread[10];
        for (int i = 0; i < threads.length; ++i) {
            Path path = new Path(this.tempFolder.newFile().toURI());
            threads[i] = new WriterThread(fs, path, 1, Integer.MAX_VALUE);
        }
        try (FSDataInputStream in = fs.open(new Path(file.toURI()));){
            for (WriterThread t : threads) {
                t.start();
            }
            Thread.sleep(5L);
            while (in.read() != -1) {
                Thread.sleep(5L);
            }
        }
        for (WriterThread t : threads) {
            t.sync();
        }
    }

    private void createRandomContents(File file, Random rnd) throws IOException {
        this.createRandomContents(file, rnd, rnd.nextInt(10000) + 1);
    }

    private void createRandomContents(File file, Random rnd, int size) throws IOException {
        byte[] data = new byte[size];
        rnd.nextBytes(data);
        try (FileOutputStream fos = new FileOutputStream(file);){
            fos.write(data);
        }
    }

    private static class FailFs
    extends LocalFileSystem {
        private FailFs() {
        }

        public FSDataOutputStream create(Path filePath, FileSystem.WriteMode overwrite) throws IOException {
            throw new IOException("test exception");
        }

        public FSDataInputStream open(Path f) throws IOException {
            throw new IOException("test exception");
        }
    }

    private static final class BlockingReaderThread
    extends BlockingThread {
        private final LimitedConnectionsFileSystem fs;
        private final Path path;
        private final int maxConcurrentInputStreams;
        private final int maxConcurrentStreamsTotal;

        BlockingReaderThread(LimitedConnectionsFileSystem fs, Path path, int maxConcurrentInputStreams, int maxConcurrentStreamsTotal) {
            this.fs = fs;
            this.path = path;
            this.maxConcurrentInputStreams = maxConcurrentInputStreams;
            this.maxConcurrentStreamsTotal = maxConcurrentStreamsTotal;
        }

        public void go() throws Exception {
            try (FSDataInputStream stream = this.fs.open(this.path);){
                Assert.assertTrue((this.fs.getNumberOfOpenInputStreams() <= this.maxConcurrentInputStreams ? 1 : 0) != 0);
                Assert.assertTrue((this.fs.getTotalNumberOfOpenStreams() <= this.maxConcurrentStreamsTotal ? 1 : 0) != 0);
                byte[] readBuffer = new byte[(int)this.fs.getFileStatus(this.path).getLen() - 1];
                Assert.assertTrue((stream.read(readBuffer) != -1 ? 1 : 0) != 0);
                this.waitTillWokenUp();
                stream.read();
            }
        }
    }

    private static final class BlockingWriterThread
    extends BlockingThread {
        private final LimitedConnectionsFileSystem fs;
        private final Path path;
        private final int maxConcurrentOutputStreams;
        private final int maxConcurrentStreamsTotal;

        BlockingWriterThread(LimitedConnectionsFileSystem fs, Path path, int maxConcurrentOutputStreams, int maxConcurrentStreamsTotal) {
            this.fs = fs;
            this.path = path;
            this.maxConcurrentOutputStreams = maxConcurrentOutputStreams;
            this.maxConcurrentStreamsTotal = maxConcurrentStreamsTotal;
        }

        public void go() throws Exception {
            try (FSDataOutputStream stream = this.fs.create(this.path, FileSystem.WriteMode.OVERWRITE);){
                Assert.assertTrue((this.fs.getNumberOfOpenOutputStreams() <= this.maxConcurrentOutputStreams ? 1 : 0) != 0);
                Assert.assertTrue((this.fs.getTotalNumberOfOpenStreams() <= this.maxConcurrentStreamsTotal ? 1 : 0) != 0);
                Random rnd = new Random();
                byte[] data = new byte[rnd.nextInt(10000) + 1];
                rnd.nextBytes(data);
                stream.write(data);
                this.waitTillWokenUp();
                stream.write(rnd.nextInt());
            }
        }
    }

    private static abstract class BlockingThread
    extends CheckedThread {
        private final OneShotLatch waiter = new OneShotLatch();

        private BlockingThread() {
        }

        public void waitTillWokenUp() throws InterruptedException {
            this.waiter.await();
        }

        public void wakeup() {
            this.waiter.trigger();
        }
    }

    private static final class ReaderThread
    extends CheckedThread {
        private final LimitedConnectionsFileSystem fs;
        private final Path path;
        private final int maxConcurrentInputStreams;
        private final int maxConcurrentStreamsTotal;

        ReaderThread(LimitedConnectionsFileSystem fs, Path path, int maxConcurrentInputStreams, int maxConcurrentStreamsTotal) {
            this.fs = fs;
            this.path = path;
            this.maxConcurrentInputStreams = maxConcurrentInputStreams;
            this.maxConcurrentStreamsTotal = maxConcurrentStreamsTotal;
        }

        public void go() throws Exception {
            try (FSDataInputStream stream = this.fs.open(this.path);){
                Assert.assertTrue((this.fs.getNumberOfOpenInputStreams() <= this.maxConcurrentInputStreams ? 1 : 0) != 0);
                Assert.assertTrue((this.fs.getTotalNumberOfOpenStreams() <= this.maxConcurrentStreamsTotal ? 1 : 0) != 0);
                byte[] readBuffer = new byte[4096];
                while (stream.read(readBuffer) != -1) {
                }
            }
        }
    }

    private static final class WriterThread
    extends CheckedThread {
        private final LimitedConnectionsFileSystem fs;
        private final Path path;
        private final int maxConcurrentOutputStreams;
        private final int maxConcurrentStreamsTotal;

        WriterThread(LimitedConnectionsFileSystem fs, Path path, int maxConcurrentOutputStreams, int maxConcurrentStreamsTotal) {
            this.fs = fs;
            this.path = path;
            this.maxConcurrentOutputStreams = maxConcurrentOutputStreams;
            this.maxConcurrentStreamsTotal = maxConcurrentStreamsTotal;
        }

        public void go() throws Exception {
            try (FSDataOutputStream stream = this.fs.create(this.path, FileSystem.WriteMode.OVERWRITE);){
                Assert.assertTrue((this.fs.getNumberOfOpenOutputStreams() <= this.maxConcurrentOutputStreams ? 1 : 0) != 0);
                Assert.assertTrue((this.fs.getTotalNumberOfOpenStreams() <= this.maxConcurrentStreamsTotal ? 1 : 0) != 0);
                Random rnd = new Random();
                byte[] data = new byte[rnd.nextInt(10000) + 1];
                rnd.nextBytes(data);
                stream.write(data);
            }
        }
    }
}

