/*
 * Decompiled with CFR 0.152.
 */
package com.sproutsocial.nsq;

import com.sproutsocial.nsq.BasePubSub;
import com.sproutsocial.nsq.Client;
import com.sproutsocial.nsq.HostAndPort;
import com.sproutsocial.nsq.MessageHandler;
import com.sproutsocial.nsq.SubConnection;
import com.sproutsocial.nsq.Subscriber;
import com.sproutsocial.nsq.Util;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ScheduledFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class Subscription
extends BasePubSub {
    private final String topic;
    private final String channel;
    private final MessageHandler handler;
    private final Subscriber subscriber;
    private final Map<HostAndPort, SubConnection> connectionMap = Collections.synchronizedMap(new HashMap());
    private int maxInFlight;
    private ScheduledFuture lowFlightRotateTask;
    private static final Logger logger = LoggerFactory.getLogger(Subscription.class);

    public Subscription(Client client, String topic, String channel, MessageHandler handler, Subscriber subscriber, int maxInFlight) {
        super(client);
        this.topic = topic;
        this.channel = channel;
        this.handler = handler;
        this.subscriber = subscriber;
        this.maxInFlight = maxInFlight;
    }

    public synchronized int getMaxInFlight() {
        return this.maxInFlight;
    }

    public synchronized void setMaxInFlight(int maxInFlight) {
        this.maxInFlight = maxInFlight;
        this.distributeMaxInFlight();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public synchronized void checkConnections(Set<HostAndPort> activeHosts) {
        SubConnection con;
        Map<HostAndPort, SubConnection> map = this.connectionMap;
        synchronized (map) {
            Iterator<SubConnection> iter = this.connectionMap.values().iterator();
            while (iter.hasNext()) {
                con = iter.next();
                if (activeHosts.contains(con.getHost()) || Util.clock() - con.getLastActionFlush() <= (long)(con.getMsgTimeout() * 100)) continue;
                logger.info("closing inactive connection:{} topic:{}", (Object)con.getHost(), (Object)this.topic);
                iter.remove();
                con.close();
            }
        }
        for (HostAndPort activeHost : activeHosts) {
            if (this.connectionMap.containsKey(activeHost)) continue;
            con = null;
            try {
                logger.info("adding new connection:{} topic:{}", (Object)activeHost, (Object)this.topic);
                con = new SubConnection(this.client, activeHost, this);
                con.connect(this.subscriber.getConfig());
                this.connectionMap.put(activeHost, con);
            }
            catch (Exception e) {
                if (con != null) {
                    con.close();
                }
                logger.error("error connecting to:{}", (Object)activeHost, (Object)e);
            }
        }
        this.distributeMaxInFlight();
    }

    private void distributeMaxInFlight() {
        if (this.checkLowFlight() || this.connectionMap.isEmpty()) {
            return;
        }
        ArrayList<SubConnection> activeCons = new ArrayList<SubConnection>();
        ArrayList<SubConnection> inactiveCons = new ArrayList<SubConnection>();
        long minActiveTime = Util.clock() - (long)((this.subscriber.getLookupIntervalSecs() + 40) * 1000);
        for (SubConnection con : Util.copy(this.connectionMap.values())) {
            if (con.lastActionFlush < minActiveTime) {
                inactiveCons.add(con);
                continue;
            }
            activeCons.add(con);
        }
        if (activeCons.isEmpty()) {
            activeCons.addAll(inactiveCons);
            inactiveCons.clear();
        }
        for (SubConnection con : inactiveCons) {
            con.setMaxInFlight(1, false);
        }
        int f = this.maxInFlight - inactiveCons.size();
        int perCon = f / activeCons.size();
        int extra = f % activeCons.size();
        for (SubConnection con : activeCons) {
            int c = perCon;
            if (extra > 0) {
                ++c;
                --extra;
            }
            con.setMaxInFlight(Math.min(c, con.getMaxRdyCount()));
        }
    }

    private boolean checkLowFlight() {
        if (this.maxInFlight < this.connectionMap.size()) {
            if (this.lowFlightRotateTask == null) {
                this.lowFlightRotateTask = this.client.scheduleAtFixedRate(new Runnable(){

                    @Override
                    public void run() {
                        Subscription.this.rotateLowFlight();
                    }
                }, 10000, 10000, false);
            }
            List<SubConnection> cons = Util.copy(this.connectionMap.values());
            for (SubConnection con : cons.subList(0, this.maxInFlight)) {
                con.setMaxInFlight(1);
            }
            for (SubConnection con : cons.subList(this.maxInFlight, cons.size())) {
                con.setMaxInFlight(0);
            }
            return true;
        }
        Util.cancel(this.lowFlightRotateTask);
        this.lowFlightRotateTask = null;
        return false;
    }

    private synchronized void rotateLowFlight() {
        SubConnection paused = null;
        SubConnection ready = null;
        for (SubConnection con : Util.copy(this.connectionMap.values())) {
            if (con.getMaxInFlight() == 0 && (paused == null || con.getLastActionFlush() < paused.getLastActionFlush())) {
                paused = con;
                continue;
            }
            if (con.getMaxInFlight() != 1 || ready != null && con.getLastActionFlush() >= ready.getLastActionFlush()) continue;
            ready = con;
        }
        if (ready != null && paused != null) {
            ready.setMaxInFlight(0);
            paused.setMaxInFlight(1);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void stop() {
        super.stop();
        Subscription subscription = this;
        synchronized (subscription) {
            Util.cancel(this.lowFlightRotateTask);
            this.lowFlightRotateTask = null;
        }
        for (SubConnection con : Util.copy(this.connectionMap.values())) {
            con.stop();
        }
    }

    public synchronized void connectionClosed(SubConnection closedCon) {
        if (this.connectionMap.get(closedCon.getHost()) == closedCon) {
            this.connectionMap.remove(closedCon.getHost());
            logger.debug("removed:{} from subscription:{}", (Object)closedCon.getHost(), (Object)this.topic);
        }
    }

    public synchronized boolean isLowFlight() {
        return this.lowFlightRotateTask != null;
    }

    public Subscriber getSubscriber() {
        return this.subscriber;
    }

    public MessageHandler getHandler() {
        return this.handler;
    }

    public String getTopic() {
        return this.topic;
    }

    public String getChannel() {
        return this.channel;
    }

    public String toString() {
        return String.format("subscription %s.%s connections:%s", this.topic, this.channel, this.connectionMap.size());
    }

    public int getConnectionCount() {
        return this.connectionMap.size();
    }
}

