/*
 * Decompiled with CFR 0.152.
 */
package io.nats.client.impl;

import io.nats.client.JetStreamApiException;
import io.nats.client.JetStreamManagement;
import io.nats.client.JetStreamSubscription;
import io.nats.client.KeyValue;
import io.nats.client.KeyValueOptions;
import io.nats.client.Message;
import io.nats.client.MessageHandler;
import io.nats.client.PurgeOptions;
import io.nats.client.PushSubscribeOptions;
import io.nats.client.api.AckPolicy;
import io.nats.client.api.ConsumerConfiguration;
import io.nats.client.api.DeliverPolicy;
import io.nats.client.api.KeyValueEntry;
import io.nats.client.api.KeyValueOperation;
import io.nats.client.api.KeyValueStatus;
import io.nats.client.api.KeyValueWatchOption;
import io.nats.client.api.KeyValueWatcher;
import io.nats.client.api.MessageInfo;
import io.nats.client.api.PublishAck;
import io.nats.client.impl.Headers;
import io.nats.client.impl.NatsConnection;
import io.nats.client.impl.NatsJetStream;
import io.nats.client.impl.NatsJetStreamManagement;
import io.nats.client.impl.NatsKeyValueWatchSubscription;
import io.nats.client.impl.NatsMessage;
import io.nats.client.support.NatsKeyValueUtil;
import io.nats.client.support.Validator;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;

