/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.network.partition.hybrid;

import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.file.Path;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Random;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.core.testutils.CheckedThread;
import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.executiongraph.IOMetrics;
import org.apache.flink.runtime.executiongraph.ResultPartitionBytes;
import org.apache.flink.runtime.io.disk.BatchShuffleReadBufferPool;
import org.apache.flink.runtime.io.disk.FileChannelManager;
import org.apache.flink.runtime.io.disk.FileChannelManagerImpl;
import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferPool;
import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
import org.apache.flink.runtime.io.network.partition.NoOpBufferAvailablityListener;
import org.apache.flink.runtime.io.network.partition.PartitionNotFoundException;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.io.network.partition.ResultSubpartition;
import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
import org.apache.flink.runtime.io.network.partition.hybrid.HsResultPartition;
import org.apache.flink.runtime.io.network.partition.hybrid.HybridShuffleConfiguration;
import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.util.IOUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.concurrent.ExecutorThreadFactory;
import org.apache.flink.util.concurrent.IgnoreShutdownRejectedExecutionHandler;
import org.assertj.core.api.AbstractThrowableAssert;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

class HsResultPartitionTest {
    private static final int bufferSize = 1024;
    private static final int totalBuffers = 1000;
    private static final int totalBytes = 0x2000000;
    private static final int numThreads = 4;
    private FileChannelManager fileChannelManager;
    private NetworkBufferPool globalPool;
    private BatchShuffleReadBufferPool readBufferPool;
    private ScheduledExecutorService readIOExecutor;
    private TaskIOMetricGroup taskIOMetricGroup;
    @TempDir
    public Path tempDataPath;

    HsResultPartitionTest() {
    }

    @BeforeEach
    void before() {
        this.fileChannelManager = new FileChannelManagerImpl(new String[]{this.tempDataPath.toString()}, "testing");
        this.globalPool = new NetworkBufferPool(1000, 1024);
        this.readBufferPool = new BatchShuffleReadBufferPool(0x2000000L, 1024);
        this.readIOExecutor = new ScheduledThreadPoolExecutor(4, (ThreadFactory)new ExecutorThreadFactory("test-io-scheduler-thread"), (RejectedExecutionHandler)new IgnoreShutdownRejectedExecutionHandler());
    }

    @AfterEach
    void after() throws Exception {
        this.fileChannelManager.close();
        this.globalPool.destroy();
        this.readBufferPool.destroy();
        this.readIOExecutor.shutdown();
    }

