/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.processor.internals.assignment;

import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.UUID;
import java.util.stream.Collectors;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.processor.TaskId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ClientState {
    private static final Logger LOG = LoggerFactory.getLogger(ClientState.class);
    public static final Comparator<TopicPartition> TOPIC_PARTITION_COMPARATOR = Comparator.comparing(TopicPartition::topic).thenComparing(TopicPartition::partition);
    private final Set<TaskId> activeTasks = new TreeSet<TaskId>();
    private final Set<TaskId> standbyTasks = new TreeSet<TaskId>();
    private final Set<TaskId> prevActiveTasks;
    private final Set<TaskId> prevStandbyTasks;
    private final Map<TaskId, Long> taskOffsetSums;
    private final Map<TaskId, Long> taskLagTotals;
    private final Map<TopicPartition, String> ownedPartitions = new TreeMap<TopicPartition, String>(TOPIC_PARTITION_COMPARATOR);
    private final Map<String, Set<TaskId>> consumerToPreviousStatefulTaskIds = new TreeMap<String, Set<TaskId>>();
    private final Map<String, Set<TaskId>> consumerToPreviousActiveTaskIds = new TreeMap<String, Set<TaskId>>();
    private final Map<String, Set<TaskId>> consumerToAssignedActiveTaskIds = new TreeMap<String, Set<TaskId>>();
    private final Map<String, Set<TaskId>> consumerToAssignedStandbyTaskIds = new TreeMap<String, Set<TaskId>>();
    private final Map<String, Set<TaskId>> consumerToRevokingActiveTaskIds = new TreeMap<String, Set<TaskId>>();
    private int capacity;

    public ClientState() {
        this(0);
    }

    ClientState(int capacity) {
        this.prevActiveTasks = new TreeSet<TaskId>();
        this.prevStandbyTasks = new TreeSet<TaskId>();
        this.taskOffsetSums = new TreeMap<TaskId, Long>();
        this.taskLagTotals = new TreeMap<TaskId, Long>();
        this.capacity = capacity;
    }

    public ClientState(Set<TaskId> previousActiveTasks, Set<TaskId> previousStandbyTasks, Map<TaskId, Long> taskLagTotals, int capacity) {
        this.prevActiveTasks = Collections.unmodifiableSet(new TreeSet<TaskId>(previousActiveTasks));
        this.prevStandbyTasks = Collections.unmodifiableSet(new TreeSet<TaskId>(previousStandbyTasks));
        this.taskOffsetSums = Collections.emptyMap();
        this.taskLagTotals = Collections.unmodifiableMap(taskLagTotals);
        this.capacity = capacity;
    }

    int capacity() {
        return this.capacity;
    }

    public void incrementCapacity() {
        ++this.capacity;
    }

    boolean reachedCapacity() {
        return this.assignedTaskCount() >= this.capacity;
    }

    public Set<TaskId> activeTasks() {
        return Collections.unmodifiableSet(this.activeTasks);
    }

    public int activeTaskCount() {
        return this.activeTasks.size();
    }

    double activeTaskLoad() {
        return (double)this.activeTaskCount() / (double)this.capacity;
    }

    public void assignActiveTasks(Collection<TaskId> tasks) {
        this.activeTasks.addAll(tasks);
    }

    public void assignActiveToConsumer(TaskId task, String consumer) {
        this.consumerToAssignedActiveTaskIds.computeIfAbsent(consumer, k -> new HashSet()).add(task);
    }

    public void assignStandbyToConsumer(TaskId task, String consumer) {
        this.consumerToAssignedStandbyTaskIds.computeIfAbsent(consumer, k -> new HashSet()).add(task);
    }

    public void revokeActiveFromConsumer(TaskId task, String consumer) {
        this.consumerToRevokingActiveTaskIds.computeIfAbsent(consumer, k -> new HashSet()).add(task);
    }

    public Map<String, Set<TaskId>> prevOwnedActiveTasksByConsumer() {
        return this.consumerToPreviousActiveTaskIds;
    }

    public Map<String, Set<TaskId>> prevOwnedStandbyByConsumer() {
        TreeMap<String, Set<TaskId>> consumerToPreviousStandbyTaskIds = new TreeMap<String, Set<TaskId>>();
        for (Map.Entry<String, Set<TaskId>> entry : this.consumerToPreviousStatefulTaskIds.entrySet()) {
            HashSet standbyTaskIds = new HashSet(entry.getValue());
            if (this.consumerToPreviousActiveTaskIds.containsKey(entry.getKey())) {
                standbyTaskIds.removeAll((Collection)this.consumerToPreviousActiveTaskIds.get(entry.getKey()));
            }
            consumerToPreviousStandbyTaskIds.put(entry.getKey(), standbyTaskIds);
        }
        return consumerToPreviousStandbyTaskIds;
    }

    public Set<TaskId> prevOwnedStatefulTasksByConsumer(String memberId) {
        return this.consumerToPreviousStatefulTaskIds.get(memberId);
    }

    public Map<String, Set<TaskId>> assignedActiveTasksByConsumer() {
        return this.consumerToAssignedActiveTaskIds;
    }

    public Map<String, Set<TaskId>> revokingActiveTasksByConsumer() {
        return this.consumerToRevokingActiveTaskIds;
    }

    public Map<String, Set<TaskId>> assignedStandbyTasksByConsumer() {
        return this.consumerToAssignedStandbyTaskIds;
    }

    public void assignActive(TaskId task) {
        this.assertNotAssigned(task);
        this.activeTasks.add(task);
    }

    public void unassignActive(TaskId task) {
        if (!this.activeTasks.contains(task)) {
            throw new IllegalArgumentException("Tried to unassign active task " + task + ", but it is not currently assigned: " + this);
        }
        this.activeTasks.remove(task);
    }

    public Set<TaskId> standbyTasks() {
        return Collections.unmodifiableSet(this.standbyTasks);
    }

    boolean hasStandbyTask(TaskId taskId) {
        return this.standbyTasks.contains(taskId);
    }

    int standbyTaskCount() {
        return this.standbyTasks.size();
    }

    public void assignStandby(TaskId task) {
        this.assertNotAssigned(task);
        this.standbyTasks.add(task);
    }

    void unassignStandby(TaskId task) {
        if (!this.standbyTasks.contains(task)) {
            throw new IllegalArgumentException("Tried to unassign standby task " + task + ", but it is not currently assigned: " + this);
        }
        this.standbyTasks.remove(task);
    }

    Set<TaskId> assignedTasks() {
        return Collections.unmodifiableSet(Utils.union(() -> new HashSet(this.activeTasks.size() + this.standbyTasks.size()), (Set[])new Set[]{this.activeTasks, this.standbyTasks}));
    }

    public int assignedTaskCount() {
        return this.activeTaskCount() + this.standbyTaskCount();
    }

    double assignedTaskLoad() {
        return (double)this.assignedTaskCount() / (double)this.capacity;
    }

    boolean hasAssignedTask(TaskId taskId) {
        return this.activeTasks.contains(taskId) || this.standbyTasks.contains(taskId);
    }

    Set<TaskId> prevActiveTasks() {
        return Collections.unmodifiableSet(this.prevActiveTasks);
    }

    private void addPreviousActiveTask(TaskId task) {
        this.prevActiveTasks.add(task);
    }

    void addPreviousActiveTasks(Set<TaskId> prevTasks) {
        this.prevActiveTasks.addAll(prevTasks);
    }

    Set<TaskId> prevStandbyTasks() {
        return Collections.unmodifiableSet(this.prevStandbyTasks);
    }

    private void addPreviousStandbyTask(TaskId task) {
        this.prevStandbyTasks.add(task);
    }

    void addPreviousStandbyTasks(Set<TaskId> standbyTasks) {
        this.prevStandbyTasks.addAll(standbyTasks);
    }

    Set<TaskId> previousAssignedTasks() {
        return Utils.union(() -> new HashSet(this.prevActiveTasks.size() + this.prevStandbyTasks.size()), (Set[])new Set[]{this.prevActiveTasks, this.prevStandbyTasks});
    }

    public String previousOwnerForPartition(TopicPartition partition) {
        return this.ownedPartitions.get(partition);
    }

    public void addOwnedPartitions(Collection<TopicPartition> ownedPartitions, String consumer) {
        for (TopicPartition tp : ownedPartitions) {
            this.ownedPartitions.put(tp, consumer);
        }
    }

    public void addPreviousTasksAndOffsetSums(String consumerId, Map<TaskId, Long> taskOffsetSums) {
        this.taskOffsetSums.putAll(taskOffsetSums);
        this.consumerToPreviousStatefulTaskIds.put(consumerId, taskOffsetSums.keySet());
    }

    public void initializePrevTasks(Map<TopicPartition, TaskId> taskForPartitionMap) {
        if (!this.prevActiveTasks.isEmpty() || !this.prevStandbyTasks.isEmpty()) {
            throw new IllegalStateException("Already added previous tasks to this client state.");
        }
        this.initializePrevActiveTasksFromOwnedPartitions(taskForPartitionMap);
        this.initializeRemainingPrevTasksFromTaskOffsetSums();
    }

    public void computeTaskLags(UUID uuid, Map<TaskId, Long> allTaskEndOffsetSums) {
        if (!this.taskLagTotals.isEmpty()) {
            throw new IllegalStateException("Already computed task lags for this client.");
        }
        for (Map.Entry<TaskId, Long> taskEntry : allTaskEndOffsetSums.entrySet()) {
            TaskId task = taskEntry.getKey();
            Long endOffsetSum = taskEntry.getValue();
            Long offsetSum = this.taskOffsetSums.getOrDefault(task, 0L);
            if (offsetSum == -2L) {
                this.taskLagTotals.put(task, -2L);
                continue;
            }
            if (offsetSum == -3L) {
                this.taskLagTotals.put(task, -3L);
                continue;
            }
            if (endOffsetSum < offsetSum) {
                LOG.warn("Task " + task + " had endOffsetSum=" + endOffsetSum + " smaller than offsetSum=" + offsetSum + " on member " + uuid + ". This probably means the task is corrupted, which in turn indicates that it will need to restore from scratch if it gets assigned. The assignor will de-prioritize returning this task to this member in the hopes that some other member may be able to re-use its state.");
                this.taskLagTotals.put(task, endOffsetSum);
                continue;
            }
            this.taskLagTotals.put(task, endOffsetSum - offsetSum);
        }
    }

    public long lagFor(TaskId task) {
        Long totalLag = this.taskLagTotals.get(task);
        if (totalLag == null) {
            throw new IllegalStateException("Tried to lookup lag for unknown task " + task);
        }
        return totalLag;
    }

    public Set<TaskId> statefulActiveTasks() {
        return this.activeTasks.stream().filter(this::isStateful).collect(Collectors.toSet());
    }

    public Set<TaskId> statelessActiveTasks() {
        return this.activeTasks.stream().filter(task -> !this.isStateful((TaskId)task)).collect(Collectors.toSet());
    }

    boolean hasUnfulfilledQuota(int tasksPerThread) {
        return this.activeTasks.size() < this.capacity * tasksPerThread;
    }

    boolean hasMoreAvailableCapacityThan(ClientState other) {
        if (this.capacity <= 0) {
            throw new IllegalStateException("Capacity of this ClientState must be greater than 0.");
        }
        if (other.capacity <= 0) {
            throw new IllegalStateException("Capacity of other ClientState must be greater than 0");
        }
        double otherLoad = (double)other.assignedTaskCount() / (double)other.capacity;
        double thisLoad = (double)this.assignedTaskCount() / (double)this.capacity;
        if (thisLoad < otherLoad) {
            return true;
        }
        if (thisLoad > otherLoad) {
            return false;
        }
        return this.capacity > other.capacity;
    }

    public String currentAssignment() {
        return "[activeTasks: (" + this.activeTasks + ") standbyTasks: (" + this.standbyTasks + ")]";
    }

    public String toString() {
        return "[activeTasks: (" + this.activeTasks + ") standbyTasks: (" + this.standbyTasks + ") prevActiveTasks: (" + this.prevActiveTasks + ") prevStandbyTasks: (" + this.prevStandbyTasks + ") changelogOffsetTotalsByTask: (" + this.taskOffsetSums.entrySet() + ") taskLagTotals: (" + this.taskLagTotals.entrySet() + ") capacity: " + this.capacity + " assigned: " + this.assignedTaskCount() + "]";
    }

    private boolean isStateful(TaskId task) {
        return this.taskLagTotals.containsKey(task);
    }

    private void initializePrevActiveTasksFromOwnedPartitions(Map<TopicPartition, TaskId> taskForPartitionMap) {
        for (Map.Entry<TopicPartition, String> partitionEntry : this.ownedPartitions.entrySet()) {
            TopicPartition tp = partitionEntry.getKey();
            TaskId task = taskForPartitionMap.get(tp);
            if (task != null) {
                this.addPreviousActiveTask(task);
                this.consumerToPreviousActiveTaskIds.computeIfAbsent(partitionEntry.getValue(), k -> new HashSet()).add(task);
                continue;
            }
            LOG.error("No task found for topic partition {}", (Object)tp);
        }
    }

    private void initializeRemainingPrevTasksFromTaskOffsetSums() {
        if (this.prevActiveTasks.isEmpty() && !this.ownedPartitions.isEmpty()) {
            LOG.error("Tried to process tasks in offset sum map before processing tasks from ownedPartitions = {}", this.ownedPartitions);
            throw new IllegalStateException("Must initialize prevActiveTasks from ownedPartitions before initializing remaining tasks.");
        }
        for (Map.Entry<TaskId, Long> taskEntry : this.taskOffsetSums.entrySet()) {
            TaskId task = taskEntry.getKey();
            if (this.prevActiveTasks.contains(task)) continue;
            long offsetSum = taskEntry.getValue();
            if (offsetSum == -2L) {
                this.addPreviousActiveTask(task);
                continue;
            }
            this.addPreviousStandbyTask(task);
        }
    }

    private void assertNotAssigned(TaskId task) {
        if (this.standbyTasks.contains(task) || this.activeTasks.contains(task)) {
            throw new IllegalArgumentException("Tried to assign task " + task + ", but it is already assigned: " + this);
        }
    }
}

