/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.network.netty;

import java.io.IOException;
import java.net.InetSocketAddress;
import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
import org.apache.flink.core.memory.MemorySegmentProvider;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.SimpleCounter;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.io.network.ConnectionID;
import org.apache.flink.runtime.io.network.ConnectionManager;
import org.apache.flink.runtime.io.network.NetworkClientHandler;
import org.apache.flink.runtime.io.network.PartitionRequestClient;
import org.apache.flink.runtime.io.network.TestingConnectionManager;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferCompressor;
import org.apache.flink.runtime.io.network.buffer.BufferDecompressor;
import org.apache.flink.runtime.io.network.buffer.BufferListener;
import org.apache.flink.runtime.io.network.buffer.BufferPool;
import org.apache.flink.runtime.io.network.buffer.BufferProvider;
import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
import org.apache.flink.runtime.io.network.netty.CreditBasedPartitionRequestClientHandler;
import org.apache.flink.runtime.io.network.netty.NettyMessage;
import org.apache.flink.runtime.io.network.netty.NettyPartitionRequestClient;
import org.apache.flink.runtime.io.network.netty.NetworkBufferAllocator;
import org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory;
import org.apache.flink.runtime.io.network.netty.PartitionRequestQueueTest;
import org.apache.flink.runtime.io.network.netty.exception.LocalTransportException;
import org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException;
import org.apache.flink.runtime.io.network.netty.exception.TransportException;
import org.apache.flink.runtime.io.network.partition.InputChannelTestUtils;
import org.apache.flink.runtime.io.network.partition.PartitionNotFoundException;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.ResultSubpartitionIndexSet;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannelBuilder;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannelID;
import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGateBuilder;
import org.apache.flink.runtime.io.network.util.TestBufferFactory;
import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBufAllocator;
import org.apache.flink.shaded.netty4.io.netty.buffer.UnpooledByteBufAllocator;
import org.apache.flink.shaded.netty4.io.netty.channel.Channel;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter;
import org.apache.flink.shaded.netty4.io.netty.channel.embedded.EmbeddedChannel;
import org.apache.flink.shaded.netty4.io.netty.channel.epoll.Epoll;
import org.apache.flink.shaded.netty4.io.netty.channel.unix.Errors;
import org.assertj.core.api.AbstractIntegerAssert;
import org.assertj.core.api.AbstractThrowableAssert;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.Assumptions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.mockito.verification.VerificationMode;

class CreditBasedPartitionRequestClientHandlerTest {
    CreditBasedPartitionRequestClientHandlerTest() {
    }

    @Test
    @Timeout(value=60L)
    void testReleaseInputChannelDuringDecode() throws Exception {
        BufferProvider bufferProvider = (BufferProvider)Mockito.mock(BufferProvider.class);
        Mockito.when((Object)bufferProvider.requestBuffer()).thenReturn(null);
        Mockito.when((Object)bufferProvider.isDestroyed()).thenReturn((Object)true);
        Mockito.when((Object)bufferProvider.addBufferListener((BufferListener)ArgumentMatchers.any(BufferListener.class))).thenReturn((Object)false);
        RemoteInputChannel inputChannel = (RemoteInputChannel)Mockito.mock(RemoteInputChannel.class);
        Mockito.when((Object)inputChannel.getInputChannelId()).thenReturn((Object)new InputChannelID());
        Mockito.when((Object)inputChannel.getBufferProvider()).thenReturn((Object)bufferProvider);
        CreditBasedPartitionRequestClientHandler client = new CreditBasedPartitionRequestClientHandler();
        client.addInputChannel(inputChannel);
        NettyMessage.BufferResponse receivedBuffer = CreditBasedPartitionRequestClientHandlerTest.createBufferResponse(TestBufferFactory.createBuffer(32768), 0, inputChannel.getInputChannelId(), 2, new NetworkBufferAllocator((NetworkClientHandler)client));
        client.channelRead((ChannelHandlerContext)Mockito.mock(ChannelHandlerContext.class), (Object)receivedBuffer);
    }

