/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.core.fs;

import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.Random;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.fs.FSDataOutputStream;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.LimitedConnectionsFileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.fs.local.LocalFileSystem;
import org.apache.flink.core.testutils.CheckedThread;
import org.apache.flink.core.testutils.OneShotLatch;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

class LimitedConnectionsFileSystemTest {
    @TempDir
    public File tempFolder;

    LimitedConnectionsFileSystemTest() {
    }

    @Test
    void testConstructionNumericOverflow() {
        LimitedConnectionsFileSystem limitedFs = new LimitedConnectionsFileSystem((FileSystem)LocalFileSystem.getSharedInstance(), Integer.MAX_VALUE, Integer.MAX_VALUE, Integer.MAX_VALUE, 0x7FFFFFFFFFFFFFFEL, 0x7FFFFFFFFFFFFFFEL);
        Assertions.assertThat((int)limitedFs.getMaxNumOpenStreamsTotal()).isEqualTo(Integer.MAX_VALUE);
        Assertions.assertThat((int)limitedFs.getMaxNumOpenOutputStreams()).isEqualTo(Integer.MAX_VALUE);
        Assertions.assertThat((int)limitedFs.getMaxNumOpenInputStreams()).isEqualTo(Integer.MAX_VALUE);
        Assertions.assertThat((long)limitedFs.getStreamOpenTimeout()).isPositive();
        Assertions.assertThat((long)limitedFs.getStreamInactivityTimeout()).isPositive();
    }

    @Test
    void testLimitingOutputStreams() throws Exception {
        int maxConcurrentOpen = 2;
        int numThreads = 61;
        LimitedConnectionsFileSystem limitedFs = new LimitedConnectionsFileSystem((FileSystem)LocalFileSystem.getSharedInstance(), Integer.MAX_VALUE, 2, Integer.MAX_VALUE, 0L, 0L);
        WriterThread[] threads = new WriterThread[61];
        for (int i = 0; i < 61; ++i) {
            Path path = new Path(File.createTempFile("junit", null, this.tempFolder).toURI());
            threads[i] = new WriterThread(limitedFs, path, 2, Integer.MAX_VALUE);
        }
        for (WriterThread t : threads) {
            t.start();
        }
        for (WriterThread t : threads) {
            t.sync();
        }
    }

    @Test
    void testLimitingInputStreams() throws Exception {
        int maxConcurrentOpen = 2;
        int numThreads = 61;
        LimitedConnectionsFileSystem limitedFs = new LimitedConnectionsFileSystem((FileSystem)LocalFileSystem.getSharedInstance(), Integer.MAX_VALUE, Integer.MAX_VALUE, 2, 0L, 0L);
        Random rnd = new Random();
        ReaderThread[] threads = new ReaderThread[61];
        for (int i = 0; i < 61; ++i) {
            File file = File.createTempFile("junit", null, this.tempFolder);
            this.createRandomContents(file, rnd);
            Path path = new Path(file.toURI());
            threads[i] = new ReaderThread(limitedFs, path, 2, Integer.MAX_VALUE);
        }
        for (ReaderThread t : threads) {
            t.start();
        }
        for (ReaderThread t : threads) {
            t.sync();
        }
    }

    @Test
    void testLimitingMixedStreams() throws Exception {
        int maxConcurrentOpen = 2;
        int numThreads = 61;
        LimitedConnectionsFileSystem limitedFs = new LimitedConnectionsFileSystem((FileSystem)LocalFileSystem.getSharedInstance(), 2);
        Random rnd = new Random();
        CheckedThread[] threads = new CheckedThread[61];
        for (int i = 0; i < 61; ++i) {
            File file = File.createTempFile("junit", null, this.tempFolder);
            Path path = new Path(file.toURI());
            if (rnd.nextBoolean()) {
                this.createRandomContents(file, rnd);
                threads[i] = new ReaderThread(limitedFs, path, Integer.MAX_VALUE, 2);
                continue;
            }
            threads[i] = new WriterThread(limitedFs, path, Integer.MAX_VALUE, 2);
        }
        for (CheckedThread t : threads) {
            t.start();
        }
        for (CheckedThread t : threads) {
            t.sync();
        }
    }

