/*
 * Decompiled with CFR 0.152.
 */
package com.github.yingzhuo.nsqj;

import com.github.yingzhuo.nsqj.Message;
import com.github.yingzhuo.nsqj.MessageHandler;
import com.github.yingzhuo.nsqj.Subscription;
import com.github.yingzhuo.nsqj.Util;
import net.jcip.annotations.ThreadSafe;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ThreadSafe
public class BackoffHandler
implements MessageHandler {
    private static final Logger log = LoggerFactory.getLogger(BackoffHandler.class);
    private volatile boolean isBackoff = false;
    private Subscription subscription;
    private final MessageHandler handler;
    private final int initDelay;
    private final int maxDelay;
    private long lastAttempt;
    private int delay;
    private int failCount;
    private int fullSpeedMaxInFlight;
    private static final int DEFAULT_INIT_DELAY_MILLIS = 1000;
    private static final int DEFAULT_MAX_DELAY_MILLIS = 60000;

    public BackoffHandler(MessageHandler messageHandler, int n, int n2) {
        this.handler = messageHandler;
        this.initDelay = n;
        this.maxDelay = n2;
    }

    public BackoffHandler(MessageHandler messageHandler) {
        this(messageHandler, 1000, 60000);
    }

    @Override
    public void accept(Message message) {
        boolean bl = this.isBackoff;
        if (bl) {
            this.attemptDuringBackoff();
        }
        try {
            this.handler.accept(message);
            if (bl) {
                this.successDuringBackoff();
            }
            message.finish();
        }
        catch (Exception exception) {
            this.failure(message, exception);
        }
    }

    synchronized void setSubscription(Subscription subscription) {
        this.subscription = subscription;
    }

    private synchronized void failure(Message message, Exception exception) {
        this.isBackoff = true;
        ++this.failCount;
        log.error("message error. failures:{}", (Object)this.failCount, (Object)exception);
        if (this.failCount == 1) {
            this.delay = this.initDelay;
            this.fullSpeedMaxInFlight = this.subscription.getMaxInFlight();
            this.lastAttempt = Util.clock();
        } else {
            this.delay = Math.min(this.delay * 2, this.maxDelay);
            this.pauseSubscription();
        }
        message.requeue();
    }

    private synchronized void pauseSubscription() {
        this.subscription.setMaxInFlight(0);
        this.subscription.getClient().schedule(new Runnable(){

            @Override
            public void run() {
                if (!((BackoffHandler)BackoffHandler.this).subscription.isStopping) {
                    BackoffHandler.this.subscription.setMaxInFlight(1);
                }
            }
        }, this.delay);
    }

    private synchronized void attemptDuringBackoff() {
        long l = Util.clock();
        int n = (int)(l - this.lastAttempt);
        if (n < this.delay) {
            Util.sleepQuietly(this.delay - n);
            this.lastAttempt = Util.clock();
        } else {
            this.lastAttempt = l;
        }
    }

    private synchronized void successDuringBackoff() {
        this.delay /= 2;
        if (this.delay < this.initDelay) {
            this.isBackoff = false;
            this.failCount = 0;
            this.delay = 0;
            this.subscription.setMaxInFlight(this.fullSpeedMaxInFlight);
        } else {
            this.pauseSubscription();
        }
    }
}

