/*
 * Decompiled with CFR 0.152.
 */
package com.github.yingzhuo.nsqj;

import com.github.yingzhuo.nsqj.Client;
import com.github.yingzhuo.nsqj.Config;
import com.github.yingzhuo.nsqj.Connection;
import com.github.yingzhuo.nsqj.FailedMessageHandler;
import com.github.yingzhuo.nsqj.HostAndPort;
import com.github.yingzhuo.nsqj.MessageHandler;
import com.github.yingzhuo.nsqj.NSQMessage;
import com.github.yingzhuo.nsqj.Subscriber;
import com.github.yingzhuo.nsqj.Subscription;
import com.github.yingzhuo.nsqj.Util;
import java.io.IOException;
import net.jcip.annotations.GuardedBy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class SubConnection
extends Connection {
    private static final Logger log = LoggerFactory.getLogger(SubConnection.class);
    private final MessageHandler handler;
    private final FailedMessageHandler failedMessageHandler;
    private final Subscription subscription;
    private final String topic;
    private final int maxAttempts;
    private final int maxFlushDelayMillis;
    private int inFlight = 0;
    private int maxInFlight = 0;
    private int maxUnflushed = 0;
    private long finishedCount = 0L;
    private long requeuedCount = 0L;

    public SubConnection(Client client, HostAndPort hostAndPort, Subscription subscription) {
        super(client, hostAndPort);
        Subscriber subscriber = subscription.getSubscriber();
        this.handler = subscription.getHandler();
        this.failedMessageHandler = subscriber.getFailedMessageHandler();
        this.subscription = subscription;
        this.topic = subscription.getTopic();
        this.maxAttempts = subscriber.getMaxAttempts();
        this.maxFlushDelayMillis = subscriber.getMaxFlushDelayMillis();
        this.scheduleAtFixedRate(new Runnable(){

            @Override
            public void run() {
                SubConnection.this.delayedFlush();
            }
        }, this.maxFlushDelayMillis / 2, this.maxFlushDelayMillis / 2, false);
    }

    public synchronized void finish(String string) {
        try {
            this.writeCommand("FIN", string);
            ++this.finishedCount;
            this.messageDone();
        }
        catch (IOException iOException) {
            log.error("finish error. {}", (Object)this.stateDesc(), (Object)iOException);
            this.close();
        }
    }

    public synchronized void requeue(String string) {
        try {
            this.writeCommand("REQ", string, 0);
            ++this.requeuedCount;
            this.messageDone();
        }
        catch (IOException iOException) {
            log.error("requeue error. {}", (Object)this.stateDesc(), (Object)iOException);
            this.close();
        }
    }

    @GuardedBy(value="this")
    private void messageDone() throws IOException {
        this.inFlight = Math.max(this.inFlight - 1, 0);
        if (this.inFlight == 0 && this.isStopping) {
            this.flushAndClose();
        } else {
            this.checkFlush();
        }
    }

    public synchronized void touch(String string) {
        try {
            this.writeCommand("TOUCH", string);
            this.checkFlush();
        }
        catch (IOException iOException) {
            log.error("touch error. {}", (Object)this.stateDesc(), (Object)iOException);
            this.close();
        }
    }

    private synchronized void delayedFlush() {
        try {
            if (this.unflushedCount > 0 && Util.clock() - this.lastActionFlush > (long)(this.maxFlushDelayMillis / 2 + 10)) {
                this.flush();
            }
        }
        catch (Exception exception) {
            log.error("delayedFlush error. {}", (Object)this.stateDesc(), (Object)exception);
            this.close();
        }
    }

    @GuardedBy(value="this")
    private void checkFlush() throws IOException {
        if (this.unflushedCount >= this.maxUnflushed) {
            this.flush();
        } else {
            ++this.unflushedCount;
        }
    }

    public synchronized void setMaxInFlight(int n) {
        this.setMaxInFlight(n, true);
    }

    public synchronized void setMaxInFlight(int n, boolean bl) {
        try {
            if (this.maxInFlight == n) {
                return;
            }
            this.maxInFlight = n;
            this.maxUnflushed = Math.min(n / 3, 150);
            log.debug("RDY:{} {}", (Object)n, (Object)this.toString());
            this.writeCommand("RDY", n);
            if (bl) {
                this.flush();
            } else {
                this.out.flush();
            }
        }
        catch (IOException iOException) {
            log.error("setMaxInFlight failed. con:{}", (Object)this.stateDesc(), (Object)iOException);
            this.close();
        }
    }

    public synchronized int getMaxInFlight() {
        return this.maxInFlight;
    }

    @Override
    public synchronized void connect(Config config) throws IOException {
        this.client.addSubConnection(this);
        super.connect(config);
        this.writeCommand("SUB", this.subscription.getTopic(), this.subscription.getChannel());
        this.flushAndReadOK();
    }

    private void failMessage(final NSQMessage nSQMessage) {
        if (this.failedMessageHandler != null) {
            this.handlerExecutor.execute(new Runnable(){

                @Override
                public void run() {
                    try {
                        SubConnection.this.failedMessageHandler.failed(SubConnection.this.subscription.getTopic(), SubConnection.this.subscription.getChannel(), nSQMessage);
                    }
                    catch (Throwable throwable) {
                        log.error("failed message error", throwable);
                    }
                }
            });
        }
        this.finish(nSQMessage.getId());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    protected void onMessage(long l, int n, String string, byte[] byArray) {
        final NSQMessage nSQMessage = new NSQMessage(l, n, string, byArray, this.topic, this);
        SubConnection subConnection = this;
        synchronized (subConnection) {
            ++this.inFlight;
        }
        if (nSQMessage.getAttempts() >= this.maxAttempts) {
            this.failMessage(nSQMessage);
        } else {
            this.handlerExecutor.execute(new Runnable(){

                @Override
                public void run() {
                    try {
                        SubConnection.this.handler.accept(nSQMessage);
                    }
                    catch (Throwable throwable) {
                        log.error("message error", throwable);
                    }
                }
            });
        }
    }

    @Override
    public void close() {
        super.close();
        this.client.getSchedExecutor().execute(new Runnable(){

            @Override
            public void run() {
                SubConnection.this.subscription.connectionClosed(SubConnection.this);
                SubConnection.this.client.connectionClosed(SubConnection.this);
            }
        });
    }

    @Override
    public synchronized void stop() {
        super.stop();
        if (this.inFlight == 0) {
            this.flushAndClose();
        } else {
            this.setMaxInFlight(0);
        }
    }

    public String toString() {
        return String.format("SubCon:%s %s.%s", this.host.getHost(), this.subscription.getTopic(), this.subscription.getChannel());
    }

    @Override
    public synchronized String stateDesc() {
        return String.format("%s inFlight:%d maxInFlight:%d fin:%d req:%d", super.stateDesc(), this.inFlight, this.maxInFlight, this.finishedCount, this.requeuedCount);
    }
}

