/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.network.netty;

import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.Nullable;
import org.apache.flink.runtime.io.disk.FileChannelManager;
import org.apache.flink.runtime.io.disk.FileChannelManagerImpl;
import org.apache.flink.runtime.io.disk.NoOpFileChannelManager;
import org.apache.flink.runtime.io.network.NettyShuffleEnvironment;
import org.apache.flink.runtime.io.network.NettyShuffleEnvironmentBuilder;
import org.apache.flink.runtime.io.network.NetworkSequenceViewReader;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils;
import org.apache.flink.runtime.io.network.netty.CreditBasedSequenceNumberingViewReader;
import org.apache.flink.runtime.io.network.netty.NettyMessage;
import org.apache.flink.runtime.io.network.netty.PartitionRequestQueue;
import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
import org.apache.flink.runtime.io.network.partition.NoOpBufferAvailablityListener;
import org.apache.flink.runtime.io.network.partition.NoOpResultSubpartitionView;
import org.apache.flink.runtime.io.network.partition.PartitionNotFoundException;
import org.apache.flink.runtime.io.network.partition.PartitionRequestListener;
import org.apache.flink.runtime.io.network.partition.PartitionRequestListenerManager;
import org.apache.flink.runtime.io.network.partition.PartitionTestUtils;
import org.apache.flink.runtime.io.network.partition.PipelinedSubpartition;
import org.apache.flink.runtime.io.network.partition.PipelinedSubpartitionTest;
import org.apache.flink.runtime.io.network.partition.PipelinedSubpartitionView;
import org.apache.flink.runtime.io.network.partition.ResultPartition;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
import org.apache.flink.runtime.io.network.partition.ResultPartitionProvider;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.io.network.partition.ResultSubpartition;
import org.apache.flink.runtime.io.network.partition.ResultSubpartitionIndexSet;
import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
import org.apache.flink.runtime.io.network.partition.TestingResultPartition;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannelID;
import org.apache.flink.runtime.io.network.util.TestBufferFactory;
import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
import org.apache.flink.shaded.netty4.io.netty.channel.embedded.EmbeddedChannel;
import org.assertj.core.api.AbstractThrowableAssert;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.ObjectAssert;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

class PartitionRequestQueueTest {
    private static final int BUFFER_SIZE = 0x100000;
    private static FileChannelManager fileChannelManager;

    PartitionRequestQueueTest() {
    }

    @BeforeAll
    static void setUp(@TempDir File temporaryFolder) {
        fileChannelManager = new FileChannelManagerImpl(new String[]{temporaryFolder.getAbsolutePath()}, "testing");
    }

    @AfterAll
    static void shutdown() throws Exception {
        fileChannelManager.close();
    }

    @Test
    public void testNotifyReaderPartitionTimeout() throws Exception {
        PartitionRequestQueue queue = new PartitionRequestQueue();
        EmbeddedChannel channel = new EmbeddedChannel(new ChannelHandler[]{queue});
        ResultPartitionManager resultPartitionManager = new ResultPartitionManager();
        ResultPartitionID resultPartitionId = new ResultPartitionID();
        CreditBasedSequenceNumberingViewReader reader = new CreditBasedSequenceNumberingViewReader(new InputChannelID(0L, 0L), 10, queue);
        reader.requestSubpartitionViewOrRegisterListener((ResultPartitionProvider)resultPartitionManager, resultPartitionId, new ResultSubpartitionIndexSet(0));
        Assertions.assertThat((Collection)((PartitionRequestListenerManager)resultPartitionManager.getListenerManagers().get(resultPartitionId)).getPartitionRequestListeners()).hasSize(1);
        reader.notifyPartitionRequestTimeout((PartitionRequestListener)((PartitionRequestListenerManager)resultPartitionManager.getListenerManagers().get(resultPartitionId)).getPartitionRequestListeners().iterator().next());
        channel.runPendingTasks();
        Object read = channel.readOutbound();
        ((ObjectAssert)((ObjectAssert)Assertions.assertThat((Object)read).isNotNull()).isInstanceOf(NettyMessage.ErrorResponse.class)).isInstanceOfSatisfying(NettyMessage.ErrorResponse.class, r -> {
            AbstractThrowableAssert cfr_ignored_0 = (AbstractThrowableAssert)Assertions.assertThat((Throwable)r.cause).isInstanceOf(PartitionNotFoundException.class);
        });
    }