    @Test
    void testEmit() throws Exception {
        int numBuffers = 100;
        int numSubpartitions = 10;
        int numRecords = 1000;
        Random random = new Random();
        BufferPool bufferPool = this.globalPool.createBufferPool(numBuffers, numBuffers);
        try (HsResultPartition partition = this.createHsResultPartition(numSubpartitions, bufferPool);){
            ByteBuffer record;
            Queue[] dataWritten = new Queue[numSubpartitions];
            Queue[] buffersRead = new Queue[numSubpartitions];
            for (int i = 0; i < numSubpartitions; ++i) {
                dataWritten[i] = new ArrayDeque();
                buffersRead[i] = new ArrayDeque();
            }
            int[] numBytesWritten = new int[numSubpartitions];
            int[] numBytesRead = new int[numSubpartitions];
            Arrays.fill(numBytesWritten, 0);
            Arrays.fill(numBytesRead, 0);
            for (int i = 0; i < numRecords; ++i) {
                int subpartition;
                record = HsResultPartitionTest.generateRandomData(random.nextInt(2048) + 1, random);
                boolean isBroadCast = random.nextBoolean();
                if (isBroadCast) {
                    partition.broadcastRecord(record);
                    for (subpartition = 0; subpartition < numSubpartitions; ++subpartition) {
                        HsResultPartitionTest.recordDataWritten(record, dataWritten, subpartition, numBytesWritten, Buffer.DataType.DATA_BUFFER);
                    }
                    continue;
                }
                subpartition = random.nextInt(numSubpartitions);
                partition.emitRecord(record, subpartition);
                HsResultPartitionTest.recordDataWritten(record, dataWritten, subpartition, numBytesWritten, Buffer.DataType.DATA_BUFFER);
            }
            partition.finish();
            for (int subpartition = 0; subpartition < numSubpartitions; ++subpartition) {
                record = EventSerializer.toSerializedEvent((AbstractEvent)EndOfPartitionEvent.INSTANCE);
                HsResultPartitionTest.recordDataWritten(record, dataWritten, subpartition, numBytesWritten, Buffer.DataType.EVENT_BUFFER);
            }
            Tuple2<ResultSubpartitionView, TestingBufferAvailabilityListener>[] viewAndListeners = this.createSubpartitionViews(partition, numSubpartitions);
            this.readData(viewAndListeners, (buffer, subpartitionId) -> {
                int numBytes = buffer.readableBytes();
                int n = subpartitionId;
                numBytesRead[n] = numBytesRead[n] + numBytes;
                MemorySegment segment = MemorySegmentFactory.allocateUnpooledSegment((int)numBytes);
                segment.put(0, buffer.getNioBufferReadable(), numBytes);
                buffersRead[subpartitionId].add(new NetworkBuffer(segment, buf -> {}, buffer.getDataType(), numBytes));
            });
            HsResultPartitionTest.checkWriteReadResult(numSubpartitions, numBytesWritten, numBytesRead, dataWritten, buffersRead);
        }
    }

    @Test
    void testBroadcastEvent() throws Exception {
        boolean numBuffers = true;
        BufferPool bufferPool = this.globalPool.createBufferPool(1, 1);
        try (HsResultPartition resultPartition = this.createHsResultPartition(2, bufferPool);){
            resultPartition.broadcastEvent((AbstractEvent)EndOfPartitionEvent.INSTANCE, false);
            Assertions.assertThat((int)bufferPool.getNumberOfAvailableMemorySegments()).isEqualTo(1);
            Tuple2<ResultSubpartitionView, TestingBufferAvailabilityListener>[] viewAndListeners = this.createSubpartitionViews(resultPartition, 2);
            boolean[] receivedEvent = new boolean[2];
            this.readData(viewAndListeners, (buffer, subpartition) -> {
                Assertions.assertThat((boolean)buffer.getDataType().isEvent()).isTrue();
                try {
                    AbstractEvent event = EventSerializer.fromSerializedEvent((ByteBuffer)buffer.readOnlySlice().getNioBufferReadable(), (ClassLoader)HsResultPartitionTest.class.getClassLoader());
                    Assertions.assertThat((Object)event).isInstanceOf(EndOfPartitionEvent.class);
                    receivedEvent[subpartition.intValue()] = true;
                }
                catch (IOException e) {
                    throw new RuntimeException(e);
                }
            });
            Assertions.assertThat((boolean[])receivedEvent).containsExactly(new boolean[]{true, true});
        }
    }

