/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.network.api.writer;

import java.io.IOException;
import java.nio.file.Path;
import java.util.ArrayDeque;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import org.apache.flink.core.io.IOReadableWritable;
import org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer;
import org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer;
import org.apache.flink.runtime.io.network.api.writer.BroadcastRecordWriter;
import org.apache.flink.runtime.io.network.api.writer.RecordWriterTest;
import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferPool;
import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
import org.apache.flink.runtime.io.network.partition.NoOpBufferAvailablityListener;
import org.apache.flink.runtime.io.network.partition.ResultPartition;
import org.apache.flink.runtime.io.network.partition.ResultSubpartitionIndexSet;
import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
import org.apache.flink.testutils.serialization.types.IntType;
import org.apache.flink.testutils.serialization.types.SerializationTestType;
import org.apache.flink.testutils.serialization.types.SerializationTestTypeFactory;
import org.apache.flink.testutils.serialization.types.Util;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

class BroadcastRecordWriterTest {
    BroadcastRecordWriterTest() {
    }

    @Test
    void testBroadcastMixedRandomEmitRecord(@TempDir Path tempPath) throws Exception {
        int numberOfSubpartitions = 8;
        int numberOfRecords = 8;
        int bufferSize = 32;
        ResultPartition partition = RecordWriterTest.createResultPartition(32, 8);
        BroadcastRecordWriter writer = new BroadcastRecordWriter((ResultPartitionWriter)partition, -1L, "test");
        SpillingAdaptiveSpanningRecordDeserializer deserializer = new SpillingAdaptiveSpanningRecordDeserializer(new String[]{tempPath.toString()});
        Util.MockRecords records = Util.randomRecords((int)8, (SerializationTestTypeFactory)SerializationTestTypeFactory.INT);
        HashMap serializedRecords = new HashMap();
        for (int i = 0; i < 8; ++i) {
            serializedRecords.put(i, new ArrayDeque());
        }
        int index = 0;
        for (SerializationTestType record : records) {
            int randomSubpartition = index++ % 8;
            writer.emit((IOReadableWritable)record, randomSubpartition);
            ((ArrayDeque)serializedRecords.get(randomSubpartition)).add(record);
            writer.broadcastEmit((IOReadableWritable)record);
            for (int i = 0; i < 8; ++i) {
                ((ArrayDeque)serializedRecords.get(i)).add(record);
            }
        }
        int numberOfCreatedBuffers = partition.getBufferPool().bestEffortGetNumOfUsedBuffers();
        Assertions.assertThat((int)16).isEqualTo(numberOfCreatedBuffers);
        for (int i = 0; i < 8; ++i) {
            Assertions.assertThat((int)partition.getNumberOfQueuedBuffers(i)).isEqualTo(9);
            int excessRandomRecords = i < 0 ? 1 : 0;
            int numberOfRandomRecords = 1 + excessRandomRecords;
            int numberOfTotalRecords = 8 + numberOfRandomRecords;
            RecordWriterTest.verifyDeserializationResults(partition.createSubpartitionView(new ResultSubpartitionIndexSet(i), (BufferAvailabilityListener)new NoOpBufferAvailablityListener()), (RecordDeserializer<SerializationTestType>)deserializer, (ArrayDeque)serializedRecords.get(i), 9, numberOfTotalRecords);
        }
    }

    @Test
    void testRandomEmitAndBufferRecycling() throws Exception {
        int recordSize = 8;
        int numberOfSubpartitions = 2;
        ResultPartition partition = RecordWriterTest.createResultPartition(2 * recordSize, numberOfSubpartitions);
        BufferPool bufferPool = partition.getBufferPool();
        BroadcastRecordWriter writer = new BroadcastRecordWriter((ResultPartitionWriter)partition, -1L, "test");
        List<Buffer> buffers = Arrays.asList(bufferPool.requestBuffer(), bufferPool.requestBuffer());
        buffers.forEach(Buffer::recycleBuffer);
        Assertions.assertThat((int)bufferPool.getNumberOfAvailableMemorySegments()).isEqualTo(3);
        writer.broadcastEmit((IOReadableWritable)new IntType(1));
        writer.broadcastEmit((IOReadableWritable)new IntType(2));
        Assertions.assertThat((int)bufferPool.getNumberOfAvailableMemorySegments()).isEqualTo(2);
        Assertions.assertThat((int)partition.getNumberOfQueuedBuffers(0)).isOne();
        ResultSubpartitionView view0 = partition.createSubpartitionView(new ResultSubpartitionIndexSet(0), (BufferAvailabilityListener)new NoOpBufferAvailablityListener());
        BroadcastRecordWriterTest.closeConsumer(view0, 2 * recordSize);
        Assertions.assertThat((int)bufferPool.getNumberOfAvailableMemorySegments()).isEqualTo(2);
        writer.emit((IOReadableWritable)new IntType(3), 0);
        Assertions.assertThat((int)bufferPool.getNumberOfAvailableMemorySegments()).isOne();
        Assertions.assertThat((int)partition.getNumberOfQueuedBuffers(1)).isOne();
        ResultSubpartitionView view1 = partition.createSubpartitionView(new ResultSubpartitionIndexSet(1), (BufferAvailabilityListener)new NoOpBufferAvailablityListener());
        BroadcastRecordWriterTest.closeConsumer(view1, 2 * recordSize);
        Assertions.assertThat((int)bufferPool.getNumberOfAvailableMemorySegments()).isEqualTo(2);
    }

    private static void closeConsumer(ResultSubpartitionView view, int expectedSize) throws IOException {
        Buffer buffer = view.getNextBuffer().buffer();
        Assertions.assertThat((int)buffer.getSize()).isEqualTo(expectedSize);
        buffer.recycleBuffer();
    }
}