    @Test
    void testNotifyReaderNonEmptyOnEmptyReaders() throws Exception {
        int buffersToWrite = 5;
        PartitionRequestQueue queue = new PartitionRequestQueue();
        EmbeddedChannel channel = new EmbeddedChannel(new ChannelHandler[]{queue});
        CreditBasedSequenceNumberingViewReader reader1 = new CreditBasedSequenceNumberingViewReader(new InputChannelID(0L, 0L), 10, queue);
        CreditBasedSequenceNumberingViewReader reader2 = new CreditBasedSequenceNumberingViewReader(new InputChannelID(1L, 1L), 10, queue);
        EmptyAlwaysAvailableResultSubpartitionView view1 = new EmptyAlwaysAvailableResultSubpartitionView();
        reader1.notifySubpartitionsCreated((ResultPartition)TestingResultPartition.newBuilder().setCreateSubpartitionViewFunction((arg_0, arg_1) -> PartitionRequestQueueTest.lambda$testNotifyReaderNonEmptyOnEmptyReaders$1((ResultSubpartitionView)view1, arg_0, arg_1)).build(), new ResultSubpartitionIndexSet(0));
        reader1.notifyDataAvailable((ResultSubpartitionView)view1);
        Assertions.assertThat((boolean)reader1.getAvailabilityAndBacklog().isAvailable()).isTrue();
        Assertions.assertThat((boolean)reader1.isRegisteredAsAvailable()).isFalse();
        channel.unsafe().outboundBuffer().setUserDefinedWritability(1, false);
        Assertions.assertThat((boolean)channel.isWritable()).isFalse();
        reader1.notifyDataAvailable((ResultSubpartitionView)view1);
        channel.runPendingTasks();
        DefaultBufferResultSubpartitionView view2 = new DefaultBufferResultSubpartitionView(5);
        reader2.notifyDataAvailable((ResultSubpartitionView)view2);
        reader2.notifySubpartitionsCreated((ResultPartition)TestingResultPartition.newBuilder().setCreateSubpartitionViewFunction((arg_0, arg_1) -> PartitionRequestQueueTest.lambda$testNotifyReaderNonEmptyOnEmptyReaders$2((ResultSubpartitionView)view2, arg_0, arg_1)).build(), new ResultSubpartitionIndexSet(0));
        Assertions.assertThat((boolean)reader2.getAvailabilityAndBacklog().isAvailable()).isTrue();
        Assertions.assertThat((boolean)reader2.isRegisteredAsAvailable()).isFalse();
        reader2.notifyDataAvailable((ResultSubpartitionView)view2);
        channel.unsafe().outboundBuffer().setUserDefinedWritability(1, true);
        channel.runPendingTasks();
        Assertions.assertThat((Collection)channel.outboundMessages()).hasSize(5);
    }

    @Test
    void testDefaultBufferWriting() throws Exception {
        this.testBufferWriting((ResultSubpartitionView)new DefaultBufferResultSubpartitionView(1));
    }

    @Test
    void testReadOnlyBufferWriting() throws Exception {
        this.testBufferWriting((ResultSubpartitionView)new ReadOnlyBufferResultSubpartitionView(1));
    }