    @Test
    void testMultipleConsumer() throws Exception {
        int numBuffers = 10;
        int numRecords = 10;
        int numConsumers = 2;
        boolean targetChannel = false;
        Random random = new Random();
        BufferPool bufferPool = this.globalPool.createBufferPool(10, 10);
        try (HsResultPartition resultPartition = this.createHsResultPartition(2, bufferPool);){
            int i;
            ArrayList<ByteBuffer> dataWritten = new ArrayList<ByteBuffer>();
            for (int i2 = 0; i2 < 10; ++i2) {
                ByteBuffer record = HsResultPartitionTest.generateRandomData(1024, random);
                resultPartition.emitRecord(record, 0);
                dataWritten.add(record);
            }
            resultPartition.finish();
            Tuple2<ResultSubpartitionView, TestingBufferAvailabilityListener>[] viewAndListeners = this.createMultipleConsumerView(resultPartition, 0, 2);
            ArrayList dataRead = new ArrayList();
            for (i = 0; i < 2; ++i) {
                dataRead.add(new ArrayList());
            }
            this.readData(viewAndListeners, (buffer, subpartition) -> {
                int numBytes = buffer.readableBytes();
                if (buffer.isBuffer()) {
                    MemorySegment segment = MemorySegmentFactory.allocateUnpooledSegment((int)numBytes);
                    segment.put(0, buffer.getNioBufferReadable(), numBytes);
                    ((List)dataRead.get((int)subpartition)).add(new NetworkBuffer(segment, buf -> {}, buffer.getDataType(), numBytes));
                }
            });
            for (i = 0; i < 2; ++i) {
                Assertions.assertThat(dataWritten).hasSameSizeAs((Iterable)dataRead.get(i));
                List readBufferList = (List)dataRead.get(i);
                for (int j = 0; j < dataWritten.size(); ++j) {
                    ByteBuffer bufferWritten = (ByteBuffer)dataWritten.get(j);
                    bufferWritten.rewind();
                    Buffer bufferRead = (Buffer)readBufferList.get(j);
                    Assertions.assertThat((Comparable)bufferRead.getNioBufferReadable()).isEqualTo((Object)bufferWritten);
                }
            }
        }
    }

    @Test
    void testBroadcastResultPartition() throws Exception {
        int numBuffers = 10;
        int numRecords = 10;
        int numConsumers = 2;
        Random random = new Random();
        BufferPool bufferPool = this.globalPool.createBufferPool(10, 10);
        try (HsResultPartition resultPartition = this.createHsResultPartition(2, bufferPool, true);){
            int i;
            ArrayList<ByteBuffer> dataWritten = new ArrayList<ByteBuffer>();
            for (int i2 = 0; i2 < 10; ++i2) {
                ByteBuffer record = HsResultPartitionTest.generateRandomData(1024, random);
                resultPartition.broadcastRecord(record);
                dataWritten.add(record);
            }
            resultPartition.finish();
            Tuple2<ResultSubpartitionView, TestingBufferAvailabilityListener>[] viewAndListeners = this.createSubpartitionViews(resultPartition, 2);
            ArrayList dataRead = new ArrayList();
            for (i = 0; i < 2; ++i) {
                dataRead.add(new ArrayList());
            }
            this.readData(viewAndListeners, (buffer, subpartition) -> {
                int numBytes = buffer.readableBytes();
                if (buffer.isBuffer()) {
                    MemorySegment segment = MemorySegmentFactory.allocateUnpooledSegment((int)numBytes);
                    segment.put(0, buffer.getNioBufferReadable(), numBytes);
                    ((List)dataRead.get((int)subpartition)).add(new NetworkBuffer(segment, buf -> {}, buffer.getDataType(), numBytes));
                }
            });
            for (i = 0; i < 2; ++i) {
                Assertions.assertThat(dataWritten).hasSameSizeAs((Iterable)dataRead.get(i));
                List readBufferList = (List)dataRead.get(i);
                for (int j = 0; j < dataWritten.size(); ++j) {
                    ByteBuffer bufferWritten = (ByteBuffer)dataWritten.get(j);
                    bufferWritten.rewind();
                    Buffer bufferRead = (Buffer)readBufferList.get(j);
                    Assertions.assertThat((Comparable)bufferRead.getNioBufferReadable()).isEqualTo((Object)bufferWritten);
                }
            }
        }
    }

    @Test
    void testClose() throws Exception {
        boolean numBuffers = true;
        BufferPool bufferPool = this.globalPool.createBufferPool(1, 1);
        HsResultPartition partition = this.createHsResultPartition(1, bufferPool);
        partition.close();
        Assertions.assertThatThrownBy(() -> partition.emitRecord(ByteBuffer.allocate(1024), 0));
    }

