/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.network.netty;

import java.io.IOException;
import java.util.Collection;
import java.util.concurrent.CompletableFuture;
import org.apache.flink.runtime.io.network.NetworkSequenceViewReader;
import org.apache.flink.runtime.io.network.TaskEventDispatcher;
import org.apache.flink.runtime.io.network.TaskEventPublisher;
import org.apache.flink.runtime.io.network.api.StopMode;
import org.apache.flink.runtime.io.network.netty.CreditBasedSequenceNumberingViewReader;
import org.apache.flink.runtime.io.network.netty.NettyMessage;
import org.apache.flink.runtime.io.network.netty.PartitionRequestQueue;
import org.apache.flink.runtime.io.network.netty.PartitionRequestServerHandler;
import org.apache.flink.runtime.io.network.partition.PartitionTestUtils;
import org.apache.flink.runtime.io.network.partition.ResultPartition;
import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
import org.apache.flink.runtime.io.network.partition.ResultPartitionProvider;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.io.network.partition.ResultSubpartitionIndexSet;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannelID;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
import org.apache.flink.shaded.netty4.io.netty.channel.embedded.EmbeddedChannel;
import org.assertj.core.api.AbstractCollectionAssert;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.CompletableFutureAssert;
import org.junit.jupiter.api.Test;

class PartitionRequestServerHandlerTest {
    PartitionRequestServerHandlerTest() {
    }

    @Test
    void testResumeConsumption() {
        InputChannelID inputChannelID = new InputChannelID();
        PartitionRequestQueue partitionRequestQueue = new PartitionRequestQueue();
        TestViewReader testViewReader = new TestViewReader(inputChannelID, 2, partitionRequestQueue);
        PartitionRequestServerHandler serverHandler = new PartitionRequestServerHandler((ResultPartitionProvider)new ResultPartitionManager(), (TaskEventPublisher)new TaskEventDispatcher(), partitionRequestQueue);
        EmbeddedChannel channel = new EmbeddedChannel(new ChannelHandler[]{serverHandler});
        partitionRequestQueue.notifyReaderCreated((NetworkSequenceViewReader)testViewReader);
        channel.writeInbound(new Object[]{new NettyMessage.ResumeConsumption(inputChannelID)});
        channel.runPendingTasks();
        Assertions.assertThat((boolean)testViewReader.consumptionResumed).isTrue();
    }

    @Test
    void testAcknowledgeAllRecordsProcessed() throws IOException {
        InputChannelID inputChannelID = new InputChannelID();
        ResultPartition resultPartition = PartitionTestUtils.createPartition(ResultPartitionType.PIPELINED_BOUNDED);
        PartitionRequestQueue partitionRequestQueue = new PartitionRequestQueue();
        PartitionRequestServerHandler serverHandler = new PartitionRequestServerHandler((ResultPartitionProvider)new ResultPartitionManager(), (TaskEventPublisher)new TaskEventDispatcher(), partitionRequestQueue);
        EmbeddedChannel channel = new EmbeddedChannel(new ChannelHandler[]{serverHandler, partitionRequestQueue});
        CreditBasedSequenceNumberingViewReader viewReader = new CreditBasedSequenceNumberingViewReader(inputChannelID, 2, partitionRequestQueue);
        viewReader.notifySubpartitionsCreated(resultPartition, new ResultSubpartitionIndexSet(0));
        partitionRequestQueue.notifyReaderCreated((NetworkSequenceViewReader)viewReader);
        resultPartition.notifyEndOfData(StopMode.DRAIN);
        CompletableFuture allRecordsProcessedFuture = resultPartition.getAllDataProcessedFuture();
        Assertions.assertThat((CompletableFuture)allRecordsProcessedFuture).isNotDone();
        channel.writeInbound(new Object[]{new NettyMessage.AckAllUserRecordsProcessed(inputChannelID)});
        channel.runPendingTasks();
        ((CompletableFutureAssert)Assertions.assertThat((CompletableFuture)allRecordsProcessedFuture).isDone()).isNotCompletedExceptionally();
    }

    @Test
    public void testNewBufferSize() {
        InputChannelID inputChannelID = new InputChannelID();
        PartitionRequestQueue partitionRequestQueue = new PartitionRequestQueue();
        TestViewReader testViewReader = new TestViewReader(inputChannelID, 2, partitionRequestQueue);
        PartitionRequestServerHandler serverHandler = new PartitionRequestServerHandler((ResultPartitionProvider)new ResultPartitionManager(), (TaskEventPublisher)new TaskEventDispatcher(), partitionRequestQueue);
        EmbeddedChannel channel = new EmbeddedChannel(new ChannelHandler[]{serverHandler});
        partitionRequestQueue.notifyReaderCreated((NetworkSequenceViewReader)testViewReader);
        channel.writeInbound(new Object[]{new NettyMessage.NewBufferSize(666, inputChannelID)});
        channel.runPendingTasks();
        Assertions.assertThat((int)testViewReader.bufferSize).isEqualTo(666);
    }

    @Test
    void testReceivingNewBufferSizeBeforeReaderIsCreated() {
        InputChannelID inputChannelID = new InputChannelID();
        PartitionRequestQueue partitionRequestQueue = new PartitionRequestQueue();
        TestViewReader testViewReader = new TestViewReader(inputChannelID, 2, partitionRequestQueue);
        PartitionRequestServerHandler serverHandler = new PartitionRequestServerHandler((ResultPartitionProvider)new ResultPartitionManager(), (TaskEventPublisher)new TaskEventDispatcher(), partitionRequestQueue);
        EmbeddedChannel channel = new EmbeddedChannel(new ChannelHandler[]{serverHandler});
        channel.writeInbound(new Object[]{new NettyMessage.NewBufferSize(666, inputChannelID)});
        channel.runPendingTasks();
        ((AbstractCollectionAssert)Assertions.assertThat((Collection)channel.outboundMessages()).withFailMessage(channel.outboundMessages().toString(), new Object[0])).isEmpty();
        Assertions.assertThat((int)testViewReader.bufferSize).isEqualTo(-1);
    }

    private static class TestViewReader
    extends CreditBasedSequenceNumberingViewReader {
        private boolean consumptionResumed = false;
        private int bufferSize = -1;

        TestViewReader(InputChannelID receiverId, int initialCredit, PartitionRequestQueue requestQueue) {
            super(receiverId, initialCredit, requestQueue);
        }

        public void resumeConsumption() {
            this.consumptionResumed = true;
        }

        public void notifyNewBufferSize(int newBufferSize) {
            this.bufferSize = newBufferSize;
        }
    }
}

