/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.memory;

import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.TestingTieredStorageMemoryManager;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageDataIdentifier;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageIdMappingUtils;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStoragePartitionId;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageSubpartitionId;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty.NettyConnectionWriter;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty.TestingNettyConnectionWriter;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty.TestingNettyServiceProducer;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty.TestingTieredStorageNettyService;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty.TieredStorageNettyService;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageMemoryManager;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageResourceRegistry;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.memory.MemoryTierProducerAgent;
import org.assertj.core.api.AbstractThrowableAssert;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

class MemoryTierProducerAgentTest {
    private static final int NUM_SUBPARTITIONS = 10;
    private static final int BUFFER_SIZE = 1024;
    private static final int SEGMENT_SIZE_BYTES = 2048;
    private static final int MEMORY_TIER_SUBPARTITION_MAX_QUEUED_BUFFERS = 3;
    private static final TieredStoragePartitionId PARTITION_ID = TieredStorageIdMappingUtils.convertId((ResultPartitionID)new ResultPartitionID());
    private static final TieredStorageSubpartitionId SUBPARTITION_ID = new TieredStorageSubpartitionId(0);

    MemoryTierProducerAgentTest() {
    }

    @Test
    void testTryStartNewSegment() {
        try (MemoryTierProducerAgent memoryTierProducerAgent = MemoryTierProducerAgentTest.createMemoryTierProducerAgent(false);){
            Assertions.assertThat((boolean)memoryTierProducerAgent.tryStartNewSegment(SUBPARTITION_ID, 0, 0)).isFalse();
            memoryTierProducerAgent.connectionEstablished(SUBPARTITION_ID, (NettyConnectionWriter)new TestingNettyConnectionWriter.Builder().build());
            Assertions.assertThat((boolean)memoryTierProducerAgent.tryStartNewSegment(SUBPARTITION_ID, 0, 0)).isTrue();
        }
    }

    @Test
    void testStartSegmentSuccessWhenSubpartitionOccupyFewBuffers() {
        int numQueuedBuffers = 2;
        try (MemoryTierProducerAgent memoryTierProducerAgent = MemoryTierProducerAgentTest.createMemoryTierProducerAgent(false, 2048, 3, new TieredStorageResourceRegistry());){
            TestingNettyConnectionWriter connectionWriter = new TestingNettyConnectionWriter.Builder().setNumQueuedBufferPayloadsSupplier(() -> numQueuedBuffers).build();
            memoryTierProducerAgent.connectionEstablished(SUBPARTITION_ID, (NettyConnectionWriter)connectionWriter);
            Assertions.assertThat((boolean)memoryTierProducerAgent.tryStartNewSegment(SUBPARTITION_ID, 0, 0)).isTrue();
        }
    }

    @Test
    void testStartSegmentFailedWhenSubpartitionOccupyTooManyBuffers() {
        int numQueuedBuffers = 3;
        try (MemoryTierProducerAgent memoryTierProducerAgent = MemoryTierProducerAgentTest.createMemoryTierProducerAgent(false, 2048, numQueuedBuffers, new TieredStorageResourceRegistry());){
            TestingNettyConnectionWriter connectionWriter = new TestingNettyConnectionWriter.Builder().setNumQueuedBufferPayloadsSupplier(() -> numQueuedBuffers).build();
            memoryTierProducerAgent.connectionEstablished(SUBPARTITION_ID, (NettyConnectionWriter)connectionWriter);
            Assertions.assertThat((boolean)memoryTierProducerAgent.tryStartNewSegment(SUBPARTITION_ID, 0, 0)).isFalse();
        }
    }

    @Test
    void testStartSegmentFailedWithInsufficientMemory() {
        TestingTieredStorageMemoryManager memoryManager = new TestingTieredStorageMemoryManager.Builder().setGetMaxNonReclaimableBuffersFunction(ignore -> 1).setEnsureCapacityFunction(num -> false).build();
        TestingTieredStorageNettyService nettyService = new TestingTieredStorageNettyService.Builder().build();
        nettyService.registerProducer(PARTITION_ID, new TestingNettyServiceProducer.Builder().build());
        try (MemoryTierProducerAgent memoryTierProducerAgent = new MemoryTierProducerAgent(PARTITION_ID, 10, 1024, 2048, 3, false, (TieredStorageMemoryManager)memoryManager, (TieredStorageNettyService)nettyService, new TieredStorageResourceRegistry());){
            memoryTierProducerAgent.connectionEstablished(SUBPARTITION_ID, (NettyConnectionWriter)new TestingNettyConnectionWriter.Builder().build());
            Assertions.assertThat((boolean)memoryTierProducerAgent.tryStartNewSegment(SUBPARTITION_ID, 0, 0)).isFalse();
        }
    }