    private void testBufferWriting(ResultSubpartitionView view) throws IOException {
        TestingResultPartition partition = TestingResultPartition.newBuilder().setCreateSubpartitionViewFunction((index, listener) -> view).build();
        InputChannelID receiverId = new InputChannelID();
        PartitionRequestQueue queue = new PartitionRequestQueue();
        CreditBasedSequenceNumberingViewReader reader = new CreditBasedSequenceNumberingViewReader(receiverId, Integer.MAX_VALUE, queue);
        EmbeddedChannel channel = new EmbeddedChannel(new ChannelHandler[]{queue});
        reader.notifySubpartitionsCreated((ResultPartition)partition, new ResultSubpartitionIndexSet(0));
        reader.notifyDataAvailable(view);
        channel.runPendingTasks();
        Object read = channel.readOutbound();
        Assertions.assertThat((Object)read).isNotNull();
        if (read instanceof NettyMessage.ErrorResponse) {
            ((NettyMessage.ErrorResponse)read).cause.printStackTrace();
        }
        Assertions.assertThat((Object)read).isInstanceOf(NettyMessage.BufferResponse.class);
        read = channel.readOutbound();
        Assertions.assertThat((Object)read).isNull();
    }

    @Test
    void testEnqueueReaderByNotifyingEventBuffer() throws Exception {
        NextIsEventResultSubpartitionView view = new NextIsEventResultSubpartitionView();
        TestingResultPartition partition = TestingResultPartition.newBuilder().setCreateSubpartitionViewFunction((arg_0, arg_1) -> PartitionRequestQueueTest.lambda$testEnqueueReaderByNotifyingEventBuffer$4((ResultSubpartitionView)view, arg_0, arg_1)).build();
        InputChannelID receiverId = new InputChannelID();
        PartitionRequestQueue queue = new PartitionRequestQueue();
        CreditBasedSequenceNumberingViewReader reader = new CreditBasedSequenceNumberingViewReader(receiverId, 0, queue);
        EmbeddedChannel channel = new EmbeddedChannel(new ChannelHandler[]{queue});
        reader.notifySubpartitionsCreated((ResultPartition)partition, new ResultSubpartitionIndexSet(0));
        ByteBuf channelBlockingBuffer = PartitionRequestQueueTest.blockChannel(channel);
        Assertions.assertThat((Object)channel.readOutbound()).isNull();
        reader.notifyDataAvailable((ResultSubpartitionView)view);
        channel.runPendingTasks();
        Assertions.assertThat((Collection)queue.getAvailableReaders()).contains((Object[])new NetworkSequenceViewReader[]{reader});
        Assertions.assertThat((int)reader.getNumCreditsAvailable()).isZero();
        channel.flush();
        Assertions.assertThat((Comparable)((ByteBuf)channel.readOutbound())).isSameAs((Object)channelBlockingBuffer);
        Assertions.assertThat((Collection)queue.getAvailableReaders()).isEmpty();
        Assertions.assertThat((int)reader.getNumCreditsAvailable()).isZero();
        Assertions.assertThat((Object)channel.readOutbound()).isNull();
    }

