/*
 * Decompiled with CFR 0.152.
 */
package rx.operators;

import java.util.Iterator;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.Assert;
import org.junit.Test;
import rx.Notification;
import rx.Observable;
import rx.Observer;
import rx.Subscription;
import rx.subjects.PublishSubject;
import rx.subscriptions.Subscriptions;
import rx.util.Exceptions;

public final class OperationNext {
    public static <T> Iterable<T> next(Observable<? extends T> items) {
        NextObserver nextObserver = new NextObserver();
        final NextIterator nextIterator = new NextIterator(nextObserver);
        items.materialize().subscribe(nextObserver);
        return new Iterable<T>(){

            @Override
            public Iterator<T> iterator() {
                return nextIterator;
            }
        };
    }

    public static class UnitTest {
        private final ExecutorService executor = Executors.newSingleThreadExecutor();

        @Test
        public void testNext() throws Throwable {
            PublishSubject<String> obs = PublishSubject.create();
            Iterator<String> it = OperationNext.next(obs).iterator();
            Assert.assertTrue((boolean)it.hasNext());
            Future<String> next = this.nextAsync(it);
            Thread.sleep(100L);
            obs.onNext("one");
            Assert.assertEquals((Object)"one", (Object)next.get());
            Assert.assertTrue((boolean)it.hasNext());
            next = this.nextAsync(it);
            Thread.sleep(100L);
            obs.onNext("two");
            Assert.assertEquals((Object)"two", (Object)next.get());
            Assert.assertTrue((boolean)it.hasNext());
            obs.onCompleted();
            Assert.assertFalse((boolean)it.hasNext());
        }

        @Test(expected=TestException.class)
        public void testOnError() throws Throwable {
            PublishSubject<String> obs = PublishSubject.create();
            Iterator<String> it = OperationNext.next(obs).iterator();
            Assert.assertTrue((boolean)it.hasNext());
            Future<String> next = this.nextAsync(it);
            Thread.sleep(100L);
            obs.onNext("one");
            Assert.assertEquals((Object)"one", (Object)next.get());
            Assert.assertTrue((boolean)it.hasNext());
            next = this.nextAsync(it);
            Thread.sleep(100L);
            obs.onError(new TestException());
            try {
                next.get();
            }
            catch (ExecutionException e) {
                throw e.getCause();
            }
        }

        @Test
        public void testOnErrorViaHasNext() throws Throwable {
            PublishSubject<String> obs = PublishSubject.create();
            Iterator<String> it = OperationNext.next(obs).iterator();
            Assert.assertTrue((boolean)it.hasNext());
            Future<String> next = this.nextAsync(it);
            Thread.sleep(100L);
            obs.onNext("one");
            Assert.assertEquals((Object)"one", (Object)next.get());
            Assert.assertTrue((boolean)it.hasNext());
            next = this.nextAsync(it);
            Thread.sleep(100L);
            obs.onError(new TestException());
            try {
                Assert.assertFalse((boolean)it.hasNext());
            }
            catch (Throwable e) {
                Assert.fail((String)"should not have received exception");
                e.printStackTrace();
            }
        }

        private Future<String> nextAsync(final Iterator<String> it) throws Throwable {
            return this.executor.submit(new Callable<String>(){

                @Override
                public String call() throws Exception {
                    return (String)it.next();
                }
            });
        }