    @Test
    void testOpenTimeoutOutputStreams() throws Exception {
        long openTimeout = 50L;
        int maxConcurrentOpen = 2;
        LimitedConnectionsFileSystem limitedFs = new LimitedConnectionsFileSystem((FileSystem)LocalFileSystem.getSharedInstance(), 2, 50L, 0L);
        BlockingWriterThread[] threads = new BlockingWriterThread[2];
        for (int i = 0; i < 2; ++i) {
            Path path = new Path(File.createTempFile("junit", null, this.tempFolder).toURI());
            threads[i] = new BlockingWriterThread(limitedFs, path, Integer.MAX_VALUE, 2);
            threads[i].start();
        }
        while (limitedFs.getTotalNumberOfOpenStreams() < 2) {
            Thread.sleep(1L);
        }
        Assertions.assertThatThrownBy(() -> limitedFs.create(new Path(File.createTempFile("junit", null, this.tempFolder).toURI()), FileSystem.WriteMode.OVERWRITE)).isInstanceOf(IOException.class);
        for (BlockingWriterThread t : threads) {
            t.wakeup();
            t.sync();
        }
    }

    @Test
    void testOpenTimeoutInputStreams() throws Exception {
        long openTimeout = 50L;
        int maxConcurrentOpen = 2;
        LimitedConnectionsFileSystem limitedFs = new LimitedConnectionsFileSystem((FileSystem)LocalFileSystem.getSharedInstance(), 2, 50L, 0L);
        Random rnd = new Random();
        BlockingReaderThread[] threads = new BlockingReaderThread[2];
        for (int i = 0; i < 2; ++i) {
            File file = File.createTempFile("junit", null, this.tempFolder);
            this.createRandomContents(file, rnd);
            Path path = new Path(file.toURI());
            threads[i] = new BlockingReaderThread(limitedFs, path, 2, Integer.MAX_VALUE);
            threads[i].start();
        }
        while (limitedFs.getTotalNumberOfOpenStreams() < 2) {
            Thread.sleep(1L);
        }
        File file = File.createTempFile("junit", null, this.tempFolder);
        this.createRandomContents(file, rnd);
        Assertions.assertThatThrownBy(() -> limitedFs.open(new Path(file.toURI()))).isInstanceOf(IOException.class);
        for (BlockingReaderThread t : threads) {
            t.wakeup();
            t.sync();
        }
    }

    @Test
    void testTerminateStalledOutputStreams() throws Exception {
        int i;
        int maxConcurrentOpen = 2;
        int numThreads = 20;
        LimitedConnectionsFileSystem limitedFs = new LimitedConnectionsFileSystem((FileSystem)LocalFileSystem.getSharedInstance(), Integer.MAX_VALUE, 2, Integer.MAX_VALUE, 0L, 50L);
        WriterThread[] threads = new WriterThread[20];
        BlockingWriterThread[] blockers = new BlockingWriterThread[20];
        for (i = 0; i < 20; ++i) {
            Path path1 = new Path(File.createTempFile("junit", null, this.tempFolder).toURI());
            Path path2 = new Path(File.createTempFile("junit", null, this.tempFolder).toURI());
            threads[i] = new WriterThread(limitedFs, path1, 2, Integer.MAX_VALUE);
            blockers[i] = new BlockingWriterThread(limitedFs, path2, 2, Integer.MAX_VALUE);
        }
        for (i = 0; i < 20; ++i) {
            blockers[i].start();
            threads[i].start();
        }
        for (WriterThread writerThread : threads) {
            try {
                writerThread.sync();
            }
            catch (LimitedConnectionsFileSystem.StreamTimeoutException streamTimeoutException) {
                // empty catch block
            }
        }
        for (CheckedThread checkedThread : blockers) {
            checkedThread.wakeup();
        }
        for (CheckedThread checkedThread : blockers) {
            try {
                checkedThread.sync();
            }
            catch (LimitedConnectionsFileSystem.StreamTimeoutException streamTimeoutException) {
                // empty catch block
            }
        }
    }

