/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.core.io;

import java.util.Arrays;
import java.util.Calendar;
import java.util.HashSet;
import java.util.Random;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.flink.api.common.io.LocatableInputSplitAssigner;
import org.apache.flink.core.io.LocatableInputSplit;
import org.junit.Assert;
import org.junit.Test;

public class LocatableSplitAssignerTest {
    @Test
    public void testSerialSplitAssignmentWithNullHost() {
        try {
            int NUM_SPLITS = 50;
            String[][] hosts = new String[][]{{"localhost"}, new String[0], null};
            HashSet<LocatableInputSplit> splits = new HashSet<LocatableInputSplit>();
            for (int i = 0; i < 50; ++i) {
                splits.add(new LocatableInputSplit(i, hosts[i % 3]));
            }
            LocatableInputSplitAssigner ia = new LocatableInputSplitAssigner(splits);
            LocatableInputSplit is = null;
            while ((is = ia.getNextInputSplit(null, 0)) != null) {
                Assert.assertTrue((boolean)splits.remove(is));
            }
            Assert.assertTrue((boolean)splits.isEmpty());
            Assert.assertNull((Object)ia.getNextInputSplit("", 0));
            Assert.assertEquals((long)50L, (long)ia.getNumberOfRemoteAssignments());
            Assert.assertEquals((long)0L, (long)ia.getNumberOfLocalAssignments());
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testSerialSplitAssignmentAllForSameHost() {
        try {
            int NUM_SPLITS = 50;
            HashSet<LocatableInputSplit> splits = new HashSet<LocatableInputSplit>();
            for (int i = 0; i < 50; ++i) {
                splits.add(new LocatableInputSplit(i, "testhost"));
            }
            LocatableInputSplitAssigner ia = new LocatableInputSplitAssigner(splits);
            LocatableInputSplit is = null;
            while ((is = ia.getNextInputSplit("testhost", 0)) != null) {
                Assert.assertTrue((boolean)splits.remove(is));
            }
            Assert.assertTrue((boolean)splits.isEmpty());
            Assert.assertNull((Object)ia.getNextInputSplit("", 0));
            Assert.assertEquals((long)0L, (long)ia.getNumberOfRemoteAssignments());
            Assert.assertEquals((long)50L, (long)ia.getNumberOfLocalAssignments());
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testSerialSplitAssignmentAllForRemoteHost() {
        try {
            String[] hosts = new String[]{"host1", "host1", "host1", "host2", "host2", "host3"};
            int NUM_SPLITS = 10 * hosts.length;
            HashSet<LocatableInputSplit> splits = new HashSet<LocatableInputSplit>();
            for (int i = 0; i < NUM_SPLITS; ++i) {
                splits.add(new LocatableInputSplit(i, hosts[i % hosts.length]));
            }
            LocatableInputSplitAssigner ia = new LocatableInputSplitAssigner(splits);
            LocatableInputSplit is = null;
            while ((is = ia.getNextInputSplit("testhost", 0)) != null) {
                Assert.assertTrue((boolean)splits.remove(is));
            }
            Assert.assertTrue((boolean)splits.isEmpty());
            Assert.assertNull((Object)ia.getNextInputSplit("anotherHost", 0));
            Assert.assertEquals((long)NUM_SPLITS, (long)ia.getNumberOfRemoteAssignments());
            Assert.assertEquals((long)0L, (long)ia.getNumberOfLocalAssignments());
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testSerialSplitAssignmentSomeForRemoteHost() {
        try {
            int i;
            String[] hosts = new String[]{"host1", "host2", "host3"};
            int NUM_LOCAL_HOST1_SPLITS = 20;
            int NUM_LOCAL_HOST2_SPLITS = 10;
            int NUM_REMOTE_SPLITS = 30;
            int NUM_LOCAL_SPLITS = 30;
            int splitCnt = 0;
            HashSet<LocatableInputSplit> splits = new HashSet<LocatableInputSplit>();
            for (i = 0; i < 20; ++i) {
                splits.add(new LocatableInputSplit(splitCnt++, "host1"));
            }
            for (i = 0; i < 10; ++i) {
                splits.add(new LocatableInputSplit(splitCnt++, "host2"));
            }
            for (i = 0; i < 30; ++i) {
                splits.add(new LocatableInputSplit(splitCnt++, "remoteHost"));
            }
            LocatableInputSplitAssigner ia = new LocatableInputSplitAssigner(splits);
            LocatableInputSplit is = null;
            int i2 = 0;
            while ((is = ia.getNextInputSplit(hosts[i2++ % hosts.length], 0)) != null) {
                Assert.assertTrue((boolean)splits.remove(is));
            }
            Assert.assertTrue((boolean)splits.isEmpty());
            Assert.assertNull((Object)ia.getNextInputSplit("anotherHost", 0));
            Assert.assertEquals((long)30L, (long)ia.getNumberOfRemoteAssignments());
            Assert.assertEquals((long)30L, (long)ia.getNumberOfLocalAssignments());
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testSerialSplitAssignmentMultiLocalHost() {
        try {
            int i;
            Object[] localHosts = new String[]{"local1", "local2", "local3"};
            Object[] remoteHosts = new String[]{"remote1", "remote2", "remote3"};
            String[] requestingHosts = new String[]{"local3", "local2", "local1", "other"};
            int NUM_THREE_LOCAL_SPLITS = 10;
            int NUM_TWO_LOCAL_SPLITS = 10;
            int NUM_ONE_LOCAL_SPLITS = 10;
            int NUM_LOCAL_SPLITS = 30;
            int NUM_REMOTE_SPLITS = 10;
            int NUM_SPLITS = 40;
            Object[] threeLocalHosts = localHosts;
            Object[] twoLocalHosts = new String[]{localHosts[0], localHosts[1], remoteHosts[0]};
            Object[] oneLocalHost = new String[]{localHosts[0], remoteHosts[0], remoteHosts[1]};
            Object[] noLocalHost = remoteHosts;
            int splitCnt = 0;
            HashSet<LocatableInputSplit> splits = new HashSet<LocatableInputSplit>();
            for (i = 0; i < 10; ++i) {
                splits.add(new LocatableInputSplit(splitCnt++, (String[])threeLocalHosts));
            }
            for (i = 0; i < 10; ++i) {
                splits.add(new LocatableInputSplit(splitCnt++, (String[])twoLocalHosts));
            }
            for (i = 0; i < 10; ++i) {
                splits.add(new LocatableInputSplit(splitCnt++, (String[])oneLocalHost));
            }
            for (i = 0; i < 10; ++i) {
                splits.add(new LocatableInputSplit(splitCnt++, (String[])noLocalHost));
            }
            LocatableInputSplitAssigner ia = new LocatableInputSplitAssigner(splits);
            LocatableInputSplit is = null;
            for (int i2 = 0; i2 < 40; ++i2) {
                String host = requestingHosts[i2 % requestingHosts.length];
                is = ia.getNextInputSplit(host, 0);
                Assert.assertTrue((is != null ? 1 : 0) != 0);
                Assert.assertTrue((boolean)splits.remove(is));
                if (host.equals(localHosts[0])) {
                    Assert.assertTrue((boolean)Arrays.equals(is.getHostnames(), oneLocalHost));
                    continue;
                }
                if (host.equals(localHosts[1])) {
                    Assert.assertTrue((boolean)Arrays.equals(is.getHostnames(), twoLocalHosts));
                    continue;
                }
                if (host.equals(localHosts[2])) {
                    Assert.assertTrue((boolean)Arrays.equals(is.getHostnames(), threeLocalHosts));
                    continue;
                }
                Assert.assertTrue((boolean)Arrays.equals(is.getHostnames(), noLocalHost));
            }
            Assert.assertTrue((boolean)splits.isEmpty());
            Assert.assertNull((Object)ia.getNextInputSplit("anotherHost", 0));
            Assert.assertEquals((long)10L, (long)ia.getNumberOfRemoteAssignments());
            Assert.assertEquals((long)30L, (long)ia.getNumberOfLocalAssignments());
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testSerialSplitAssignmentMixedLocalHost() {
        try {
            String[] hosts = new String[]{"host1", "host1", "host1", "host2", "host2", "host3"};
            int NUM_SPLITS = 10 * hosts.length;
            HashSet<LocatableInputSplit> splits = new HashSet<LocatableInputSplit>();
            for (int i = 0; i < NUM_SPLITS; ++i) {
                splits.add(new LocatableInputSplit(i, hosts[i % hosts.length]));
            }
            LocatableInputSplitAssigner ia = new LocatableInputSplitAssigner(splits);
            LocatableInputSplit is = null;
            int i = 0;
            while ((is = ia.getNextInputSplit(hosts[i++ % hosts.length], 0)) != null) {
                Assert.assertTrue((boolean)splits.remove(is));
            }
            Assert.assertTrue((boolean)splits.isEmpty());
            Assert.assertNull((Object)ia.getNextInputSplit("anotherHost", 0));
            Assert.assertEquals((long)0L, (long)ia.getNumberOfRemoteAssignments());
            Assert.assertEquals((long)NUM_SPLITS, (long)ia.getNumberOfLocalAssignments());
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testConcurrentSplitAssignmentNullHost() {
        try {
            int i;
            int NUM_THREADS = 10;
            int NUM_SPLITS = 500;
            int SUM_OF_IDS = 124750;
            String[][] hosts = new String[][]{{"localhost"}, new String[0], null};
            HashSet<LocatableInputSplit> splits = new HashSet<LocatableInputSplit>();
            for (int i2 = 0; i2 < 500; ++i2) {
                splits.add(new LocatableInputSplit(i2, hosts[i2 % 3]));
            }
            final LocatableInputSplitAssigner ia = new LocatableInputSplitAssigner(splits);
            final AtomicInteger splitsRetrieved = new AtomicInteger(0);
            final AtomicInteger sumOfIds = new AtomicInteger(0);
            Runnable retriever = new Runnable(){

                @Override
                public void run() {
                    LocatableInputSplit split;
                    while ((split = ia.getNextInputSplit(null, 0)) != null) {
                        splitsRetrieved.incrementAndGet();
                        sumOfIds.addAndGet(split.getSplitNumber());
                    }
                }
            };
            Thread[] threads = new Thread[10];
            for (i = 0; i < 10; ++i) {
                threads[i] = new Thread(retriever);
                threads[i].setDaemon(true);
            }
            for (i = 0; i < 10; ++i) {
                threads[i].start();
            }
            for (i = 0; i < 10; ++i) {
                threads[i].join(5000L);
            }
            for (i = 0; i < 10; ++i) {
                if (!threads[i].isAlive()) continue;
                Assert.fail((String)"The concurrency test case is erroneous, the thread did not respond in time.");
            }
            Assert.assertEquals((long)500L, (long)splitsRetrieved.get());
            Assert.assertEquals((long)124750L, (long)sumOfIds.get());
            Assert.assertNull((Object)ia.getNextInputSplit("", 0));
            Assert.assertEquals((long)500L, (long)ia.getNumberOfRemoteAssignments());
            Assert.assertEquals((long)0L, (long)ia.getNumberOfLocalAssignments());
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testConcurrentSplitAssignmentForSingleHost() {
        try {
            int i;
            int NUM_THREADS = 10;
            int NUM_SPLITS = 500;
            int SUM_OF_IDS = 124750;
            HashSet<LocatableInputSplit> splits = new HashSet<LocatableInputSplit>();
            for (int i2 = 0; i2 < 500; ++i2) {
                splits.add(new LocatableInputSplit(i2, "testhost"));
            }
            final LocatableInputSplitAssigner ia = new LocatableInputSplitAssigner(splits);
            final AtomicInteger splitsRetrieved = new AtomicInteger(0);
            final AtomicInteger sumOfIds = new AtomicInteger(0);
            Runnable retriever = new Runnable(){

                @Override
                public void run() {
                    LocatableInputSplit split;
                    while ((split = ia.getNextInputSplit("testhost", 0)) != null) {
                        splitsRetrieved.incrementAndGet();
                        sumOfIds.addAndGet(split.getSplitNumber());
                    }
                }
            };
            Thread[] threads = new Thread[10];
            for (i = 0; i < 10; ++i) {
                threads[i] = new Thread(retriever);
                threads[i].setDaemon(true);
            }
            for (i = 0; i < 10; ++i) {
                threads[i].start();
            }
            for (i = 0; i < 10; ++i) {
                threads[i].join(5000L);
            }
            for (i = 0; i < 10; ++i) {
                if (!threads[i].isAlive()) continue;
                Assert.fail((String)"The concurrency test case is erroneous, the thread did not respond in time.");
            }
            Assert.assertEquals((long)500L, (long)splitsRetrieved.get());
            Assert.assertEquals((long)124750L, (long)sumOfIds.get());
            Assert.assertNull((Object)ia.getNextInputSplit("testhost", 0));
            Assert.assertEquals((long)0L, (long)ia.getNumberOfRemoteAssignments());
            Assert.assertEquals((long)500L, (long)ia.getNumberOfLocalAssignments());
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testConcurrentSplitAssignmentForMultipleHosts() {
        try {
            int i;
            int NUM_THREADS = 10;
            int NUM_SPLITS = 500;
            int SUM_OF_IDS = 124750;
            final String[] hosts = new String[]{"host1", "host1", "host1", "host2", "host2", "host3"};
            HashSet<LocatableInputSplit> splits = new HashSet<LocatableInputSplit>();
            for (int i2 = 0; i2 < 500; ++i2) {
                splits.add(new LocatableInputSplit(i2, hosts[i2 % hosts.length]));
            }
            final LocatableInputSplitAssigner ia = new LocatableInputSplitAssigner(splits);
            final AtomicInteger splitsRetrieved = new AtomicInteger(0);
            final AtomicInteger sumOfIds = new AtomicInteger(0);
            Runnable retriever = new Runnable(){

                @Override
                public void run() {
                    LocatableInputSplit split;
                    String threadHost = hosts[(int)(Math.random() * (double)hosts.length)];
                    while ((split = ia.getNextInputSplit(threadHost, 0)) != null) {
                        splitsRetrieved.incrementAndGet();
                        sumOfIds.addAndGet(split.getSplitNumber());
                    }
                }
            };
            Thread[] threads = new Thread[10];
            for (i = 0; i < 10; ++i) {
                threads[i] = new Thread(retriever);
                threads[i].setDaemon(true);
            }
            for (i = 0; i < 10; ++i) {
                threads[i].start();
            }
            for (i = 0; i < 10; ++i) {
                threads[i].join(5000L);
            }
            for (i = 0; i < 10; ++i) {
                if (!threads[i].isAlive()) continue;
                Assert.fail((String)"The concurrency test case is erroneous, the thread did not respond in time.");
            }
            Assert.assertEquals((long)500L, (long)splitsRetrieved.get());
            Assert.assertEquals((long)124750L, (long)sumOfIds.get());
            Assert.assertNull((Object)ia.getNextInputSplit("testhost", 0));
            Assert.assertTrue((ia.getNumberOfLocalAssignments() >= 500 / hosts.length ? 1 : 0) != 0);
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testAssignmentOfManySplitsRandomly() {
        int i;
        long seed = Calendar.getInstance().getTimeInMillis();
        int NUM_SPLITS = 65536;
        String[] splitHosts = new String[256];
        String[] requestingHosts = new String[256];
        Random rand = new Random(seed);
        for (i = 0; i < splitHosts.length; ++i) {
            splitHosts[i] = "localHost" + i;
        }
        for (i = 0; i < requestingHosts.length; ++i) {
            requestingHosts[i] = i % 2 == 0 ? "localHost" + i : "remoteHost" + i;
        }
        String[] stringArray = new String[]{};
        HashSet<String> hosts = new HashSet<String>();
        HashSet<LocatableInputSplit> splits = new HashSet<LocatableInputSplit>();
        for (int i2 = 0; i2 < 65536; ++i2) {
            while (hosts.size() < 3) {
                hosts.add(splitHosts[rand.nextInt(splitHosts.length)]);
            }
            splits.add(new LocatableInputSplit(i2, hosts.toArray(stringArray)));
            hosts.clear();
        }
        LocatableInputSplitAssigner ia = new LocatableInputSplitAssigner(splits);
        for (int i3 = 0; i3 < 65536; ++i3) {
            LocatableInputSplit split = ia.getNextInputSplit(requestingHosts[rand.nextInt(requestingHosts.length)], 0);
            Assert.assertTrue((split != null ? 1 : 0) != 0);
            Assert.assertTrue((boolean)splits.remove(split));
        }
        Assert.assertTrue((boolean)splits.isEmpty());
        Assert.assertNull((Object)ia.getNextInputSplit("testHost", 0));
    }
}