    @Test
    void testEnqueueReaderByNotifyingBufferAndCredit() throws Exception {
        int i;
        DefaultBufferResultSubpartitionView view = new DefaultBufferResultSubpartitionView(10);
        TestingResultPartition partition = TestingResultPartition.newBuilder().setCreateSubpartitionViewFunction((arg_0, arg_1) -> PartitionRequestQueueTest.lambda$testEnqueueReaderByNotifyingBufferAndCredit$5((ResultSubpartitionView)view, arg_0, arg_1)).build();
        InputChannelID receiverId = new InputChannelID();
        PartitionRequestQueue queue = new PartitionRequestQueue();
        CreditBasedSequenceNumberingViewReader reader = new CreditBasedSequenceNumberingViewReader(receiverId, 2, queue);
        EmbeddedChannel channel = new EmbeddedChannel(new ChannelHandler[]{queue});
        reader.addCredit(-2);
        reader.notifySubpartitionsCreated((ResultPartition)partition, new ResultSubpartitionIndexSet(0));
        queue.notifyReaderCreated((NetworkSequenceViewReader)reader);
        ByteBuf channelBlockingBuffer = PartitionRequestQueueTest.blockChannel(channel);
        Assertions.assertThat((Object)channel.readOutbound()).isNull();
        int notifyNumBuffers = 5;
        for (int i2 = 0; i2 < 5; ++i2) {
            reader.notifyDataAvailable((ResultSubpartitionView)view);
        }
        channel.runPendingTasks();
        Assertions.assertThat((Collection)queue.getAvailableReaders()).isEmpty();
        Assertions.assertThat((boolean)reader.hasBuffersAvailable().isAvailable()).isTrue();
        Assertions.assertThat((boolean)reader.isRegisteredAsAvailable()).isFalse();
        Assertions.assertThat((int)reader.getNumCreditsAvailable()).isZero();
        int notifyNumCredits = 3;
        for (i = 1; i <= 3; ++i) {
            queue.addCreditOrResumeConsumption(receiverId, viewReader -> viewReader.addCredit(1));
            Assertions.assertThat((boolean)reader.isRegisteredAsAvailable()).isTrue();
            Assertions.assertThat((Collection)queue.getAvailableReaders()).contains((Object[])new NetworkSequenceViewReader[]{reader});
            Assertions.assertThat((int)reader.getNumCreditsAvailable()).isEqualTo(i);
            Assertions.assertThat((boolean)reader.hasBuffersAvailable().isAvailable()).isTrue();
        }
        channel.flush();
        Assertions.assertThat((Comparable)((ByteBuf)channel.readOutbound())).isSameAs((Object)channelBlockingBuffer);
        Assertions.assertThat((Collection)queue.getAvailableReaders()).isEmpty();
        Assertions.assertThat((int)reader.getNumCreditsAvailable()).isZero();
        Assertions.assertThat((boolean)reader.hasBuffersAvailable().isAvailable()).isTrue();
        Assertions.assertThat((boolean)reader.isRegisteredAsAvailable()).isFalse();
        for (i = 1; i <= 3; ++i) {
            Assertions.assertThat((Object)channel.readOutbound()).isInstanceOf(NettyMessage.BufferResponse.class);
        }
        Assertions.assertThat((Object)channel.readOutbound()).isNull();
    }

    @Test
    void testEnqueueReaderByResumingConsumption() throws Exception {
        PipelinedSubpartition subpartition = PipelinedSubpartitionTest.createPipelinedSubpartition();
        Buffer.DataType dataType1 = Buffer.DataType.ALIGNED_CHECKPOINT_BARRIER;
        Buffer.DataType dataType2 = Buffer.DataType.DATA_BUFFER;
        subpartition.add(BufferBuilderTestUtils.createEventBufferConsumer(4096, dataType1));
        subpartition.add(BufferBuilderTestUtils.createEventBufferConsumer(4096, dataType2));
        NoOpBufferAvailablityListener bufferAvailabilityListener = new NoOpBufferAvailablityListener();
        PipelinedSubpartitionView view = subpartition.createReadView((BufferAvailabilityListener)bufferAvailabilityListener);
        TestingResultPartition partition = TestingResultPartition.newBuilder().setCreateSubpartitionViewFunction((index, listener) -> view).build();
        InputChannelID receiverId = new InputChannelID();
        PartitionRequestQueue queue = new PartitionRequestQueue();
        CreditBasedSequenceNumberingViewReader reader = new CreditBasedSequenceNumberingViewReader(receiverId, 2, queue);
        EmbeddedChannel channel = new EmbeddedChannel(new ChannelHandler[]{queue});
        reader.notifySubpartitionsCreated((ResultPartition)partition, new ResultSubpartitionIndexSet(0));
        queue.notifyReaderCreated((NetworkSequenceViewReader)reader);
        Assertions.assertThat((boolean)reader.getAvailabilityAndBacklog().isAvailable()).isTrue();
        reader.notifyDataAvailable((ResultSubpartitionView)view);
        channel.runPendingTasks();
        Assertions.assertThat((boolean)reader.getAvailabilityAndBacklog().isAvailable()).isFalse();
        Assertions.assertThat((int)subpartition.unsynchronizedGetNumberOfQueuedBuffers()).isOne();
        queue.addCreditOrResumeConsumption(receiverId, NetworkSequenceViewReader::resumeConsumption);
        Assertions.assertThat((boolean)reader.getAvailabilityAndBacklog().isAvailable()).isFalse();
        Assertions.assertThat((int)subpartition.unsynchronizedGetNumberOfQueuedBuffers()).isZero();
        Object data1 = channel.readOutbound();
        Assertions.assertThat((Comparable)((NettyMessage.BufferResponse)data1).buffer.getDataType()).isEqualTo((Object)dataType1);
        Object data2 = channel.readOutbound();
        Assertions.assertThat((Comparable)((NettyMessage.BufferResponse)data2).buffer.getDataType()).isEqualTo((Object)dataType2);
    }

