/*
 * Decompiled with CFR 0.152.
 */
package rx.operators;

import java.util.List;
import java.util.Vector;
import java.util.concurrent.ExecutionException;
import org.junit.Assert;
import org.junit.Test;
import rx.Notification;
import rx.Observable;
import rx.Observer;
import rx.Subscription;

public final class OperationMaterialize {
    public static <T> Observable.OnSubscribeFunc<Notification<T>> materialize(Observable<? extends T> sequence) {
        return new MaterializeObservable<T>(sequence);
    }

    private static class TestAsyncErrorObservable
    implements Observable.OnSubscribeFunc<String> {
        String[] valuesToReturn;
        volatile Thread t;

        TestAsyncErrorObservable(String ... values) {
            this.valuesToReturn = values;
        }

        @Override
        public Subscription onSubscribe(final Observer<? super String> observer) {
            this.t = new Thread(new Runnable(){

                @Override
                public void run() {
                    for (String s : TestAsyncErrorObservable.this.valuesToReturn) {
                        if (s == null) {
                            System.out.println("throwing exception");
                            try {
                                Thread.sleep(100L);
                            }
                            catch (Throwable e) {
                                // empty catch block
                            }
                            observer.onError(new NullPointerException());
                            return;
                        }
                        observer.onNext(s);
                    }
                    System.out.println("subscription complete");
                    observer.onCompleted();
                }
            });
            this.t.start();
            return new Subscription(){

                @Override
                public void unsubscribe() {
                }
            };
        }
    }

    private static class TestObserver
    implements Observer<Notification<String>> {
        boolean onCompleted = false;
        boolean onError = false;
        List<Notification<String>> notifications = new Vector<Notification<String>>();

        private TestObserver() {
        }

        @Override
        public void onCompleted() {
            this.onCompleted = true;
        }

        @Override
        public void onError(Throwable e) {
            this.onError = true;
        }

        @Override
        public void onNext(Notification<String> value) {
            this.notifications.add(value);
        }
    }

    public static class UnitTest {
        @Test
        public void testMaterialize1() {
            TestAsyncErrorObservable o1 = new TestAsyncErrorObservable("one", "two", null, "three");
            TestObserver Observer2 = new TestObserver();
            Observable<Notification<String>> m = Observable.create(OperationMaterialize.materialize(Observable.create(o1)));
            m.subscribe(Observer2);
            try {
                o1.t.join();
            }
            catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            Assert.assertFalse((boolean)Observer2.onError);
            Assert.assertTrue((boolean)Observer2.onCompleted);
            Assert.assertEquals((long)3L, (long)Observer2.notifications.size());
            Assert.assertEquals((Object)"one", (Object)Observer2.notifications.get(0).getValue());
            Assert.assertTrue((boolean)Observer2.notifications.get(0).isOnNext());
            Assert.assertEquals((Object)"two", (Object)Observer2.notifications.get(1).getValue());
            Assert.assertTrue((boolean)Observer2.notifications.get(1).isOnNext());
            Assert.assertEquals(NullPointerException.class, Observer2.notifications.get(2).getThrowable().getClass());
            Assert.assertTrue((boolean)Observer2.notifications.get(2).isOnError());
        }

        @Test
        public void testMaterialize2() {
            TestAsyncErrorObservable o1 = new TestAsyncErrorObservable("one", "two", "three");
            TestObserver Observer2 = new TestObserver();
            Observable<Notification<String>> m = Observable.create(OperationMaterialize.materialize(Observable.create(o1)));
            m.subscribe(Observer2);
            try {
                o1.t.join();
            }
            catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            Assert.assertFalse((boolean)Observer2.onError);
            Assert.assertTrue((boolean)Observer2.onCompleted);
            Assert.assertEquals((long)4L, (long)Observer2.notifications.size());
            Assert.assertEquals((Object)"one", (Object)Observer2.notifications.get(0).getValue());
            Assert.assertTrue((boolean)Observer2.notifications.get(0).isOnNext());
            Assert.assertEquals((Object)"two", (Object)Observer2.notifications.get(1).getValue());
            Assert.assertTrue((boolean)Observer2.notifications.get(1).isOnNext());
            Assert.assertEquals((Object)"three", (Object)Observer2.notifications.get(2).getValue());
            Assert.assertTrue((boolean)Observer2.notifications.get(2).isOnNext());
            Assert.assertTrue((boolean)Observer2.notifications.get(3).isOnCompleted());
        }

        @Test
        public void testMultipleSubscribes() throws InterruptedException, ExecutionException {
            TestAsyncErrorObservable o = new TestAsyncErrorObservable("one", "two", null, "three");
            Observable<Notification<String>> m = Observable.create(OperationMaterialize.materialize(Observable.create(o)));
            Assert.assertEquals((long)3L, (long)m.toList().toBlockingObservable().toFuture().get().size());
            Assert.assertEquals((long)3L, (long)m.toList().toBlockingObservable().toFuture().get().size());
        }
    }

    private static class MaterializeObservable<T>
    implements Observable.OnSubscribeFunc<Notification<T>> {
        private final Observable<? extends T> sequence;

        public MaterializeObservable(Observable<? extends T> sequence) {
            this.sequence = sequence;
        }

        @Override
        public Subscription onSubscribe(final Observer<? super Notification<T>> observer) {
            return this.sequence.subscribe(new Observer<T>(){

                @Override
                public void onCompleted() {
                    observer.onNext(new Notification());
                    observer.onCompleted();
                }

                @Override
                public void onError(Throwable e) {
                    observer.onNext(new Notification(e));
                    observer.onCompleted();
                }

                @Override
                public void onNext(T value) {
                    observer.onNext(new Notification(value));
                }
            });
        }
    }
}

