/*
 * Decompiled with CFR 0.152.
 */
package rx.operators;

import java.util.concurrent.atomic.AtomicInteger;
import org.junit.Assert;
import org.junit.Test;
import rx.Observable;
import rx.Scheduler;
import rx.concurrency.Schedulers;
import rx.observables.GroupedObservable;
import rx.util.functions.Action1;
import rx.util.functions.Func0;
import rx.util.functions.Func1;

public final class OperationParallel<T> {
    public static <T, R> Observable<R> parallel(Observable<T> source, Func1<Observable<T>, Observable<R>> f) {
        return OperationParallel.parallel(source, f, Schedulers.threadPoolForComputation());
    }

    public static <T, R> Observable<R> parallel(final Observable<T> source, final Func1<Observable<T>, Observable<R>> f, final Scheduler s) {
        return Observable.defer(new Func0<Observable<R>>(){

            @Override
            public Observable<R> call() {
                final AtomicInteger i = new AtomicInteger(0);
                return source.groupBy(new Func1<T, Integer>(){

                    @Override
                    public Integer call(T t) {
                        return i.incrementAndGet() % s.degreeOfParallelism();
                    }
                }).flatMap(new Func1<GroupedObservable<Integer, T>, Observable<R>>(){

                    @Override
                    public Observable<R> call(GroupedObservable<Integer, T> group) {
                        return (Observable)f.call(group.observeOn(s));
                    }
                }).synchronize();
            }
        });
    }

    public static class UnitTest {
        @Test
        public void testParallel() {
            int NUM = 1000;
            final AtomicInteger count = new AtomicInteger();
            Observable.range(1, NUM).parallel(new Func1<Observable<Integer>, Observable<Integer[]>>(){

                @Override
                public Observable<Integer[]> call(Observable<Integer> o) {
                    return o.map(new Func1<Integer, Integer[]>(){

                        @Override
                        public Integer[] call(Integer t) {
                            return new Integer[]{t, t * 99};
                        }
                    });
                }
            }).toBlockingObservable().forEach(new Action1<Integer[]>(){

                @Override
                public void call(Integer[] v) {
                    count.incrementAndGet();
                    System.out.println("V: " + v[0] + " R: " + v[1] + " Thread: " + Thread.currentThread());
                }
            });
            Assert.assertEquals((long)NUM, (long)count.get());
        }
    }
}

