/*
 * Decompiled with CFR 0.152.
 */
package rx.operators;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
import org.mockito.verification.VerificationMode;
import rx.Observable;
import rx.Observer;
import rx.Subscription;
import rx.operators.SafeObservableSubscription;
import rx.operators.SynchronizedObserver;

public final class OperationMerge {
    public static <T> Observable.OnSubscribeFunc<T> merge(final Observable<? extends Observable<? extends T>> o) {
        return new Observable.OnSubscribeFunc<T>(){

            @Override
            public Subscription onSubscribe(Observer<? super T> observer) {
                return new MergeObservable(o).onSubscribe(observer);
            }
        };
    }

    public static <T> Observable.OnSubscribeFunc<T> merge(Observable<? extends T> ... sequences) {
        return OperationMerge.merge(Arrays.asList(sequences));
    }

    public static <T> Observable.OnSubscribeFunc<T> merge(final Iterable<? extends Observable<? extends T>> sequences) {
        return OperationMerge.merge(Observable.create(new Observable.OnSubscribeFunc<Observable<? extends T>>(){
            private volatile boolean unsubscribed = false;

            @Override
            public Subscription onSubscribe(Observer<? super Observable<? extends T>> observer) {
                for (Observable o : sequences) {
                    if (this.unsubscribed) break;
                    observer.onNext(o);
                }
                if (!this.unsubscribed) {
                    observer.onCompleted();
                }
                return new Subscription(){

                    @Override
                    public void unsubscribe() {
                        unsubscribed = true;
                    }
                };
            }
        }));
    }

    public static class UnitTest {
        @Mock
        Observer<String> stringObserver;

        @Before
        public void before() {
            MockitoAnnotations.initMocks((Object)this);
        }

        @Test
        public void testMergeObservableOfObservables() {
            final Observable<String> o1 = Observable.create(new TestSynchronousObservable());
            final Observable<String> o2 = Observable.create(new TestSynchronousObservable());
            Observable<Observable<String>> observableOfObservables = Observable.create(new Observable.OnSubscribeFunc<Observable<String>>(){

                @Override
                public Subscription onSubscribe(Observer<? super Observable<String>> observer) {
                    observer.onNext(o1);
                    observer.onNext(o2);
                    observer.onCompleted();
                    return new Subscription(){

                        @Override
                        public void unsubscribe() {
                        }
                    };
                }
            });
            Observable<String> m = Observable.create(OperationMerge.merge(observableOfObservables));
            m.subscribe(this.stringObserver);
            ((Observer)Mockito.verify(this.stringObserver, (VerificationMode)Mockito.never())).onError((Throwable)Mockito.any(Throwable.class));
            ((Observer)Mockito.verify(this.stringObserver, (VerificationMode)Mockito.times((int)1))).onCompleted();
            ((Observer)Mockito.verify(this.stringObserver, (VerificationMode)Mockito.times((int)2))).onNext("hello");
        }

        @Test
        public void testMergeArray() {
            Observable<String> o1 = Observable.create(new TestSynchronousObservable());
            Observable<String> o2 = Observable.create(new TestSynchronousObservable());
            Observable<String> m = Observable.create(OperationMerge.merge(o1, o2));
            m.subscribe(this.stringObserver);
            ((Observer)Mockito.verify(this.stringObserver, (VerificationMode)Mockito.never())).onError((Throwable)Mockito.any(Throwable.class));
            ((Observer)Mockito.verify(this.stringObserver, (VerificationMode)Mockito.times((int)2))).onNext("hello");
            ((Observer)Mockito.verify(this.stringObserver, (VerificationMode)Mockito.times((int)1))).onCompleted();
        }

        @Test
        public void testMergeList() {
            Observable<String> o1 = Observable.create(new TestSynchronousObservable());
            Observable<String> o2 = Observable.create(new TestSynchronousObservable());
            ArrayList<Observable<String>> listOfObservables = new ArrayList<Observable<String>>();
            listOfObservables.add(o1);
            listOfObservables.add(o2);
            Observable<String> m = Observable.create(OperationMerge.merge(listOfObservables));
            m.subscribe(this.stringObserver);
            ((Observer)Mockito.verify(this.stringObserver, (VerificationMode)Mockito.never())).onError((Throwable)Mockito.any(Throwable.class));
            ((Observer)Mockito.verify(this.stringObserver, (VerificationMode)Mockito.times((int)1))).onCompleted();
            ((Observer)Mockito.verify(this.stringObserver, (VerificationMode)Mockito.times((int)2))).onNext("hello");
        }

