/*
 * Decompiled with CFR 0.152.
 */
package rx.subjects;

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantLock;
import rx.Notification;
import rx.Observable;
import rx.Observer;
import rx.Subscription;
import rx.operators.SafeObservableSubscription;
import rx.subjects.Subject;
import rx.subscriptions.Subscriptions;

public class AsyncSubject<T>
extends Subject<T, T> {
    private final AsyncSubjectState<T> state;

    public static <T> AsyncSubject<T> create() {
        final AsyncSubjectState state = new AsyncSubjectState();
        Observable.OnSubscribeFunc onSubscribe = new Observable.OnSubscribeFunc<T>(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public Subscription onSubscribe(Observer<? super T> observer) {
                state.SUBSCRIPTION_LOCK.lock();
                try {
                    if (state.completed.get()) {
                        AsyncSubject.emitNotificationToObserver(state, observer);
                        Subscription subscription = Subscriptions.empty();
                        return subscription;
                    }
                    final SafeObservableSubscription subscription = new SafeObservableSubscription();
                    subscription.wrap(new Subscription(){

                        @Override
                        public void unsubscribe() {
                            state.observers.remove(subscription);
                        }
                    });
                    state.observers.put(subscription, observer);
                    SafeObservableSubscription safeObservableSubscription = subscription;
                    return safeObservableSubscription;
                }
                finally {
                    state.SUBSCRIPTION_LOCK.unlock();
                }
            }
        };
        return new AsyncSubject(onSubscribe, state);
    }

    private static <T> void emitNotificationToObserver(AsyncSubjectState<T> state, Observer<? super T> observer) {
        Notification finalValue = (Notification)((AsyncSubjectState)state).currentValue.get();
        if (finalValue != null) {
            if (finalValue.isOnNext()) {
                observer.onNext(finalValue.getValue());
            } else if (finalValue.isOnError()) {
                observer.onError(finalValue.getThrowable());
            }
        }
        observer.onCompleted();
    }

    protected AsyncSubject(Observable.OnSubscribeFunc<T> onSubscribe, AsyncSubjectState<T> state) {
        super(onSubscribe);
        this.state = state;
    }

    @Override
    public void onCompleted() {
        this.terminalState();
    }

    @Override
    public void onError(Throwable e) {
        ((AsyncSubjectState)this.state).currentValue.set(new Notification(e));
        this.terminalState();
    }

    @Override
    public void onNext(T v) {
        ((AsyncSubjectState)this.state).currentValue.set(new Notification<T>(v));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void terminalState() {
        ((AsyncSubjectState)this.state).SUBSCRIPTION_LOCK.lock();
        try {
            if (((AsyncSubjectState)this.state).completed.compareAndSet(false, true)) {
                for (Subscription s : ((AsyncSubjectState)this.state).observers.keySet()) {
                    AsyncSubject.emitNotificationToObserver(this.state, (Observer)((AsyncSubjectState)this.state).observers.get(s));
                    ((AsyncSubjectState)this.state).observers.remove(s);
                }
            }
        }
        finally {
            ((AsyncSubjectState)this.state).SUBSCRIPTION_LOCK.unlock();
        }
    }

    private static class AsyncSubjectState<T> {
        private final ConcurrentHashMap<Subscription, Observer<? super T>> observers = new ConcurrentHashMap();
        private final AtomicReference<Notification<T>> currentValue = new AtomicReference();
        private final AtomicBoolean completed = new AtomicBoolean();
        private final ReentrantLock SUBSCRIPTION_LOCK = new ReentrantLock();

        private AsyncSubjectState() {
        }
    }
}

