/*
 * Decompiled with CFR 0.152.
 */
package com.github.yingzhuo.nsqj;

import com.github.yingzhuo.nsqj.BasePubSub;
import com.github.yingzhuo.nsqj.Client;
import com.github.yingzhuo.nsqj.HostAndPort;
import com.github.yingzhuo.nsqj.MessageHandler;
import com.github.yingzhuo.nsqj.SubConnection;
import com.github.yingzhuo.nsqj.Subscriber;
import com.github.yingzhuo.nsqj.Util;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ScheduledFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class Subscription
extends BasePubSub {
    private static final Logger log = LoggerFactory.getLogger(Subscription.class);
    private final String topic;
    private final String channel;
    private final MessageHandler handler;
    private final Subscriber subscriber;
    private final Map<HostAndPort, SubConnection> connectionMap = Collections.synchronizedMap(new HashMap());
    private int maxInFlight;
    private ScheduledFuture lowFlightRotateTask;

    public Subscription(Client client, String string, String string2, MessageHandler messageHandler, Subscriber subscriber, int n) {
        super(client);
        this.topic = string;
        this.channel = string2;
        this.handler = messageHandler;
        this.subscriber = subscriber;
        this.maxInFlight = n;
    }

    public synchronized int getMaxInFlight() {
        return this.maxInFlight;
    }

    public synchronized void setMaxInFlight(int n) {
        this.maxInFlight = n;
        this.distributeMaxInFlight();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public synchronized void checkConnections(Set<HostAndPort> set) {
        SubConnection subConnection;
        Map<HostAndPort, SubConnection> map = this.connectionMap;
        synchronized (map) {
            Iterator<SubConnection> object = this.connectionMap.values().iterator();
            while (object.hasNext()) {
                subConnection = object.next();
                if (set.contains(subConnection.getHost()) || Util.clock() - subConnection.getLastActionFlush() <= (long)(subConnection.getMsgTimeout() * 100)) continue;
                log.info("closing inactive connection:{} topic:{}", (Object)subConnection.getHost(), (Object)this.topic);
                object.remove();
                subConnection.close();
            }
        }
        for (HostAndPort hostAndPort : set) {
            if (this.connectionMap.containsKey(hostAndPort)) continue;
            try {
                log.info("adding new connection:{} topic:{}", (Object)hostAndPort, (Object)this.topic);
                subConnection = new SubConnection(this.client, hostAndPort, this);
                subConnection.connect(this.subscriber.getConfig());
                this.connectionMap.put(hostAndPort, subConnection);
            }
            catch (Exception exception) {
                log.error("error connecting to:{}", (Object)hostAndPort, (Object)exception);
            }
        }
        this.distributeMaxInFlight();
    }

    private void distributeMaxInFlight() {
        if (this.checkLowFlight() || this.connectionMap.isEmpty()) {
            return;
        }
        ArrayList<SubConnection> arrayList = new ArrayList<SubConnection>();
        ArrayList<SubConnection> arrayList2 = new ArrayList<SubConnection>();
        long l = Util.clock() - (long)((this.subscriber.getLookupIntervalSecs() + 40) * 1000);
        for (SubConnection subConnection : Util.copy(this.connectionMap.values())) {
            if (subConnection.lastActionFlush < l) {
                arrayList2.add(subConnection);
                continue;
            }
            arrayList.add(subConnection);
        }
        if (arrayList.isEmpty()) {
            arrayList.addAll(arrayList2);
            arrayList2.clear();
        }
        for (SubConnection subConnection : arrayList2) {
            subConnection.setMaxInFlight(1, false);
        }
        int n = this.maxInFlight - arrayList2.size();
        int n2 = n / arrayList.size();
        int n3 = n % arrayList.size();
        for (SubConnection subConnection : arrayList) {
            int n4 = n2;
            if (n3 > 0) {
                ++n4;
                --n3;
            }
            subConnection.setMaxInFlight(Math.min(n4, subConnection.getMaxRdyCount()));
        }
    }

    private boolean checkLowFlight() {
        if (this.maxInFlight < this.connectionMap.size()) {
            if (this.lowFlightRotateTask == null) {
                this.lowFlightRotateTask = this.client.scheduleAtFixedRate(new Runnable(){

                    @Override
                    public void run() {
                        Subscription.this.rotateLowFlight();
                    }
                }, 10000, 10000, false);
            }
            List<SubConnection> list = Util.copy(this.connectionMap.values());
            for (SubConnection subConnection : list.subList(0, this.maxInFlight)) {
                subConnection.setMaxInFlight(1);
            }
            for (SubConnection subConnection : list.subList(this.maxInFlight, list.size())) {
                subConnection.setMaxInFlight(0);
            }
            return true;
        }
        Util.cancel(this.lowFlightRotateTask);
        this.lowFlightRotateTask = null;
        return false;
    }

    private synchronized void rotateLowFlight() {
        SubConnection subConnection = null;
        SubConnection subConnection2 = null;
        for (SubConnection subConnection3 : Util.copy(this.connectionMap.values())) {
            if (subConnection3.getMaxInFlight() == 0 && (subConnection == null || subConnection3.getLastActionFlush() < subConnection.getLastActionFlush())) {
                subConnection = subConnection3;
                continue;
            }
            if (subConnection3.getMaxInFlight() != 1 || subConnection2 != null && subConnection3.getLastActionFlush() >= subConnection2.getLastActionFlush()) continue;
            subConnection2 = subConnection3;
        }
        if (subConnection2 != null && subConnection != null) {
            subConnection2.setMaxInFlight(0);
            subConnection.setMaxInFlight(1);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void stop() {
        super.stop();
        Subscription subscription = this;
        synchronized (subscription) {
            Util.cancel(this.lowFlightRotateTask);
            this.lowFlightRotateTask = null;
        }
        for (SubConnection subConnection : Util.copy(this.connectionMap.values())) {
            subConnection.stop();
        }
    }

    public synchronized void connectionClosed(SubConnection subConnection) {
        if (this.connectionMap.get(subConnection.getHost()) == subConnection) {
            this.connectionMap.remove(subConnection.getHost());
            log.debug("removed:{} from subscription:{}", (Object)subConnection.getHost(), (Object)this.topic);
        }
    }

    public synchronized boolean isLowFlight() {
        return this.lowFlightRotateTask != null;
    }

    public Subscriber getSubscriber() {
        return this.subscriber;
    }

    public MessageHandler getHandler() {
        return this.handler;
    }

    public String getTopic() {
        return this.topic;
    }

    public String getChannel() {
        return this.channel;
    }

    public String toString() {
        return String.format("subscription %s.%s connections:%s", this.topic, this.channel, this.connectionMap.size());
    }

    public int getConnectionCount() {
        return this.connectionMap.size();
    }
}

