/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.disk;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.testutils.CheckedThread;
import org.apache.flink.runtime.io.disk.BatchShuffleReadBufferPool;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;

@Timeout(value=60L)
class BatchShuffleReadBufferPoolTest {
    BatchShuffleReadBufferPoolTest() {
    }

    @Test
    void testIllegalTotalBytes() {
        Assertions.assertThatThrownBy(() -> this.createBufferPool(0L, 1024)).isInstanceOf(IllegalArgumentException.class);
    }

    @Test
    void testIllegalBufferSize() {
        Assertions.assertThatThrownBy(() -> this.createBufferPool(0x2000000L, 0)).isInstanceOf(IllegalArgumentException.class);
    }

    @Test
    void testLargeTotalBytes() {
        BatchShuffleReadBufferPool bufferPool = this.createBufferPool(Long.MAX_VALUE, 1024);
        Assertions.assertThat((int)bufferPool.getNumTotalBuffers()).isEqualTo(Integer.MAX_VALUE);
        bufferPool.destroy();
    }

    @Test
    void testTotalBytesSmallerThanBufferSize() {
        Assertions.assertThatThrownBy(() -> this.createBufferPool(4096L, 32768)).isInstanceOf(IllegalArgumentException.class);
    }

    @Test
    void testBufferCalculation() {
        long totalBytes = 0x2000000L;
        int bufferSize = 4096;
        while ((long)bufferSize <= totalBytes) {
            BatchShuffleReadBufferPool bufferPool = this.createBufferPool(totalBytes, bufferSize);
            Assertions.assertThat((long)bufferPool.getTotalBytes()).isEqualTo(totalBytes);
            Assertions.assertThat((int)bufferPool.getNumTotalBuffers()).isEqualTo(totalBytes / (long)bufferSize);
            Assertions.assertThat((int)bufferPool.getNumBuffersPerRequest()).isLessThanOrEqualTo(bufferPool.getNumTotalBuffers());
            Assertions.assertThat((int)bufferPool.getNumBuffersPerRequest()).isGreaterThan(0);
            bufferSize += 1024;
        }
    }

    @Test
    void testRequestBuffers() throws Exception {
        BatchShuffleReadBufferPool bufferPool = this.createBufferPool();
        ArrayList buffers = new ArrayList();
        try {
            buffers.addAll(bufferPool.requestBuffers());
            Assertions.assertThat(buffers).hasSize(bufferPool.getNumBuffersPerRequest());
        }
        finally {
            bufferPool.recycle(buffers);
            bufferPool.destroy();
        }
    }

    @Test
    void testRecycle() throws Exception {
        BatchShuffleReadBufferPool bufferPool = this.createBufferPool();
        List buffers = bufferPool.requestBuffers();
        bufferPool.recycle((Collection)buffers);
        Assertions.assertThat((int)bufferPool.getAvailableBuffers()).isEqualTo(bufferPool.getNumTotalBuffers());
    }