    @Test
    void testAnnounceBacklog() throws Exception {
        PipelinedSubpartition subpartition = PipelinedSubpartitionTest.createPipelinedSubpartition();
        subpartition.add(BufferBuilderTestUtils.createEventBufferConsumer(4096, Buffer.DataType.DATA_BUFFER));
        subpartition.add(BufferBuilderTestUtils.createEventBufferConsumer(4096, Buffer.DataType.DATA_BUFFER));
        PipelinedSubpartitionView view = subpartition.createReadView((BufferAvailabilityListener)new NoOpBufferAvailablityListener());
        TestingResultPartition partition = TestingResultPartition.newBuilder().setCreateSubpartitionViewFunction((index, listener) -> view).build();
        PartitionRequestQueue queue = new PartitionRequestQueue();
        InputChannelID receiverId = new InputChannelID();
        CreditBasedSequenceNumberingViewReader reader = new CreditBasedSequenceNumberingViewReader(receiverId, 0, queue);
        EmbeddedChannel channel = new EmbeddedChannel(new ChannelHandler[]{queue});
        reader.notifySubpartitionsCreated((ResultPartition)partition, new ResultSubpartitionIndexSet(0));
        queue.notifyReaderCreated((NetworkSequenceViewReader)reader);
        reader.notifyDataAvailable((ResultSubpartitionView)view);
        channel.runPendingTasks();
        Object data = channel.readOutbound();
        Assertions.assertThat((Object)data).isInstanceOf(NettyMessage.BacklogAnnouncement.class);
        NettyMessage.BacklogAnnouncement announcement = (NettyMessage.BacklogAnnouncement)data;
        Assertions.assertThat((Comparable)announcement.receiverId).isEqualTo((Object)receiverId);
        Assertions.assertThat((int)announcement.backlog).isEqualTo(subpartition.getBuffersInBacklogUnsafe());
        subpartition.release();
        reader.notifyDataAvailable((ResultSubpartitionView)view);
        channel.runPendingTasks();
        Assertions.assertThat((Object)channel.readOutbound()).isNotNull();
    }

    @Test
    void testCancelPartitionRequestForUnavailableView() throws Exception {
        this.testCancelPartitionRequest(false);
    }

    @Test
    void testCancelPartitionRequestForAvailableView() throws Exception {
        this.testCancelPartitionRequest(true);
    }

