/*
 * Decompiled with CFR 0.152.
 */
package rx.operators;

import java.util.Arrays;
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.junit.Assert;
import org.junit.Test;
import rx.Observable;
import rx.Observer;
import rx.Subscription;
import rx.observables.GroupedObservable;
import rx.operators.SafeObservableSubscription;
import rx.subscriptions.BooleanSubscription;
import rx.subscriptions.Subscriptions;
import rx.util.functions.Action1;
import rx.util.functions.Func1;
import rx.util.functions.Functions;

public final class OperationGroupBy {
    public static <K, T, R> Observable.OnSubscribeFunc<GroupedObservable<K, R>> groupBy(Observable<? extends T> source, final Func1<? super T, ? extends K> keySelector, final Func1<? super T, ? extends R> elementSelector) {
        Observable keyval = source.map(new Func1<T, KeyValue<K, R>>(){

            @Override
            public KeyValue<K, R> call(T t) {
                Object key = keySelector.call(t);
                Object value = elementSelector.call(t);
                return new KeyValue(key, value);
            }
        });
        return new GroupBy(keyval);
    }

    public static <K, T> Observable.OnSubscribeFunc<GroupedObservable<K, T>> groupBy(Observable<? extends T> source, Func1<? super T, ? extends K> keySelector) {
        return OperationGroupBy.groupBy(source, keySelector, Functions.identity());
    }

    private static <T> Observer<T> emptyObserver() {
        return new Observer<T>(){

            @Override
            public void onCompleted() {
            }

            @Override
            public void onError(Throwable e) {
            }

            @Override
            public void onNext(T t) {
            }
        };
    }

    public static class UnitTest {
        final Func1<String, Integer> length = new Func1<String, Integer>(){

            @Override
            public Integer call(String s) {
                return s.length();
            }
        };

        @Test
        public void testGroupBy() {
            Observable<String> source = Observable.from("one", "two", "three", "four", "five", "six");
            Observable grouped = Observable.create(OperationGroupBy.groupBy(source, this.length));
            Map map = UnitTest.toMap(grouped);
            Assert.assertEquals((long)3L, (long)map.size());
            Assert.assertArrayEquals((Object[])Arrays.asList("one", "two", "six").toArray(), (Object[])map.get(3).toArray());
            Assert.assertArrayEquals((Object[])Arrays.asList("four", "five").toArray(), (Object[])map.get(4).toArray());
            Assert.assertArrayEquals((Object[])Arrays.asList("three").toArray(), (Object[])map.get(5).toArray());
        }

        @Test
        public void testEmpty() {
            Observable source = Observable.empty();
            Observable grouped = Observable.create(OperationGroupBy.groupBy(source, this.length));
            Map map = UnitTest.toMap(grouped);
            Assert.assertTrue((boolean)map.isEmpty());
        }

        @Test
        public void testError() {
            Observable<String> sourceStrings = Observable.from("one", "two", "three", "four", "five", "six");
            Observable errorSource = Observable.error(new RuntimeException("forced failure"));
            Observable<String> source = Observable.concat(sourceStrings, errorSource);
            Observable<GroupedObservable<Integer, String>> grouped = Observable.create(OperationGroupBy.groupBy(source, this.length));
            final AtomicInteger groupCounter = new AtomicInteger();
            final AtomicInteger eventCounter = new AtomicInteger();
            final AtomicReference error = new AtomicReference();
            grouped.mapMany(new Func1<GroupedObservable<Integer, String>, Observable<String>>(){

                @Override
                public Observable<String> call(final GroupedObservable<Integer, String> o) {
                    groupCounter.incrementAndGet();
                    return o.map(new Func1<String, String>(){

                        @Override
                        public String call(String v) {
                            return "Event => key: " + o.getKey() + " value: " + v;
                        }
                    });
                }
            }).subscribe(new Observer<String>(){

                @Override
                public void onCompleted() {
                }

                @Override
                public void onError(Throwable e) {
                    e.printStackTrace();
                    error.set(e);
                }

                @Override
                public void onNext(String v) {
                    eventCounter.incrementAndGet();
                    System.out.println(v);
                }
            });
            Assert.assertEquals((long)3L, (long)groupCounter.get());
            Assert.assertEquals((long)6L, (long)eventCounter.get());
            Assert.assertNotNull(error.get());
        }

        private static <K, V> Map<K, Collection<V>> toMap(Observable<GroupedObservable<K, V>> observable) {
            final ConcurrentHashMap result = new ConcurrentHashMap();
            observable.toBlockingObservable().forEach(new Action1<GroupedObservable<K, V>>(){

                @Override
                public void call(final GroupedObservable<K, V> o) {
                    result.put(o.getKey(), new ConcurrentLinkedQueue());
                    o.subscribe(new Action1<V>(){

                        @Override
                        public void call(V v) {
                            ((Collection)result.get(o.getKey())).add(v);
                        }
                    });
                }
            });
            return result;
        }