    @Test
    void testRelease() throws Exception {
        int numSubpartitions = 2;
        int numBuffers = 10;
        BufferPool bufferPool = this.globalPool.createBufferPool(10, 10);
        HsResultPartition partition = this.createHsResultPartition(2, bufferPool, HybridShuffleConfiguration.builder((int)2, (int)this.readBufferPool.getNumBuffersPerRequest()).setFullStrategyNumBuffersTriggerSpillingRatio(0.6f).setFullStrategyReleaseBufferRatio(0.8f).build());
        partition.emitRecord(ByteBuffer.allocate(5120), 1);
        Assertions.assertThat((int)bufferPool.bestEffortGetNumOfUsedBuffers()).isEqualTo(5);
        partition.close();
        Assertions.assertThat((boolean)bufferPool.isDestroyed()).isTrue();
        partition.release();
        while (((File[])Preconditions.checkNotNull((Object)this.fileChannelManager.getPaths()[0].listFiles())).length != 0) {
            Thread.sleep(10L);
        }
        Assertions.assertThat((int)1000).isEqualTo(this.globalPool.getNumberOfAvailableMemorySegments());
    }

    @Test
    void testCreateSubpartitionViewAfterRelease() throws Exception {
        int numBuffers = 10;
        BufferPool bufferPool = this.globalPool.createBufferPool(10, 10);
        HsResultPartition resultPartition = this.createHsResultPartition(2, bufferPool);
        resultPartition.release();
        Assertions.assertThatThrownBy(() -> resultPartition.createSubpartitionView(0, (BufferAvailabilityListener)new NoOpBufferAvailablityListener())).isInstanceOf(IllegalStateException.class);
    }

    @Test
    void testCreateSubpartitionViewLostData() throws Exception {
        int numBuffers = 10;
        BufferPool bufferPool = this.globalPool.createBufferPool(10, 10);
        HsResultPartition resultPartition = this.createHsResultPartition(2, bufferPool);
        IOUtils.deleteFilesRecursively((Path)this.tempDataPath);
        Assertions.assertThatThrownBy(() -> resultPartition.createSubpartitionView(0, (BufferAvailabilityListener)new NoOpBufferAvailablityListener())).isInstanceOf(PartitionNotFoundException.class);
    }

    @Test
    void testAvailability() throws Exception {
        int numBuffers = 2;
        boolean numSubpartitions = true;
        BufferPool bufferPool = this.globalPool.createBufferPool(2, 2);
        HsResultPartition partition = this.createHsResultPartition(1, bufferPool, HybridShuffleConfiguration.builder((int)1, (int)this.readBufferPool.getNumBuffersPerRequest()).setFullStrategyReleaseBufferRatio(0.0f).build());
        partition.emitRecord(ByteBuffer.allocate(2048), 0);
        Assertions.assertThat((boolean)partition.isAvailable()).isFalse();
        partition.close();
        partition.release();
        Assertions.assertThat((boolean)partition.isAvailable()).isTrue();
    }

    @Test
    void testMetricsUpdate() throws Exception {
        BufferPool bufferPool = this.globalPool.createBufferPool(3, 3);
        try (HsResultPartition partition = this.createHsResultPartition(2, bufferPool);){
            partition.emitRecord(ByteBuffer.allocate(1024), 0);
            partition.broadcastRecord(ByteBuffer.allocate(1024));
            Assertions.assertThat((long)this.taskIOMetricGroup.getNumBuffersOutCounter().getCount()).isEqualTo(3L);
            Assertions.assertThat((long)this.taskIOMetricGroup.getNumBytesOutCounter().getCount()).isEqualTo(3072L);
            IOMetrics ioMetrics = this.taskIOMetricGroup.createSnapshot();
            Assertions.assertThat((Map)ioMetrics.getResultPartitionBytes()).hasSize(1);
            ResultPartitionBytes partitionBytes = (ResultPartitionBytes)ioMetrics.getResultPartitionBytes().values().iterator().next();
            Assertions.assertThat((long[])partitionBytes.getSubpartitionBytes()).containsExactly(new long[]{2048L, 1024L});
        }
    }