        @Test
        public void testUnSubscribe() {
            TestObservable tA = new TestObservable();
            TestObservable tB = new TestObservable();
            Observable<String> m = Observable.create(OperationMerge.merge(Observable.create(tA), Observable.create(tB)));
            Subscription s = m.subscribe(this.stringObserver);
            tA.sendOnNext("Aone");
            tB.sendOnNext("Bone");
            s.unsubscribe();
            tA.sendOnNext("Atwo");
            tB.sendOnNext("Btwo");
            tA.sendOnCompleted();
            tB.sendOnCompleted();
            ((Observer)Mockito.verify(this.stringObserver, (VerificationMode)Mockito.never())).onError((Throwable)Mockito.any(Throwable.class));
            ((Observer)Mockito.verify(this.stringObserver, (VerificationMode)Mockito.times((int)1))).onNext("Aone");
            ((Observer)Mockito.verify(this.stringObserver, (VerificationMode)Mockito.times((int)1))).onNext("Bone");
            Assert.assertTrue((boolean)tA.unsubscribed);
            Assert.assertTrue((boolean)tB.unsubscribed);
            ((Observer)Mockito.verify(this.stringObserver, (VerificationMode)Mockito.never())).onNext("Atwo");
            ((Observer)Mockito.verify(this.stringObserver, (VerificationMode)Mockito.never())).onNext("Btwo");
            ((Observer)Mockito.verify(this.stringObserver, (VerificationMode)Mockito.never())).onCompleted();
        }

