/*
 * Decompiled with CFR 0.152.
 */
package rx.operators;

import java.util.concurrent.atomic.AtomicInteger;
import org.junit.Test;
import org.mockito.InOrder;
import org.mockito.Mockito;
import rx.Observable;
import rx.Observer;
import rx.Scheduler;
import rx.Subscription;
import rx.concurrency.Schedulers;
import rx.subscriptions.CompositeSubscription;
import rx.subscriptions.MultipleAssignmentSubscription;
import rx.subscriptions.Subscriptions;
import rx.util.functions.Func2;

public class OperationRetry {
    private static final int INFINITE_RETRY = -1;

    public static <T> Observable.OnSubscribeFunc<T> retry(Observable<T> observable, int retryCount) {
        return new Retry<T>(observable, retryCount);
    }

    public static <T> Observable.OnSubscribeFunc<T> retry(Observable<T> observable) {
        return new Retry<T>(observable, -1);
    }

    public static class UnitTest {
        @Test
        public void testOriginFails() {
            Observer observer = (Observer)Mockito.mock(Observer.class);
            Observable<String> origin = Observable.create(new FuncWithErrors(2));
            origin.subscribe(observer);
            InOrder inOrder = Mockito.inOrder((Object[])new Object[]{observer});
            ((Observer)inOrder.verify((Object)observer, Mockito.times((int)1))).onNext("beginningEveryTime");
            ((Observer)inOrder.verify((Object)observer, Mockito.times((int)1))).onError((Throwable)Mockito.any(RuntimeException.class));
            ((Observer)inOrder.verify((Object)observer, Mockito.never())).onNext("onSuccessOnly");
            ((Observer)inOrder.verify((Object)observer, Mockito.never())).onCompleted();
        }

        @Test
        public void testRetryFail() {
            int NUM_RETRIES = 1;
            int NUM_FAILURES = 2;
            Observer observer = (Observer)Mockito.mock(Observer.class);
            Observable<String> origin = Observable.create(new FuncWithErrors(NUM_FAILURES));
            Observable.create(OperationRetry.retry(origin, NUM_RETRIES)).subscribe(observer);
            InOrder inOrder = Mockito.inOrder((Object[])new Object[]{observer});
            ((Observer)inOrder.verify((Object)observer, Mockito.times((int)(1 + NUM_RETRIES)))).onNext("beginningEveryTime");
            ((Observer)inOrder.verify((Object)observer, Mockito.times((int)1))).onError((Throwable)Mockito.any(RuntimeException.class));
            ((Observer)inOrder.verify((Object)observer, Mockito.never())).onNext("onSuccessOnly");
            ((Observer)inOrder.verify((Object)observer, Mockito.never())).onCompleted();
            inOrder.verifyNoMoreInteractions();
        }

        @Test
        public void testRetrySuccess() {
            int NUM_RETRIES = 3;
            int NUM_FAILURES = 2;
            Observer observer = (Observer)Mockito.mock(Observer.class);
            Observable<String> origin = Observable.create(new FuncWithErrors(NUM_FAILURES));
            Observable.create(OperationRetry.retry(origin, NUM_RETRIES)).subscribe(observer);
            InOrder inOrder = Mockito.inOrder((Object[])new Object[]{observer});
            ((Observer)inOrder.verify((Object)observer, Mockito.times((int)(1 + NUM_FAILURES)))).onNext("beginningEveryTime");
            ((Observer)inOrder.verify((Object)observer, Mockito.never())).onError((Throwable)Mockito.any(Throwable.class));
            ((Observer)inOrder.verify((Object)observer, Mockito.times((int)1))).onNext("onSuccessOnly");
            ((Observer)inOrder.verify((Object)observer, Mockito.times((int)1))).onCompleted();
            inOrder.verifyNoMoreInteractions();
        }

        @Test
        public void testInfiniteRetry() {
            int NUM_FAILURES = 20;
            Observer observer = (Observer)Mockito.mock(Observer.class);
            Observable<String> origin = Observable.create(new FuncWithErrors(NUM_FAILURES));
            Observable.create(OperationRetry.retry(origin)).subscribe(observer);
            InOrder inOrder = Mockito.inOrder((Object[])new Object[]{observer});
            ((Observer)inOrder.verify((Object)observer, Mockito.times((int)(1 + NUM_FAILURES)))).onNext("beginningEveryTime");
            ((Observer)inOrder.verify((Object)observer, Mockito.never())).onError((Throwable)Mockito.any(Throwable.class));
            ((Observer)inOrder.verify((Object)observer, Mockito.times((int)1))).onNext("onSuccessOnly");
            ((Observer)inOrder.verify((Object)observer, Mockito.times((int)1))).onCompleted();
            inOrder.verifyNoMoreInteractions();
        }

        public static class FuncWithErrors
        implements Observable.OnSubscribeFunc<String> {
            private final int numFailures;
            private final AtomicInteger count = new AtomicInteger(0);

            FuncWithErrors(int count) {
                this.numFailures = count;
            }

            @Override
            public Subscription onSubscribe(Observer<? super String> o) {
                o.onNext("beginningEveryTime");
                if (this.count.incrementAndGet() <= this.numFailures) {
                    o.onError(new RuntimeException("forced failure: " + this.count.get()));
                } else {
                    o.onNext("onSuccessOnly");
                    o.onCompleted();
                }
                return Subscriptions.empty();
            }
        }
    }

    private static class Retry<T>
    implements Observable.OnSubscribeFunc<T> {
        private final Observable<T> source;
        private final int retryCount;
        private final AtomicInteger attempts = new AtomicInteger(0);
        private final CompositeSubscription subscription = new CompositeSubscription(new Subscription[0]);

        public Retry(Observable<T> source, int retryCount) {
            this.source = source;
            this.retryCount = retryCount;
        }

        @Override
        public Subscription onSubscribe(Observer<? super T> observer) {
            MultipleAssignmentSubscription rescursiveSubscription = new MultipleAssignmentSubscription();
            this.subscription.add(Schedulers.currentThread().schedule(rescursiveSubscription, this.attemptSubscription(observer)));
            this.subscription.add(rescursiveSubscription);
            return this.subscription;
        }

        private Func2<Scheduler, MultipleAssignmentSubscription, Subscription> attemptSubscription(final Observer<? super T> observer) {
            return new Func2<Scheduler, MultipleAssignmentSubscription, Subscription>(){

                @Override
                public Subscription call(final Scheduler scheduler, final MultipleAssignmentSubscription rescursiveSubscription) {
                    Retry.this.attempts.incrementAndGet();
                    return Retry.this.source.subscribe(new Observer<T>(){

                        @Override
                        public void onCompleted() {
                            observer.onCompleted();
                        }

                        @Override
                        public void onError(Throwable e) {
                            if (!(Retry.this.retryCount != -1 && Retry.this.attempts.get() > Retry.this.retryCount || Retry.this.subscription.isUnsubscribed())) {
                                rescursiveSubscription.setSubscription(scheduler.schedule(rescursiveSubscription, Retry.this.attemptSubscription(observer)));
                            } else {
                                observer.onError(e);
                            }
                        }

                        @Override
                        public void onNext(T v) {
                            observer.onNext(v);
                        }
                    });
                }
            };
        }
    }
}

