/*
 * Decompiled with CFR 0.152.
 */
package io.nats.client.impl;

import io.nats.client.Dispatcher;
import io.nats.client.JetStream;
import io.nats.client.JetStreamApiException;
import io.nats.client.JetStreamSubscription;
import io.nats.client.MessageHandler;
import io.nats.client.PushSubscribeOptions;
import io.nats.client.api.AckPolicy;
import io.nats.client.api.ConsumerConfiguration;
import io.nats.client.api.DeliverPolicy;
import io.nats.client.api.KeyValueEntry;
import io.nats.client.api.KeyValueOperation;
import io.nats.client.api.KeyValueWatchOption;
import io.nats.client.api.KeyValueWatcher;
import io.nats.client.impl.NatsDispatcher;
import io.nats.client.impl.NatsJetStream;
import io.nats.client.impl.NatsKeyValue;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicBoolean;

public class NatsKeyValueWatchSubscription
implements AutoCloseable {
    private static final Object dispatcherLock = new Object();
    private static NatsDispatcher dispatcher;
    private final JetStreamSubscription sub;
    private final AtomicBoolean endOfDataSent;

    public NatsKeyValueWatchSubscription(NatsKeyValue kv, String keyPattern, KeyValueWatcher watcher, KeyValueWatchOption ... watchOptions) throws IOException, JetStreamApiException {
        String keySubject = kv.defaultKeySubject(keyPattern);
        boolean headersOnly = false;
        boolean ignoreDeletes = false;
        DeliverPolicy deliverPolicy = DeliverPolicy.LastPerSubject;
        block6: for (KeyValueWatchOption wo : watchOptions) {
            if (wo == null) continue;
            switch (wo) {
                case META_ONLY: {
                    headersOnly = true;
                    continue block6;
                }
                case IGNORE_DELETE: {
                    ignoreDeletes = true;
                    continue block6;
                }
                case UPDATES_ONLY: {
                    deliverPolicy = DeliverPolicy.New;
                    continue block6;
                }
                case INCLUDE_HISTORY: {
                    deliverPolicy = DeliverPolicy.All;
                }
            }
        }
        if (deliverPolicy == DeliverPolicy.New) {
            watcher.endOfData();
            this.endOfDataSent = new AtomicBoolean(true);
        } else {
            KeyValueEntry kveCheckPending = kv.getLastMessage(keyPattern);
            if (kveCheckPending == null) {
                watcher.endOfData();
                this.endOfDataSent = new AtomicBoolean(true);
            } else {
                this.endOfDataSent = new AtomicBoolean(false);
            }
        }
        PushSubscribeOptions pso = ((PushSubscribeOptions.Builder)((PushSubscribeOptions.Builder)PushSubscribeOptions.builder().stream(kv.getStreamName())).ordered(true).configuration(ConsumerConfiguration.builder().ackPolicy(AckPolicy.None).deliverPolicy(deliverPolicy).headersOnly(headersOnly).filterSubject(keySubject).build())).build();
        boolean includeDeletes = !ignoreDeletes;
        MessageHandler handler = m -> {
            KeyValueEntry kve = new KeyValueEntry(m);
            if (includeDeletes || kve.getOperation() == KeyValueOperation.PUT) {
                watcher.watch(kve);
            }
            if (!this.endOfDataSent.get() && kve.getDelta() == 0L) {
                watcher.endOfData();
                this.endOfDataSent.set(true);
            }
        };
        this.sub = kv.js.subscribe(keySubject, NatsKeyValueWatchSubscription.getDispatcher(kv.js), handler, false, pso);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private static Dispatcher getDispatcher(JetStream js) {
        Object object = dispatcherLock;
        synchronized (object) {
            if (dispatcher == null) {
                dispatcher = (NatsDispatcher)((NatsJetStream)js).conn.createDispatcher();
            }
            return dispatcher;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void unsubscribe() {
        Object object = dispatcherLock;
        synchronized (object) {
            dispatcher.unsubscribe(this.sub);
            if (dispatcher.getSubscriptionHandlers().size() == 0) {
                NatsKeyValueWatchSubscription.dispatcher.connection.closeDispatcher(dispatcher);
                dispatcher = null;
            }
        }
    }

    @Override
    public void close() throws Exception {
        this.unsubscribe();
    }
}