        @Test
        public void testMergeArrayWithThreading() {
            TestASynchronousObservable o1 = new TestASynchronousObservable();
            TestASynchronousObservable o2 = new TestASynchronousObservable();
            Observable<String> m = Observable.create(OperationMerge.merge(Observable.create(o1), Observable.create(o2)));
            m.subscribe(this.stringObserver);
            try {
                o1.t.join();
                o2.t.join();
            }
            catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            ((Observer)Mockito.verify(this.stringObserver, (VerificationMode)Mockito.never())).onError((Throwable)Mockito.any(Throwable.class));
            ((Observer)Mockito.verify(this.stringObserver, (VerificationMode)Mockito.times((int)2))).onNext("hello");
            ((Observer)Mockito.verify(this.stringObserver, (VerificationMode)Mockito.times((int)1))).onCompleted();
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Test
        public void testSynchronizationOfMultipleSequences() throws Throwable {
            TestASynchronousObservable o1 = new TestASynchronousObservable();
            TestASynchronousObservable o2 = new TestASynchronousObservable();
            final CountDownLatch endLatch = new CountDownLatch(1);
            final AtomicInteger concurrentCounter = new AtomicInteger();
            final AtomicInteger totalCounter = new AtomicInteger();
            Observable<String> m = Observable.create(OperationMerge.merge(Observable.create(o1), Observable.create(o2)));
            m.subscribe(new Observer<String>(){

                @Override
                public void onCompleted() {
                }

                @Override
                public void onError(Throwable e) {
                    throw new RuntimeException("failed", e);
                }

                @Override
                public void onNext(String v) {
                    totalCounter.incrementAndGet();
                    concurrentCounter.incrementAndGet();
                    try {
                        endLatch.await();
                    }
                    catch (InterruptedException e) {
                        e.printStackTrace();
                        throw new RuntimeException("failed", e);
                    }
                    finally {
                        concurrentCounter.decrementAndGet();
                    }
                }
            });
            o1.onNextBeingSent.await();
            o2.onNextBeingSent.await();
            Thread.sleep(300L);
            try {
                Assert.assertEquals((long)1L, (long)concurrentCounter.get());
            }
            finally {
                endLatch.countDown();
            }
            try {
                o1.t.join();
                o2.t.join();
            }
            catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            Assert.assertEquals((long)2L, (long)totalCounter.get());
            Assert.assertEquals((long)0L, (long)concurrentCounter.get());
        }

        @Test
        public void testError1() {
            Observable<String> o1 = Observable.create(new TestErrorObservable("four", null, "six"));
            Observable<String> o2 = Observable.create(new TestErrorObservable("one", "two", "three"));
            Observable<String> m = Observable.create(OperationMerge.merge(o1, o2));
            m.subscribe(this.stringObserver);
            ((Observer)Mockito.verify(this.stringObserver, (VerificationMode)Mockito.times((int)1))).onError((Throwable)Mockito.any(NullPointerException.class));
            ((Observer)Mockito.verify(this.stringObserver, (VerificationMode)Mockito.never())).onCompleted();
            ((Observer)Mockito.verify(this.stringObserver, (VerificationMode)Mockito.times((int)0))).onNext("one");
            ((Observer)Mockito.verify(this.stringObserver, (VerificationMode)Mockito.times((int)0))).onNext("two");
            ((Observer)Mockito.verify(this.stringObserver, (VerificationMode)Mockito.times((int)0))).onNext("three");
            ((Observer)Mockito.verify(this.stringObserver, (VerificationMode)Mockito.times((int)1))).onNext("four");
            ((Observer)Mockito.verify(this.stringObserver, (VerificationMode)Mockito.times((int)0))).onNext("five");
            ((Observer)Mockito.verify(this.stringObserver, (VerificationMode)Mockito.times((int)0))).onNext("six");
        }

        @Test
        public void testError2() {
            Observable<String> o1 = Observable.create(new TestErrorObservable("one", "two", "three"));
            Observable<String> o2 = Observable.create(new TestErrorObservable("four", null, "six"));
            Observable<String> o3 = Observable.create(new TestErrorObservable("seven", "eight", null));
            Observable<String> o4 = Observable.create(new TestErrorObservable("nine"));
            Observable<String> m = Observable.create(OperationMerge.merge(o1, o2, o3, o4));
            m.subscribe(this.stringObserver);
            ((Observer)Mockito.verify(this.stringObserver, (VerificationMode)Mockito.times((int)1))).onError((Throwable)Mockito.any(NullPointerException.class));
            ((Observer)Mockito.verify(this.stringObserver, (VerificationMode)Mockito.never())).onCompleted();
            ((Observer)Mockito.verify(this.stringObserver, (VerificationMode)Mockito.times((int)1))).onNext("one");
            ((Observer)Mockito.verify(this.stringObserver, (VerificationMode)Mockito.times((int)1))).onNext("two");
            ((Observer)Mockito.verify(this.stringObserver, (VerificationMode)Mockito.times((int)1))).onNext("three");
            ((Observer)Mockito.verify(this.stringObserver, (VerificationMode)Mockito.times((int)1))).onNext("four");
            ((Observer)Mockito.verify(this.stringObserver, (VerificationMode)Mockito.times((int)0))).onNext("five");
            ((Observer)Mockito.verify(this.stringObserver, (VerificationMode)Mockito.times((int)0))).onNext("six");
            ((Observer)Mockito.verify(this.stringObserver, (VerificationMode)Mockito.times((int)0))).onNext("seven");
            ((Observer)Mockito.verify(this.stringObserver, (VerificationMode)Mockito.times((int)0))).onNext("eight");
            ((Observer)Mockito.verify(this.stringObserver, (VerificationMode)Mockito.times((int)0))).onNext("nine");
        }

        private static class TestErrorObservable
        implements Observable.OnSubscribeFunc<String> {
            String[] valuesToReturn;

            TestErrorObservable(String ... values) {
                this.valuesToReturn = values;
            }

            @Override
            public Subscription onSubscribe(Observer<? super String> observer) {
                for (String s : this.valuesToReturn) {
                    if (s == null) {
                        System.out.println("throwing exception");
                        observer.onError(new NullPointerException());
                        continue;
                    }
                    observer.onNext(s);
                }
                observer.onCompleted();
                return new Subscription(){

                    @Override
                    public void unsubscribe() {
                    }
                };
            }
        }

        private static class TestObservable
        implements Observable.OnSubscribeFunc<String> {
            Observer<? super String> observer = null;
            volatile boolean unsubscribed = false;
            Subscription s = new Subscription(){

                @Override
                public void unsubscribe() {
                    TestObservable.this.unsubscribed = true;
                }
            };

            private TestObservable() {
            }

            public void sendOnCompleted() {
                this.observer.onCompleted();
            }

            public void sendOnNext(String value) {
                this.observer.onNext(value);
            }

            public void sendOnError(Throwable e) {
                this.observer.onError(e);
            }

            @Override
            public Subscription onSubscribe(Observer<? super String> observer) {
                this.observer = observer;
                return this.s;
            }
        }

        private static class TestASynchronousObservable
        implements Observable.OnSubscribeFunc<String> {
            Thread t;
            final CountDownLatch onNextBeingSent = new CountDownLatch(1);

            private TestASynchronousObservable() {
            }

            @Override
            public Subscription onSubscribe(final Observer<? super String> observer) {
                this.t = new Thread(new Runnable(){

                    @Override
                    public void run() {
                        TestASynchronousObservable.this.onNextBeingSent.countDown();
                        observer.onNext("hello");
                        observer.onCompleted();
                    }
                });
                this.t.start();
                return new Subscription(){

                    @Override
                    public void unsubscribe() {
                    }
                };
            }
        }

        private static class TestSynchronousObservable
        implements Observable.OnSubscribeFunc<String> {
            private TestSynchronousObservable() {
            }

            @Override
            public Subscription onSubscribe(Observer<? super String> observer) {
                observer.onNext("hello");
                observer.onCompleted();
                return new Subscription(){

                    @Override
                    public void unsubscribe() {
                    }
                };
            }
        }
    }

