/*
 * Decompiled with CFR 0.152.
 */
package org.apache.storm.grouping;

import com.google.common.collect.Lists;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import org.apache.storm.grouping.CustomStreamGrouping;
import org.apache.storm.grouping.ShuffleGrouping;
import org.apache.storm.task.WorkerTopologyContext;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;

public class ShuffleGroupingTest {
    @Test
    public void testShuffleGrouping() {
        int i;
        int numTasks = 6;
        ShuffleGrouping grouper = new ShuffleGrouping();
        int inputTaskId = 100;
        ArrayList availableTaskIds = Lists.newArrayList();
        for (int i2 = 0; i2 < 6; ++i2) {
            availableTaskIds.add(i2);
        }
        WorkerTopologyContext context = (WorkerTopologyContext)Mockito.mock(WorkerTopologyContext.class);
        grouper.prepare(context, null, (List)availableTaskIds);
        int[] taskCounts = new int[6];
        for (i = 1; i <= 30000; ++i) {
            List taskIds = grouper.chooseTasks(100, (List)Lists.newArrayList());
            Assert.assertNotNull((String)"Not null taskId list returned", (Object)taskIds);
            Assert.assertEquals((String)"Single task Id returned", (long)1L, (long)taskIds.size());
            int taskId = (Integer)taskIds.get(0);
            Assert.assertTrue((String)"TaskId should exist", (taskId >= 0 && taskId < 6 ? 1 : 0) != 0);
            int n = taskId;
            taskCounts[n] = taskCounts[n] + 1;
        }
        for (i = 0; i < 6; ++i) {
            Assert.assertEquals((String)"Distribution should be even for all nodes", (long)5000L, (long)taskCounts[i]);
        }
    }

    @Test
    public void testShuffleGroupMultiThreaded() throws InterruptedException, ExecutionException {
        int numTasks = 6;
        int groupingExecutionsPerThread = 30000;
        int numThreads = 10;
        ShuffleGrouping grouper = new ShuffleGrouping();
        int inputTaskId = 100;
        final ArrayList availableTaskIds = Lists.newArrayList();
        for (int i = 0; i < 6; ++i) {
            availableTaskIds.add(i);
        }
        WorkerTopologyContext context = (WorkerTopologyContext)Mockito.mock(WorkerTopologyContext.class);
        grouper.prepare(context, null, (List)availableTaskIds);
        ArrayList threadTasks = Lists.newArrayList();
        for (int x = 0; x < 10; ++x) {
            Callable<int[]> threadTask = new Callable<int[]>((CustomStreamGrouping)grouper){
                final /* synthetic */ CustomStreamGrouping val$grouper;
                {
                    this.val$grouper = customStreamGrouping;
                }

                @Override
                public int[] call() throws Exception {
                    int[] taskCounts = new int[availableTaskIds.size()];
                    for (int i = 1; i <= 30000; ++i) {
                        List taskIds = this.val$grouper.chooseTasks(100, (List)Lists.newArrayList());
                        Assert.assertNotNull((String)"Not null taskId list returned", (Object)taskIds);
                        Assert.assertEquals((String)"Single task Id returned", (long)1L, (long)taskIds.size());
                        int taskId = (Integer)taskIds.get(0);
                        Assert.assertTrue((String)"TaskId should exist", (taskId >= 0 && taskId < availableTaskIds.size() ? 1 : 0) != 0);
                        int n = taskId;
                        taskCounts[n] = taskCounts[n] + 1;
                    }
                    return taskCounts;
                }
            };
            threadTasks.add(threadTask);
        }
        ExecutorService executor = Executors.newFixedThreadPool(threadTasks.size());
        List taskResults = executor.invokeAll(threadTasks);
        int[] taskIdTotals = new int[6];
        for (Future taskResult : taskResults) {
            while (!taskResult.isDone()) {
                Thread.sleep(1000L);
            }
            int[] taskDistributions = (int[])taskResult.get();
            for (int i = 0; i < taskDistributions.length; ++i) {
                int n = i;
                taskIdTotals[n] = taskIdTotals[n] + taskDistributions[i];
            }
        }
        for (int i = 0; i < 6; ++i) {
            Assert.assertEquals((long)50000L, (long)taskIdTotals[i]);
        }
    }
}