    @Test
    void testReceiveEmptyBuffer() throws Exception {
        BufferProvider bufferProvider = (BufferProvider)Mockito.mock(BufferProvider.class);
        Mockito.when((Object)bufferProvider.requestBuffer()).thenReturn((Object)TestBufferFactory.createBuffer(0));
        RemoteInputChannel inputChannel = (RemoteInputChannel)Mockito.mock(RemoteInputChannel.class);
        Mockito.when((Object)inputChannel.getInputChannelId()).thenReturn((Object)new InputChannelID());
        Mockito.when((Object)inputChannel.getBufferProvider()).thenReturn((Object)bufferProvider);
        Buffer emptyBuffer = TestBufferFactory.createBuffer(0);
        CreditBasedPartitionRequestClientHandler client = new CreditBasedPartitionRequestClientHandler();
        client.addInputChannel(inputChannel);
        int backlog = 2;
        NettyMessage.BufferResponse receivedBuffer = CreditBasedPartitionRequestClientHandlerTest.createBufferResponse(emptyBuffer, 0, inputChannel.getInputChannelId(), 2, new NetworkBufferAllocator((NetworkClientHandler)client));
        client.channelRead((ChannelHandlerContext)Mockito.mock(ChannelHandlerContext.class), (Object)receivedBuffer);
        ((RemoteInputChannel)Mockito.verify((Object)inputChannel, (VerificationMode)Mockito.never())).onError((Throwable)ArgumentMatchers.any(Throwable.class));
        ((RemoteInputChannel)Mockito.verify((Object)inputChannel, (VerificationMode)Mockito.times((int)1))).onEmptyBuffer(0, 2);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    void testReceiveBuffer() throws Exception {
        NetworkBufferPool networkBufferPool = new NetworkBufferPool(10, 32);
        SingleInputGate inputGate = InputChannelTestUtils.createSingleInputGate(1, (MemorySegmentProvider)networkBufferPool);
        RemoteInputChannel inputChannel = InputChannelBuilder.newBuilder().buildRemoteChannel(inputGate);
        try {
            inputGate.setInputChannels(new InputChannel[]{inputChannel});
            BufferPool bufferPool = networkBufferPool.createBufferPool(8, 8);
            inputGate.setBufferPool(bufferPool);
            inputGate.setupChannels();
            CreditBasedPartitionRequestClientHandler handler = new CreditBasedPartitionRequestClientHandler();
            handler.addInputChannel(inputChannel);
            int backlog = 2;
            NettyMessage.BufferResponse bufferResponse = CreditBasedPartitionRequestClientHandlerTest.createBufferResponse(TestBufferFactory.createBuffer(32), 0, inputChannel.getInputChannelId(), 2, new NetworkBufferAllocator((NetworkClientHandler)handler));
            handler.channelRead((ChannelHandlerContext)Mockito.mock(ChannelHandlerContext.class), (Object)bufferResponse);
            Assertions.assertThat((int)inputChannel.getNumberOfQueuedBuffers()).isOne();
            Assertions.assertThat((int)inputChannel.getSenderBacklog()).isEqualTo(2);
        }
        finally {
            CreditBasedPartitionRequestClientHandlerTest.releaseResource(inputGate, networkBufferPool);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @ParameterizedTest
    @ValueSource(strings={"LZ4", "LZO", "ZSTD"})
    void testReceiveCompressedBuffer(String compressionCodec) throws Exception {
        int bufferSize = 1024;
        BufferCompressor compressor = new BufferCompressor(bufferSize, NettyShuffleEnvironmentOptions.CompressionCodec.valueOf((String)compressionCodec));
        BufferDecompressor decompressor = new BufferDecompressor(bufferSize, NettyShuffleEnvironmentOptions.CompressionCodec.valueOf((String)compressionCodec));
        NetworkBufferPool networkBufferPool = new NetworkBufferPool(10, bufferSize);
        SingleInputGate inputGate = new SingleInputGateBuilder().setBufferDecompressor(decompressor).setSegmentProvider((MemorySegmentProvider)networkBufferPool).build();
        RemoteInputChannel inputChannel = InputChannelTestUtils.createRemoteInputChannel(inputGate, null);
        inputGate.setInputChannels(new InputChannel[]{inputChannel});
        try {
            BufferPool bufferPool = networkBufferPool.createBufferPool(8, 8);
            inputGate.setBufferPool(bufferPool);
            inputGate.setupChannels();
            CreditBasedPartitionRequestClientHandler handler = new CreditBasedPartitionRequestClientHandler();
            handler.addInputChannel(inputChannel);
            Buffer buffer = compressor.compressToOriginalBuffer(TestBufferFactory.createBuffer(bufferSize));
            NettyMessage.BufferResponse bufferResponse = CreditBasedPartitionRequestClientHandlerTest.createBufferResponse(buffer, 0, inputChannel.getInputChannelId(), 2, new NetworkBufferAllocator((NetworkClientHandler)handler));
            Assertions.assertThat((boolean)bufferResponse.isCompressed).isTrue();
            handler.channelRead(null, (Object)bufferResponse);
            Buffer receivedBuffer = inputChannel.getNextReceivedBuffer();
            Assertions.assertThat((Object)receivedBuffer).isNotNull();
            Assertions.assertThat((boolean)receivedBuffer.isCompressed()).isTrue();
            receivedBuffer.recycleBuffer();
        }
        finally {
            CreditBasedPartitionRequestClientHandlerTest.releaseResource(inputGate, networkBufferPool);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    void testReceiveBacklogAnnouncement() throws Exception {
        int bufferSize = 1024;
        int numBuffers = 10;
        NetworkBufferPool networkBufferPool = new NetworkBufferPool(numBuffers, bufferSize);
        SingleInputGate inputGate = new SingleInputGateBuilder().setSegmentProvider((MemorySegmentProvider)networkBufferPool).build();
        RemoteInputChannel inputChannel = InputChannelTestUtils.createRemoteInputChannel(inputGate, null);
        inputGate.setInputChannels(new InputChannel[]{inputChannel});
        try {
            BufferPool bufferPool = networkBufferPool.createBufferPool(8, 8);
            inputGate.setBufferPool(bufferPool);
            inputGate.setupChannels();
            CreditBasedPartitionRequestClientHandler handler = new CreditBasedPartitionRequestClientHandler();
            handler.addInputChannel(inputChannel);
            Assertions.assertThat((int)inputChannel.getNumberOfAvailableBuffers()).isEqualTo(2);
            Assertions.assertThat((int)inputChannel.unsynchronizedGetFloatingBuffersAvailable()).isZero();
            int backlog = 5;
            NettyMessage.BacklogAnnouncement announcement = new NettyMessage.BacklogAnnouncement(backlog, inputChannel.getInputChannelId());
            handler.channelRead(null, (Object)announcement);
            Assertions.assertThat((int)inputChannel.getNumberOfAvailableBuffers()).isEqualTo(7);
            Assertions.assertThat((int)inputChannel.getNumberOfRequiredBuffers()).isEqualTo(7);
            Assertions.assertThat((int)inputChannel.getSenderBacklog()).isEqualTo(backlog);
            Assertions.assertThat((int)inputChannel.unsynchronizedGetFloatingBuffersAvailable()).isEqualTo(5);
            backlog = 12;
            announcement = new NettyMessage.BacklogAnnouncement(backlog, inputChannel.getInputChannelId());
            handler.channelRead(null, (Object)announcement);
            Assertions.assertThat((int)inputChannel.getNumberOfAvailableBuffers()).isEqualTo(10);
            Assertions.assertThat((int)inputChannel.getNumberOfRequiredBuffers()).isEqualTo(14);
            Assertions.assertThat((int)inputChannel.getSenderBacklog()).isEqualTo(backlog);
            Assertions.assertThat((int)inputChannel.unsynchronizedGetFloatingBuffersAvailable()).isEqualTo(8);
        }
        finally {
            CreditBasedPartitionRequestClientHandlerTest.releaseResource(inputGate, networkBufferPool);
        }
    }

    @Test
    void testThrowExceptionForNoAvailableBuffer() throws Exception {
        SingleInputGate inputGate = InputChannelTestUtils.createSingleInputGate(1);
        RemoteInputChannel inputChannel = (RemoteInputChannel)Mockito.spy((Object)InputChannelBuilder.newBuilder().buildRemoteChannel(inputGate));
        CreditBasedPartitionRequestClientHandler handler = new CreditBasedPartitionRequestClientHandler();
        handler.addInputChannel(inputChannel);
        ((AbstractIntegerAssert)Assertions.assertThat((int)inputChannel.getNumberOfAvailableBuffers()).as("There should be no buffers available in the channel.", new Object[0])).isZero();
        NettyMessage.BufferResponse bufferResponse = CreditBasedPartitionRequestClientHandlerTest.createBufferResponse(TestBufferFactory.createBuffer(32768), 0, inputChannel.getInputChannelId(), 2, new NetworkBufferAllocator((NetworkClientHandler)handler));
        Assertions.assertThat((Object)bufferResponse.getBuffer()).isNull();
        handler.channelRead((ChannelHandlerContext)Mockito.mock(ChannelHandlerContext.class), (Object)bufferResponse);
        ((RemoteInputChannel)Mockito.verify((Object)inputChannel, (VerificationMode)Mockito.times((int)1))).onError((Throwable)ArgumentMatchers.any(IllegalStateException.class));
    }

    @Test
    void testReceivePartitionNotFoundException() throws Exception {
        BufferProvider bufferProvider = (BufferProvider)Mockito.mock(BufferProvider.class);
        Mockito.when((Object)bufferProvider.requestBuffer()).thenReturn((Object)TestBufferFactory.createBuffer(0));
        RemoteInputChannel inputChannel = (RemoteInputChannel)Mockito.mock(RemoteInputChannel.class);
        Mockito.when((Object)inputChannel.getInputChannelId()).thenReturn((Object)new InputChannelID());
        Mockito.when((Object)inputChannel.getBufferProvider()).thenReturn((Object)bufferProvider);
        NettyMessage.ErrorResponse partitionNotFound = new NettyMessage.ErrorResponse((Throwable)new PartitionNotFoundException(new ResultPartitionID()), inputChannel.getInputChannelId());
        CreditBasedPartitionRequestClientHandler client = new CreditBasedPartitionRequestClientHandler();
        client.addInputChannel(inputChannel);
        ChannelHandlerContext ctx = (ChannelHandlerContext)Mockito.mock(ChannelHandlerContext.class);
        Mockito.when((Object)ctx.channel()).thenReturn((Object)((Channel)Mockito.mock(Channel.class)));
        client.channelActive(ctx);
        client.channelRead(ctx, (Object)partitionNotFound);
        ((RemoteInputChannel)Mockito.verify((Object)inputChannel, (VerificationMode)Mockito.times((int)1))).onFailedPartitionRequest();
    }

    @Test
    void testCancelBeforeActive() throws Exception {
        RemoteInputChannel inputChannel = (RemoteInputChannel)Mockito.mock(RemoteInputChannel.class);
        Mockito.when((Object)inputChannel.getInputChannelId()).thenReturn((Object)new InputChannelID());
        CreditBasedPartitionRequestClientHandler client = new CreditBasedPartitionRequestClientHandler();
        client.addInputChannel(inputChannel);
        client.cancelRequestFor(null);
        client.cancelRequestFor(inputChannel.getInputChannelId());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    void testNotifyCreditAvailable() throws Exception {
        CreditBasedPartitionRequestClientHandler handler = new CreditBasedPartitionRequestClientHandler();
        NetworkBufferAllocator allocator = new NetworkBufferAllocator((NetworkClientHandler)handler);
        EmbeddedChannel channel = new EmbeddedChannel(new ChannelHandler[]{handler});
        NettyPartitionRequestClient client = new NettyPartitionRequestClient((Channel)channel, (NetworkClientHandler)handler, (ConnectionID)Mockito.mock(ConnectionID.class), (PartitionRequestClientFactory)Mockito.mock(PartitionRequestClientFactory.class));
        NetworkBufferPool networkBufferPool = new NetworkBufferPool(10, 32);
        SingleInputGate inputGate = InputChannelTestUtils.createSingleInputGate(2, (MemorySegmentProvider)networkBufferPool);
        RemoteInputChannel[] inputChannels = new RemoteInputChannel[]{InputChannelTestUtils.createRemoteInputChannel(inputGate, (PartitionRequestClient)client), InputChannelTestUtils.createRemoteInputChannel(inputGate, (PartitionRequestClient)client)};
        try {
            inputGate.setInputChannels((InputChannel[])inputChannels);
            BufferPool bufferPool = networkBufferPool.createBufferPool(6, 6);
            inputGate.setBufferPool(bufferPool);
            inputGate.setupChannels();
            inputChannels[0].requestSubpartitions();
            inputChannels[1].requestSubpartitions();
            Assertions.assertThat((boolean)channel.isWritable()).isTrue();
            Object readFromOutbound = channel.readOutbound();
            Assertions.assertThat((Object)readFromOutbound).isInstanceOf(NettyMessage.PartitionRequest.class);
            Assertions.assertThat((Comparable)inputChannels[0].getInputChannelId()).isEqualTo((Object)((NettyMessage.PartitionRequest)readFromOutbound).receiverId);
            Assertions.assertThat((int)((NettyMessage.PartitionRequest)readFromOutbound).credit).isEqualTo(2);
            readFromOutbound = channel.readOutbound();
            Assertions.assertThat((Object)readFromOutbound).isInstanceOf(NettyMessage.PartitionRequest.class);
            Assertions.assertThat((Comparable)inputChannels[1].getInputChannelId()).isEqualTo((Object)((NettyMessage.PartitionRequest)readFromOutbound).receiverId);
            Assertions.assertThat((int)((NettyMessage.PartitionRequest)readFromOutbound).credit).isEqualTo(2);
            NettyMessage.BufferResponse bufferResponse1 = CreditBasedPartitionRequestClientHandlerTest.createBufferResponse(TestBufferFactory.createBuffer(32), 0, inputChannels[0].getInputChannelId(), 1, allocator);
            NettyMessage.BufferResponse bufferResponse2 = CreditBasedPartitionRequestClientHandlerTest.createBufferResponse(TestBufferFactory.createBuffer(32), 0, inputChannels[1].getInputChannelId(), 1, allocator);
            handler.channelRead((ChannelHandlerContext)Mockito.mock(ChannelHandlerContext.class), (Object)bufferResponse1);
            handler.channelRead((ChannelHandlerContext)Mockito.mock(ChannelHandlerContext.class), (Object)bufferResponse2);
            Assertions.assertThat((int)inputChannels[0].getUnannouncedCredit()).isEqualTo(2);
            Assertions.assertThat((int)inputChannels[1].getUnannouncedCredit()).isEqualTo(2);
            channel.runPendingTasks();
            readFromOutbound = channel.readOutbound();
            Assertions.assertThat((Object)readFromOutbound).isInstanceOf(NettyMessage.AddCredit.class);
            Assertions.assertThat((Comparable)inputChannels[0].getInputChannelId()).isEqualTo((Object)((NettyMessage.AddCredit)readFromOutbound).receiverId);
            Assertions.assertThat((int)((NettyMessage.AddCredit)readFromOutbound).credit).isEqualTo(2);
            readFromOutbound = channel.readOutbound();
            Assertions.assertThat((Object)readFromOutbound).isInstanceOf(NettyMessage.AddCredit.class);
            Assertions.assertThat((Comparable)inputChannels[1].getInputChannelId()).isEqualTo((Object)((NettyMessage.AddCredit)readFromOutbound).receiverId);
            Assertions.assertThat((int)((NettyMessage.AddCredit)readFromOutbound).credit).isEqualTo(2);
            Assertions.assertThat((Object)channel.readOutbound()).isNull();
            ByteBuf channelBlockingBuffer = PartitionRequestQueueTest.blockChannel(channel);
            NettyMessage.BufferResponse bufferResponse3 = CreditBasedPartitionRequestClientHandlerTest.createBufferResponse(TestBufferFactory.createBuffer(32), 1, inputChannels[0].getInputChannelId(), 1, allocator);
            handler.channelRead((ChannelHandlerContext)Mockito.mock(ChannelHandlerContext.class), (Object)bufferResponse3);
            Assertions.assertThat((int)inputChannels[0].getUnannouncedCredit()).isOne();
            Assertions.assertThat((int)inputChannels[1].getUnannouncedCredit()).isZero();
            channel.runPendingTasks();
            Assertions.assertThat((boolean)channel.isWritable()).isFalse();
            Assertions.assertThat((Object)channel.readOutbound()).isNull();
            channel.flush();
            Assertions.assertThat((Comparable)channelBlockingBuffer).isSameAs(channel.readOutbound());
            Assertions.assertThat((boolean)channel.isWritable()).isTrue();
            readFromOutbound = channel.readOutbound();
            Assertions.assertThat((Object)readFromOutbound).isInstanceOf(NettyMessage.AddCredit.class);
            Assertions.assertThat((int)((NettyMessage.AddCredit)readFromOutbound).credit).isOne();
            Assertions.assertThat((int)inputChannels[0].getUnannouncedCredit()).isZero();
            Assertions.assertThat((int)inputChannels[1].getUnannouncedCredit()).isZero();
            Assertions.assertThat((Object)channel.readOutbound()).isNull();
        }
        finally {
            CreditBasedPartitionRequestClientHandlerTest.releaseResource(inputGate, networkBufferPool);
            channel.close();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    void testNotifyCreditAvailableAfterReleased() throws Exception {
        CreditBasedPartitionRequestClientHandler handler = new CreditBasedPartitionRequestClientHandler();
        EmbeddedChannel channel = new EmbeddedChannel(new ChannelHandler[]{handler});
        NettyPartitionRequestClient client = new NettyPartitionRequestClient((Channel)channel, (NetworkClientHandler)handler, (ConnectionID)Mockito.mock(ConnectionID.class), (PartitionRequestClientFactory)Mockito.mock(PartitionRequestClientFactory.class));
        NetworkBufferPool networkBufferPool = new NetworkBufferPool(10, 32);
        SingleInputGate inputGate = InputChannelTestUtils.createSingleInputGate(1, (MemorySegmentProvider)networkBufferPool);
        RemoteInputChannel inputChannel = InputChannelTestUtils.createRemoteInputChannel(inputGate, (PartitionRequestClient)client);
        try {
            inputGate.setInputChannels(new InputChannel[]{inputChannel});
            BufferPool bufferPool = networkBufferPool.createBufferPool(6, 6);
            inputGate.setBufferPool(bufferPool);
            inputGate.setupChannels();
            inputChannel.requestSubpartitions();
            Object readFromOutbound = channel.readOutbound();
            Assertions.assertThat((Object)readFromOutbound).isInstanceOf(NettyMessage.PartitionRequest.class);
            Assertions.assertThat((int)((NettyMessage.PartitionRequest)readFromOutbound).credit).isEqualTo(2);
            NettyMessage.BufferResponse bufferResponse = CreditBasedPartitionRequestClientHandlerTest.createBufferResponse(TestBufferFactory.createBuffer(32), 0, inputChannel.getInputChannelId(), 1, new NetworkBufferAllocator((NetworkClientHandler)handler));
            handler.channelRead((ChannelHandlerContext)Mockito.mock(ChannelHandlerContext.class), (Object)bufferResponse);
            Assertions.assertThat((int)inputChannel.getUnannouncedCredit()).isEqualTo(2);
            inputGate.close();
            readFromOutbound = channel.readOutbound();
            Assertions.assertThat((Object)readFromOutbound).isInstanceOf(NettyMessage.CloseRequest.class);
            channel.runPendingTasks();
            Assertions.assertThat((Object)channel.readOutbound()).isNull();
        }
        finally {
            CreditBasedPartitionRequestClientHandlerTest.releaseResource(inputGate, networkBufferPool);
            channel.close();
        }
    }

    @Test
    void testReadBufferResponseBeforeReleasingChannel() throws Exception {
        this.testReadBufferResponseWithReleasingOrRemovingChannel(false, true);
    }

    @Test
    void testReadBufferResponseBeforeRemovingChannel() throws Exception {
        this.testReadBufferResponseWithReleasingOrRemovingChannel(true, true);
    }

    @Test
    void testReadBufferResponseAfterReleasingChannel() throws Exception {
        this.testReadBufferResponseWithReleasingOrRemovingChannel(false, false);
    }

    @Test
    void testReadBufferResponseAfterRemovingChannel() throws Exception {
        this.testReadBufferResponseWithReleasingOrRemovingChannel(true, false);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    void testDoNotFailHandlerOnSingleChannelFailure() throws Exception {
        int bufferSize = 1024;
        String expectedMessage = "test exception on buffer";
        NetworkBufferPool networkBufferPool = new NetworkBufferPool(10, 1024);
        SingleInputGate inputGate = InputChannelTestUtils.createSingleInputGate(1, (MemorySegmentProvider)networkBufferPool);
        TestRemoteInputChannelForError inputChannel = new TestRemoteInputChannelForError(inputGate, "test exception on buffer");
        CreditBasedPartitionRequestClientHandler handler = new CreditBasedPartitionRequestClientHandler();
        try {
            inputGate.setInputChannels(new InputChannel[]{inputChannel});
            inputGate.setup();
            inputGate.requestPartitions();
            handler.addInputChannel((RemoteInputChannel)inputChannel);
            NettyMessage.BufferResponse bufferResponse = CreditBasedPartitionRequestClientHandlerTest.createBufferResponse(TestBufferFactory.createBuffer(1024), 0, inputChannel.getInputChannelId(), 1, new NetworkBufferAllocator((NetworkClientHandler)handler));
            handler.channelRead(null, (Object)bufferResponse);
            handler.checkError();
            ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> ((SingleInputGate)inputGate).getNext()).isInstanceOf(IOException.class)).hasMessage("test exception on buffer");
        }
        finally {
            CreditBasedPartitionRequestClientHandlerTest.releaseResource(inputGate, networkBufferPool);
        }
    }

    @Test
    void testExceptionWrap() {
        this.testExceptionWrap(LocalTransportException.class, new Exception());
        this.testExceptionWrap(LocalTransportException.class, new Exception("some error"));
        this.testExceptionWrap(RemoteTransportException.class, new IOException("Connection reset by peer"));
        Assumptions.assumeThat((boolean)Epoll.isAvailable()).isTrue();
        this.testExceptionWrap(RemoteTransportException.class, (Exception)new Errors.NativeIoException("readAddress", Errors.ERRNO_ECONNRESET_NEGATIVE));
    }

    private void testExceptionWrap(Class<? extends TransportException> expectedClass, final Exception cause) {
        CreditBasedPartitionRequestClientHandler handler = new CreditBasedPartitionRequestClientHandler();
        handler.setConnectionId(new ConnectionID(ResourceID.generate(), new InetSocketAddress("localhost", 0), 0));
        EmbeddedChannel embeddedChannel = new EmbeddedChannel(new ChannelHandler[]{new ChannelInboundHandlerAdapter(){

            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                throw cause;
            }
        }, handler});
        embeddedChannel.writeInbound(new Object[]{1});
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> handler.checkError()).isInstanceOf(expectedClass)).withFailMessage(String.format("The handler should wrap the exception %s as %s, but it does not.", cause, expectedClass), new Object[0]);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    void testAnnounceBufferSize() throws Exception {
        CreditBasedPartitionRequestClientHandler handler = new CreditBasedPartitionRequestClientHandler();
        EmbeddedChannel channel = new EmbeddedChannel(new ChannelHandler[]{handler});
        NettyPartitionRequestClient client = new NettyPartitionRequestClient((Channel)channel, (NetworkClientHandler)handler, (ConnectionID)Mockito.mock(ConnectionID.class), (PartitionRequestClientFactory)Mockito.mock(PartitionRequestClientFactory.class));
        NetworkBufferPool networkBufferPool = new NetworkBufferPool(10, 32);
        SingleInputGate inputGate = InputChannelTestUtils.createSingleInputGate(2, (MemorySegmentProvider)networkBufferPool);
        RemoteInputChannel[] inputChannels = new RemoteInputChannel[]{InputChannelTestUtils.createRemoteInputChannel(inputGate, (PartitionRequestClient)client), InputChannelTestUtils.createRemoteInputChannel(inputGate, (PartitionRequestClient)client)};
        try {
            inputGate.setInputChannels((InputChannel[])inputChannels);
            BufferPool bufferPool = networkBufferPool.createBufferPool(6, 6);
            inputGate.setBufferPool(bufferPool);
            inputGate.setupChannels();
            inputChannels[0].requestSubpartitions();
            inputChannels[1].requestSubpartitions();
            channel.readOutbound();
            channel.readOutbound();
            inputGate.announceBufferSize(333);
            channel.runPendingTasks();
            NettyMessage.NewBufferSize readOutbound = (NettyMessage.NewBufferSize)channel.readOutbound();
            Assertions.assertThat((Object)readOutbound).isInstanceOf(NettyMessage.NewBufferSize.class);
            Assertions.assertThat((Comparable)inputChannels[0].getInputChannelId()).isEqualTo((Object)readOutbound.receiverId);
            Assertions.assertThat((int)readOutbound.bufferSize).isEqualTo(333);
            readOutbound = (NettyMessage.NewBufferSize)channel.readOutbound();
            Assertions.assertThat((Comparable)inputChannels[1].getInputChannelId()).isEqualTo((Object)readOutbound.receiverId);
            Assertions.assertThat((int)readOutbound.bufferSize).isEqualTo(333);
        }
        finally {
            CreditBasedPartitionRequestClientHandlerTest.releaseResource(inputGate, networkBufferPool);
            channel.close();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void testReadBufferResponseWithReleasingOrRemovingChannel(boolean isRemoved, boolean readBeforeReleasingOrRemoving) throws Exception {
        int bufferSize = 1024;
        NetworkBufferPool networkBufferPool = new NetworkBufferPool(10, bufferSize);
        SingleInputGate inputGate = InputChannelTestUtils.createSingleInputGate(1, (MemorySegmentProvider)networkBufferPool);
        RemoteInputChannel inputChannel = new InputChannelBuilder().buildRemoteChannel(inputGate);
        inputGate.setInputChannels(new InputChannel[]{inputChannel});
        inputGate.setup();
        CreditBasedPartitionRequestClientHandler handler = new CreditBasedPartitionRequestClientHandler();
        EmbeddedChannel embeddedChannel = new EmbeddedChannel(new ChannelHandler[]{handler});
        handler.addInputChannel(inputChannel);
        try {
            if (!readBeforeReleasingOrRemoving) {
                inputGate.close();
                if (isRemoved) {
                    handler.removeInputChannel(inputChannel);
                }
            }
            NettyMessage.BufferResponse bufferResponse = CreditBasedPartitionRequestClientHandlerTest.createBufferResponse(TestBufferFactory.createBuffer(bufferSize), 0, inputChannel.getInputChannelId(), 1, new NetworkBufferAllocator((NetworkClientHandler)handler));
            if (readBeforeReleasingOrRemoving) {
                inputGate.close();
                if (isRemoved) {
                    handler.removeInputChannel(inputChannel);
                }
            }
            handler.channelRead(null, (Object)bufferResponse);
            Assertions.assertThat((int)inputChannel.getNumberOfQueuedBuffers()).isZero();
            if (!readBeforeReleasingOrRemoving) {
                Assertions.assertThat((Object)bufferResponse.getBuffer()).isNull();
            } else {
                Assertions.assertThat((Object)bufferResponse.getBuffer()).isNotNull();
                Assertions.assertThat((boolean)bufferResponse.getBuffer().isRecycled()).isTrue();
            }
            embeddedChannel.runScheduledPendingTasks();
            NettyMessage.CancelPartitionRequest cancelPartitionRequest = (NettyMessage.CancelPartitionRequest)embeddedChannel.readOutbound();
            Assertions.assertThat((Object)cancelPartitionRequest).isNotNull();
            Assertions.assertThat((Comparable)inputChannel.getInputChannelId()).isEqualTo((Object)cancelPartitionRequest.receiverId);
        }
        finally {
            CreditBasedPartitionRequestClientHandlerTest.releaseResource(inputGate, networkBufferPool);
            embeddedChannel.close();
        }
    }

    private static void releaseResource(SingleInputGate inputGate, NetworkBufferPool networkBufferPool) throws IOException {
        inputGate.close();
        networkBufferPool.destroyAllBufferPools();
        networkBufferPool.destroy();
    }

    private static NettyMessage.BufferResponse createBufferResponse(Buffer buffer, int sequenceNumber, InputChannelID receivingChannelId, int backlog, NetworkBufferAllocator allocator) throws IOException {
        NettyMessage.BufferResponse resp = new NettyMessage.BufferResponse(buffer, sequenceNumber, receivingChannelId, 0, backlog);
        ByteBuf serialized = resp.write((ByteBufAllocator)UnpooledByteBufAllocator.DEFAULT);
        serialized.readBytes(9);
        return NettyMessage.BufferResponse.readFrom((ByteBuf)serialized, (NetworkBufferAllocator)allocator);
    }

    private static class TestRemoteInputChannelForError
    extends RemoteInputChannel {
        private final String expectedMessage;

        TestRemoteInputChannelForError(SingleInputGate inputGate, String expectedMessage) {
            super(inputGate, 0, new ResultPartitionID(), new ResultSubpartitionIndexSet(0), InputChannelBuilder.STUB_CONNECTION_ID, (ConnectionManager)new TestingConnectionManager(), 0, 100, 100, 2, (Counter)new SimpleCounter(), (Counter)new SimpleCounter(), ChannelStateWriter.NO_OP);
            this.expectedMessage = expectedMessage;
        }

        public void onBuffer(Buffer buffer, int sequenceNumber, int backlog, int subpartitionId) throws IOException {
            buffer.recycleBuffer();
            throw new IOException(this.expectedMessage);
        }
    }
}

