/*
 * Decompiled with CFR 0.152.
 */
package rx.operators;

import org.junit.Test;
import org.mockito.Mockito;
import org.mockito.verification.VerificationMode;
import rx.Observable;
import rx.Observer;
import rx.Scheduler;
import rx.Subscription;
import rx.concurrency.Schedulers;
import rx.operators.OperatorTester;
import rx.util.functions.Action0;
import rx.util.functions.Func2;

public class OperationSubscribeOn {
    public static <T> Observable.OnSubscribeFunc<T> subscribeOn(Observable<? extends T> source, Scheduler scheduler) {
        return new SubscribeOn<T>(source, scheduler);
    }

    public static class UnitTest {
        @Test
        public void testSubscribeOn() {
            Observable<Integer> w = Observable.from(1, Integer.valueOf(2), 3);
            Scheduler scheduler = (Scheduler)Mockito.spy((Object)OperatorTester.UnitTest.forwardingScheduler(Schedulers.immediate()));
            Observer observer = (Observer)Mockito.mock(Observer.class);
            Subscription subscription = Observable.create(OperationSubscribeOn.subscribeOn(w, scheduler)).subscribe(observer);
            ((Scheduler)Mockito.verify((Object)scheduler, (VerificationMode)Mockito.times((int)1))).schedule(Mockito.isNull(), (Func2)Mockito.any(Func2.class));
            subscription.unsubscribe();
            ((Scheduler)Mockito.verify((Object)scheduler, (VerificationMode)Mockito.times((int)1))).schedule((Action0)Mockito.any(Action0.class));
            Mockito.verifyNoMoreInteractions((Object[])new Object[]{scheduler});
            ((Observer)Mockito.verify((Object)observer, (VerificationMode)Mockito.times((int)1))).onNext(1);
            ((Observer)Mockito.verify((Object)observer, (VerificationMode)Mockito.times((int)1))).onNext(2);
            ((Observer)Mockito.verify((Object)observer, (VerificationMode)Mockito.times((int)1))).onNext(3);
            ((Observer)Mockito.verify((Object)observer, (VerificationMode)Mockito.times((int)1))).onCompleted();
        }
    }

    private static class ScheduledSubscription
    implements Subscription {
        private final Subscription underlying;
        private final Scheduler scheduler;

        private ScheduledSubscription(Subscription underlying, Scheduler scheduler) {
            this.underlying = underlying;
            this.scheduler = scheduler;
        }

        @Override
        public void unsubscribe() {
            this.scheduler.schedule(new Action0(){

                @Override
                public void call() {
                    ScheduledSubscription.this.underlying.unsubscribe();
                }
            });
        }
    }

    private static class SubscribeOn<T>
    implements Observable.OnSubscribeFunc<T> {
        private final Observable<? extends T> source;
        private final Scheduler scheduler;

        public SubscribeOn(Observable<? extends T> source, Scheduler scheduler) {
            this.source = source;
            this.scheduler = scheduler;
        }

        @Override
        public Subscription onSubscribe(final Observer<? super T> observer) {
            return this.scheduler.schedule(null, new Func2<Scheduler, T, Subscription>(){

                @Override
                public Subscription call(Scheduler s, T t) {
                    return new ScheduledSubscription(SubscribeOn.this.source.subscribe(observer), SubscribeOn.this.scheduler);
                }
            });
        }
    }
}