    private void testCancelPartitionRequest(boolean isAvailableView) throws Exception {
        ResultPartitionManager partitionManager = new ResultPartitionManager();
        ResultPartition partition = PartitionRequestQueueTest.createFinishedPartitionWithFilledData(partitionManager);
        InputChannelID receiverId = new InputChannelID();
        PartitionRequestQueue queue = new PartitionRequestQueue();
        CreditBasedSequenceNumberingViewReader reader = new CreditBasedSequenceNumberingViewReader(receiverId, 2, queue);
        EmbeddedChannel channel = new EmbeddedChannel(new ChannelHandler[]{queue});
        reader.notifySubpartitionsCreated(partition, new ResultSubpartitionIndexSet(0));
        queue.notifyReaderCreated((NetworkSequenceViewReader)reader);
        PartitionRequestQueueTest.blockChannel(channel);
        if (isAvailableView) {
            queue.addCreditOrResumeConsumption(receiverId, viewReader -> viewReader.addCredit(1));
            Assertions.assertThat((Collection)queue.getAvailableReaders()).contains((Object[])new NetworkSequenceViewReader[]{reader});
        }
        queue.cancel(receiverId);
        channel.runPendingTasks();
        Assertions.assertThat((Collection)queue.getAvailableReaders()).doesNotContain((Object[])new NetworkSequenceViewReader[]{reader});
        Assertions.assertThat((boolean)reader.isReleased()).isTrue();
        partition.release();
        channel.close();
    }

    @Test
    void testNotifyNewBufferSize() throws Exception {
        ResultPartition parent = PartitionRequestQueueTest.createResultPartition();
        NoOpBufferAvailablityListener bufferAvailabilityListener = new NoOpBufferAvailablityListener();
        ResultSubpartitionView view = parent.createSubpartitionView(new ResultSubpartitionIndexSet(0), (BufferAvailabilityListener)bufferAvailabilityListener);
        TestingResultPartition partition = TestingResultPartition.newBuilder().setCreateSubpartitionViewFunction((index, listener) -> view).build();
        InputChannelID receiverId = new InputChannelID();
        PartitionRequestQueue queue = new PartitionRequestQueue();
        CreditBasedSequenceNumberingViewReader reader = new CreditBasedSequenceNumberingViewReader(receiverId, 2, queue);
        EmbeddedChannel channel = new EmbeddedChannel(new ChannelHandler[]{queue});
        reader.notifySubpartitionsCreated((ResultPartition)partition, new ResultSubpartitionIndexSet(0));
        queue.notifyReaderCreated((NetworkSequenceViewReader)reader);
        queue.notifyNewBufferSize(receiverId, 65);
        parent.emitRecord(ByteBuffer.allocate(128), 0);
        parent.emitRecord(ByteBuffer.allocate(10), 0);
        parent.emitRecord(ByteBuffer.allocate(60), 0);
        reader.notifyDataAvailable(view);
        channel.runPendingTasks();
        Object data1 = channel.readOutbound();
        Assertions.assertThat((int)((NettyMessage.BufferResponse)data1).buffer.getSize()).isEqualTo(128);
        Object data2 = channel.readOutbound();
        Assertions.assertThat((int)((NettyMessage.BufferResponse)data2).buffer.getSize()).isEqualTo(65);
    }

    private static ResultPartition createResultPartition() throws IOException {
        NettyShuffleEnvironment network = new NettyShuffleEnvironmentBuilder().setNumNetworkBuffers(10).setBufferSize(0x100000).build();
        ResultPartition resultPartition = PartitionTestUtils.createPartition(network, (FileChannelManager)NoOpFileChannelManager.INSTANCE, ResultPartitionType.PIPELINED, 2);
        resultPartition.setup();
        return resultPartition;
    }

    private static ResultPartition createFinishedPartitionWithFilledData(ResultPartitionManager partitionManager) throws Exception {
        NettyShuffleEnvironment environment = new NettyShuffleEnvironmentBuilder().setResultPartitionManager(partitionManager).build();
        ResultPartition partition = PartitionTestUtils.createPartition(environment, fileChannelManager, ResultPartitionType.BLOCKING, 1);
        partition.setup();
        partition.emitRecord(ByteBuffer.allocate(0x100000), 0);
        partition.finish();
        return partition;
    }