    @Test
    void testTerminateStalledInputStreams() throws Exception {
        int i;
        int maxConcurrentOpen = 2;
        int numThreads = 20;
        LimitedConnectionsFileSystem limitedFs = new LimitedConnectionsFileSystem((FileSystem)LocalFileSystem.getSharedInstance(), Integer.MAX_VALUE, Integer.MAX_VALUE, 2, 0L, 50L);
        Random rnd = new Random();
        ReaderThread[] threads = new ReaderThread[20];
        BlockingReaderThread[] blockers = new BlockingReaderThread[20];
        for (i = 0; i < 20; ++i) {
            File file1 = File.createTempFile("junit", null, this.tempFolder);
            File file2 = File.createTempFile("junit", null, this.tempFolder);
            this.createRandomContents(file1, rnd);
            this.createRandomContents(file2, rnd);
            Path path = new Path(file1.toURI());
            Path path2 = new Path(file2.toURI());
            threads[i] = new ReaderThread(limitedFs, path, 2, Integer.MAX_VALUE);
            blockers[i] = new BlockingReaderThread(limitedFs, path2, 2, Integer.MAX_VALUE);
        }
        for (i = 0; i < 20; ++i) {
            blockers[i].start();
            threads[i].start();
        }
        for (ReaderThread readerThread : threads) {
            try {
                readerThread.sync();
            }
            catch (LimitedConnectionsFileSystem.StreamTimeoutException streamTimeoutException) {
                // empty catch block
            }
        }
        for (CheckedThread checkedThread : blockers) {
            checkedThread.wakeup();
        }
        for (CheckedThread checkedThread : blockers) {
            try {
                checkedThread.sync();
            }
            catch (LimitedConnectionsFileSystem.StreamTimeoutException streamTimeoutException) {
                // empty catch block
            }
        }
    }

    @Test
    void testTerminateStalledMixedStreams() throws Exception {
        int i;
        int maxConcurrentOpen = 2;
        int numThreads = 20;
        LimitedConnectionsFileSystem limitedFs = new LimitedConnectionsFileSystem((FileSystem)LocalFileSystem.getSharedInstance(), 2, 0L, 50L);
        Random rnd = new Random();
        CheckedThread[] threads = new CheckedThread[20];
        BlockingThread[] blockers = new BlockingThread[20];
        for (i = 0; i < 20; ++i) {
            File file1 = File.createTempFile("junit", null, this.tempFolder);
            File file2 = File.createTempFile("junit", null, this.tempFolder);
            Path path = new Path(file1.toURI());
            Path path2 = new Path(file2.toURI());
            if (rnd.nextBoolean()) {
                this.createRandomContents(file1, rnd);
                this.createRandomContents(file2, rnd);
                threads[i] = new ReaderThread(limitedFs, path, 2, Integer.MAX_VALUE);
                blockers[i] = new BlockingReaderThread(limitedFs, path2, 2, Integer.MAX_VALUE);
                continue;
            }
            threads[i] = new WriterThread(limitedFs, path, 2, Integer.MAX_VALUE);
            blockers[i] = new BlockingWriterThread(limitedFs, path2, 2, Integer.MAX_VALUE);
        }
        for (i = 0; i < 20; ++i) {
            blockers[i].start();
            threads[i].start();
        }
        for (CheckedThread checkedThread : threads) {
            try {
                checkedThread.sync();
            }
            catch (LimitedConnectionsFileSystem.StreamTimeoutException streamTimeoutException) {
                // empty catch block
            }
        }
        for (CheckedThread checkedThread : blockers) {
            checkedThread.wakeup();
        }
        for (CheckedThread checkedThread : blockers) {
            try {
                checkedThread.sync();
            }
            catch (LimitedConnectionsFileSystem.StreamTimeoutException streamTimeoutException) {
                // empty catch block
            }
        }
    }

    @Test
    void testFailingStreamsUnregister() throws Exception {
        LimitedConnectionsFileSystem fs = new LimitedConnectionsFileSystem((FileSystem)new FailFs(), 1);
        Assertions.assertThat((int)fs.getNumberOfOpenInputStreams()).isZero();
        Assertions.assertThat((int)fs.getNumberOfOpenOutputStreams()).isZero();
        Assertions.assertThat((int)fs.getTotalNumberOfOpenStreams()).isZero();
        Assertions.assertThatThrownBy(() -> fs.open(new Path(File.createTempFile("junit", null, this.tempFolder).toURI()))).isInstanceOf(IOException.class);
        Assertions.assertThatThrownBy(() -> fs.create(new Path(File.createTempFile("junit", null, this.tempFolder).toURI()), FileSystem.WriteMode.NO_OVERWRITE)).isInstanceOf(IOException.class);
        Assertions.assertThat((int)fs.getNumberOfOpenInputStreams()).isZero();
        Assertions.assertThat((int)fs.getNumberOfOpenOutputStreams()).isZero();
        Assertions.assertThat((int)fs.getTotalNumberOfOpenStreams()).isZero();
    }

