/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.network.netty;

import java.lang.reflect.Field;
import java.net.InetAddress;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
import org.apache.flink.runtime.io.network.TaskEventDispatcher;
import org.apache.flink.runtime.io.network.TaskEventPublisher;
import org.apache.flink.runtime.io.network.netty.NettyConfig;
import org.apache.flink.runtime.io.network.netty.NettyConnectionManager;
import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
import org.apache.flink.runtime.io.network.partition.ResultPartitionProvider;
import org.apache.flink.shaded.netty4.io.netty.bootstrap.Bootstrap;
import org.apache.flink.shaded.netty4.io.netty.bootstrap.ServerBootstrap;
import org.apache.flink.shaded.netty4.io.netty.channel.EventLoopGroup;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.ObjectAssert;
import org.junit.jupiter.api.Test;

class NettyConnectionManagerTest {
    NettyConnectionManagerTest() {
    }

    @Test
    void testMatchingNumberOfArenasAndThreadsAsDefault() throws Exception {
        int numberOfSlots = 2;
        NettyConfig config = new NettyConfig(InetAddress.getLocalHost(), 0, 1024, numberOfSlots, new Configuration());
        NettyConnectionManager connectionManager = this.createNettyConnectionManager(config);
        connectionManager.start();
        ((ObjectAssert)Assertions.assertThat((Object)connectionManager).withFailMessage("connectionManager is null due to fail to get a free port", new Object[0])).isNotNull();
        Assertions.assertThat((int)connectionManager.getBufferPool().getNumberOfArenas()).isEqualTo(numberOfSlots);
        Bootstrap boostrap = connectionManager.getClient().getBootstrap();
        EventLoopGroup group = boostrap.config().group();
        Field f = group.getClass().getSuperclass().getSuperclass().getDeclaredField("children");
        f.setAccessible(true);
        Object[] eventExecutors = (Object[])f.get(group);
        Assertions.assertThat((Object[])eventExecutors).hasSize(numberOfSlots);
        ServerBootstrap bootstrap = connectionManager.getServer().getBootstrap();
        group = bootstrap.config().group();
        f = group.getClass().getSuperclass().getSuperclass().getDeclaredField("children");
        f.setAccessible(true);
        eventExecutors = (Object[])f.get(group);
        Assertions.assertThat((Object[])eventExecutors).hasSize(numberOfSlots);
        bootstrap = connectionManager.getServer().getBootstrap();
        group = bootstrap.childGroup();
        f = group.getClass().getSuperclass().getSuperclass().getDeclaredField("children");
        f.setAccessible(true);
        eventExecutors = (Object[])f.get(group);
        Assertions.assertThat((Object[])eventExecutors).hasSize(numberOfSlots);
    }

    @Test
    void testManualConfiguration() throws Exception {
        int numberOfArenas = 1;
        int numberOfClientThreads = 3;
        int numberOfServerThreads = 4;
        Configuration flinkConfig = new Configuration();
        flinkConfig.set(NettyShuffleEnvironmentOptions.NUM_ARENAS, (Object)numberOfArenas);
        flinkConfig.set(NettyShuffleEnvironmentOptions.NUM_THREADS_CLIENT, (Object)3);
        flinkConfig.set(NettyShuffleEnvironmentOptions.NUM_THREADS_SERVER, (Object)4);
        NettyConfig config = new NettyConfig(InetAddress.getLocalHost(), 0, 1024, 1337, flinkConfig);
        NettyConnectionManager connectionManager = this.createNettyConnectionManager(config);
        connectionManager.start();
        Assertions.assertThat((int)connectionManager.getBufferPool().getNumberOfArenas()).isEqualTo(numberOfArenas);
        ((ObjectAssert)Assertions.assertThat((Object)connectionManager).withFailMessage("connectionManager is null due to fail to get a free port", new Object[0])).isNotNull();
        Bootstrap boostrap = connectionManager.getClient().getBootstrap();
        EventLoopGroup group = boostrap.config().group();
        Field f = group.getClass().getSuperclass().getSuperclass().getDeclaredField("children");
        f.setAccessible(true);
        Object[] eventExecutors = (Object[])f.get(group);
        Assertions.assertThat((Object[])eventExecutors).hasSize(numberOfClientThreads);
        ServerBootstrap bootstrap = connectionManager.getServer().getBootstrap();
        group = bootstrap.config().group();
        f = group.getClass().getSuperclass().getSuperclass().getDeclaredField("children");
        f.setAccessible(true);
        eventExecutors = (Object[])f.get(group);
        Assertions.assertThat((Object[])eventExecutors).hasSize(numberOfServerThreads);
        bootstrap = connectionManager.getServer().getBootstrap();
        group = bootstrap.childGroup();
        f = group.getClass().getSuperclass().getSuperclass().getDeclaredField("children");
        f.setAccessible(true);
        eventExecutors = (Object[])f.get(group);
        Assertions.assertThat((Object[])eventExecutors).hasSize(numberOfServerThreads);
    }

    private NettyConnectionManager createNettyConnectionManager(NettyConfig config) {
        return new NettyConnectionManager((ResultPartitionProvider)new ResultPartitionManager(), (TaskEventPublisher)new TaskEventDispatcher(), config, 1, true);
    }
}

