/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.processor.internals.tasks;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.internals.KafkaFutureImpl;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.ReadOnlyTask;
import org.apache.kafka.streams.processor.internals.StreamTask;
import org.apache.kafka.streams.processor.internals.Task;
import org.apache.kafka.streams.processor.internals.TasksRegistry;
import org.apache.kafka.streams.processor.internals.tasks.DefaultTaskExecutor;
import org.apache.kafka.streams.processor.internals.tasks.TaskExecutor;
import org.apache.kafka.streams.processor.internals.tasks.TaskExecutorCreator;
import org.apache.kafka.streams.processor.internals.tasks.TaskManager;
import org.slf4j.Logger;

public class DefaultTaskManager
implements TaskManager {
    private final Time time;
    private final Logger log;
    private final TasksRegistry tasks;
    private final Lock tasksLock = new ReentrantLock();
    private final List<TaskId> lockedTasks = new ArrayList<TaskId>();
    private final Map<TaskId, TaskExecutor> assignedTasks = new HashMap<TaskId, TaskExecutor>();
    private final List<TaskExecutor> taskExecutors;

    public DefaultTaskManager(Time time, String clientId, TasksRegistry tasks, StreamsConfig config, TaskExecutorCreator executorCreator) {
        String logPrefix = String.format("%s ", clientId);
        LogContext logContext = new LogContext(logPrefix);
        this.log = logContext.logger(DefaultTaskManager.class);
        this.time = time;
        this.tasks = tasks;
        int numExecutors = config.getInt("num.stream.threads");
        this.taskExecutors = new ArrayList<TaskExecutor>(numExecutors);
        for (int i = 1; i <= numExecutors; ++i) {
            String name = clientId + "-TaskExecutor-" + i;
            this.taskExecutors.add(executorCreator.create(this, name, time));
        }
    }

    @Override
    public StreamTask assignNextTask(TaskExecutor executor) {
        return this.returnWithTasksLocked(() -> {
            if (!this.taskExecutors.contains(executor)) {
                throw new IllegalArgumentException("The requested executor for getting next task to assign is unrecognized");
            }
            for (Task task : this.tasks.activeTasks()) {
                if (this.assignedTasks.containsKey(task.id()) || this.lockedTasks.contains(task.id()) || !((StreamTask)task).isProcessable(this.time.milliseconds())) continue;
                this.assignedTasks.put(task.id(), executor);
                this.log.info("Assigned {} to executor {}", (Object)task.id(), (Object)executor.name());
                return (StreamTask)task;
            }
            return null;
        });
    }

    @Override
    public void unassignTask(StreamTask task, TaskExecutor executor) {
        this.executeWithTasksLocked(() -> {
            if (!this.taskExecutors.contains(executor)) {
                throw new IllegalArgumentException("The requested executor for unassign task is unrecognized");
            }
            TaskExecutor lockedExecutor = this.assignedTasks.get(task.id());
            if (lockedExecutor == null || lockedExecutor != executor) {
                throw new IllegalArgumentException("Task " + task.id() + " is not locked by the executor");
            }
            this.assignedTasks.remove(task.id());
            this.log.info("Unassigned {} from executor {}", (Object)task.id(), (Object)executor.name());
        });
    }

    @Override
    public KafkaFuture<Void> lockTasks(Set<TaskId> taskIds) {
        return (KafkaFuture)this.returnWithTasksLocked(() -> {
            this.lockedTasks.addAll(taskIds);
            KafkaFutureImpl result = new KafkaFutureImpl();
            ConcurrentSkipListSet remainingTaskIds = new ConcurrentSkipListSet(taskIds);
            for (TaskId taskId : taskIds) {
                Task task = this.tasks.task(taskId);
                if (task == null) {
                    throw new IllegalArgumentException("Trying to lock task " + taskId + " but it's not owned");
                }
                if (!task.isActive()) {
                    throw new IllegalArgumentException("The locking task " + taskId + " is not an active task");
                }
                if (this.assignedTasks.containsKey(taskId)) {
                    KafkaFuture<StreamTask> future = this.assignedTasks.get(taskId).unassign();
                    future.whenComplete((streamTask, throwable) -> {
                        if (throwable != null) {
                            result.completeExceptionally(throwable);
                        } else {
                            remainingTaskIds.remove(streamTask.id());
                            if (remainingTaskIds.isEmpty()) {
                                result.complete(null);
                            }
                        }
                    });
                    continue;
                }
                remainingTaskIds.remove(taskId);
                if (!remainingTaskIds.isEmpty()) continue;
                result.complete(null);
            }
            return result;
        });
    }

    @Override
    public KafkaFuture<Void> lockAllTasks() {
        return this.returnWithTasksLocked(() -> this.lockTasks(this.tasks.activeTasks().stream().map(Task::id).collect(Collectors.toSet())));
    }

    @Override
    public void unlockTasks(Set<TaskId> taskIds) {
        this.executeWithTasksLocked(() -> this.lockedTasks.removeAll(taskIds));
    }

    @Override
    public void unlockAllTasks() {
        this.executeWithTasksLocked(() -> this.unlockTasks(this.tasks.activeTasks().stream().map(Task::id).collect(Collectors.toSet())));
    }

    @Override
    public void add(Set<StreamTask> tasksToAdd) {
        this.executeWithTasksLocked(() -> {
            for (StreamTask task : tasksToAdd) {
                this.tasks.addTask(task);
            }
        });
        this.log.info("Added tasks {} to the task manager to process", tasksToAdd);
    }

    @Override
    public void remove(TaskId taskId) {
        this.executeWithTasksLocked(() -> {
            if (this.assignedTasks.containsKey(taskId)) {
                throw new IllegalArgumentException("The task to remove is still assigned to executors");
            }
            if (!this.lockedTasks.contains(taskId)) {
                throw new IllegalArgumentException("The task to remove is not locked yet by the task manager");
            }
            if (!this.tasks.contains(taskId)) {
                throw new IllegalArgumentException("The task to remove is not owned by the task manager");
            }
            this.tasks.removeTask(this.tasks.task(taskId));
        });
        this.log.info("Removed task {} from the task manager", (Object)taskId);
    }

    @Override
    public Set<ReadOnlyTask> getTasks() {
        return this.returnWithTasksLocked(() -> this.tasks.activeTasks().stream().map(ReadOnlyTask::new).collect(Collectors.toSet()));
    }

    private void executeWithTasksLocked(Runnable action) {
        this.tasksLock.lock();
        try {
            action.run();
        }
        finally {
            this.tasksLock.unlock();
        }
    }

    private <T> T returnWithTasksLocked(Supplier<T> action) {
        this.tasksLock.lock();
        try {
            T t = action.get();
            return t;
        }
        finally {
            this.tasksLock.unlock();
        }
    }

    static class DefaultTaskExecutorCreator
    implements TaskExecutorCreator {
        DefaultTaskExecutorCreator() {
        }

        @Override
        public TaskExecutor create(TaskManager taskManager, String name, Time time) {
            return new DefaultTaskExecutor(taskManager, name, time);
        }
    }
}