    @Test
    void testTryWrite() {
        try (MemoryTierProducerAgent memoryTierProducerAgent = MemoryTierProducerAgentTest.createMemoryTierProducerAgent(false, 1024, new TieredStorageResourceRegistry());){
            memoryTierProducerAgent.connectionEstablished(SUBPARTITION_ID, (NettyConnectionWriter)new TestingNettyConnectionWriter.Builder().build());
            Assertions.assertThat((boolean)memoryTierProducerAgent.tryWrite(SUBPARTITION_ID, BufferBuilderTestUtils.buildSomeBuffer(), (Object)this, 0)).isTrue();
            Assertions.assertThat((boolean)memoryTierProducerAgent.tryWrite(SUBPARTITION_ID, BufferBuilderTestUtils.buildSomeBuffer(), (Object)this, 0)).isFalse();
        }
    }

    @Test
    void testBroadcastOnlyPartitionCanNotUseMemoryTier() {
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> MemoryTierProducerAgentTest.createMemoryTierProducerAgent(true)).isInstanceOf(IllegalArgumentException.class)).hasMessageContaining("not allowed to use the memory tier");
    }

    @Test
    void testRelease() {
        TieredStorageResourceRegistry resourceRegistry = new TieredStorageResourceRegistry();
        MemoryTierProducerAgent memoryTierProducerAgent = MemoryTierProducerAgentTest.createMemoryTierProducerAgent(false, 2048, resourceRegistry);
        AtomicBoolean isClosed = new AtomicBoolean(false);
        memoryTierProducerAgent.connectionEstablished(SUBPARTITION_ID, (NettyConnectionWriter)new TestingNettyConnectionWriter.Builder().setCloseFunction(throwable -> {
            isClosed.set(true);
            return null;
        }).build());
        resourceRegistry.clearResourceFor((TieredStorageDataIdentifier)PARTITION_ID);
        Assertions.assertThat((AtomicBoolean)isClosed).isTrue();
    }

    private static MemoryTierProducerAgent createMemoryTierProducerAgent(boolean isBroadcastOnly) {
        return MemoryTierProducerAgentTest.createMemoryTierProducerAgent(isBroadcastOnly, 2048, new TieredStorageResourceRegistry());
    }

    private static MemoryTierProducerAgent createMemoryTierProducerAgent(boolean isBroadcastOnly, int segmentSizeBytes, TieredStorageResourceRegistry resourceRegistry) {
        return MemoryTierProducerAgentTest.createMemoryTierProducerAgent(isBroadcastOnly, segmentSizeBytes, 3, resourceRegistry);
    }

    private static MemoryTierProducerAgent createMemoryTierProducerAgent(boolean isBroadcastOnly, int segmentSizeBytes, int memoryTierSubpartitionMaxQueuedBuffers, TieredStorageResourceRegistry resourceRegistry) {
        TestingTieredStorageMemoryManager memoryManager = new TestingTieredStorageMemoryManager.Builder().setGetMaxNonReclaimableBuffersFunction(ignore -> Integer.MAX_VALUE).build();
        TestingTieredStorageNettyService nettyService = new TestingTieredStorageNettyService.Builder().build();
        TestingNettyServiceProducer nettyServiceProducer = new TestingNettyServiceProducer.Builder().build();
        nettyService.registerProducer(PARTITION_ID, nettyServiceProducer);
        return new MemoryTierProducerAgent(PARTITION_ID, 10, 1024, segmentSizeBytes, memoryTierSubpartitionMaxQueuedBuffers, isBroadcastOnly, (TieredStorageMemoryManager)memoryManager, (TieredStorageNettyService)nettyService, resourceRegistry);
    }
}

