/*
 * Decompiled with CFR 0.152.
 */
package rx.subjects;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.InOrder;
import org.mockito.Mockito;
import org.mockito.verification.VerificationMode;
import rx.Notification;
import rx.Observable;
import rx.Observer;
import rx.Subscription;
import rx.operators.SafeObservableSubscription;
import rx.subjects.Subject;
import rx.subjects.UnsubscribeTester;
import rx.util.functions.Action1;
import rx.util.functions.Func0;
import rx.util.functions.Func1;

public class PublishSubject<T>
extends Subject<T, T> {
    private final ConcurrentHashMap<Subscription, Observer<? super T>> observers;

    public static <T> PublishSubject<T> create() {
        final ConcurrentHashMap<Subscription, Observer<? super T>> observers = new ConcurrentHashMap<Subscription, Observer<? super T>>();
        Observable.OnSubscribeFunc onSubscribe = new Observable.OnSubscribeFunc<T>(){

            @Override
            public Subscription onSubscribe(Observer<? super T> observer) {
                final SafeObservableSubscription subscription = new SafeObservableSubscription();
                subscription.wrap(new Subscription(){

                    @Override
                    public void unsubscribe() {
                        observers.remove(subscription);
                    }
                });
                observers.put(subscription, observer);
                return subscription;
            }
        };
        return new PublishSubject<T>(onSubscribe, observers);
    }

    protected PublishSubject(Observable.OnSubscribeFunc<T> onSubscribe, ConcurrentHashMap<Subscription, Observer<? super T>> observers) {
        super(onSubscribe);
        this.observers = observers;
    }

    @Override
    public void onCompleted() {
        for (Observer<T> observer : this.snapshotOfValues()) {
            observer.onCompleted();
        }
        this.observers.clear();
    }

    @Override
    public void onError(Throwable e) {
        for (Observer<T> observer : this.snapshotOfValues()) {
            observer.onError(e);
        }
        this.observers.clear();
    }

    @Override
    public void onNext(T args) {
        for (Observer<T> observer : this.snapshotOfValues()) {
            observer.onNext(args);
        }
    }

    private Collection<Observer<? super T>> snapshotOfValues() {
        return new ArrayList<Observer<? super T>>(this.observers.values());
    }

    public static class UnitTest {
        private final Throwable testException = new Throwable();

        @Test
        public void test() {
            PublishSubject subject = PublishSubject.create();
            final AtomicReference actualRef = new AtomicReference();
            Observable<List<Notification<List<Notification<Integer>>>>> wNotificationsList = subject.materialize().toList();
            wNotificationsList.subscribe(new Action1<List<Notification<Integer>>>(){

                @Override
                public void call(List<Notification<Integer>> actual) {
                    actualRef.set(actual);
                }
            });
            Subscription sub = Observable.create(new Observable.OnSubscribeFunc<Integer>(){

                @Override
                public Subscription onSubscribe(final Observer<? super Integer> observer) {
                    final AtomicBoolean stop = new AtomicBoolean(false);
                    new Thread(){

                        @Override
                        public void run() {
                            int i = 1;
                            while (!stop.get()) {
                                observer.onNext(i++);
                            }
                            observer.onCompleted();
                        }
                    }.start();
                    return new Subscription(){

                        @Override
                        public void unsubscribe() {
                            stop.set(true);
                        }
                    };
                }
            }).subscribe(subject);
            Observable.from(-1, Integer.valueOf(-2), -3).subscribe(subject);
            ArrayList expected = new ArrayList();
            expected.add(new Notification<Integer>(-1));
            expected.add(new Notification<Integer>(-2));
            expected.add(new Notification<Integer>(-3));
            expected.add(new Notification());
            junit.framework.Assert.assertTrue((boolean)((List)actualRef.get()).containsAll(expected));
            sub.unsubscribe();
        }

        @Test
        public void testCompleted() {
            PublishSubject<String> subject = PublishSubject.create();
            Observer aObserver = (Observer)Mockito.mock(Observer.class);
            subject.subscribe(aObserver);
            subject.onNext("one");
            subject.onNext("two");
            subject.onNext("three");
            subject.onCompleted();
            Observer anotherObserver = (Observer)Mockito.mock(Observer.class);
            subject.subscribe(anotherObserver);
            subject.onNext("four");
            subject.onCompleted();
            subject.onError(new Throwable());
            this.assertCompletedObserver(aObserver);
        }

        private void assertCompletedObserver(Observer<String> aObserver) {
            ((Observer)Mockito.verify(aObserver, (VerificationMode)Mockito.times((int)1))).onNext("one");
            ((Observer)Mockito.verify(aObserver, (VerificationMode)Mockito.times((int)1))).onNext("two");
            ((Observer)Mockito.verify(aObserver, (VerificationMode)Mockito.times((int)1))).onNext("three");
            ((Observer)Mockito.verify(aObserver, (VerificationMode)Mockito.never())).onError((Throwable)Mockito.any(Throwable.class));
            ((Observer)Mockito.verify(aObserver, (VerificationMode)Mockito.times((int)1))).onCompleted();
        }

        @Test
        public void testError() {
            PublishSubject<String> subject = PublishSubject.create();
            Observer aObserver = (Observer)Mockito.mock(Observer.class);
            subject.subscribe(aObserver);
            subject.onNext("one");
            subject.onNext("two");
            subject.onNext("three");
            subject.onError(this.testException);
            Observer anotherObserver = (Observer)Mockito.mock(Observer.class);
            subject.subscribe(anotherObserver);
            subject.onNext("four");
            subject.onError(new Throwable());
            subject.onCompleted();
            this.assertErrorObserver(aObserver);
        }

        private void assertErrorObserver(Observer<String> aObserver) {
            ((Observer)Mockito.verify(aObserver, (VerificationMode)Mockito.times((int)1))).onNext("one");
            ((Observer)Mockito.verify(aObserver, (VerificationMode)Mockito.times((int)1))).onNext("two");
            ((Observer)Mockito.verify(aObserver, (VerificationMode)Mockito.times((int)1))).onNext("three");
            ((Observer)Mockito.verify(aObserver, (VerificationMode)Mockito.times((int)1))).onError(this.testException);
            ((Observer)Mockito.verify(aObserver, (VerificationMode)Mockito.never())).onCompleted();
        }

        @Test
        public void testSubscribeMidSequence() {
            PublishSubject<String> subject = PublishSubject.create();
            Observer aObserver = (Observer)Mockito.mock(Observer.class);
            subject.subscribe(aObserver);
            subject.onNext("one");
            subject.onNext("two");
            this.assertObservedUntilTwo(aObserver);
            Observer anotherObserver = (Observer)Mockito.mock(Observer.class);
            subject.subscribe(anotherObserver);
            subject.onNext("three");
            subject.onCompleted();
            this.assertCompletedObserver(aObserver);
            this.assertCompletedStartingWithThreeObserver(anotherObserver);
        }

        private void assertCompletedStartingWithThreeObserver(Observer<String> aObserver) {
            ((Observer)Mockito.verify(aObserver, (VerificationMode)Mockito.never())).onNext("one");
            ((Observer)Mockito.verify(aObserver, (VerificationMode)Mockito.never())).onNext("two");
            ((Observer)Mockito.verify(aObserver, (VerificationMode)Mockito.times((int)1))).onNext("three");
            ((Observer)Mockito.verify(aObserver, (VerificationMode)Mockito.never())).onError((Throwable)Mockito.any(Throwable.class));
            ((Observer)Mockito.verify(aObserver, (VerificationMode)Mockito.times((int)1))).onCompleted();
        }

        @Test
        public void testUnsubscribeFirstObserver() {
            PublishSubject<String> subject = PublishSubject.create();
            Observer aObserver = (Observer)Mockito.mock(Observer.class);
            Subscription subscription = subject.subscribe(aObserver);
            subject.onNext("one");
            subject.onNext("two");
            subscription.unsubscribe();
            this.assertObservedUntilTwo(aObserver);
            Observer anotherObserver = (Observer)Mockito.mock(Observer.class);
            subject.subscribe(anotherObserver);
            subject.onNext("three");
            subject.onCompleted();
            this.assertObservedUntilTwo(aObserver);
            this.assertCompletedStartingWithThreeObserver(anotherObserver);
        }

        private void assertObservedUntilTwo(Observer<String> aObserver) {
            ((Observer)Mockito.verify(aObserver, (VerificationMode)Mockito.times((int)1))).onNext("one");
            ((Observer)Mockito.verify(aObserver, (VerificationMode)Mockito.times((int)1))).onNext("two");
            ((Observer)Mockito.verify(aObserver, (VerificationMode)Mockito.never())).onNext("three");
            ((Observer)Mockito.verify(aObserver, (VerificationMode)Mockito.never())).onError((Throwable)Mockito.any(Throwable.class));
            ((Observer)Mockito.verify(aObserver, (VerificationMode)Mockito.never())).onCompleted();
        }

        @Test
        public void testUnsubscribe() {
            UnsubscribeTester.test(new Func0<PublishSubject<Object>>(){

                @Override
                public PublishSubject<Object> call() {
                    return PublishSubject.create();
                }
            }, new Action1<PublishSubject<Object>>(){

                @Override
                public void call(PublishSubject<Object> DefaultSubject) {
                    DefaultSubject.onCompleted();
                }
            }, new Action1<PublishSubject<Object>>(){

                @Override
                public void call(PublishSubject<Object> DefaultSubject) {
                    DefaultSubject.onError(new Throwable());
                }
            }, new Action1<PublishSubject<Object>>(){

                @Override
                public void call(PublishSubject<Object> DefaultSubject) {
                    DefaultSubject.onNext("one");
                }
            });
        }

        @Test
        public void testNestedSubscribe() {
            final PublishSubject<Integer> s = PublishSubject.create();
            final AtomicInteger countParent = new AtomicInteger();
            final AtomicInteger countChildren = new AtomicInteger();
            final AtomicInteger countTotal = new AtomicInteger();
            final ArrayList list = new ArrayList();
            s.mapMany(new Func1<Integer, Observable<String>>(){

                @Override
                public Observable<String> call(final Integer v) {
                    countParent.incrementAndGet();
                    return s.map(new Func1<Integer, String>(){

                        @Override
                        public String call(Integer v2) {
                            countChildren.incrementAndGet();
                            return "Parent: " + v + " Child: " + v2;
                        }
                    });
                }
            }).subscribe(new Action1<String>(){

                @Override
                public void call(String v) {
                    countTotal.incrementAndGet();
                    list.add(v);
                }
            });
            for (int i = 0; i < 10; ++i) {
                s.onNext(i);
            }
            s.onCompleted();
            Assert.assertEquals((long)45L, (long)list.size());
        }

        @Test
        public void testReSubscribe() {
            PublishSubject<Integer> ps = PublishSubject.create();
            Observer o1 = (Observer)Mockito.mock(Observer.class);
            Subscription s1 = ps.subscribe(o1);
            ps.onNext(1);
            InOrder inOrder1 = Mockito.inOrder((Object[])new Object[]{o1});
            ((Observer)inOrder1.verify((Object)o1, Mockito.times((int)1))).onNext(1);
            inOrder1.verifyNoMoreInteractions();
            s1.unsubscribe();
            ps.onNext(2);
            Observer o2 = (Observer)Mockito.mock(Observer.class);
            Subscription s2 = ps.subscribe(o2);
            ps.onNext(3);
            InOrder inOrder2 = Mockito.inOrder((Object[])new Object[]{o2});
            ((Observer)inOrder2.verify((Object)o2, Mockito.times((int)1))).onNext(3);
            inOrder2.verifyNoMoreInteractions();
            s2.unsubscribe();
        }

        @Test
        public void testReSubscribeAfterTerminalState() {
            PublishSubject<Integer> ps = PublishSubject.create();
            Observer o1 = (Observer)Mockito.mock(Observer.class);
            Subscription s1 = ps.subscribe(o1);
            ps.onNext(1);
            InOrder inOrder1 = Mockito.inOrder((Object[])new Object[]{o1});
            ((Observer)inOrder1.verify((Object)o1, Mockito.times((int)1))).onNext(1);
            inOrder1.verifyNoMoreInteractions();
            s1.unsubscribe();
            ps.onCompleted();
            ps.onNext(2);
            Observer o2 = (Observer)Mockito.mock(Observer.class);
            Subscription s2 = ps.subscribe(o2);
            ps.onNext(3);
            InOrder inOrder2 = Mockito.inOrder((Object[])new Object[]{o2});
            ((Observer)inOrder2.verify((Object)o2, Mockito.times((int)1))).onNext(3);
            inOrder2.verifyNoMoreInteractions();
            s2.unsubscribe();
        }
    }
}