    private static final class MergeObservable<T>
    implements Observable.OnSubscribeFunc<T> {
        private final Observable<? extends Observable<? extends T>> sequences;
        private final MergeSubscription ourSubscription = new MergeSubscription();
        private AtomicBoolean stopped = new AtomicBoolean(false);
        private volatile boolean parentCompleted = false;
        private final ConcurrentHashMap<ChildObserver, ChildObserver> childObservers = new ConcurrentHashMap();
        private final ConcurrentHashMap<ChildObserver, Subscription> childSubscriptions = new ConcurrentHashMap();

        private MergeObservable(Observable<? extends Observable<? extends T>> sequences) {
            this.sequences = sequences;
        }

        @Override
        public Subscription onSubscribe(Observer<? super T> actualObserver) {
            SafeObservableSubscription subscription = new SafeObservableSubscription(this.ourSubscription);
            SynchronizedObserver<? super T> synchronizedObserver = new SynchronizedObserver<T>(actualObserver, subscription);
            this.sequences.subscribe(new ParentObserver(synchronizedObserver));
            return subscription;
        }

        private class ChildObserver
        implements Observer<T> {
            private final Observer<T> actualObserver;

            public ChildObserver(Observer<T> actualObserver) {
                this.actualObserver = actualObserver;
            }

            @Override
            public void onCompleted() {
                MergeObservable.this.childObservers.remove(this);
                if (!MergeObservable.this.stopped.get() && MergeObservable.this.childObservers.size() == 0 && MergeObservable.this.parentCompleted && MergeObservable.this.ourSubscription.stop()) {
                    this.actualObserver.onCompleted();
                }
            }

            @Override
            public void onError(Throwable e) {
                if (!MergeObservable.this.stopped.get() && MergeObservable.this.ourSubscription.stop()) {
                    this.actualObserver.onError(e);
                }
            }

            @Override
            public void onNext(T args) {
                if (!MergeObservable.this.stopped.get()) {
                    this.actualObserver.onNext(args);
                }
            }
        }

        private class ParentObserver
        implements Observer<Observable<? extends T>> {
            private final Observer<T> actualObserver;

            public ParentObserver(Observer<T> actualObserver) {
                this.actualObserver = actualObserver;
            }

            @Override
            public void onCompleted() {
                MergeObservable.this.parentCompleted = true;
                if (MergeObservable.this.childObservers.size() == 0 && !MergeObservable.this.stopped.get() && MergeObservable.this.ourSubscription.stop()) {
                    this.actualObserver.onCompleted();
                }
            }

            @Override
            public void onError(Throwable e) {
                this.actualObserver.onError(e);
            }

            @Override
            public void onNext(Observable<? extends T> childObservable) {
                if (MergeObservable.this.stopped.get()) {
                    return;
                }
                if (childObservable == null) {
                    throw new IllegalArgumentException("Observable<T> can not be null.");
                }
                ChildObserver _w = new ChildObserver(this.actualObserver);
                MergeObservable.this.childObservers.put(_w, _w);
                Subscription _subscription = childObservable.subscribe(_w);
                MergeObservable.this.childSubscriptions.put(_w, _subscription);
            }
        }

        private class MergeSubscription
        implements Subscription {
            private MergeSubscription() {
            }

            @Override
            public void unsubscribe() {
                this.stop();
            }

            public boolean stop() {
                boolean didSet = MergeObservable.this.stopped.compareAndSet(false, true);
                if (didSet) {
                    for (Subscription _s : MergeObservable.this.childSubscriptions.values()) {
                        _s.unsubscribe();
                    }
                    return true;
                }
                return false;
            }
        }
    }
}