        @Test
        public void testNoBufferingOrBlockingOfSequence() throws Throwable {
            final CountDownLatch finished = new CountDownLatch(1);
            int COUNT = 30;
            final CountDownLatch timeHasPassed = new CountDownLatch(30);
            final AtomicBoolean running = new AtomicBoolean(true);
            final AtomicInteger count = new AtomicInteger(0);
            Observable<Integer> obs = Observable.create(new Observable.OnSubscribeFunc<Integer>(){

                @Override
                public Subscription onSubscribe(final Observer<? super Integer> o) {
                    new Thread(new Runnable(){

                        /*
                         * WARNING - Removed try catching itself - possible behaviour change.
                         */
                        @Override
                        public void run() {
                            try {
                                while (running.get()) {
                                    o.onNext(count.incrementAndGet());
                                    timeHasPassed.countDown();
                                }
                                o.onCompleted();
                            }
                            catch (Throwable e) {
                                o.onError(e);
                            }
                            finally {
                                finished.countDown();
                            }
                        }
                    }).start();
                    return Subscriptions.empty();
                }
            });
            Iterator<Integer> it = OperationNext.next(obs).iterator();
            Assert.assertTrue((boolean)it.hasNext());
            int a = it.next();
            Assert.assertTrue((boolean)it.hasNext());
            int b = it.next();
            Assert.assertTrue((String)"a and b should be different", (a != b ? 1 : 0) != 0);
            timeHasPassed.await(8000L, TimeUnit.MILLISECONDS);
            Assert.assertTrue((boolean)it.hasNext());
            int c = it.next();
            Assert.assertTrue((String)"c should not just be the next in sequence", (c != b + 1 ? 1 : 0) != 0);
            Assert.assertTrue((String)("expected that c [" + c + "] is higher than or equal to " + 30), (c >= 30 ? 1 : 0) != 0);
            Assert.assertTrue((boolean)it.hasNext());
            running.set(false);
            finished.await();
            Assert.assertFalse((boolean)it.hasNext());
            System.out.println("a: " + a + " b: " + b + " c: " + c);
        }

        private static class TestException
        extends RuntimeException {
            private TestException() {
            }
        }
    }

    private static class NextObserver<T>
    implements Observer<Notification<? extends T>> {
        private final BlockingQueue<Notification<? extends T>> buf = new ArrayBlockingQueue<Notification<? extends T>>(1);
        private final AtomicBoolean waiting = new AtomicBoolean(false);

        private NextObserver() {
        }

        @Override
        public void onCompleted() {
        }

        @Override
        public void onError(Throwable e) {
        }

        @Override
        public void onNext(Notification<? extends T> args) {
            if (this.waiting.getAndSet(false) || !args.isOnNext()) {
                Notification toOffer = args;
                while (!this.buf.offer(toOffer)) {
                    Notification concurrentItem = (Notification)this.buf.poll();
                    if (concurrentItem.isOnNext()) continue;
                    toOffer = concurrentItem;
                }
            }
        }

        public void await() {
            this.waiting.set(true);
        }

        public boolean isCompleted(boolean rethrowExceptionIfExists) {
            Notification lastItem = (Notification)this.buf.peek();
            if (lastItem == null) {
                return false;
            }
            if (lastItem.isOnError()) {
                if (rethrowExceptionIfExists) {
                    throw Exceptions.propagate(lastItem.getThrowable());
                }
                return true;
            }
            return lastItem.isOnCompleted();
        }

        public T takeNext() throws InterruptedException {
            Notification<T> next = this.buf.take();
            if (next.isOnError()) {
                throw Exceptions.propagate(next.getThrowable());
            }
            if (next.isOnCompleted()) {
                throw new IllegalStateException("Observable is completed");
            }
            return next.getValue();
        }
    }

    private static class NextIterator<T>
    implements Iterator<T> {
        private final NextObserver<? extends T> observer;

        private NextIterator(NextObserver<? extends T> observer) {
            this.observer = observer;
        }

        @Override
        public boolean hasNext() {
            return !this.observer.isCompleted(false);
        }

        @Override
        public T next() {
            if (this.observer.isCompleted(true)) {
                throw new IllegalStateException("Observable is completed");
            }
            this.observer.await();
            try {
                return this.observer.takeNext();
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw Exceptions.propagate(e);
            }
        }

        @Override
        public void remove() {
            throw new UnsupportedOperationException("Read only iterator");
        }
    }
}

