/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.artemis.shaded.org.jgroups.protocols;

import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
import org.apache.activemq.artemis.shaded.org.jgroups.Address;
import org.apache.activemq.artemis.shaded.org.jgroups.Event;
import org.apache.activemq.artemis.shaded.org.jgroups.Header;
import org.apache.activemq.artemis.shaded.org.jgroups.LongMessage;
import org.apache.activemq.artemis.shaded.org.jgroups.Message;
import org.apache.activemq.artemis.shaded.org.jgroups.View;
import org.apache.activemq.artemis.shaded.org.jgroups.annotations.MBean;
import org.apache.activemq.artemis.shaded.org.jgroups.annotations.ManagedAttribute;
import org.apache.activemq.artemis.shaded.org.jgroups.annotations.ManagedOperation;
import org.apache.activemq.artemis.shaded.org.jgroups.annotations.Property;
import org.apache.activemq.artemis.shaded.org.jgroups.conf.AttributeType;
import org.apache.activemq.artemis.shaded.org.jgroups.protocols.FcHeader;
import org.apache.activemq.artemis.shaded.org.jgroups.stack.Protocol;
import org.apache.activemq.artemis.shaded.org.jgroups.util.Credit;
import org.apache.activemq.artemis.shaded.org.jgroups.util.MessageBatch;
import org.apache.activemq.artemis.shaded.org.jgroups.util.Util;

