/*
 * Decompiled with CFR 0.152.
 */
package rx.operators;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import rx.Observable;
import rx.Observer;
import rx.Scheduler;
import rx.Subscription;
import rx.concurrency.Schedulers;
import rx.operators.SafeObservableSubscription;
import rx.subscriptions.CompositeSubscription;
import rx.subscriptions.SerialSubscription;
import rx.util.functions.Action0;
import rx.util.functions.Func0;

public final class OperationTimeout {
    public static <T> Observable.OnSubscribeFunc<T> timeout(Observable<? extends T> source, long timeout, TimeUnit timeUnit) {
        return new Timeout(source, timeout, timeUnit, null, Schedulers.threadPoolForComputation());
    }

    public static <T> Observable.OnSubscribeFunc<T> timeout(Observable<? extends T> sequence, long timeout, TimeUnit timeUnit, Observable<? extends T> other) {
        return new Timeout(sequence, timeout, timeUnit, other, Schedulers.threadPoolForComputation());
    }

    public static <T> Observable.OnSubscribeFunc<T> timeout(Observable<? extends T> source, long timeout, TimeUnit timeUnit, Scheduler scheduler) {
        return new Timeout(source, timeout, timeUnit, null, scheduler);
    }

    public static <T> Observable.OnSubscribeFunc<T> timeout(Observable<? extends T> sequence, long timeout, TimeUnit timeUnit, Observable<? extends T> other, Scheduler scheduler) {
        return new Timeout(sequence, timeout, timeUnit, other, scheduler);
    }

    private static class Timeout<T>
    implements Observable.OnSubscribeFunc<T> {
        private final Observable<? extends T> source;
        private final long timeout;
        private final TimeUnit timeUnit;
        private final Scheduler scheduler;
        private final Observable<? extends T> other;

        private Timeout(Observable<? extends T> source, long timeout, TimeUnit timeUnit, Observable<? extends T> other, Scheduler scheduler) {
            this.source = source;
            this.timeout = timeout;
            this.timeUnit = timeUnit;
            this.other = other;
            this.scheduler = scheduler;
        }

        @Override
        public Subscription onSubscribe(final Observer<? super T> observer) {
            final AtomicBoolean terminated = new AtomicBoolean(false);
            final AtomicLong actual = new AtomicLong(0L);
            final SerialSubscription serial = new SerialSubscription();
            final Object gate = new Object();
            CompositeSubscription composite = new CompositeSubscription(new Subscription[0]);
            final Func0<Subscription> schedule = new Func0<Subscription>(){

                @Override
                public Subscription call() {
                    final long expected = actual.get();
                    return Timeout.this.scheduler.schedule(new Action0(){

                        /*
                         * WARNING - Removed try catching itself - possible behaviour change.
                         */
                        @Override
                        public void call() {
                            boolean timeoutWins = false;
                            Object object = gate;
                            synchronized (object) {
                                if (expected == actual.get() && !terminated.getAndSet(true)) {
                                    timeoutWins = true;
                                }
                            }
                            if (timeoutWins) {
                                if (Timeout.this.other == null) {
                                    observer.onError(new TimeoutException());
                                } else {
                                    serial.setSubscription(Timeout.this.other.subscribe(observer));
                                }
                            }
                        }
                    }, Timeout.this.timeout, Timeout.this.timeUnit);
                }
            };
            SafeObservableSubscription subscription = new SafeObservableSubscription();
            composite.add(subscription.wrap(this.source.subscribe(new Observer<T>(){

                /*
                 * WARNING - Removed try catching itself - possible behaviour change.
                 */
                @Override
                public void onNext(T value) {
                    boolean onNextWins = false;
                    Object object = gate;
                    synchronized (object) {
                        if (!terminated.get()) {
                            actual.incrementAndGet();
                            onNextWins = true;
                        }
                    }
                    if (onNextWins) {
                        serial.setSubscription((Subscription)schedule.call());
                        observer.onNext(value);
                    }
                }

                /*
                 * WARNING - Removed try catching itself - possible behaviour change.
                 */
                @Override
                public void onError(Throwable error) {
                    boolean onErrorWins = false;
                    Object object = gate;
                    synchronized (object) {
                        if (!terminated.getAndSet(true)) {
                            onErrorWins = true;
                        }
                    }
                    if (onErrorWins) {
                        serial.unsubscribe();
                        observer.onError(error);
                    }
                }

                /*
                 * WARNING - Removed try catching itself - possible behaviour change.
                 */
                @Override
                public void onCompleted() {
                    boolean onCompletedWins = false;
                    Object object = gate;
                    synchronized (object) {
                        if (!terminated.getAndSet(true)) {
                            onCompletedWins = true;
                        }
                    }
                    if (onCompletedWins) {
                        serial.unsubscribe();
                        observer.onCompleted();
                    }
                }
            })));
            composite.add(serial);
            serial.setSubscription((Subscription)schedule.call());
            return composite;
        }
    }
}

