/*
 * Decompiled with CFR 0.152.
 */
package rx.operators;

import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import org.junit.Assert;
import org.junit.Test;
import rx.Observable;
import rx.Observer;
import rx.Subscription;
import rx.subscriptions.Subscriptions;

public class OperationToFuture {
    public static <T> Future<T> toFuture(Observable<? extends T> that) {
        final CountDownLatch finished = new CountDownLatch(1);
        final AtomicReference value = new AtomicReference();
        final AtomicReference error = new AtomicReference();
        final Subscription s = that.subscribe(new Observer<T>(){

            @Override
            public void onCompleted() {
                finished.countDown();
            }

            @Override
            public void onError(Throwable e) {
                error.compareAndSet(null, e);
                finished.countDown();
            }

            @Override
            public void onNext(T v) {
                if (!value.compareAndSet(null, v)) {
                    error.compareAndSet(null, new IllegalStateException("Observable.toFuture() only supports sequences with a single value. Use .toList().toFuture() if multiple values are expected."));
                    finished.countDown();
                }
            }
        });
        return new Future<T>(){
            private volatile boolean cancelled = false;

            @Override
            public boolean cancel(boolean mayInterruptIfRunning) {
                if (finished.getCount() > 0L) {
                    this.cancelled = true;
                    s.unsubscribe();
                    finished.countDown();
                    return true;
                }
                return false;
            }

            @Override
            public boolean isCancelled() {
                return this.cancelled;
            }

            @Override
            public boolean isDone() {
                return finished.getCount() == 0L;
            }

            @Override
            public T get() throws InterruptedException, ExecutionException {
                finished.await();
                return this.getValue();
            }

            @Override
            public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
                if (finished.await(timeout, unit)) {
                    return this.getValue();
                }
                throw new TimeoutException("Timed out after " + unit.toMillis(timeout) + "ms waiting for underlying Observable.");
            }

            private T getValue() throws ExecutionException {
                if (error.get() != null) {
                    throw new ExecutionException("Observable onError", (Throwable)error.get());
                }
                return value.get();
            }
        };
    }

    @Test
    public void testToFuture() throws InterruptedException, ExecutionException {
        Observable<String> obs = Observable.from("one");
        Future<String> f = OperationToFuture.toFuture(obs);
        Assert.assertEquals((Object)"one", (Object)f.get());
    }

    @Test
    public void testToFutureList() throws InterruptedException, ExecutionException {
        Observable<String> obs = Observable.from("one", "two", "three");
        Future<List<String>> f = OperationToFuture.toFuture(obs.toList());
        Assert.assertEquals((Object)"one", (Object)f.get().get(0));
        Assert.assertEquals((Object)"two", (Object)f.get().get(1));
        Assert.assertEquals((Object)"three", (Object)f.get().get(2));
    }

    @Test(expected=ExecutionException.class)
    public void testExceptionWithMoreThanOneElement() throws InterruptedException, ExecutionException {
        Observable<String> obs = Observable.from("one", "two");
        Future<String> f = OperationToFuture.toFuture(obs);
        Assert.assertEquals((Object)"one", (Object)f.get());
    }

    @Test
    public void testToFutureWithException() {
        Observable<String> obs = Observable.create(new Observable.OnSubscribeFunc<String>(){

            @Override
            public Subscription onSubscribe(Observer<? super String> observer) {
                observer.onNext("one");
                observer.onError(new TestException());
                return Subscriptions.empty();
            }
        });
        Future<String> f = OperationToFuture.toFuture(obs);
        try {
            f.get();
            Assert.fail((String)"expected exception");
        }
        catch (Throwable e) {
            Assert.assertEquals(TestException.class, e.getCause().getClass());
        }
    }

    private static class TestException
    extends RuntimeException {
        private static final long serialVersionUID = 1L;

        private TestException() {
        }
    }
}

