/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.io.benchmark;

import java.io.IOException;
import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
import org.apache.flink.streaming.runtime.io.benchmark.StreamNetworkThroughputBenchmark;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;

public class StreamNetworkThroughputBenchmarkTest {
    @Rule
    public ExpectedException expectedException = ExpectedException.none();

    protected StreamNetworkThroughputBenchmark createBenchmark() {
        return new StreamNetworkThroughputBenchmark();
    }

    @Test
    public void pointToPointBenchmark() throws Exception {
        StreamNetworkThroughputBenchmark benchmark = this.createBenchmark();
        benchmark.setUp(1, 1, 100);
        try {
            benchmark.executeBenchmark(1000L);
        }
        finally {
            benchmark.tearDown();
        }
    }

    @Test
    public void largeLocalMode() throws Exception {
        StreamNetworkThroughputBenchmark env = new StreamNetworkThroughputBenchmark();
        env.setUp(4, 10, 100, true);
        env.executeBenchmark(10000000L);
        env.tearDown();
    }

    @Test
    public void largeRemoteMode() throws Exception {
        StreamNetworkThroughputBenchmark env = new StreamNetworkThroughputBenchmark();
        env.setUp(4, 10, 100, false);
        env.executeBenchmark(10000000L);
        env.tearDown();
    }

    @Test
    public void largeRemoteAlwaysFlush() throws Exception {
        StreamNetworkThroughputBenchmark env = new StreamNetworkThroughputBenchmark();
        env.setUp(1, 1, 0, false);
        env.executeBenchmark(1000000L);
        env.tearDown();
    }

    @Test
    public void remoteModeInsufficientBuffersSender() throws Exception {
        StreamNetworkThroughputBenchmark env = new StreamNetworkThroughputBenchmark();
        int writers = 2;
        int channels = 2;
        this.expectedException.expect(IOException.class);
        this.expectedException.expectMessage("Insufficient number of network buffers");
        env.setUp(writers, channels, 100, false, writers * channels - 1, writers * channels * (Integer)NettyShuffleEnvironmentOptions.NETWORK_BUFFERS_PER_CHANNEL.defaultValue());
    }

    @Test
    public void remoteModeInsufficientBuffersReceiver() throws Exception {
        StreamNetworkThroughputBenchmark env = new StreamNetworkThroughputBenchmark();
        int writers = 2;
        int channels = 2;
        this.expectedException.expect(IOException.class);
        this.expectedException.expectMessage("Insufficient number of network buffers");
        env.setUp(writers, channels, 100, false, writers * channels, writers * channels * (Integer)NettyShuffleEnvironmentOptions.NETWORK_BUFFERS_PER_CHANNEL.defaultValue() - 1);
    }

    @Test
    public void remoteModeMinimumBuffers() throws Exception {
        StreamNetworkThroughputBenchmark env = new StreamNetworkThroughputBenchmark();
        int writers = 2;
        int channels = 2;
        env.setUp(writers, channels, 100, false, writers * channels + writers, writers + writers * channels * (Integer)NettyShuffleEnvironmentOptions.NETWORK_BUFFERS_PER_CHANNEL.defaultValue());
        env.executeBenchmark(10000L);
        env.tearDown();
    }

    @Test
    public void pointToMultiPointBenchmark() throws Exception {
        StreamNetworkThroughputBenchmark benchmark = this.createBenchmark();
        benchmark.setUp(1, 100, 100);
        try {
            benchmark.executeBenchmark(1000L);
        }
        finally {
            benchmark.tearDown();
        }
    }

    @Test
    public void multiPointToPointBenchmark() throws Exception {
        StreamNetworkThroughputBenchmark benchmark = this.createBenchmark();
        benchmark.setUp(4, 1, 100);
        try {
            benchmark.executeBenchmark(1000L);
        }
        finally {
            benchmark.tearDown();
        }
    }

    @Test
    public void multiPointToMultiPointBenchmark() throws Exception {
        StreamNetworkThroughputBenchmark benchmark = this.createBenchmark();
        benchmark.setUp(4, 100, 100);
        try {
            benchmark.executeBenchmark(1000L);
        }
        finally {
            benchmark.tearDown();
        }
    }
}

