/*
 * Decompiled with CFR 0.152.
 */
package rx.operators;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
import org.mockito.verification.VerificationMode;
import rx.Observable;
import rx.Observer;
import rx.Subscription;
import rx.operators.SafeObservableSubscription;

public final class SynchronizedObserver<T>
implements Observer<T> {
    private final Observer<? super T> observer;
    private final SafeObservableSubscription subscription;
    private volatile boolean finishRequested = false;
    private volatile boolean finished = false;

    public SynchronizedObserver(Observer<? super T> Observer2, SafeObservableSubscription subscription) {
        this.observer = Observer2;
        this.subscription = subscription;
    }

    public SynchronizedObserver(Observer<? super T> Observer2) {
        this.observer = Observer2;
        this.subscription = new SafeObservableSubscription();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void onNext(T arg) {
        if (this.finished || this.finishRequested || this.subscription.isUnsubscribed()) {
            return;
        }
        SynchronizedObserver synchronizedObserver = this;
        synchronized (synchronizedObserver) {
            if (this.finished || this.finishRequested || this.subscription.isUnsubscribed()) {
                return;
            }
            this.observer.onNext(arg);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void onError(Throwable e) {
        if (this.finished || this.subscription.isUnsubscribed()) {
            return;
        }
        this.finishRequested = true;
        SynchronizedObserver synchronizedObserver = this;
        synchronized (synchronizedObserver) {
            if (this.finished || this.subscription.isUnsubscribed()) {
                return;
            }
            this.observer.onError(e);
            this.finished = true;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void onCompleted() {
        if (this.finished || this.subscription.isUnsubscribed()) {
            return;
        }
        this.finishRequested = true;
        SynchronizedObserver synchronizedObserver = this;
        synchronized (synchronizedObserver) {
            if (this.finished || this.subscription.isUnsubscribed()) {
                return;
            }
            this.observer.onCompleted();
            this.finished = true;
        }
    }

    public static class UnitTest {
        @Mock
        Observer<String> aObserver;

        @Before
        public void before() {
            MockitoAnnotations.initMocks((Object)this);
        }

        @Test
        public void testSingleThreadedBasic() {
            Subscription s = (Subscription)Mockito.mock(Subscription.class);
            TestSingleThreadedObservable onSubscribe = new TestSingleThreadedObservable(s, "one", "two", "three");
            Observable<String> w = Observable.create(onSubscribe);
            SafeObservableSubscription as = new SafeObservableSubscription(s);
            SynchronizedObserver<String> aw = new SynchronizedObserver<String>(this.aObserver, as);
            w.subscribe(aw);
            onSubscribe.waitToFinish();
            ((Observer)Mockito.verify(this.aObserver, (VerificationMode)Mockito.times((int)1))).onNext("one");
            ((Observer)Mockito.verify(this.aObserver, (VerificationMode)Mockito.times((int)1))).onNext("two");
            ((Observer)Mockito.verify(this.aObserver, (VerificationMode)Mockito.times((int)1))).onNext("three");
            ((Observer)Mockito.verify(this.aObserver, (VerificationMode)Mockito.never())).onError((Throwable)Mockito.any(Throwable.class));
            ((Observer)Mockito.verify(this.aObserver, (VerificationMode)Mockito.times((int)1))).onCompleted();
        }

        @Test
        public void testMultiThreadedBasic() {
            Subscription s = (Subscription)Mockito.mock(Subscription.class);
            TestMultiThreadedObservable onSubscribe = new TestMultiThreadedObservable(s, "one", "two", "three");
            Observable<String> w = Observable.create(onSubscribe);
            SafeObservableSubscription as = new SafeObservableSubscription(s);
            BusyObserver busyObserver = new BusyObserver();
            SynchronizedObserver<String> aw = new SynchronizedObserver<String>(busyObserver, as);
            w.subscribe(aw);
            onSubscribe.waitToFinish();
            Assert.assertEquals((long)3L, (long)busyObserver.onNextCount.get());
            Assert.assertFalse((boolean)busyObserver.onError);
            Assert.assertTrue((boolean)busyObserver.onCompleted);
            Assert.assertTrue((onSubscribe.maxConcurrentThreads.get() > 1 ? 1 : 0) != 0);
            Assert.assertEquals((long)1L, (long)busyObserver.maxConcurrentThreads.get());
        }

        @Test
        public void testMultiThreadedWithNPE() {
            Subscription s = (Subscription)Mockito.mock(Subscription.class);
            TestMultiThreadedObservable onSubscribe = new TestMultiThreadedObservable(s, "one", "two", "three", null);
            Observable<String> w = Observable.create(onSubscribe);
            SafeObservableSubscription as = new SafeObservableSubscription(s);
            BusyObserver busyObserver = new BusyObserver();
            SynchronizedObserver<String> aw = new SynchronizedObserver<String>(busyObserver, as);
            w.subscribe(aw);
            onSubscribe.waitToFinish();
            System.out.println("maxConcurrentThreads: " + onSubscribe.maxConcurrentThreads.get());
            Assert.assertTrue((busyObserver.onNextCount.get() < 4 ? 1 : 0) != 0);
            Assert.assertTrue((boolean)busyObserver.onError);
            Assert.assertFalse((boolean)busyObserver.onCompleted);
            Assert.assertTrue((onSubscribe.maxConcurrentThreads.get() > 1 ? 1 : 0) != 0);
            Assert.assertEquals((long)1L, (long)busyObserver.maxConcurrentThreads.get());
        }

        @Test
        public void testMultiThreadedWithNPEinMiddle() {
            Subscription s = (Subscription)Mockito.mock(Subscription.class);
            TestMultiThreadedObservable onSubscribe = new TestMultiThreadedObservable(s, "one", "two", "three", null, "four", "five", "six", "seven", "eight", "nine");
            Observable<String> w = Observable.create(onSubscribe);
            SafeObservableSubscription as = new SafeObservableSubscription(s);
            BusyObserver busyObserver = new BusyObserver();
            SynchronizedObserver<String> aw = new SynchronizedObserver<String>(busyObserver, as);
            w.subscribe(aw);
            onSubscribe.waitToFinish();
            System.out.println("maxConcurrentThreads: " + onSubscribe.maxConcurrentThreads.get());
            System.out.println("onNext count: " + busyObserver.onNextCount.get());
            Assert.assertTrue((busyObserver.onNextCount.get() < 9 ? 1 : 0) != 0);
            Assert.assertTrue((boolean)busyObserver.onError);
            Assert.assertFalse((boolean)busyObserver.onCompleted);
            Assert.assertTrue((onSubscribe.maxConcurrentThreads.get() > 1 ? 1 : 0) != 0);
            Assert.assertEquals((long)1L, (long)busyObserver.maxConcurrentThreads.get());
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Test
        public void runConcurrencyTest() {
            ExecutorService tp = Executors.newFixedThreadPool(20);
            try {
                TestConcurrencyObserver tw = new TestConcurrencyObserver();
                SafeObservableSubscription s = new SafeObservableSubscription();
                SynchronizedObserver<String> w = new SynchronizedObserver<String>(tw, s);
                Future<?> f1 = tp.submit(new OnNextThread(w, 12000));
                Future<?> f2 = tp.submit(new OnNextThread(w, 5000));
                Future<?> f3 = tp.submit(new OnNextThread(w, 75000));
                Future<?> f4 = tp.submit(new OnNextThread(w, 13500));
                Future<?> f5 = tp.submit(new OnNextThread(w, 22000));
                Future<?> f6 = tp.submit(new OnNextThread(w, 15000));
                Future<?> f7 = tp.submit(new OnNextThread(w, 7500));
                Future<?> f8 = tp.submit(new OnNextThread(w, 23500));
                Future<?> f10 = tp.submit(new CompletionThread(w, TestConcurrencyObserverEvent.onCompleted, f1, f2, f3, f4));
                try {
                    Thread.sleep(1L);
                }
                catch (InterruptedException e) {
                    // empty catch block
                }
                Future<?> f11 = tp.submit(new CompletionThread(w, TestConcurrencyObserverEvent.onCompleted, f4, f6, f7));
                Future<?> f12 = tp.submit(new CompletionThread(w, TestConcurrencyObserverEvent.onCompleted, f4, f6, f7));
                Future<?> f13 = tp.submit(new CompletionThread(w, TestConcurrencyObserverEvent.onCompleted, f4, f6, f7));
                Future<?> f14 = tp.submit(new CompletionThread(w, TestConcurrencyObserverEvent.onCompleted, f4, f6, f7));
                Future<?> f15 = tp.submit(new CompletionThread(w, TestConcurrencyObserverEvent.onError, f1, f2, f3, f4));
                Future<?> f16 = tp.submit(new CompletionThread(w, TestConcurrencyObserverEvent.onError, f1, f2, f3, f4));
                Future<?> f17 = tp.submit(new CompletionThread(w, TestConcurrencyObserverEvent.onError, f1, f2, f3, f4));
                Future<?> f18 = tp.submit(new CompletionThread(w, TestConcurrencyObserverEvent.onError, f1, f2, f3, f4));
                UnitTest.waitOnThreads(f1, f2, f3, f4, f5, f6, f7, f8, f10, f11, f12, f13, f14, f15, f16, f17, f18);
                int numNextEvents = tw.assertEvents(null);
            }
            catch (Throwable e) {
                Assert.fail((String)("Concurrency test failed: " + e.getMessage()));
                e.printStackTrace();
            }
            finally {
                tp.shutdown();
                try {
                    tp.awaitTermination(5000L, TimeUnit.MILLISECONDS);
                }
                catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }

        private static void waitOnThreads(Future<?> ... futures) {
            for (Future<?> f : futures) {
                try {
                    f.get(10L, TimeUnit.SECONDS);
                }
                catch (Throwable e) {
                    System.err.println("Failed while waiting on future.");
                    e.printStackTrace();
                }
            }
        }

        private static class BusyObserver
        implements Observer<String> {
            volatile boolean onCompleted = false;
            volatile boolean onError = false;
            AtomicInteger onNextCount = new AtomicInteger();
            AtomicInteger threadsRunning = new AtomicInteger();
            AtomicInteger maxConcurrentThreads = new AtomicInteger();

            private BusyObserver() {
            }

            @Override
            public void onCompleted() {
                System.out.println(">>> BusyObserver received onCompleted");
                this.onCompleted = true;
            }

            @Override
            public void onError(Throwable e) {
                System.out.println(">>> BusyObserver received onError: " + e.getMessage());
                this.onError = true;
            }

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public void onNext(String args) {
                this.threadsRunning.incrementAndGet();
                try {
                    this.onNextCount.incrementAndGet();
                    System.out.println(">>> BusyObserver received onNext: " + args);
                    try {
                        Thread.sleep(200L);
                    }
                    catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                finally {
                    int concurrentThreads = this.threadsRunning.get();
                    int maxThreads = this.maxConcurrentThreads.get();
                    if (concurrentThreads > maxThreads) {
                        this.maxConcurrentThreads.compareAndSet(maxThreads, concurrentThreads);
                    }
                    this.threadsRunning.decrementAndGet();
                }
            }
        }

        private static class TestMultiThreadedObservable
        implements Observable.OnSubscribeFunc<String> {
            final Subscription s;
            final String[] values;
            Thread t = null;
            AtomicInteger threadsRunning = new AtomicInteger();
            AtomicInteger maxConcurrentThreads = new AtomicInteger();
            ExecutorService threadPool;

            public TestMultiThreadedObservable(Subscription s, String ... values) {
                this.s = s;
                this.values = values;
                this.threadPool = Executors.newCachedThreadPool();
            }

            @Override
            public Subscription onSubscribe(final Observer<? super String> observer) {
                System.out.println("TestMultiThreadedObservable subscribed to ...");
                this.t = new Thread(new Runnable(){

                    @Override
                    public void run() {
                        try {
                            System.out.println("running TestMultiThreadedObservable thread");
                            for (final String s : TestMultiThreadedObservable.this.values) {
                                TestMultiThreadedObservable.this.threadPool.execute(new Runnable(){

                                    /*
                                     * WARNING - Removed try catching itself - possible behaviour change.
                                     */
                                    @Override
                                    public void run() {
                                        TestMultiThreadedObservable.this.threadsRunning.incrementAndGet();
                                        try {
                                            System.out.println("TestMultiThreadedObservable onNext: " + s);
                                            if (s == null) {
                                                throw new NullPointerException();
                                            }
                                            observer.onNext(s);
                                            int concurrentThreads = TestMultiThreadedObservable.this.threadsRunning.get();
                                            int maxThreads = TestMultiThreadedObservable.this.maxConcurrentThreads.get();
                                            if (concurrentThreads > maxThreads) {
                                                TestMultiThreadedObservable.this.maxConcurrentThreads.compareAndSet(maxThreads, concurrentThreads);
                                            }
                                        }
                                        catch (Throwable e) {
                                            observer.onError(e);
                                        }
                                        finally {
                                            TestMultiThreadedObservable.this.threadsRunning.decrementAndGet();
                                        }
                                    }
                                });
                            }
                            TestMultiThreadedObservable.this.threadPool.shutdown();
                        }
                        catch (Throwable e) {
                            throw new RuntimeException(e);
                        }
                        try {
                            TestMultiThreadedObservable.this.threadPool.awaitTermination(2L, TimeUnit.SECONDS);
                        }
                        catch (InterruptedException e) {
                            throw new RuntimeException(e);
                        }
                        observer.onCompleted();
                    }
                });
                System.out.println("starting TestMultiThreadedObservable thread");
                this.t.start();
                System.out.println("done starting TestMultiThreadedObservable thread");
                return this.s;
            }

            public void waitToFinish() {
                try {
                    this.t.join();
                }
                catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
        }

        private static class TestSingleThreadedObservable
        implements Observable.OnSubscribeFunc<String> {
            final Subscription s;
            final String[] values;
            private Thread t = null;

            public TestSingleThreadedObservable(Subscription s, String ... values) {
                this.s = s;
                this.values = values;
            }

            @Override
            public Subscription onSubscribe(final Observer<? super String> observer) {
                System.out.println("TestSingleThreadedObservable subscribed to ...");
                this.t = new Thread(new Runnable(){

                    @Override
                    public void run() {
                        try {
                            System.out.println("running TestSingleThreadedObservable thread");
                            for (String s : TestSingleThreadedObservable.this.values) {
                                System.out.println("TestSingleThreadedObservable onNext: " + s);
                                observer.onNext(s);
                            }
                            observer.onCompleted();
                        }
                        catch (Throwable e) {
                            throw new RuntimeException(e);
                        }
                    }
                });
                System.out.println("starting TestSingleThreadedObservable thread");
                this.t.start();
                System.out.println("done starting TestSingleThreadedObservable thread");
                return this.s;
            }

            public void waitToFinish() {
                try {
                    this.t.join();
                }
                catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
        }

        private static class TestConcurrencyObserver
        implements Observer<String> {
            private final LinkedBlockingQueue<TestConcurrencyObserverEvent> events = new LinkedBlockingQueue();
            private final int waitTime;

            public TestConcurrencyObserver(int waitTimeInNext) {
                this.waitTime = waitTimeInNext;
            }

            public TestConcurrencyObserver() {
                this.waitTime = 0;
            }

            @Override
            public void onCompleted() {
                this.events.add(TestConcurrencyObserverEvent.onCompleted);
            }

            @Override
            public void onError(Throwable e) {
                this.events.add(TestConcurrencyObserverEvent.onError);
            }

            @Override
            public void onNext(String args) {
                this.events.add(TestConcurrencyObserverEvent.onNext);
                int s = 0;
                for (int i = 0; i < 20; ++i) {
                    s += s * i;
                }
                if (this.waitTime > 0) {
                    try {
                        Thread.sleep(this.waitTime);
                    }
                    catch (InterruptedException interruptedException) {
                        // empty catch block
                    }
                }
            }

            public int assertEvents(TestConcurrencyObserverEvent expectedEndingEvent) throws IllegalStateException {
                int nextCount = 0;
                boolean finished = false;
                for (TestConcurrencyObserverEvent e : this.events) {
                    if (e == TestConcurrencyObserverEvent.onNext) {
                        if (finished) {
                            throw new IllegalStateException("Received onNext but we're already finished.");
                        }
                        ++nextCount;
                        continue;
                    }
                    if (e == TestConcurrencyObserverEvent.onError) {
                        if (finished) {
                            throw new IllegalStateException("Received onError but we're already finished.");
                        }
                        if (expectedEndingEvent != null && TestConcurrencyObserverEvent.onError != expectedEndingEvent) {
                            throw new IllegalStateException("Received onError ending event but expected " + (Object)((Object)expectedEndingEvent));
                        }
                        finished = true;
                        continue;
                    }
                    if (e != TestConcurrencyObserverEvent.onCompleted) continue;
                    if (finished) {
                        throw new IllegalStateException("Received onCompleted but we're already finished.");
                    }
                    if (expectedEndingEvent != null && TestConcurrencyObserverEvent.onCompleted != expectedEndingEvent) {
                        throw new IllegalStateException("Received onCompleted ending event but expected " + (Object)((Object)expectedEndingEvent));
                    }
                    finished = true;
                }
                return nextCount;
            }
        }

        private static enum TestConcurrencyObserverEvent {
            onCompleted,
            onError,
            onNext;

        }

        public static class CompletionThread
        implements Runnable {
            private final Observer<String> Observer;
            private final TestConcurrencyObserverEvent event;
            private final Future<?>[] waitOnThese;

            CompletionThread(Observer<String> Observer2, TestConcurrencyObserverEvent event, Future<?> ... waitOnThese) {
                this.Observer = Observer2;
                this.event = event;
                this.waitOnThese = waitOnThese;
            }

            @Override
            public void run() {
                if (this.waitOnThese != null) {
                    for (Future<?> f : this.waitOnThese) {
                        try {
                            f.get();
                        }
                        catch (Throwable e) {
                            System.err.println("Error while waiting on future in CompletionThread");
                        }
                    }
                }
                if (this.event == TestConcurrencyObserverEvent.onError) {
                    this.Observer.onError(new RuntimeException("mocked exception"));
                } else if (this.event == TestConcurrencyObserverEvent.onCompleted) {
                    this.Observer.onCompleted();
                } else {
                    throw new IllegalArgumentException("Expecting either onError or onCompleted");
                }
            }
        }

        public static class OnNextThread
        implements Runnable {
            private final Observer<String> Observer;
            private final int numStringsToSend;

            OnNextThread(Observer<String> Observer2, int numStringsToSend) {
                this.Observer = Observer2;
                this.numStringsToSend = numStringsToSend;
            }

            @Override
            public void run() {
                for (int i = 0; i < this.numStringsToSend; ++i) {
                    this.Observer.onNext("aString");
                }
            }
        }
    }
}