    @Test
    void testBufferOperationTimestampUpdated() throws Exception {
        BatchShuffleReadBufferPool bufferPool = new BatchShuffleReadBufferPool(1024L, 1024);
        long oldTimestamp = bufferPool.getLastBufferOperationTimestamp();
        Thread.sleep(100L);
        List buffers = bufferPool.requestBuffers();
        Assertions.assertThat((List)buffers).hasSize(1);
        Assertions.assertThat((long)bufferPool.getLastBufferOperationTimestamp()).isGreaterThan(oldTimestamp);
        oldTimestamp = bufferPool.getLastBufferOperationTimestamp();
        Thread.sleep(100L);
        bufferPool.recycle((Collection)buffers);
        Assertions.assertThat((long)bufferPool.getLastBufferOperationTimestamp()).isGreaterThan(oldTimestamp);
        buffers = bufferPool.requestBuffers();
        oldTimestamp = bufferPool.getLastBufferOperationTimestamp();
        Thread.sleep(100L);
        Assertions.assertThat((List)bufferPool.requestBuffers()).isEmpty();
        Assertions.assertThat((long)bufferPool.getLastBufferOperationTimestamp()).isEqualTo(oldTimestamp);
        bufferPool.recycle((Collection)buffers);
        bufferPool.destroy();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    void testBufferFulfilledByRecycledBuffers() throws Exception {
        int numRequestThreads = 2;
        final BatchShuffleReadBufferPool bufferPool = this.createBufferPool();
        final ConcurrentHashMap<Object, List> buffers = new ConcurrentHashMap<Object, List>();
        try {
            Object[] owners = new Object[8];
            for (int i = 0; i < 8; ++i) {
                owners[i] = new Object();
                buffers.put(owners[i], bufferPool.requestBuffers());
            }
            Assertions.assertThat((int)bufferPool.getAvailableBuffers()).isZero();
            CheckedThread[] requestThreads = new CheckedThread[numRequestThreads];
            for (int i = 0; i < numRequestThreads; ++i) {
                requestThreads[i] = new CheckedThread(){

                    public void go() throws Exception {
                        Object owner = new Object();
                        List allocated = null;
                        while (allocated == null || allocated.isEmpty()) {
                            allocated = bufferPool.requestBuffers();
                        }
                        buffers.put(owner, allocated);
                    }
                };
                requestThreads[i].start();
            }
            for (MemorySegment segment : (List)buffers.remove(owners[0])) {
                bufferPool.recycle(segment);
            }
            bufferPool.recycle((Collection)buffers.remove(owners[1]));
            for (CheckedThread requestThread : requestThreads) {
                requestThread.sync();
            }
            Assertions.assertThat((int)bufferPool.getAvailableBuffers()).isZero();
            Assertions.assertThat(buffers).hasSize(8);
        }
        finally {
            for (Object owner : buffers.keySet()) {
                bufferPool.recycle((Collection)buffers.remove(owner));
            }
            Assertions.assertThat((int)bufferPool.getAvailableBuffers()).isEqualTo(bufferPool.getNumTotalBuffers());
            bufferPool.destroy();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    void testMultipleThreadRequestAndRecycle() throws Exception {
        int numRequestThreads = 10;
        final BatchShuffleReadBufferPool bufferPool = this.createBufferPool();
        try {
            CheckedThread[] requestThreads = new CheckedThread[numRequestThreads];
            for (int i = 0; i < numRequestThreads; ++i) {
                requestThreads[i] = new CheckedThread(){

                    public void go() throws Exception {
                        for (int j = 0; j < 100; ++j) {
                            List buffers = bufferPool.requestBuffers();
                            Thread.sleep(10L);
                            if (j % 2 == 0) {
                                bufferPool.recycle((Collection)buffers);
                                continue;
                            }
                            for (MemorySegment segment : buffers) {
                                bufferPool.recycle(segment);
                            }
                        }
                    }
                };
                requestThreads[i].start();
            }
            for (CheckedThread requestThread : requestThreads) {
                requestThread.sync();
            }
            Assertions.assertThat((int)bufferPool.getAvailableBuffers()).isEqualTo(bufferPool.getNumTotalBuffers());
        }
        finally {
            bufferPool.destroy();
        }
    }

    @Test
    void testDestroy() throws Exception {
        BatchShuffleReadBufferPool bufferPool = this.createBufferPool();
        List buffers = bufferPool.requestBuffers();
        bufferPool.recycle((Collection)buffers);
        Assertions.assertThat((boolean)bufferPool.isDestroyed()).isFalse();
        Assertions.assertThat((int)bufferPool.getAvailableBuffers()).isEqualTo(bufferPool.getNumTotalBuffers());
        buffers = bufferPool.requestBuffers();
        Assertions.assertThat((int)bufferPool.getAvailableBuffers()).isEqualTo(bufferPool.getNumTotalBuffers() - buffers.size());
        bufferPool.destroy();
        Assertions.assertThat((boolean)bufferPool.isDestroyed()).isTrue();
        Assertions.assertThat((int)bufferPool.getAvailableBuffers()).isZero();
    }

    @Test
    void testRequestBuffersAfterDestroyed() throws Exception {
        BatchShuffleReadBufferPool bufferPool = this.createBufferPool();
        bufferPool.requestBuffers();
        bufferPool.destroy();
        Assertions.assertThatThrownBy(() -> ((BatchShuffleReadBufferPool)bufferPool).requestBuffers()).isInstanceOf(IllegalStateException.class);
    }

    @Test
    void testRecycleAfterDestroyed() throws Exception {
        BatchShuffleReadBufferPool bufferPool = this.createBufferPool();
        List buffers = bufferPool.requestBuffers();
        bufferPool.destroy();
        bufferPool.recycle((Collection)buffers);
        Assertions.assertThat((int)bufferPool.getAvailableBuffers()).isZero();
    }

    @Test
    void testDestroyWhileBlockingRequest() throws Exception {
        final BatchShuffleReadBufferPool bufferPool = this.createBufferPool();
        CheckedThread requestThread = new CheckedThread(){

            public void go() throws Exception {
                while (true) {
                    bufferPool.requestBuffers();
                }
            }
        };
        requestThread.start();
        Thread.sleep(1000L);
        bufferPool.destroy();
        Assertions.assertThatThrownBy(() -> ((CheckedThread)requestThread).sync()).isInstanceOf(IllegalStateException.class);
    }

    private BatchShuffleReadBufferPool createBufferPool(long totalBytes, int bufferSize) {
        return new BatchShuffleReadBufferPool(totalBytes, bufferSize);
    }

    private BatchShuffleReadBufferPool createBufferPool() {
        return this.createBufferPool(0x2000000L, 32768);
    }
}