    @Test
    void testSlowOutputStreamNotClosed() throws Exception {
        ReaderThread[] file;
        LimitedConnectionsFileSystem fs = new LimitedConnectionsFileSystem((FileSystem)LocalFileSystem.getSharedInstance(), 1, 0L, 1000L);
        Random rnd = new Random();
        ReaderThread[] threads = new ReaderThread[10];
        for (int i = 0; i < threads.length; ++i) {
            file = File.createTempFile("junit", null, this.tempFolder);
            this.createRandomContents((File)file, rnd);
            Path path = new Path(file.toURI());
            threads[i] = new ReaderThread(fs, path, 1, Integer.MAX_VALUE);
        }
        try (FSDataOutputStream out = fs.create(new Path(File.createTempFile("junit", null, this.tempFolder).toURI()), FileSystem.WriteMode.OVERWRITE);){
            file = threads;
            int n = file.length;
            for (int i = 0; i < n; ++i) {
                ReaderThread t = file[i];
                t.start();
            }
            Thread.sleep(5L);
            for (int bytesLeft = 50; bytesLeft > 0; --bytesLeft) {
                out.write(bytesLeft);
                Thread.sleep(5L);
            }
        }
        for (ReaderThread t : threads) {
            t.sync();
        }
    }

    @Test
    void testSlowInputStreamNotClosed() throws Exception {
        File file = File.createTempFile("junit", null, this.tempFolder);
        this.createRandomContents(file, new Random(), 50);
        LimitedConnectionsFileSystem fs = new LimitedConnectionsFileSystem((FileSystem)LocalFileSystem.getSharedInstance(), 1, 0L, 1000L);
        WriterThread[] threads = new WriterThread[10];
        for (int i = 0; i < threads.length; ++i) {
            Path path = new Path(File.createTempFile("junit", null, this.tempFolder).toURI());
            threads[i] = new WriterThread(fs, path, 1, Integer.MAX_VALUE);
        }
        try (FSDataInputStream in = fs.open(new Path(file.toURI()));){
            WriterThread[] writerThreadArray = threads;
            int n = writerThreadArray.length;
            for (int i = 0; i < n; ++i) {
                WriterThread t = writerThreadArray[i];
                t.start();
            }
            Thread.sleep(5L);
            while (in.read() != -1) {
                Thread.sleep(5L);
            }
        }
        for (WriterThread t : threads) {
            t.sync();
        }
    }

    private void createRandomContents(File file, Random rnd) throws IOException {
        this.createRandomContents(file, rnd, rnd.nextInt(10000) + 1);
    }

    private void createRandomContents(File file, Random rnd, int size) throws IOException {
        byte[] data = new byte[size];
        rnd.nextBytes(data);
        try (FileOutputStream fos = new FileOutputStream(file);){
            fos.write(data);
        }
    }

    private static class FailFs
    extends LocalFileSystem {
        private FailFs() {
        }

        public FSDataOutputStream create(Path filePath, FileSystem.WriteMode overwrite) throws IOException {
            throw new IOException("test exception");
        }

        public FSDataInputStream open(Path f) throws IOException {
            throw new IOException("test exception");
        }
    }

    private static final class BlockingReaderThread
    extends BlockingThread {
        private final LimitedConnectionsFileSystem fs;
        private final Path path;
        private final int maxConcurrentInputStreams;
        private final int maxConcurrentStreamsTotal;

        BlockingReaderThread(LimitedConnectionsFileSystem fs, Path path, int maxConcurrentInputStreams, int maxConcurrentStreamsTotal) {
            this.fs = fs;
            this.path = path;
            this.maxConcurrentInputStreams = maxConcurrentInputStreams;
            this.maxConcurrentStreamsTotal = maxConcurrentStreamsTotal;
        }

        public void go() throws Exception {
            try (FSDataInputStream stream = this.fs.open(this.path);){
                Assertions.assertThat((int)this.fs.getNumberOfOpenInputStreams()).isLessThanOrEqualTo(this.maxConcurrentInputStreams);
                Assertions.assertThat((int)this.fs.getTotalNumberOfOpenStreams()).isLessThanOrEqualTo(this.maxConcurrentStreamsTotal);
                byte[] readBuffer = new byte[(int)this.fs.getFileStatus(this.path).getLen() - 1];
                Assertions.assertThat((stream.read(readBuffer) != -1 ? 1 : 0) != 0).isTrue();
                this.waitTillWokenUp();
                stream.read();
            }
        }
    }