public class NatsKeyValue
implements KeyValue {
    final NatsJetStream js;
    final JetStreamManagement jsm;
    private final String bucketName;
    private final String streamName;
    private final String streamSubject;
    private final String defaultKeyPrefix;
    private final String publishKeyPrefix;

    NatsKeyValue(NatsConnection connection, String bucketName, KeyValueOptions kvo) throws IOException {
        this.bucketName = Validator.validateKvBucketNameRequired(bucketName);
        this.streamName = NatsKeyValueUtil.toStreamName(bucketName);
        this.streamSubject = NatsKeyValueUtil.toStreamSubject(bucketName);
        this.defaultKeyPrefix = NatsKeyValueUtil.toKeyPrefix(bucketName);
        if (kvo == null) {
            this.js = new NatsJetStream(connection, null);
            this.jsm = new NatsJetStreamManagement(connection, null);
            this.publishKeyPrefix = this.defaultKeyPrefix;
        } else {
            this.js = new NatsJetStream(connection, kvo.getJetStreamOptions());
            this.jsm = new NatsJetStreamManagement(connection, kvo.getJetStreamOptions());
            this.publishKeyPrefix = kvo.getFeaturePrefix() == null ? this.defaultKeyPrefix : kvo.getFeaturePrefix();
        }
    }

    String defaultKeySubject(String key) {
        return this.defaultKeyPrefix + key;
    }

    String publishKeySubject(String key) {
        return this.publishKeyPrefix + key;
    }

    @Override
    public String getBucketName() {
        return this.bucketName;
    }

    String getStreamName() {
        return this.streamName;
    }

    String getStreamSubject() {
        return this.streamSubject;
    }

    @Override
    public KeyValueEntry get(String key) throws IOException, JetStreamApiException {
        return this.getLastMessage(Validator.validateNonWildcardKvKeyRequired(key));
    }

    KeyValueEntry getLastMessage(String key) throws IOException, JetStreamApiException {
        MessageInfo mi = this.jsm.getLastMessage(this.streamName, this.defaultKeySubject(key));
        if (mi.hasError()) {
            if (mi.getApiErrorCode() == 10037) {
                return null;
            }
            mi.throwOnHasError();
        }
        return new KeyValueEntry(mi);
    }

    @Override
    public long put(String key, byte[] value) throws IOException, JetStreamApiException {
        return this._publishWithNonWildcardKey(key, value, null).getSeqno();
    }

    @Override
    public long put(String key, String value) throws IOException, JetStreamApiException {
        return this.put(key, value.getBytes(StandardCharsets.UTF_8));
    }

    @Override
    public long put(String key, Number value) throws IOException, JetStreamApiException {
        return this.put(key, value.toString().getBytes(StandardCharsets.US_ASCII));
    }

    @Override
    public long create(String key, byte[] value) throws IOException, JetStreamApiException {
        try {
            return this.update(key, value, 0L);
        }
        catch (JetStreamApiException e) {
            KeyValueEntry kve;
            if (e.getApiErrorCode() == 10071 && (kve = this.getLastMessage(key)) != null && kve.getOperation() != KeyValueOperation.PUT) {
                return this.update(key, value, kve.getRevision());
            }
            throw e;
        }
    }

    @Override
    public long update(String key, byte[] value, long expectedRevision) throws IOException, JetStreamApiException {
        Headers h = new Headers().add("Nats-Expected-Last-Subject-Sequence", Long.toString(expectedRevision));
        return this._publishWithNonWildcardKey(key, value, h).getSeqno();
    }

    @Override
    public void delete(String key) throws IOException, JetStreamApiException {
        this._publishWithNonWildcardKey(key, null, NatsKeyValueUtil.DELETE_HEADERS);
    }

    @Override
    public void purge(String key) throws IOException, JetStreamApiException {
        this._publishWithNonWildcardKey(key, null, NatsKeyValueUtil.PURGE_HEADERS);
    }

    private PublishAck _publishWithNonWildcardKey(String key, byte[] data, Headers h) throws IOException, JetStreamApiException {
        Validator.validateNonWildcardKvKeyRequired(key);
        return this.js.publish(NatsMessage.builder().subject(this.publishKeySubject(key)).data(data).headers(h).build());
    }

    @Override
    public NatsKeyValueWatchSubscription watch(String key, KeyValueWatcher watcher, KeyValueWatchOption ... watchOptions) throws IOException, JetStreamApiException, InterruptedException {
        Validator.validateKvKeyWildcardAllowedRequired(key);
        Validator.validateNotNull(watcher, "Watcher is required");
        return new NatsKeyValueWatchSubscription(this, key, watcher, watchOptions);
    }

    @Override
    public NatsKeyValueWatchSubscription watchAll(KeyValueWatcher watcher, KeyValueWatchOption ... watchOptions) throws IOException, JetStreamApiException, InterruptedException {
        Validator.validateNotNull(watcher, "Watcher is required");
        return new NatsKeyValueWatchSubscription(this, ">", watcher, watchOptions);
    }

    @Override
    public List<String> keys() throws IOException, JetStreamApiException, InterruptedException {
        ArrayList<String> list = new ArrayList<String>();
        this.visitSubject(this.defaultKeySubject(">"), DeliverPolicy.LastPerSubject, true, false, m -> {
            KeyValueOperation op = NatsKeyValueUtil.getOperation(m.getHeaders());
            if (op == KeyValueOperation.PUT) {
                list.add(new NatsKeyValueUtil.BucketAndKey((Message)m).key);
            }
        });
        return list;
    }

    @Override
    public List<KeyValueEntry> history(String key) throws IOException, JetStreamApiException, InterruptedException {
        Validator.validateNonWildcardKvKeyRequired(key);
        ArrayList<KeyValueEntry> list = new ArrayList<KeyValueEntry>();
        this.visitSubject(this.defaultKeySubject(key), DeliverPolicy.All, false, true, m -> list.add(new KeyValueEntry(m)));
        return list;
    }

    @Override
    public void purgeDeletes() throws IOException, JetStreamApiException, InterruptedException {
        ArrayList list = new ArrayList();
        this.visitSubject(this.streamSubject, DeliverPolicy.LastPerSubject, true, false, m -> {
            KeyValueOperation op = NatsKeyValueUtil.getOperation(m.getHeaders());
            if (op != KeyValueOperation.PUT) {
                list.add(new NatsKeyValueUtil.BucketAndKey((Message)m).key);
            }
        });
        for (String key : list) {
            this.jsm.purgeStream(this.streamName, PurgeOptions.subject(this.defaultKeySubject(key)));
        }
    }

    @Override
    public KeyValueStatus getStatus() throws IOException, JetStreamApiException, InterruptedException {
        return new KeyValueStatus(this.jsm.getStreamInfo(this.streamName));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void visitSubject(String subject, DeliverPolicy deliverPolicy, boolean headersOnly, boolean ordered, MessageHandler handler) throws IOException, JetStreamApiException, InterruptedException {
        PushSubscribeOptions pso = ((PushSubscribeOptions.Builder)PushSubscribeOptions.builder().ordered(ordered).configuration(ConsumerConfiguration.builder().ackPolicy(AckPolicy.None).deliverPolicy(deliverPolicy).headersOnly(headersOnly).build())).build();
        JetStreamSubscription sub = this.js.subscribe(subject, pso);
        try {
            Duration d100 = Duration.ofMillis(100L);
            Message m = sub.nextMessage(Duration.ofMillis(5000L));
            while (m != null) {
                handler.onMessage(m);
                if (m.metaData().pendingCount() == 0L) {
                    m = null;
                    continue;
                }
                m = sub.nextMessage(d100);
            }
        }
        finally {
            sub.unsubscribe();
        }
    }
}