    static ByteBuf blockChannel(EmbeddedChannel channel) {
        int highWaterMark = channel.config().getWriteBufferHighWaterMark();
        ByteBuf channelBlockingBuffer = Unpooled.buffer((int)highWaterMark).writerIndex(highWaterMark);
        channel.write((Object)channelBlockingBuffer);
        Assertions.assertThat((boolean)channel.isWritable()).isFalse();
        return channelBlockingBuffer;
    }

    private static /* synthetic */ ResultSubpartitionView lambda$testEnqueueReaderByNotifyingBufferAndCredit$5(ResultSubpartitionView view, int index, BufferAvailabilityListener listener) throws IOException {
        return view;
    }

    private static /* synthetic */ ResultSubpartitionView lambda$testEnqueueReaderByNotifyingEventBuffer$4(ResultSubpartitionView view, int index, BufferAvailabilityListener listener) throws IOException {
        return view;
    }

    private static /* synthetic */ ResultSubpartitionView lambda$testNotifyReaderNonEmptyOnEmptyReaders$2(ResultSubpartitionView view2, int index, BufferAvailabilityListener listener) throws IOException {
        return view2;
    }

    private static /* synthetic */ ResultSubpartitionView lambda$testNotifyReaderNonEmptyOnEmptyReaders$1(ResultSubpartitionView view1, int index, BufferAvailabilityListener listener) throws IOException {
        return view1;
    }

    private static class NextIsEventResultSubpartitionView
    extends NoOpResultSubpartitionView {
        private NextIsEventResultSubpartitionView() {
        }

        public ResultSubpartitionView.AvailabilityWithBacklog getAvailabilityAndBacklog(boolean isCreditAvailable) {
            return new ResultSubpartitionView.AvailabilityWithBacklog(true, 0);
        }
    }

    private static class EmptyAlwaysAvailableResultSubpartitionView
    extends NoOpResultSubpartitionView {
        private EmptyAlwaysAvailableResultSubpartitionView() {
        }

        public boolean isReleased() {
            return false;
        }

        public ResultSubpartitionView.AvailabilityWithBacklog getAvailabilityAndBacklog(boolean isCreditAvailable) {
            return new ResultSubpartitionView.AvailabilityWithBacklog(true, 0);
        }
    }

    private static class ReadOnlyBufferResultSubpartitionView
    extends DefaultBufferResultSubpartitionView {
        private ReadOnlyBufferResultSubpartitionView(int buffersInBacklog) {
            super(buffersInBacklog);
        }

        @Override
        @Nullable
        public ResultSubpartition.BufferAndBacklog getNextBuffer() {
            ResultSubpartition.BufferAndBacklog nextBuffer = super.getNextBuffer();
            return new ResultSubpartition.BufferAndBacklog(nextBuffer.buffer().readOnlySlice(), nextBuffer.buffersInBacklog(), nextBuffer.getNextDataType(), 0);
        }
    }

    private static class DefaultBufferResultSubpartitionView
    extends NoOpResultSubpartitionView {
        private final AtomicInteger buffersInBacklog;

        private DefaultBufferResultSubpartitionView(int buffersInBacklog) {
            this.buffersInBacklog = new AtomicInteger(buffersInBacklog);
        }

        @Nullable
        public ResultSubpartition.BufferAndBacklog getNextBuffer() {
            int buffers = this.buffersInBacklog.decrementAndGet();
            return new ResultSubpartition.BufferAndBacklog(TestBufferFactory.createBuffer(10), buffers, buffers > 0 ? Buffer.DataType.DATA_BUFFER : Buffer.DataType.NONE, 0);
        }

        public ResultSubpartitionView.AvailabilityWithBacklog getAvailabilityAndBacklog(boolean isCreditAvailable) {
            int numBuffers = this.buffersInBacklog.get();
            return new ResultSubpartitionView.AvailabilityWithBacklog(isCreditAvailable && numBuffers > 0, numBuffers);
        }
    }
}