@MBean(description="Simple flow control protocol based on a credit system")
public abstract class FlowControl
extends Protocol {
    @Property(description="Max number of bytes to send per receiver until an ack must be received to proceed", type=AttributeType.BYTES)
    protected long max_credits = 5000000L;
    @Property(description="Max time (in ms) to block", type=AttributeType.TIME)
    protected long max_block_time = 500L;
    @Property(description="The threshold (as a percentage of max_credits) at which a receiver sends more credits to a sender. Example: if max_credits is 1'000'000, and min_threshold 0.25, then we send ca. 250'000 credits to P once we've got only 250'000 credits left for P (we've received 750'000 bytes from P)")
    protected double min_threshold = 0.4;
    @Property(description="Computed as max_credits x min_theshold unless explicitly set", type=AttributeType.BYTES)
    protected long min_credits;
    @ManagedAttribute(description="Number of credit requests received", type=AttributeType.SCALAR)
    protected long num_credit_requests_received;
    @ManagedAttribute(description="Number of credit requests sent", type=AttributeType.SCALAR)
    protected long num_credit_requests_sent;
    @ManagedAttribute(description="Number of credit responses received", type=AttributeType.SCALAR)
    protected long num_credit_responses_received;
    @ManagedAttribute(description="Number of credit responses sent", type=AttributeType.SCALAR)
    protected long num_credit_responses_sent;
    @ManagedAttribute(description="Number of messages dropped due to DONT_BLOCK flag", type=AttributeType.SCALAR)
    protected long num_msgs_dropped;
    protected final Map<Address, Credit> received = Util.createConcurrentMap();
    protected volatile boolean running = true;
    protected int frag_size;

    @Override
    public void resetStats() {
        super.resetStats();
        this.num_credit_requests_sent = 0L;
        this.num_credit_requests_received = 0L;
        this.num_credit_responses_received = 0L;
        this.num_credit_responses_sent = 0L;
        this.num_msgs_dropped = 0L;
    }

    public abstract int getNumberOfBlockings();

    public abstract double getAverageTimeBlocked();

    public long getMaxCredits() {
        return this.max_credits;
    }

    public <T extends FlowControl> T setMaxCredits(long m) {
        this.max_credits = m;
        return (T)this;
    }

    public double getMinThreshold() {
        return this.min_threshold;
    }

    public <T extends FlowControl> T setMinThreshold(double m) {
        this.min_threshold = m;
        return (T)this;
    }

    public long getMinCredits() {
        return this.min_credits;
    }

    public <T extends FlowControl> T setMinCredits(long m) {
        this.min_credits = m;
        return (T)this;
    }

    public long getMaxBlockTime() {
        return this.max_block_time;
    }

    public <T extends FlowControl> T setMaxBlockTime(long t) {
        this.max_block_time = t;
        return (T)this;
    }

    @ManagedAttribute(type=AttributeType.SCALAR)
    @Deprecated
    public long getNumberOfCreditRequestsReceived() {
        return this.num_credit_requests_received;
    }

    @ManagedAttribute(type=AttributeType.SCALAR)
    @Deprecated
    public long getNumberOfCreditRequestsSent() {
        return this.num_credit_requests_sent;
    }

    @ManagedAttribute(type=AttributeType.SCALAR)
    @Deprecated
    public long getNumberOfCreditResponsesReceived() {
        return this.num_credit_responses_received;
    }

    @ManagedAttribute(type=AttributeType.SCALAR)
    @Deprecated
    public long getNumberOfCreditResponsesSent() {
        return this.num_credit_responses_sent;
    }

    public abstract String printSenderCredits();

    @ManagedOperation(description="Print receiver credits")
    public String printReceiverCredits() {
        return FlowControl.printMap(this.received);
    }

    public long getReceiverCreditsFor(Address mbr) {
        Credit credits = this.received.get(mbr);
        return credits == null ? 0L : credits.get();
    }

    public String printCredits() {
        return String.format("receivers:\n%s", FlowControl.printMap(this.received));
    }

    protected abstract boolean handleMulticastMessage();

    protected abstract void handleCredit(Address var1, long var2);

    protected abstract Header getReplenishHeader();

    protected abstract Header getCreditRequestHeader();

    @ManagedOperation(description="Unblocks all senders")
    public void unblock() {
    }

    @Override
    public void init() throws Exception {
        boolean min_credits_set;
        boolean bl = min_credits_set = this.min_credits != 0L;
        if (!min_credits_set) {
            this.min_credits = (long)((double)this.max_credits * this.min_threshold);
        }
    }

    @Override
    public void start() throws Exception {
        super.start();
        boolean is_udp_transport = this.getTransport().isMulticastCapable();
        if (is_udp_transport && this.frag_size <= 0) {
            this.log.warn("No fragmentation protocol was found. When flow control is used, we recommend a fragmentation protocol, due to https://issues.redhat.com/browse/JGRP-590");
        }
        if (this.frag_size > 0 && (long)this.frag_size >= this.min_credits) {
            this.log.warn("The fragmentation size of the fragmentation protocol is %d, which is greater than min_credits (%d). This can lead to blockings (https://issues.redhat.com/browse/JGRP-1659)", this.frag_size, this.min_credits);
        }
        this.running = true;
    }

    @Override
    public void stop() {
        super.stop();
        this.running = false;
    }

    @Override
    public Object down(Event evt) {
        switch (evt.getType()) {
            case 56: {
                this.handleConfigEvent((Map)evt.getArg());
                break;
            }
            case 6: {
                this.handleViewChange(((View)evt.getArg()).getMembers());
            }
        }
        return this.down_prot.down(evt);
    }

    @Override
    public Object down(Message msg) {
        long new_credits;
        boolean process;
        if (msg.isFlagSet(Message.Flag.NO_FC)) {
            return this.down_prot.down(msg);
        }
        Address dest = msg.getDest();
        boolean multicast = dest == null || dest.isMulticast();
        boolean handle_multicasts = this.handleMulticastMessage();
        boolean bl = process = multicast && handle_multicasts || !multicast && !handle_multicasts && (!msg.isFlagSet(Message.TransientFlag.DONT_LOOPBACK) || !Objects.equals(dest, this.local_addr));
        if (!process) {
            return this.down_prot.down(msg);
        }
        int length = msg.getLength();
        if (length == 0) {
            return this.down_prot.down(msg);
        }
        Object retval = this.handleDownMessage(msg, length);
        if (multicast && msg.isFlagSet(Message.TransientFlag.DONT_LOOPBACK) && (new_credits = this.adjustCredit(this.received, this.local_addr, length)) > 0L) {
            this.sendCredit(this.local_addr, new_credits);
        }
        return retval;
    }

    @Override
    public Object up(Event evt) {
        switch (evt.getType()) {
            case 6: {
                this.handleViewChange(((View)evt.getArg()).getMembers());
            }
        }
        return this.up_prot.up(evt);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public Object up(Message msg) {
        boolean process;
        if (msg.isFlagSet(Message.Flag.NO_FC)) {
            return this.up_prot.up(msg);
        }
        Address dest = msg.getDest();
        boolean multicast = dest == null;
        boolean handle_multicasts = this.handleMulticastMessage();
        FcHeader hdr = (FcHeader)msg.getHeader(this.id);
        boolean bl = process = handle_multicasts && multicast || !handle_multicasts && !multicast || hdr != null;
        if (!process) {
            return this.up_prot.up(msg);
        }
        if (hdr != null) {
            this.handleUpEvent(msg, hdr);
            return null;
        }
        try {
            Object object = this.up_prot.up(msg);
            return object;
        }
        finally {
            Address sender;
            long new_credits;
            int length = msg.getLength();
            if (length > 0 && (new_credits = this.adjustCredit(this.received, sender = msg.getSrc(), length)) > 0L) {
                this.sendCredit(sender, new_credits);
            }
        }
    }

    protected void handleUpEvent(Message msg, FcHeader hdr) {
        switch (hdr.type) {
            case 1: {
                ++this.num_credit_responses_received;
                this.handleCredit(msg.getSrc(), ((LongMessage)msg).getValue());
                break;
            }
            case 2: {
                ++this.num_credit_requests_received;
                Address sender = msg.getSrc();
                long requested_credits = ((LongMessage)msg).getValue();
                if (requested_credits <= 0L) break;
                this.handleCreditRequest(this.received, sender, requested_credits);
                break;
            }
            default: {
                this.log.error(Util.getMessage("HeaderTypeNotKnown"), this.local_addr, hdr.type);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void up(MessageBatch batch) {
        int length = 0;
        Iterator<Message> it = batch.iterator();
        while (it.hasNext()) {
            Message msg = it.next();
            if (msg.isFlagSet(Message.Flag.NO_FC)) continue;
            Address dest = msg.getDest();
            boolean multicast = dest == null;
            boolean handle_multicasts = this.handleMulticastMessage();
            FcHeader hdr = (FcHeader)msg.getHeader(this.id);
            boolean process = handle_multicasts && multicast || !handle_multicasts && !multicast || hdr != null;
            if (!process) continue;
            if (hdr != null) {
                it.remove();
                this.handleUpEvent(msg, hdr);
                continue;
            }
            length += msg.getLength();
        }
        if (!batch.isEmpty()) {
            try {
                this.up_prot.up(batch);
            }
            finally {
                Address sender;
                long new_credits;
                if (length > 0 && (new_credits = this.adjustCredit(this.received, sender = batch.sender(), length)) > 0L) {
                    this.sendCredit(sender, new_credits);
                }
            }
        }
    }

    protected void handleConfigEvent(Map<String, Object> info) {
        Integer tmp;
        if (info != null && (tmp = (Integer)info.get("frag_size")) != null) {
            this.frag_size = tmp;
        }
    }

    protected abstract Object handleDownMessage(Message var1, int var2);

    protected long adjustCredit(Map<Address, Credit> map, Address sender, int length) {
        Credit cred;
        if (sender == null || length == 0 || (cred = map.get(sender)) == null) {
            return 0L;
        }
        if (this.log.isTraceEnabled()) {
            this.log.trace("%s used %d credits, %d remaining", sender, length, cred.get() - (long)length);
        }
        return cred.decrementAndGet(length, this.min_credits, this.max_credits);
    }

    protected void handleCreditRequest(Map<Address, Credit> map, Address sender, long requested_credits) {
        if (requested_credits > 0L && sender != null) {
            Credit cred = map.get(sender);
            if (cred == null) {
                return;
            }
            if (this.log.isTraceEnabled()) {
                this.log.trace("received credit request from %s: sending %d credits", sender, requested_credits);
            }
            cred.increment(requested_credits, this.max_credits);
            this.sendCredit(sender, requested_credits);
        }
    }

    protected void sendCredit(Address dest, long credits) {
        if (this.log.isTraceEnabled()) {
            this.log.trace("sending %d credits to %s", credits, dest);
        }
        Message msg = new LongMessage(dest, credits).putHeader(this.id, this.getReplenishHeader()).setFlag(Message.Flag.OOB, Message.Flag.DONT_BUNDLE).setFlag(Message.TransientFlag.DONT_BLOCK);
        this.down_prot.down(msg);
        ++this.num_credit_responses_sent;
    }

    protected void sendCreditRequest(Address dest, long credits_needed) {
        if (this.log.isTraceEnabled()) {
            this.log.trace("sending request for %d credits to %s", credits_needed, dest);
        }
        Message msg = new LongMessage(dest, credits_needed).putHeader(this.id, this.getCreditRequestHeader()).setFlag(Message.Flag.OOB, Message.Flag.DONT_BUNDLE).setFlag(Message.TransientFlag.DONT_BLOCK);
        this.down_prot.down(msg);
        ++this.num_credit_requests_sent;
    }

    protected void handleViewChange(List<Address> mbrs) {
        if (mbrs == null) {
            return;
        }
        if (this.log.isTraceEnabled()) {
            this.log.trace("new membership: %s", mbrs);
        }
        mbrs.stream().filter(addr -> !this.received.containsKey(addr)).forEach(addr -> this.received.put((Address)addr, new Credit(this.max_credits)));
        this.received.keySet().retainAll(mbrs);
    }

    protected static String printMap(Map<Address, ? extends Credit> m) {
        return m.entrySet().stream().map(e -> String.format("%s: %s", e.getKey(), e.getValue())).collect(Collectors.joining("\n"));
    }
}

