/*
 * Decompiled with CFR 0.152.
 */
package io.nats.client.impl;

import io.nats.client.ErrorListener;
import io.nats.client.JetStreamStatusException;
import io.nats.client.Message;
import io.nats.client.SubscribeOptions;
import io.nats.client.api.ConsumerConfiguration;
import io.nats.client.impl.MessageManager;
import io.nats.client.impl.NatsConnection;
import io.nats.client.impl.NatsJetStreamSubscription;
import io.nats.client.impl.NatsMessage;
import io.nats.client.support.Status;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.atomic.AtomicLong;

class PushStatusMessageManager
extends MessageManager {
    private static final int THRESHOLD = 3;
    private final NatsConnection conn;
    private final boolean syncMode;
    private final boolean queueMode;
    private final boolean hb;
    private final boolean fc;
    private final long idleHeartbeatSetting;
    private final long alarmPeriodSetting;
    private String lastFcSubject;
    private long lastStreamSeq;
    private long lastConsumerSeq;
    private final AtomicLong lastMsgReceived;
    private HeartbeatTimer heartbeatTimer;

    PushStatusMessageManager(NatsConnection conn, SubscribeOptions so, ConsumerConfiguration cc, boolean queueMode, boolean syncMode) {
        this.conn = conn;
        this.syncMode = syncMode;
        this.queueMode = queueMode;
        this.lastStreamSeq = -1L;
        this.lastConsumerSeq = -1L;
        this.lastMsgReceived = new AtomicLong();
        if (queueMode) {
            this.hb = false;
            this.fc = false;
            this.idleHeartbeatSetting = 0L;
            this.alarmPeriodSetting = 0L;
        } else {
            long l = this.idleHeartbeatSetting = cc.getIdleHeartbeat() == null ? 0L : cc.getIdleHeartbeat().toMillis();
            if (this.idleHeartbeatSetting <= 0L) {
                this.alarmPeriodSetting = 0L;
                this.hb = false;
            } else {
                long mat = so.getMessageAlarmTime();
                this.alarmPeriodSetting = mat < this.idleHeartbeatSetting ? this.idleHeartbeatSetting * 3L : mat;
                this.hb = true;
            }
            this.fc = this.hb && cc.isFlowControl();
        }
    }

    @Override
    void setSub(NatsJetStreamSubscription sub) {
        super.setSub(sub);
        if (this.hb) {
            sub.setBeforeQueueProcessor(this::beforeQueueProcessor);
            this.heartbeatTimer = new HeartbeatTimer();
        }
    }

    @Override
    void shutdown() {
        if (this.heartbeatTimer != null) {
            this.heartbeatTimer.shutdown();
        }
    }

    boolean isSyncMode() {
        return this.syncMode;
    }

    boolean isQueueMode() {
        return this.queueMode;
    }

    boolean isFc() {
        return this.fc;
    }

    boolean isHb() {
        return this.hb;
    }

    long getIdleHeartbeatSetting() {
        return this.idleHeartbeatSetting;
    }

    long getAlarmPeriodSetting() {
        return this.alarmPeriodSetting;
    }

    String getLastFcSubject() {
        return this.lastFcSubject;
    }

    long getLastStreamSequence() {
        return this.lastStreamSeq;
    }

    long getLastConsumerSequence() {
        return this.lastConsumerSeq;
    }

    long getLastMsgReceived() {
        return this.lastMsgReceived.get();
    }

    NatsMessage beforeQueueProcessor(NatsMessage msg) {
        this.lastMsgReceived.set(System.currentTimeMillis());
        if (msg.isStatusMessage() && msg.getStatus().isHeartbeat() && this.extractFcSubject(msg) == null) {
            return null;
        }
        return msg;
    }

    @Override
    boolean manage(Message msg) {
        if (msg.isStatusMessage()) {
            Status status = msg.getStatus();
            if (status.isFlowControl()) {
                if (this.fc) {
                    this._processFlowControl(msg.getReplyTo(), ErrorListener.FlowControlSource.FLOW_CONTROL);
                }
                return true;
            }
            if (status.isHeartbeat()) {
                if (this.fc) {
                    this._processFlowControl(this.extractFcSubject(msg), ErrorListener.FlowControlSource.HEARTBEAT);
                }
                return true;
            }
            this.conn.getOptions().getErrorListener().unhandledStatus(this.conn, this.sub, status);
            if (this.syncMode) {
                throw new JetStreamStatusException(this.sub, status);
            }
            return true;
        }
        this.lastStreamSeq = msg.metaData().streamSequence();
        this.lastConsumerSeq = msg.metaData().consumerSequence();
        return false;
    }

    String extractFcSubject(Message msg) {
        return msg.getHeaders() == null ? null : msg.getHeaders().getFirst("Nats-Consumer-Stalled");
    }

    private void _processFlowControl(String fcSubject, ErrorListener.FlowControlSource source) {
        if (fcSubject != null && !fcSubject.equals(this.lastFcSubject)) {
            this.conn.publishInternal(fcSubject, null, null, null, false);
            this.lastFcSubject = fcSubject;
            this.conn.getOptions().getErrorListener().flowControlProcessed(this.conn, this.sub, fcSubject, source);
        }
    }

    class HeartbeatTimer {
        Timer timer;
        boolean alive = true;

        public HeartbeatTimer() {
            this.restart();
        }

        synchronized void restart() {
            this.cancel();
            if (this.alive) {
                this.timer = new Timer();
                this.timer.schedule((TimerTask)new HeartbeatTimerTask(), PushStatusMessageManager.this.alarmPeriodSetting);
            }
        }

        public synchronized void shutdown() {
            this.alive = false;
            this.cancel();
        }

        private void cancel() {
            if (this.timer != null) {
                this.timer.cancel();
                this.timer.purge();
                this.timer = null;
            }
        }

        class HeartbeatTimerTask
        extends TimerTask {
            HeartbeatTimerTask() {
            }

            @Override
            public void run() {
                long sinceLast = System.currentTimeMillis() - PushStatusMessageManager.this.lastMsgReceived.get();
                if (sinceLast > PushStatusMessageManager.this.alarmPeriodSetting) {
                    PushStatusMessageManager.this.conn.getOptions().getErrorListener().heartbeatAlarm(PushStatusMessageManager.this.conn, PushStatusMessageManager.this.sub, PushStatusMessageManager.this.lastStreamSeq, PushStatusMessageManager.this.lastConsumerSeq);
                }
                HeartbeatTimer.this.restart();
            }
        }
    }
}