        @Test
        public void testGroupedEventStream() throws Throwable {
            final AtomicInteger eventCounter = new AtomicInteger();
            final AtomicInteger subscribeCounter = new AtomicInteger();
            final AtomicInteger groupCounter = new AtomicInteger();
            final CountDownLatch latch = new CountDownLatch(1);
            int count = 100;
            int groupCount = 2;
            Observable<Event> es = Observable.create(new Observable.OnSubscribeFunc<Event>(){

                @Override
                public Subscription onSubscribe(final Observer<? super Event> observer) {
                    System.out.println("*** Subscribing to EventStream ***");
                    subscribeCounter.incrementAndGet();
                    new Thread(new Runnable(){

                        @Override
                        public void run() {
                            for (int i = 0; i < 100; ++i) {
                                Event e = new Event();
                                e.source = i % 2;
                                e.message = "Event-" + i;
                                observer.onNext(e);
                            }
                            observer.onCompleted();
                        }
                    }).start();
                    return Subscriptions.empty();
                }
            });
            es.groupBy(new Func1<Event, Integer>(){

                @Override
                public Integer call(Event e) {
                    return e.source;
                }
            }).mapMany(new Func1<GroupedObservable<Integer, Event>, Observable<String>>(){

                @Override
                public Observable<String> call(GroupedObservable<Integer, Event> eventGroupedObservable) {
                    System.out.println("GroupedObservable Key: " + eventGroupedObservable.getKey());
                    groupCounter.incrementAndGet();
                    return eventGroupedObservable.map(new Func1<Event, String>(){

                        @Override
                        public String call(Event event) {
                            return "Source: " + event.source + "  Message: " + event.message;
                        }
                    });
                }
            }).subscribe(new Observer<String>(){

                @Override
                public void onCompleted() {
                    latch.countDown();
                }

                @Override
                public void onError(Throwable e) {
                    e.printStackTrace();
                    latch.countDown();
                }

                @Override
                public void onNext(String outputMessage) {
                    System.out.println(outputMessage);
                    eventCounter.incrementAndGet();
                }
            });
            latch.await(5000L, TimeUnit.MILLISECONDS);
            Assert.assertEquals((long)1L, (long)subscribeCounter.get());
            Assert.assertEquals((long)2L, (long)groupCounter.get());
            Assert.assertEquals((long)100L, (long)eventCounter.get());
        }

        @Test
        public void testUnsubscribe() throws InterruptedException {
            final AtomicInteger eventCounter = new AtomicInteger();
            final AtomicInteger subscribeCounter = new AtomicInteger();
            final AtomicInteger groupCounter = new AtomicInteger();
            final AtomicInteger sentEventCounter = new AtomicInteger();
            final CountDownLatch latch = new CountDownLatch(1);
            int count = 100;
            int groupCount = 2;
            Observable<Event> es = Observable.create(new Observable.OnSubscribeFunc<Event>(){

                @Override
                public Subscription onSubscribe(final Observer<? super Event> observer) {
                    final BooleanSubscription s = new BooleanSubscription();
                    System.out.println("testUnsubscribe => *** Subscribing to EventStream ***");
                    subscribeCounter.incrementAndGet();
                    new Thread(new Runnable(){

                        @Override
                        public void run() {
                            for (int i = 0; i < 100 && !s.isUnsubscribed(); ++i) {
                                Event e = new Event();
                                e.source = i % 2;
                                e.message = "Event-" + i;
                                observer.onNext(e);
                                sentEventCounter.incrementAndGet();
                            }
                            observer.onCompleted();
                        }
                    }).start();
                    return s;
                }
            });
            es.groupBy(new Func1<Event, Integer>(){

                @Override
                public Integer call(Event e) {
                    return e.source;
                }
            }).take(1).mapMany(new Func1<GroupedObservable<Integer, Event>, Observable<String>>(){

                @Override
                public Observable<String> call(GroupedObservable<Integer, Event> eventGroupedObservable) {
                    System.out.println("testUnsubscribe => GroupedObservable Key: " + eventGroupedObservable.getKey());
                    groupCounter.incrementAndGet();
                    return eventGroupedObservable.take(20).map(new Func1<Event, String>(){

                        @Override
                        public String call(Event event) {
                            return "testUnsubscribe => Source: " + event.source + "  Message: " + event.message;
                        }
                    });
                }
            }).subscribe(new Observer<String>(){

                @Override
                public void onCompleted() {
                    latch.countDown();
                }

                @Override
                public void onError(Throwable e) {
                    e.printStackTrace();
                    latch.countDown();
                }

                @Override
                public void onNext(String outputMessage) {
                    System.out.println(outputMessage);
                    eventCounter.incrementAndGet();
                }
            });
            latch.await(5000L, TimeUnit.MILLISECONDS);
            Assert.assertEquals((long)1L, (long)subscribeCounter.get());
            Assert.assertEquals((long)1L, (long)groupCounter.get());
            Assert.assertEquals((long)20L, (long)eventCounter.get());
            Assert.assertEquals((double)39.0, (double)sentEventCounter.get(), (double)10.0);
        }

