/*
 * Decompiled with CFR 0.152.
 */
package rx.operators;

import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
import org.mockito.verification.VerificationMode;
import rx.Observable;
import rx.Observer;
import rx.Subscription;
import rx.operators.SafeObservableSubscription;
import rx.subscriptions.Subscriptions;

public final class OperationTake {
    public static <T> Observable.OnSubscribeFunc<T> take(final Observable<? extends T> items, final int num) {
        return new Observable.OnSubscribeFunc<T>(){

            @Override
            public Subscription onSubscribe(Observer<? super T> observer) {
                return new Take(items, num).onSubscribe(observer);
            }
        };
    }

    public static class UnitTest {
        @Test
        public void testTake1() {
            Observable<String> w = Observable.from("one", "two", "three");
            Observable<String> take = Observable.create(OperationTake.take(w, 2));
            Observer aObserver = (Observer)Mockito.mock(Observer.class);
            take.subscribe(aObserver);
            ((Observer)Mockito.verify((Object)aObserver, (VerificationMode)Mockito.times((int)1))).onNext("one");
            ((Observer)Mockito.verify((Object)aObserver, (VerificationMode)Mockito.times((int)1))).onNext("two");
            ((Observer)Mockito.verify((Object)aObserver, (VerificationMode)Mockito.never())).onNext("three");
            ((Observer)Mockito.verify((Object)aObserver, (VerificationMode)Mockito.never())).onError((Throwable)Mockito.any(Throwable.class));
            ((Observer)Mockito.verify((Object)aObserver, (VerificationMode)Mockito.times((int)1))).onCompleted();
        }

        @Test
        public void testTake2() {
            Observable<String> w = Observable.from("one", "two", "three");
            Observable<String> take = Observable.create(OperationTake.take(w, 1));
            Observer aObserver = (Observer)Mockito.mock(Observer.class);
            take.subscribe(aObserver);
            ((Observer)Mockito.verify((Object)aObserver, (VerificationMode)Mockito.times((int)1))).onNext("one");
            ((Observer)Mockito.verify((Object)aObserver, (VerificationMode)Mockito.never())).onNext("two");
            ((Observer)Mockito.verify((Object)aObserver, (VerificationMode)Mockito.never())).onNext("three");
            ((Observer)Mockito.verify((Object)aObserver, (VerificationMode)Mockito.never())).onError((Throwable)Mockito.any(Throwable.class));
            ((Observer)Mockito.verify((Object)aObserver, (VerificationMode)Mockito.times((int)1))).onCompleted();
        }

        @Test
        public void testTakeDoesntLeakErrors() {
            Observable<String> source = Observable.create(new Observable.OnSubscribeFunc<String>(){

                @Override
                public Subscription onSubscribe(Observer<? super String> observer) {
                    observer.onNext("one");
                    observer.onError(new Throwable("test failed"));
                    return Subscriptions.empty();
                }
            });
            Observer aObserver = (Observer)Mockito.mock(Observer.class);
            Observable.create(OperationTake.take(source, 1)).subscribe(aObserver);
            ((Observer)Mockito.verify((Object)aObserver, (VerificationMode)Mockito.times((int)1))).onNext("one");
            ((Observer)Mockito.verify((Object)aObserver, (VerificationMode)Mockito.never())).onError((Throwable)Mockito.any(Throwable.class));
            ((Observer)Mockito.verify((Object)aObserver, (VerificationMode)Mockito.times((int)1))).onCompleted();
            Mockito.verifyNoMoreInteractions((Object[])new Object[]{aObserver});
        }

        @Test
        public void testTakeZeroDoesntLeakError() {
            final AtomicBoolean subscribed = new AtomicBoolean(false);
            final AtomicBoolean unSubscribed = new AtomicBoolean(false);
            Observable<String> source = Observable.create(new Observable.OnSubscribeFunc<String>(){

                @Override
                public Subscription onSubscribe(Observer<? super String> observer) {
                    subscribed.set(true);
                    observer.onError(new Throwable("test failed"));
                    return new Subscription(){

                        @Override
                        public void unsubscribe() {
                            unSubscribed.set(true);
                        }
                    };
                }
            });
            Observer aObserver = (Observer)Mockito.mock(Observer.class);
            Observable.create(OperationTake.take(source, 0)).subscribe(aObserver);
            Assert.assertTrue((String)"source subscribed", (boolean)subscribed.get());
            Assert.assertTrue((String)"source unsubscribed", (boolean)unSubscribed.get());
            ((Observer)Mockito.verify((Object)aObserver, (VerificationMode)Mockito.never())).onNext(Mockito.anyString());
            ((Observer)Mockito.verify((Object)aObserver, (VerificationMode)Mockito.never())).onError((Throwable)Mockito.any(Throwable.class));
            ((Observer)Mockito.verify((Object)aObserver, (VerificationMode)Mockito.times((int)1))).onCompleted();
            Mockito.verifyNoMoreInteractions((Object[])new Object[]{aObserver});
        }