    @Test
    void testSelectiveSpillingStrategyRegisterMultipleConsumer() throws Exception {
        int numSubpartitions = 2;
        BufferPool bufferPool = this.globalPool.createBufferPool(2, 2);
        try (HsResultPartition partition = this.createHsResultPartition(2, bufferPool, HybridShuffleConfiguration.builder((int)2, (int)this.readBufferPool.getNumBuffersPerRequest()).setSpillingStrategyType(HybridShuffleConfiguration.SpillingStrategyType.SELECTIVE).build());){
            partition.createSubpartitionView(0, (BufferAvailabilityListener)new NoOpBufferAvailablityListener());
            ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> partition.createSubpartitionView(0, (BufferAvailabilityListener)new NoOpBufferAvailablityListener())).isInstanceOf(IllegalStateException.class)).hasMessageContaining("Multiple consumer is not allowed");
        }
    }

    @Test
    void testFullSpillingStrategyRegisterMultipleConsumer() throws Exception {
        int numSubpartitions = 2;
        BufferPool bufferPool = this.globalPool.createBufferPool(2, 2);
        try (HsResultPartition partition = this.createHsResultPartition(2, bufferPool, HybridShuffleConfiguration.builder((int)2, (int)this.readBufferPool.getNumBuffersPerRequest()).setSpillingStrategyType(HybridShuffleConfiguration.SpillingStrategyType.FULL).build());){
            partition.createSubpartitionView(0, (BufferAvailabilityListener)new NoOpBufferAvailablityListener());
            Assertions.assertThatNoException().isThrownBy(() -> partition.createSubpartitionView(0, (BufferAvailabilityListener)new NoOpBufferAvailablityListener()));
        }
    }

    @Test
    void testMetricsUpdateForBroadcastOnlyResultPartition() throws Exception {
        BufferPool bufferPool = this.globalPool.createBufferPool(3, 3);
        try (HsResultPartition partition = this.createHsResultPartition(2, bufferPool, true);){
            partition.broadcastRecord(ByteBuffer.allocate(1024));
            Assertions.assertThat((long)this.taskIOMetricGroup.getNumBuffersOutCounter().getCount()).isEqualTo(1L);
            Assertions.assertThat((long)this.taskIOMetricGroup.getNumBytesOutCounter().getCount()).isEqualTo(1024L);
            IOMetrics ioMetrics = this.taskIOMetricGroup.createSnapshot();
            Assertions.assertThat((Map)ioMetrics.getResultPartitionBytes()).hasSize(1);
            ResultPartitionBytes partitionBytes = (ResultPartitionBytes)ioMetrics.getResultPartitionBytes().values().iterator().next();
            Assertions.assertThat((long[])partitionBytes.getSubpartitionBytes()).containsExactly(new long[]{1024L, 1024L});
        }
    }

    private static void recordDataWritten(ByteBuffer record, Queue<Tuple2<ByteBuffer, Buffer.DataType>>[] dataWritten, int subpartition, int[] numBytesWritten, Buffer.DataType dataType) {
        record.rewind();
        dataWritten[subpartition].add((Tuple2<ByteBuffer, Buffer.DataType>)Tuple2.of((Object)record, (Object)dataType));
        int n = subpartition;
        numBytesWritten[n] = numBytesWritten[n] + record.remaining();
    }

    private long readData(final Tuple2<ResultSubpartitionView, TestingBufferAvailabilityListener>[] viewAndListeners, final BiConsumer<Buffer, Integer> bufferProcessor) throws Exception {
        final AtomicInteger dataSize = new AtomicInteger(0);
        final AtomicInteger numEndOfPartitionEvents = new AtomicInteger(0);
        CheckedThread[] subpartitionViewThreads = new CheckedThread[viewAndListeners.length];
        int i = 0;
        while (i < viewAndListeners.length) {
            CheckedThread subpartitionViewThread;
            final int subpartition = i++;
            subpartitionViewThreads[subpartition] = subpartitionViewThread = new CheckedThread(){

                public void go() throws Exception {
                    ResultSubpartitionView view = (ResultSubpartitionView)viewAndListeners[subpartition].f0;
                    while (true) {
                        ResultSubpartition.BufferAndBacklog bufferAndBacklog;
                        if ((bufferAndBacklog = view.getNextBuffer()) == null) {
                            ((TestingBufferAvailabilityListener)viewAndListeners[subpartition].f1).waitForData();
                            continue;
                        }
                        Buffer buffer = bufferAndBacklog.buffer();
                        bufferProcessor.accept(buffer, subpartition);
                        dataSize.addAndGet(buffer.readableBytes());
                        buffer.recycleBuffer();
                        if (!buffer.isBuffer()) break;
                        if (bufferAndBacklog.getNextDataType() != Buffer.DataType.NONE) continue;
                        ((TestingBufferAvailabilityListener)viewAndListeners[subpartition].f1).waitForData();
                    }
                    numEndOfPartitionEvents.incrementAndGet();
                    view.releaseAllResources();
                }
            };
            subpartitionViewThread.start();
        }
        for (CheckedThread thread : subpartitionViewThreads) {
            thread.sync();
        }
        return dataSize.get();
    }

    private static ByteBuffer generateRandomData(int dataSize, Random random) {
        byte[] dataWritten = new byte[dataSize];
        random.nextBytes(dataWritten);
        return ByteBuffer.wrap(dataWritten);
    }

    private HsResultPartition createHsResultPartition(int numSubpartitions, BufferPool bufferPool) throws IOException {
        return this.createHsResultPartition(numSubpartitions, bufferPool, false);
    }

    private HsResultPartition createHsResultPartition(int numSubpartitions, BufferPool bufferPool, HybridShuffleConfiguration hybridShuffleConfiguration) throws IOException {
        return this.createHsResultPartition(numSubpartitions, bufferPool, false, hybridShuffleConfiguration);
    }

    private HsResultPartition createHsResultPartition(int numSubpartitions, BufferPool bufferPool, boolean isBroadcastOnly) throws IOException {
        return this.createHsResultPartition(numSubpartitions, bufferPool, isBroadcastOnly, HybridShuffleConfiguration.builder((int)numSubpartitions, (int)this.readBufferPool.getNumBuffersPerRequest()).build());
    }

    private HsResultPartition createHsResultPartition(int numSubpartitions, BufferPool bufferPool, boolean isBroadcastOnly, HybridShuffleConfiguration hybridShuffleConfiguration) throws IOException {
        HsResultPartition hsResultPartition = new HsResultPartition("HsResultPartitionTest", 0, new ResultPartitionID(), ResultPartitionType.HYBRID_FULL, numSubpartitions, numSubpartitions, this.readBufferPool, this.readIOExecutor, new ResultPartitionManager(), this.fileChannelManager.createChannel().getPath(), 1024, hybridShuffleConfiguration, null, isBroadcastOnly, () -> bufferPool);
        this.taskIOMetricGroup = UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup();
        hsResultPartition.setup();
        hsResultPartition.setMetricGroup(this.taskIOMetricGroup);
        return hsResultPartition;
    }

    /*
     * WARNING - void declaration
     */
    private static void checkWriteReadResult(int numSubpartitions, int[] numBytesWritten, int[] numBytesRead, Queue<Tuple2<ByteBuffer, Buffer.DataType>>[] dataWritten, Queue<Buffer>[] buffersRead) {
        for (int subpartitionIndex = 0; subpartitionIndex < numSubpartitions; ++subpartitionIndex) {
            void var10_10;
            Assertions.assertThat((int)numBytesWritten[subpartitionIndex]).isEqualTo(numBytesRead[subpartitionIndex]);
            ArrayList<Tuple2> eventsWritten = new ArrayList<Tuple2>();
            ArrayList<Buffer> eventsRead = new ArrayList<Buffer>();
            ByteBuffer subpartitionDataWritten = ByteBuffer.allocate(numBytesWritten[subpartitionIndex]);
            for (Tuple2 tuple2 : dataWritten[subpartitionIndex]) {
                subpartitionDataWritten.put((ByteBuffer)tuple2.f0);
                ((ByteBuffer)tuple2.f0).rewind();
                if (!((Buffer.DataType)tuple2.f1).isEvent()) continue;
                eventsWritten.add(tuple2);
            }
            ByteBuffer subpartitionDataRead = ByteBuffer.allocate(numBytesRead[subpartitionIndex]);
            for (Buffer buffer : buffersRead[subpartitionIndex]) {
                subpartitionDataRead.put(buffer.getNioBufferReadable());
                if (buffer.isBuffer()) continue;
                eventsRead.add(buffer);
            }
            subpartitionDataWritten.flip();
            subpartitionDataRead.flip();
            Assertions.assertThat((Comparable)subpartitionDataWritten).isEqualTo((Object)subpartitionDataRead);
            Assertions.assertThat((int)eventsWritten.size()).isEqualTo(eventsRead.size());
            boolean bl = false;
            while (var10_10 < eventsWritten.size()) {
                Assertions.assertThat((Comparable)((Comparable)((Tuple2)eventsWritten.get((int)var10_10)).f1)).isEqualTo((Object)((Buffer)eventsRead.get((int)var10_10)).getDataType());
                Assertions.assertThat((Comparable)((Comparable)((Tuple2)eventsWritten.get((int)var10_10)).f0)).isEqualTo((Object)((Buffer)eventsRead.get((int)var10_10)).getNioBufferReadable());
                ++var10_10;
            }
        }
    }

    private Tuple2<ResultSubpartitionView, TestingBufferAvailabilityListener>[] createSubpartitionViews(HsResultPartition partition, int numSubpartitions) throws Exception {
        Tuple2[] viewAndListeners = new Tuple2[numSubpartitions];
        for (int subpartition = 0; subpartition < numSubpartitions; ++subpartition) {
            TestingBufferAvailabilityListener listener = new TestingBufferAvailabilityListener();
            viewAndListeners[subpartition] = Tuple2.of((Object)partition.createSubpartitionView(subpartition, (BufferAvailabilityListener)listener), (Object)listener);
        }
        return viewAndListeners;
    }

    private Tuple2<ResultSubpartitionView, TestingBufferAvailabilityListener>[] createMultipleConsumerView(HsResultPartition partition, int subpartitionId, int numConsumers) throws Exception {
        Tuple2[] viewAndListeners = new Tuple2[numConsumers];
        for (int consumer = 0; consumer < numConsumers; ++consumer) {
            TestingBufferAvailabilityListener listener = new TestingBufferAvailabilityListener();
            viewAndListeners[consumer] = Tuple2.of((Object)partition.createSubpartitionView(subpartitionId, (BufferAvailabilityListener)listener), (Object)listener);
        }
        return viewAndListeners;
    }

    private static final class TestingBufferAvailabilityListener
    implements BufferAvailabilityListener {
        private int numNotifications;

        private TestingBufferAvailabilityListener() {
        }

        public synchronized void notifyDataAvailable() {
            if (this.numNotifications == 0) {
                this.notifyAll();
            }
            ++this.numNotifications;
        }

        public synchronized void waitForData() throws InterruptedException {
            if (this.numNotifications == 0) {
                this.wait();
            }
            this.numNotifications = 0;
        }
    }
}

