/*
 * Decompiled with CFR 0.152.
 */
package rx.operators;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.Assert;
import org.junit.Test;
import rx.Observable;
import rx.Observer;
import rx.Subscription;
import rx.subjects.ReplaySubject;
import rx.subscriptions.BooleanSubscription;
import rx.util.functions.Action1;

public class OperationCache {
    public static <T> Observable.OnSubscribeFunc<T> cache(final Observable<? extends T> source) {
        return new Observable.OnSubscribeFunc<T>(){
            final AtomicBoolean subscribed = new AtomicBoolean(false);
            private final ReplaySubject<T> cache = ReplaySubject.create();

            @Override
            public Subscription onSubscribe(Observer<? super T> observer) {
                if (this.subscribed.compareAndSet(false, true)) {
                    source.subscribe(this.cache);
                }
                return this.cache.subscribe(observer);
            }
        };
    }

    public static class UnitTest {
        @Test
        public void testCache() throws InterruptedException {
            final AtomicInteger counter = new AtomicInteger();
            Observable<String> o = Observable.create(OperationCache.cache(Observable.create(new Observable.OnSubscribeFunc<String>(){

                @Override
                public Subscription onSubscribe(final Observer<? super String> observer) {
                    BooleanSubscription subscription = new BooleanSubscription();
                    new Thread(new Runnable(){

                        @Override
                        public void run() {
                            counter.incrementAndGet();
                            System.out.println("published observable being executed");
                            observer.onNext("one");
                            observer.onCompleted();
                        }
                    }).start();
                    return subscription;
                }
            })));
            final CountDownLatch latch = new CountDownLatch(2);
            o.subscribe(new Action1<String>(){

                @Override
                public void call(String v) {
                    Assert.assertEquals((Object)"one", (Object)v);
                    System.out.println("v: " + v);
                    latch.countDown();
                }
            });
            o.subscribe(new Action1<String>(){

                @Override
                public void call(String v) {
                    Assert.assertEquals((Object)"one", (Object)v);
                    System.out.println("v: " + v);
                    latch.countDown();
                }
            });
            if (!latch.await(1000L, TimeUnit.MILLISECONDS)) {
                Assert.fail((String)"subscriptions did not receive values");
            }
            Assert.assertEquals((long)1L, (long)counter.get());
        }
    }
}

