/*
 * Decompiled with CFR 0.152.
 */
package rx.operators;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.InOrder;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.mockito.verification.VerificationMode;
import rx.Observable;
import rx.Observer;
import rx.Scheduler;
import rx.Subscription;
import rx.concurrency.ImmediateScheduler;
import rx.concurrency.Schedulers;
import rx.operators.ScheduledObserver;
import rx.subscriptions.CompositeSubscription;

public class OperationObserveOn {
    public static <T> Observable.OnSubscribeFunc<T> observeOn(Observable<? extends T> source, Scheduler scheduler) {
        return new ObserveOn<T>(source, scheduler);
    }

    public static class UnitTest {
        @Test
        public void testObserveOn() {
            Observer observer = (Observer)Mockito.mock(Observer.class);
            Observable.create(OperationObserveOn.observeOn(Observable.from(1, Integer.valueOf(2), 3), Schedulers.immediate())).subscribe(observer);
            ((Observer)Mockito.verify((Object)observer, (VerificationMode)Mockito.times((int)1))).onNext(1);
            ((Observer)Mockito.verify((Object)observer, (VerificationMode)Mockito.times((int)1))).onNext(2);
            ((Observer)Mockito.verify((Object)observer, (VerificationMode)Mockito.times((int)1))).onNext(3);
            ((Observer)Mockito.verify((Object)observer, (VerificationMode)Mockito.times((int)1))).onCompleted();
        }

        @Test
        public void testOrdering() throws InterruptedException {
            Observable<String> obs = Observable.from("one", null, "two", "three", "four");
            Observer observer = (Observer)Mockito.mock(Observer.class);
            InOrder inOrder = Mockito.inOrder((Object[])new Object[]{observer});
            final CountDownLatch completedLatch = new CountDownLatch(1);
            ((Observer)Mockito.doAnswer((Answer)new Answer<Void>(){

                public Void answer(InvocationOnMock invocation) throws Throwable {
                    completedLatch.countDown();
                    return null;
                }
            }).when((Object)observer)).onCompleted();
            obs.observeOn(Schedulers.threadPoolForComputation()).subscribe(observer);
            if (!completedLatch.await(1000L, TimeUnit.MILLISECONDS)) {
                Assert.fail((String)"timed out waiting");
            }
            ((Observer)inOrder.verify((Object)observer, Mockito.times((int)1))).onNext("one");
            ((Observer)inOrder.verify((Object)observer, Mockito.times((int)1))).onNext(null);
            ((Observer)inOrder.verify((Object)observer, Mockito.times((int)1))).onNext("two");
            ((Observer)inOrder.verify((Object)observer, Mockito.times((int)1))).onNext("three");
            ((Observer)inOrder.verify((Object)observer, Mockito.times((int)1))).onNext("four");
            ((Observer)inOrder.verify((Object)observer, Mockito.times((int)1))).onCompleted();
            inOrder.verifyNoMoreInteractions();
        }
    }

    private static class ObserveOn<T>
    implements Observable.OnSubscribeFunc<T> {
        private final Observable<? extends T> source;
        private final Scheduler scheduler;

        public ObserveOn(Observable<? extends T> source, Scheduler scheduler) {
            this.source = source;
            this.scheduler = scheduler;
        }

        @Override
        public Subscription onSubscribe(Observer<? super T> observer) {
            if (this.scheduler instanceof ImmediateScheduler) {
                return this.source.subscribe(observer);
            }
            CompositeSubscription s = new CompositeSubscription(new Subscription[0]);
            s.add(this.source.subscribe(new ScheduledObserver<T>(s, observer, this.scheduler)));
            return s;
        }
    }
}

