/*
 * Decompiled with CFR 0.152.
 */
package org.infinispan.api.reactive.client.impl;

import io.reactivex.rxjava3.core.Flowable;
import io.reactivex.rxjava3.processors.UnicastProcessor;
import org.infinispan.api.reactive.EntryStatus;
import org.infinispan.api.reactive.KeyValueEntry;
import org.infinispan.client.hotrod.logging.Log;
import org.infinispan.commons.logging.LogFactory;
import org.infinispan.query.api.continuous.ContinuousQuery;
import org.infinispan.query.api.continuous.ContinuousQueryListener;
import org.infinispan.query.dsl.Query;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;

public class ContinuousQueryPublisherImpl<K, V>
implements Publisher<KeyValueEntry<K, V>> {
    private static final Log log = (Log)LogFactory.getLog(ContinuousQueryPublisherImpl.class, Log.class);
    private final ContinuousQuery<K, V> continuousQuery;
    private final boolean created;
    private final boolean updated;
    private final boolean deleted;
    private final Query<?> query;
    private Flowable<KeyValueEntry<K, V>> flowable;
    private ContinuousQueryListener<K, V> continuousQueryListener;

    public ContinuousQueryPublisherImpl(Query<?> query, ContinuousQuery<K, V> continuousQuery, boolean created, boolean updated, boolean deleted) {
        this.query = query;
        this.continuousQuery = continuousQuery;
        this.created = created;
        this.updated = updated;
        this.deleted = deleted;
    }

    public void subscribe(Subscriber<? super KeyValueEntry<K, V>> subscriber) {
        this.createContinuousQueryFlowable();
        this.flowable.subscribe(subscriber);
    }

    private synchronized void createContinuousQueryFlowable() {
        if (this.flowable == null) {
            UnicastProcessor unicastProcessor = UnicastProcessor.create();
            this.continuousQueryListener = this.createContinuousQueryListener(unicastProcessor);
            this.continuousQuery.addContinuousQueryListener(this.query, this.continuousQueryListener);
            unicastProcessor.doOnError(e -> {
                log.error(e);
                this.continuousQuery.removeContinuousQueryListener(this.continuousQueryListener);
            }).doOnCancel(() -> this.continuousQuery.removeContinuousQueryListener(this.continuousQueryListener));
            this.flowable = unicastProcessor;
        }
    }

    private ContinuousQueryListener<K, V> createContinuousQueryListener(final UnicastProcessor<KeyValueEntry<K, V>> processor) {
        return new ContinuousQueryListener<K, V>(){

            public void resultJoining(K key, V value) {
                if (ContinuousQueryPublisherImpl.this.created) {
                    processor.onNext((Object)new KeyValueEntry(key, value, EntryStatus.CREATED));
                }
            }

            public void resultUpdated(K key, V value) {
                if (ContinuousQueryPublisherImpl.this.updated) {
                    processor.onNext((Object)new KeyValueEntry(key, value, EntryStatus.UPDATED));
                }
            }

            public void resultLeaving(K key) {
                if (ContinuousQueryPublisherImpl.this.deleted) {
                    processor.onNext((Object)new KeyValueEntry(key, null, EntryStatus.DELETED));
                }
            }
        };
    }
}