        private static class Event {
            int source;
            String message;

            private Event() {
            }

            public String toString() {
                return "Event => source: " + this.source + " message: " + this.message;
            }
        }
    }

    private static class KeyValue<K, V> {
        private final K key;
        private final V value;

        private KeyValue(K key, V value) {
            this.key = key;
            this.value = value;
        }
    }

    private static class GroupedSubject<K, T>
    extends GroupedObservable<K, T>
    implements Observer<T> {
        private final AtomicReference<Observer<? super T>> subscribedObserver;

        static <K, T> GroupedSubject<K, T> create(final K key, final GroupBy<K, T> parent) {
            final AtomicReference<Observer<? super T>> subscribedObserver = new AtomicReference<Observer<? super T>>(OperationGroupBy.emptyObserver());
            return new GroupedSubject<K, T>(key, new Observable.OnSubscribeFunc<T>(){
                private final SafeObservableSubscription subscription = new SafeObservableSubscription();

                @Override
                public Subscription onSubscribe(Observer<? super T> observer) {
                    subscribedObserver.set(observer);
                    parent.subscribeKey(key);
                    return this.subscription.wrap(new Subscription(){

                        @Override
                        public void unsubscribe() {
                            subscribedObserver.set(OperationGroupBy.emptyObserver());
                            parent.unsubscribeKey(key);
                        }
                    });
                }
            }, subscribedObserver);
        }

        public GroupedSubject(K key, Observable.OnSubscribeFunc<T> onSubscribe, AtomicReference<Observer<? super T>> subscribedObserver) {
            super(key, onSubscribe);
            this.subscribedObserver = subscribedObserver;
        }

        @Override
        public void onCompleted() {
            this.subscribedObserver.get().onCompleted();
        }

        @Override
        public void onError(Throwable e) {
            this.subscribedObserver.get().onError(e);
        }

        @Override
        public void onNext(T v) {
            this.subscribedObserver.get().onNext(v);
        }
    }

    private static class GroupBy<K, V>
    implements Observable.OnSubscribeFunc<GroupedObservable<K, V>> {
        private final Observable<KeyValue<K, V>> source;
        private final ConcurrentHashMap<K, GroupedSubject<K, V>> groupedObservables = new ConcurrentHashMap();
        private final SafeObservableSubscription actualParentSubscription = new SafeObservableSubscription();
        private final AtomicInteger numGroupSubscriptions = new AtomicInteger();
        private final AtomicBoolean unsubscribeRequested = new AtomicBoolean(false);

        private GroupBy(Observable<KeyValue<K, V>> source) {
            this.source = source;
        }

        @Override
        public Subscription onSubscribe(final Observer<? super GroupedObservable<K, V>> observer) {
            final GroupBy _this = this;
            this.actualParentSubscription.wrap(this.source.subscribe(new Observer<KeyValue<K, V>>(){

                @Override
                public void onCompleted() {
                    for (GroupedSubject o : GroupBy.this.groupedObservables.values()) {
                        o.onCompleted();
                    }
                    observer.onCompleted();
                }

                @Override
                public void onError(Throwable e) {
                    for (GroupedSubject o : GroupBy.this.groupedObservables.values()) {
                        o.onError(e);
                    }
                    observer.onError(e);
                }

                @Override
                public void onNext(KeyValue<K, V> value) {
                    GroupedSubject<Object, Object> gs = (GroupedSubject<Object, Object>)GroupBy.this.groupedObservables.get(value.key);
                    if (gs == null) {
                        if (GroupBy.this.unsubscribeRequested.get()) {
                            return;
                        }
                        GroupedSubject<Object, Object> newGs = GroupedSubject.create(value.key, _this);
                        GroupedSubject existing = GroupBy.this.groupedObservables.putIfAbsent(value.key, newGs);
                        if (existing == null) {
                            gs = newGs;
                            observer.onNext(gs);
                        } else {
                            gs = existing;
                        }
                    }
                    gs.onNext(value.value);
                }
            }));
            return new Subscription(){

                @Override
                public void unsubscribe() {
                    if (GroupBy.this.numGroupSubscriptions.get() == 0) {
                        GroupBy.this.actualParentSubscription.unsubscribe();
                        GroupBy.this.unsubscribeRequested.set(true);
                    }
                }
            };
        }

        private void subscribeKey(K key) {
            this.numGroupSubscriptions.incrementAndGet();
        }

        private void unsubscribeKey(K key) {
            int c = this.numGroupSubscriptions.decrementAndGet();
            if (c == 0) {
                this.actualParentSubscription.unsubscribe();
            }
        }
    }
}