    private static final class BlockingWriterThread
    extends BlockingThread {
        private final LimitedConnectionsFileSystem fs;
        private final Path path;
        private final int maxConcurrentOutputStreams;
        private final int maxConcurrentStreamsTotal;

        BlockingWriterThread(LimitedConnectionsFileSystem fs, Path path, int maxConcurrentOutputStreams, int maxConcurrentStreamsTotal) {
            this.fs = fs;
            this.path = path;
            this.maxConcurrentOutputStreams = maxConcurrentOutputStreams;
            this.maxConcurrentStreamsTotal = maxConcurrentStreamsTotal;
        }

        public void go() throws Exception {
            try (FSDataOutputStream stream = this.fs.create(this.path, FileSystem.WriteMode.OVERWRITE);){
                Assertions.assertThat((int)this.fs.getNumberOfOpenOutputStreams()).isLessThanOrEqualTo(this.maxConcurrentOutputStreams);
                Assertions.assertThat((int)this.fs.getTotalNumberOfOpenStreams()).isLessThanOrEqualTo(this.maxConcurrentStreamsTotal);
                Random rnd = new Random();
                byte[] data = new byte[rnd.nextInt(10000) + 1];
                rnd.nextBytes(data);
                stream.write(data);
                this.waitTillWokenUp();
                stream.write(rnd.nextInt());
            }
        }
    }

    private static abstract class BlockingThread
    extends CheckedThread {
        private final OneShotLatch waiter = new OneShotLatch();

        private BlockingThread() {
        }

        public void waitTillWokenUp() throws InterruptedException {
            this.waiter.await();
        }

        public void wakeup() {
            this.waiter.trigger();
        }
    }

    private static final class ReaderThread
    extends CheckedThread {
        private final LimitedConnectionsFileSystem fs;
        private final Path path;
        private final int maxConcurrentInputStreams;
        private final int maxConcurrentStreamsTotal;

        ReaderThread(LimitedConnectionsFileSystem fs, Path path, int maxConcurrentInputStreams, int maxConcurrentStreamsTotal) {
            this.fs = fs;
            this.path = path;
            this.maxConcurrentInputStreams = maxConcurrentInputStreams;
            this.maxConcurrentStreamsTotal = maxConcurrentStreamsTotal;
        }

        public void go() throws Exception {
            try (FSDataInputStream stream = this.fs.open(this.path);){
                Assertions.assertThat((int)this.fs.getNumberOfOpenInputStreams()).isLessThanOrEqualTo(this.maxConcurrentInputStreams);
                Assertions.assertThat((int)this.fs.getTotalNumberOfOpenStreams()).isLessThanOrEqualTo(this.maxConcurrentStreamsTotal);
                byte[] readBuffer = new byte[4096];
                while (stream.read(readBuffer) != -1) {
                }
            }
        }
    }

    private static final class WriterThread
    extends CheckedThread {
        private final LimitedConnectionsFileSystem fs;
        private final Path path;
        private final int maxConcurrentOutputStreams;
        private final int maxConcurrentStreamsTotal;

        WriterThread(LimitedConnectionsFileSystem fs, Path path, int maxConcurrentOutputStreams, int maxConcurrentStreamsTotal) {
            this.fs = fs;
            this.path = path;
            this.maxConcurrentOutputStreams = maxConcurrentOutputStreams;
            this.maxConcurrentStreamsTotal = maxConcurrentStreamsTotal;
        }

        public void go() throws Exception {
            try (FSDataOutputStream stream = this.fs.create(this.path, FileSystem.WriteMode.OVERWRITE);){
                Assertions.assertThat((int)this.fs.getNumberOfOpenOutputStreams()).isLessThanOrEqualTo(this.maxConcurrentOutputStreams);
                Assertions.assertThat((int)this.fs.getTotalNumberOfOpenStreams()).isLessThanOrEqualTo(this.maxConcurrentStreamsTotal);
                Random rnd = new Random();
                byte[] data = new byte[rnd.nextInt(10000) + 1];
                rnd.nextBytes(data);
                stream.write(data);
            }
        }
    }
}