        @Test
        public void testUnsubscribeAfterTake() {
            Subscription s = (Subscription)Mockito.mock(Subscription.class);
            TestObservableFunc f = new TestObservableFunc(s, "one", "two", "three");
            Observable<String> w = Observable.create(f);
            Observer aObserver = (Observer)Mockito.mock(Observer.class);
            Observable<String> take = Observable.create(OperationTake.take(w, 1));
            take.subscribe(aObserver);
            try {
                f.t.join();
            }
            catch (Throwable e) {
                e.printStackTrace();
                Assert.fail((String)e.getMessage());
            }
            System.out.println("TestObservable thread finished");
            ((Observer)Mockito.verify((Object)aObserver, (VerificationMode)Mockito.times((int)1))).onNext("one");
            ((Observer)Mockito.verify((Object)aObserver, (VerificationMode)Mockito.never())).onNext("two");
            ((Observer)Mockito.verify((Object)aObserver, (VerificationMode)Mockito.never())).onNext("three");
            ((Observer)Mockito.verify((Object)aObserver, (VerificationMode)Mockito.times((int)1))).onCompleted();
            ((Subscription)Mockito.verify((Object)s, (VerificationMode)Mockito.times((int)1))).unsubscribe();
            Mockito.verifyNoMoreInteractions((Object[])new Object[]{aObserver});
        }

        private static class TestObservableFunc
        implements Observable.OnSubscribeFunc<String> {
            final Subscription s;
            final String[] values;
            Thread t = null;

            public TestObservableFunc(Subscription s, String ... values) {
                this.s = s;
                this.values = values;
            }

            @Override
            public Subscription onSubscribe(final Observer<? super String> observer) {
                System.out.println("TestObservable subscribed to ...");
                this.t = new Thread(new Runnable(){

                    @Override
                    public void run() {
                        try {
                            System.out.println("running TestObservable thread");
                            for (String s : TestObservableFunc.this.values) {
                                System.out.println("TestObservable onNext: " + s);
                                observer.onNext(s);
                            }
                            observer.onCompleted();
                        }
                        catch (Throwable e) {
                            throw new RuntimeException(e);
                        }
                    }
                });
                System.out.println("starting TestObservable thread");
                this.t.start();
                System.out.println("done starting TestObservable thread");
                return this.s;
            }
        }
    }

    private static class Take<T>
    implements Observable.OnSubscribeFunc<T> {
        private final Observable<? extends T> items;
        private final int num;
        private final SafeObservableSubscription subscription = new SafeObservableSubscription();

        private Take(Observable<? extends T> items, int num) {
            this.items = items;
            this.num = num;
        }

        @Override
        public Subscription onSubscribe(Observer<? super T> observer) {
            if (this.num < 1) {
                this.items.subscribe(new Observer<T>(){

                    @Override
                    public void onCompleted() {
                    }

                    @Override
                    public void onError(Throwable e) {
                    }

                    @Override
                    public void onNext(T args) {
                    }
                }).unsubscribe();
                observer.onCompleted();
                return Subscriptions.empty();
            }
            return this.subscription.wrap(this.items.subscribe(new ItemObserver(observer)));
        }

        private class ItemObserver
        implements Observer<T> {
            private final Observer<? super T> observer;
            private final AtomicInteger counter = new AtomicInteger();

            public ItemObserver(Observer<? super T> observer) {
                this.observer = observer;
            }

            @Override
            public void onCompleted() {
                if (this.counter.getAndSet(Take.this.num) < Take.this.num) {
                    this.observer.onCompleted();
                }
            }

            @Override
            public void onError(Throwable e) {
                if (this.counter.getAndSet(Take.this.num) < Take.this.num) {
                    this.observer.onError(e);
                }
            }

            @Override
            public void onNext(T args) {
                int count = this.counter.incrementAndGet();
                if (count <= Take.this.num) {
                    this.observer.onNext(args);
                    if (count == Take.this.num) {
                        this.observer.onCompleted();
                    }
                }
                if (count >= Take.this.num) {
                    Take.this.subscription.unsubscribe();
                }
            }
        }
    }
}

