/*
 * Decompiled with CFR 0.152.
 */
package rx.operators;

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import rx.Observable;
import rx.Observer;
import rx.Subscription;
import rx.observables.GroupedObservable;
import rx.operators.SafeObservableSubscription;
import rx.util.functions.Func1;
import rx.util.functions.Functions;

public final class OperationGroupBy {
    public static <K, T, R> Observable.OnSubscribeFunc<GroupedObservable<K, R>> groupBy(Observable<? extends T> source, final Func1<? super T, ? extends K> keySelector, final Func1<? super T, ? extends R> elementSelector) {
        Observable keyval = source.map(new Func1<T, KeyValue<K, R>>(){

            @Override
            public KeyValue<K, R> call(T t) {
                Object key = keySelector.call(t);
                Object value = elementSelector.call(t);
                return new KeyValue(key, value);
            }
        });
        return new GroupBy(keyval);
    }

    public static <K, T> Observable.OnSubscribeFunc<GroupedObservable<K, T>> groupBy(Observable<? extends T> source, Func1<? super T, ? extends K> keySelector) {
        return OperationGroupBy.groupBy(source, keySelector, Functions.identity());
    }

    private static <T> Observer<T> emptyObserver() {
        return new Observer<T>(){

            @Override
            public void onCompleted() {
            }

            @Override
            public void onError(Throwable e) {
            }

            @Override
            public void onNext(T t) {
            }
        };
    }

    private static class KeyValue<K, V> {
        private final K key;
        private final V value;

        private KeyValue(K key, V value) {
            this.key = key;
            this.value = value;
        }
    }

    private static class GroupedSubject<K, T>
    extends GroupedObservable<K, T>
    implements Observer<T> {
        private final AtomicReference<Observer<? super T>> subscribedObserver;

        static <K, T> GroupedSubject<K, T> create(final K key, final GroupBy<K, T> parent) {
            final AtomicReference<Observer<? super T>> subscribedObserver = new AtomicReference<Observer<? super T>>(OperationGroupBy.emptyObserver());
            return new GroupedSubject<K, T>(key, new Observable.OnSubscribeFunc<T>(){
                private final SafeObservableSubscription subscription = new SafeObservableSubscription();

                @Override
                public Subscription onSubscribe(Observer<? super T> observer) {
                    subscribedObserver.set(observer);
                    parent.subscribeKey(key);
                    return this.subscription.wrap(new Subscription(){

                        @Override
                        public void unsubscribe() {
                            subscribedObserver.set(OperationGroupBy.emptyObserver());
                            parent.unsubscribeKey(key);
                        }
                    });
                }
            }, subscribedObserver);
        }

        public GroupedSubject(K key, Observable.OnSubscribeFunc<T> onSubscribe, AtomicReference<Observer<? super T>> subscribedObserver) {
            super(key, onSubscribe);
            this.subscribedObserver = subscribedObserver;
        }

        @Override
        public void onCompleted() {
            this.subscribedObserver.get().onCompleted();
        }

        @Override
        public void onError(Throwable e) {
            this.subscribedObserver.get().onError(e);
        }

        @Override
        public void onNext(T v) {
            this.subscribedObserver.get().onNext(v);
        }
    }

    private static class GroupBy<K, V>
    implements Observable.OnSubscribeFunc<GroupedObservable<K, V>> {
        private final Observable<KeyValue<K, V>> source;
        private final ConcurrentHashMap<K, GroupedSubject<K, V>> groupedObservables = new ConcurrentHashMap();
        private final SafeObservableSubscription actualParentSubscription = new SafeObservableSubscription();
        private final AtomicInteger numGroupSubscriptions = new AtomicInteger();
        private final AtomicBoolean unsubscribeRequested = new AtomicBoolean(false);

        private GroupBy(Observable<KeyValue<K, V>> source) {
            this.source = source;
        }

        @Override
        public Subscription onSubscribe(final Observer<? super GroupedObservable<K, V>> observer) {
            final GroupBy _this = this;
            this.actualParentSubscription.wrap(this.source.subscribe(new Observer<KeyValue<K, V>>(){

                @Override
                public void onCompleted() {
                    for (GroupedSubject o : GroupBy.this.groupedObservables.values()) {
                        o.onCompleted();
                    }
                    observer.onCompleted();
                }

                @Override
                public void onError(Throwable e) {
                    for (GroupedSubject o : GroupBy.this.groupedObservables.values()) {
                        o.onError(e);
                    }
                    observer.onError(e);
                }

                @Override
                public void onNext(KeyValue<K, V> value) {
                    GroupedSubject<Object, Object> gs = (GroupedSubject<Object, Object>)GroupBy.this.groupedObservables.get(value.key);
                    if (gs == null) {
                        if (GroupBy.this.unsubscribeRequested.get()) {
                            return;
                        }
                        GroupedSubject<Object, Object> newGs = GroupedSubject.create(value.key, _this);
                        GroupedSubject existing = GroupBy.this.groupedObservables.putIfAbsent(value.key, newGs);
                        if (existing == null) {
                            gs = newGs;
                            observer.onNext(gs);
                        } else {
                            gs = existing;
                        }
                    }
                    gs.onNext(value.value);
                }
            }));
            return new Subscription(){

                @Override
                public void unsubscribe() {
                    if (GroupBy.this.numGroupSubscriptions.get() == 0) {
                        GroupBy.this.actualParentSubscription.unsubscribe();
                        GroupBy.this.unsubscribeRequested.set(true);
                    }
                }
            };
        }

        private void subscribeKey(K key) {
            this.numGroupSubscriptions.incrementAndGet();
        }

        private void unsubscribeKey(K key) {
            int c = this.numGroupSubscriptions.decrementAndGet();
            if (c == 0) {
                this.actualParentSubscription.unsubscribe();
            }
        }
    }
}

