/*
 * Decompiled with CFR 0.152.
 */
package io.nats.client.impl;

import io.nats.client.Dispatcher;
import io.nats.client.Message;
import io.nats.client.MessageHandler;
import io.nats.client.Subscription;
import io.nats.client.impl.MessageQueue;
import io.nats.client.impl.NatsConnection;
import io.nats.client.impl.NatsConsumer;
import io.nats.client.impl.NatsDispatcher;
import io.nats.client.impl.NatsMessage;
import java.time.Duration;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;

class NatsSubscription
extends NatsConsumer
implements Subscription {
    private String subject;
    private final String queueName;
    private String sid;
    private NatsDispatcher dispatcher;
    private MessageQueue incoming;
    private final AtomicLong unSubMessageLimit;
    private Function<NatsMessage, Boolean> beforeQueueProcessor;

    NatsSubscription(String sid, String subject, String queueName, NatsConnection connection, NatsDispatcher dispatcher) {
        super(connection);
        this.subject = subject;
        this.queueName = queueName;
        this.sid = sid;
        this.dispatcher = dispatcher;
        this.unSubMessageLimit = new AtomicLong(-1L);
        if (this.dispatcher == null) {
            this.incoming = new MessageQueue(false, connection.getOptions().getRequestCleanupInterval());
        }
        this.setBeforeQueueProcessor(null);
    }

    void reSubscribe(String newDeliverSubject) {
        this.connection.sendUnsub(this, 0);
        if (this.dispatcher == null) {
            this.connection.remove(this);
            this.sid = this.connection.reSubscribe(this, newDeliverSubject, this.queueName);
        } else {
            MessageHandler handler = this.dispatcher.getNonDefaultHandlerBySid(this.sid);
            this.dispatcher.remove(this);
            this.sid = this.dispatcher.reSubscribe(this, newDeliverSubject, this.queueName, handler);
        }
        this.subject = newDeliverSubject;
    }

    @Override
    public boolean isActive() {
        return this.dispatcher != null || this.incoming != null;
    }

    void setBeforeQueueProcessor(Function<NatsMessage, Boolean> beforeQueueProcessor) {
        this.beforeQueueProcessor = beforeQueueProcessor == null ? m -> true : beforeQueueProcessor;
    }

    public Function<NatsMessage, Boolean> getBeforeQueueProcessor() {
        return this.beforeQueueProcessor;
    }

    void invalidate() {
        if (this.incoming != null) {
            this.incoming.pause();
        }
        this.dispatcher = null;
        this.incoming = null;
    }

    void setUnsubLimit(long cd) {
        this.unSubMessageLimit.set(cd);
    }

    boolean reachedUnsubLimit() {
        long max = this.unSubMessageLimit.get();
        long recv = this.getDeliveredCount();
        return max > 0L && max <= recv;
    }

    String getSID() {
        return this.sid;
    }

    NatsDispatcher getNatsDispatcher() {
        return this.dispatcher;
    }

    @Override
    MessageQueue getMessageQueue() {
        return this.incoming;
    }

    @Override
    public Dispatcher getDispatcher() {
        return this.dispatcher;
    }

    @Override
    public String getSubject() {
        return this.subject;
    }

    @Override
    public String getQueueName() {
        return this.queueName;
    }

    @Override
    public Message nextMessage(long timeoutMillis) throws InterruptedException, IllegalStateException {
        return this.nextMessageInternal(Duration.ofMillis(timeoutMillis));
    }

    @Override
    public Message nextMessage(Duration timeout) throws InterruptedException, IllegalStateException {
        return this.nextMessageInternal(timeout);
    }

    protected NatsMessage nextMessageInternal(Duration timeout) throws InterruptedException {
        if (this.dispatcher != null) {
            throw new IllegalStateException("Subscriptions that belong to a dispatcher cannot respond to nextMessage directly.");
        }
        if (this.incoming == null) {
            throw new IllegalStateException("This subscription is inactive.");
        }
        NatsMessage msg = this.incoming.pop(timeout);
        if (this.incoming == null || !this.incoming.isRunning()) {
            throw new IllegalStateException("This subscription became inactive.");
        }
        if (msg != null) {
            this.incrementDeliveredCount();
        }
        if (this.reachedUnsubLimit()) {
            this.connection.invalidate(this);
        }
        return msg;
    }

    @Override
    public void unsubscribe() {
        if (this.dispatcher != null) {
            throw new IllegalStateException("Subscriptions that belong to a dispatcher cannot respond to unsubscribe directly.");
        }
        if (this.incoming == null) {
            throw new IllegalStateException("This subscription is inactive.");
        }
        if (this.isDraining()) {
            return;
        }
        this.connection.unsubscribe(this, -1);
    }

    @Override
    public Subscription unsubscribe(int after) {
        if (this.dispatcher != null) {
            throw new IllegalStateException("Subscriptions that belong to a dispatcher cannot respond to unsubscribe directly.");
        }
        if (this.incoming == null) {
            throw new IllegalStateException("This subscription is inactive.");
        }
        if (this.isDraining()) {
            return this;
        }
        this.connection.unsubscribe(this, after);
        return this;
    }

    @Override
    void sendUnsubForDrain() {
        this.connection.sendUnsub(this, -1);
    }

    @Override
    void cleanUpAfterDrain() {
        this.connection.invalidate(this);
    }
}

