/*
 * Decompiled with CFR 0.152.
 */
package com.atlassian.jira.cluster.distribution.localq;

import com.atlassian.annotations.ExperimentalApi;
import com.atlassian.event.api.EventListener;
import com.atlassian.event.api.EventPublisher;
import com.atlassian.jira.cluster.ClusterNodes;
import com.atlassian.jira.cluster.Node;
import com.atlassian.jira.cluster.NodeChangedEvent;
import com.atlassian.jira.cluster.NodeJoinedClusterEvent;
import com.atlassian.jira.cluster.NodeRemovedFromClusterEvent;
import com.atlassian.jira.cluster.distribution.localq.LocalQCacheOp;
import com.atlassian.jira.cluster.distribution.localq.LocalQCacheOpQueue;
import com.atlassian.jira.cluster.distribution.localq.LocalQCacheOpQueueFactory;
import com.atlassian.jira.cluster.distribution.localq.LocalQCacheOpQueueWithStats;
import com.atlassian.jira.cluster.distribution.localq.LocalQCacheOpReader;
import com.atlassian.jira.cluster.distribution.localq.LocalQCacheOpSender;
import com.atlassian.jira.cluster.distribution.localq.LocalQConfig;
import com.atlassian.jira.cluster.distribution.localq.LocalQCriticalHandler;
import com.atlassian.jira.cluster.distribution.localq.LocalQCriticalHandlerFactory;
import com.atlassian.jira.cluster.distribution.localq.LocalQStatsUtil;
import com.atlassian.plugin.event.events.PluginFrameworkShutdownEvent;
import com.atlassian.plugin.event.events.PluginFrameworkStartingEvent;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.io.IOException;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class LocalQCacheManager {
    private static final Logger LOG = LoggerFactory.getLogger(LocalQCacheManager.class);
    private final AtomicReference<Status> status;
    private static ImmutableSet<Status> STATUS_SHUTTING_DOWN = ImmutableSet.of((Object)((Object)Status.STOPPING), (Object)((Object)Status.STOPPED));
    private final Set<Node.NodeState> nodeStatesWithActiveQueues = ImmutableSet.of((Object)((Object)Node.NodeState.ACTIVATING), (Object)((Object)Node.NodeState.ACTIVE));
    private final ConcurrentHashMap<LocalQCacheOpQueue.QueueId, LocalQCacheOpQueueWithStats> queuesByQueueId = new ConcurrentHashMap();
    private final ConcurrentHashMap<LocalQCacheOpQueue.QueueId, Future<?>> queueReaderByQueueId = new ConcurrentHashMap();
    private final ClusterNodes clusterNodes;
    private final LocalQCacheOpSender localQCacheOpSender;
    private final LocalQCacheOpQueueFactory localQCacheOpQueueFactory;
    private final LocalQCriticalHandler criticalHandler;
    private final EventPublisher eventPublisher;
    private final ExecutorService executorReaders = Executors.newCachedThreadPool(new ThreadFactoryBuilder().setDaemon(true).setNameFormat("localq-reader-%d").build());
    private final ScheduledExecutorService executorStats = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder().setDaemon(true).setNameFormat("localq-stats-%d").build());

    public LocalQCacheManager(ClusterNodes clusterNodes, LocalQCacheOpSender localQCacheOpSender, LocalQCacheOpQueueFactory localQCacheOpQueueFactory, LocalQCriticalHandlerFactory localQCriticalHandlerFactory, EventPublisher eventPublisher) {
        this.clusterNodes = clusterNodes;
        this.localQCacheOpSender = localQCacheOpSender;
        this.localQCacheOpQueueFactory = localQCacheOpQueueFactory;
        this.criticalHandler = localQCriticalHandlerFactory.create();
        this.eventPublisher = eventPublisher;
        this.status = new AtomicReference<Status>(Status.CREATED);
        this.eventPublisher.register((Object)this);
    }

    @EventListener
    public void onPluginFrameworkShutdownEvent(PluginFrameworkShutdownEvent event) {
        this.stop();
    }

    @EventListener
    public void onPluginFrameworkStartingEvent(PluginFrameworkStartingEvent event) {
        this.start();
    }

    public void start() {
        this.status.set(Status.STARTING);
        LOG.info("Starting {}...", (Object)LocalQCacheManager.class.getSimpleName());
        this.init();
        LOG.info("Done starting {}.", (Object)LocalQCacheManager.class.getSimpleName());
        this.status.set(Status.STARTED);
        this.logQStats("onStart");
    }

    public void stop() {
        this.logQStats("onStop");
        this.status.set(Status.STOPPING);
        LOG.info("Stopping {}...", (Object)LocalQCacheManager.class.getSimpleName());
        this.destroy();
        LOG.info("Done stopping {}.", (Object)LocalQCacheManager.class.getSimpleName());
        this.status.set(Status.STOPPED);
    }

    void init() {
        if (!this.clusterNodes.current().isClustered()) {
            LOG.warn("This node is not a cluster node. Not initializing: {}", (Object)LocalQCacheManager.class.getSimpleName());
        } else {
            for (Node node : this.getNodesWhichShouldHaveActiveQueues()) {
                this.initOrGetAllQueues(node);
            }
            if (this.queuesByQueueId.isEmpty()) {
                LOG.info("Currently there are no other nodes in cluster. Not creating any cache replication queues.");
            }
            this.executorStats.scheduleAtFixedRate(() -> this.logQStats("scheduled"), LocalQConfig.statsLoggingIntervalSeconds(), LocalQConfig.statsLoggingIntervalSeconds(), TimeUnit.SECONDS);
        }
    }

    @ExperimentalApi
    public Set<LocalQCacheOpQueueWithStats.QueueStats> queueStatsTotal() {
        return this.queuesByQueueId.values().stream().map(LocalQCacheOpQueueWithStats::statsTotal).collect(Collectors.toSet());
    }

    @ExperimentalApi
    public Set<LocalQCacheOpQueueWithStats.QueueStats> queueStatsTotalMerged() {
        Map<String, List<LocalQCacheOpQueueWithStats.QueueStats>> totalStatsByNodeId = this.queueStatsTotal().stream().collect(Collectors.groupingBy(queueStats -> queueStats.nodeId));
        Map<String, LocalQCacheOpQueueWithStats.QueueStats> totalStatsMergedByNodeId = totalStatsByNodeId.keySet().stream().collect(Collectors.toMap(nodeId -> nodeId, nodeId -> LocalQCacheOpQueueWithStats.QueueStats.merge(nodeId, (List)totalStatsByNodeId.get(nodeId))));
        return new HashSet<LocalQCacheOpQueueWithStats.QueueStats>(totalStatsMergedByNodeId.values());
    }

    private synchronized void logQStats(String context) {
        try {
            ImmutableSet queueWithStats = ImmutableSet.copyOf(this.queuesByQueueId.values());
            LOG.info("[{}] Running cache replication queue stats for: {} queues...", (Object)context, (Object)this.queuesByQueueId.size());
            LocalQStatsUtil.logStats(LOG, (Set<LocalQCacheOpQueueWithStats>)queueWithStats);
            LOG.info("[{}] ... done running cache replication queue stats for: {} queues.", (Object)context, (Object)this.queuesByQueueId.size());
        }
        catch (Throwable t) {
            LOG.error("Error occurred in cache replication queue stats job: {}, error: {}", new Object[]{Thread.currentThread().getName(), t.getMessage(), t});
        }
    }

    void destroy() {
        this.eventPublisher.unregister((Object)this);
        this.executorStats.shutdownNow();
        try {
            this.executorStats.awaitTermination(1L, TimeUnit.SECONDS);
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        this.executorReaders.shutdownNow();
        try {
            this.executorReaders.awaitTermination(1L, TimeUnit.SECONDS);
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        for (LocalQCacheOpQueue localQCacheOpQueue : this.queuesByQueueId.values()) {
            localQCacheOpQueue.close();
        }
    }

    private Set<Node> getNodesWhichShouldHaveActiveQueues() {
        return this.clusterNodes.all().stream().filter(this.nodeShouldHaveActiveQueue()).collect(Collectors.toSet());
    }

    private Predicate<Node> nodeShouldHaveActiveQueue() {
        return node -> node.isClustered() && !this.isNodeCurrentNode((Node)node) && this.nodeStatesWithActiveQueues.contains((Object)node.getState());
    }

    private boolean isNodeCurrentNode(Node node) {
        return Objects.equals(this.clusterNodes.current().getNodeId(), node.getNodeId());
    }

    @EventListener
    public void onNodeChangedEvent(NodeChangedEvent nodeChangedEvent) {
        Node node = nodeChangedEvent.getNewState();
        if (node != null) {
            if (this.nodeStatesWithActiveQueues.contains((Object)node.getState())) {
                this.onNodeAdded(node);
            } else {
                this.onNodeRemoved(node);
            }
        }
    }

    @EventListener
    public void onNodeJoinedClusterEvent(NodeJoinedClusterEvent nodeJoinedClusterEvent) {
        Node nodeAdded = nodeJoinedClusterEvent.getNode();
        this.onNodeAdded(nodeAdded);
    }

    @EventListener
    public void onNodeRemovedFromClusterEvent(NodeRemovedFromClusterEvent nodeRemovedFromClusterEvent) {
        Node nodeRemoved = nodeRemovedFromClusterEvent.getNode();
        this.onNodeRemoved(nodeRemoved);
    }

    void onNodeAdded(Node node) {
        if (node != null && node.isClustered() && node.getNodeId() != null && !this.isNodeCurrentNode(node)) {
            this.initOrGetAllQueues(node);
        }
    }

    void onNodeRemoved(Node node) {
        if (node != null && node.getNodeId() != null && !this.isNodeCurrentNode(node)) {
            this.closeAllQueues(node.getNodeId());
        }
    }

    private void validateClusteredNode(Node node) {
        Preconditions.checkNotNull((Object)node);
        Preconditions.checkArgument((boolean)node.isClustered());
        Preconditions.checkNotNull((Object)node.getNodeId());
    }

    private void forEachNodeQueueNumber(Consumer<Integer> doForNodeNumber) {
        for (int i = 0; i < 10; ++i) {
            doForNodeNumber.accept(i);
        }
    }

    private void initOrGetAllQueues(Node node) {
        this.forEachNodeQueueNumber(nodeQueueNumber -> this.initOrGetQueue(node, (int)nodeQueueNumber));
    }

    private LocalQCacheOpQueueWithStats initOrGetQueueForCurrentThread(Node node) {
        return this.initOrGetQueue(node, LocalQCacheOpQueue.nodeQueueNumberForCurrentThread());
    }

    private LocalQCacheOpQueueWithStats initOrGetQueue(Node node, int nodeQueueNumber) {
        this.validateClusteredNode(node);
        Preconditions.checkArgument((!this.isNodeCurrentNode(node) ? 1 : 0) != 0, (String)"Node cannot create cache replication queue for itself: {}", (Object)node.getNodeId());
        LocalQCacheOpQueue.QueueId queueId = LocalQCacheOpQueue.QueueId.create(node.getNodeId(), nodeQueueNumber);
        return this.queuesByQueueId.computeIfAbsent(queueId, s -> {
            try {
                LocalQCacheOpQueueWithStats queue = new LocalQCacheOpQueueWithStats(this.localQCacheOpQueueFactory.create(node, nodeQueueNumber));
                LocalQCacheOpReader reader = LocalQCacheOpReader.create(queue, this.localQCacheOpSender, this.criticalHandler, this.clusterNodes);
                Future<?> readerTask = this.executorReaders.submit(reader);
                this.queueReaderByQueueId.put(queueId, readerTask);
                LOG.info("Created cache replication queue: {} with queue reader running: {}", (Object)queue.name(), (Object)(!readerTask.isDone() ? 1 : 0));
                return queue;
            }
            catch (IOException e) {
                LOG.error("Error when creating cache replication queue: {} for node: {}. This node will be inconsistent. Error: {}", new Object[]{queueId, node.getNodeId(), e.getMessage(), e});
                return null;
            }
        });
    }

    private void closeAllQueues(String nodeId) {
        this.forEachNodeQueueNumber(nodeQueueNumber -> this.closeQueue(nodeId, (int)nodeQueueNumber));
    }

    private void closeQueue(String nodeId, int nodeQueueNumber) {
        LocalQCacheOpQueue localQCacheOpQueue;
        if (nodeId == null) {
            return;
        }
        LocalQCacheOpQueue.QueueId queueId = LocalQCacheOpQueue.QueueId.create(nodeId, nodeQueueNumber);
        Future<?> queueReaderFuture = this.queueReaderByQueueId.get(queueId);
        if (queueReaderFuture != null) {
            LOG.info("Closing cache replication queue sender: {}", (Object)queueId);
            queueReaderFuture.cancel(true);
            this.queueReaderByQueueId.remove(queueId);
        }
        if ((localQCacheOpQueue = (LocalQCacheOpQueue)this.queuesByQueueId.get(queueId)) != null) {
            LOG.info("Closing cache replication queue: {}", (Object)queueId);
            localQCacheOpQueue.close();
            this.queuesByQueueId.remove(queueId);
        }
    }

    private boolean isShuttingDown() {
        return STATUS_SHUTTING_DOWN.contains((Object)this.status.get());
    }

    public int addToAllQueues(LocalQCacheOp localQCacheOp) {
        if (this.isShuttingDown()) {
            LOG.info("Service: {} is in status: {}. Not replicating: {}", new Object[]{LocalQCacheManager.class.getSimpleName(), this.status.get(), localQCacheOp});
            return 0;
        }
        Set<Node> nodes = this.getNodesWhichShouldHaveActiveQueues();
        if (nodes.size() == 0) {
            LOG.debug("No nodes in cluster to replicate: {}", (Object)localQCacheOp);
            return 0;
        }
        LOG.debug("About to add localQCacheOp: {} to cache replication queues: {}...", (Object)localQCacheOp, (Object)nodes.size());
        int numberOfQueues = 0;
        for (Node node : nodes) {
            LocalQCacheOpQueueWithStats q = this.initOrGetQueueForCurrentThread(node);
            if (q == null) {
                LOG.warn("Queue is null. Not replicating: {}", (Object)localQCacheOp);
                continue;
            }
            if (q.isClosed()) {
                LOG.warn("Queue: {} is closed. Not replicating: {}", (Object)q.name(), (Object)localQCacheOp);
                continue;
            }
            if (!this.addToQueue(q, localQCacheOp)) continue;
            ++numberOfQueues;
        }
        LOG.debug("Done adding localQCacheOp: {} to cache replication queues: {}.", (Object)localQCacheOp, (Object)numberOfQueues);
        return numberOfQueues;
    }

    private boolean addToQueue(LocalQCacheOpQueueWithStats localQCacheOpQueue, LocalQCacheOp localQCacheOp) {
        try {
            return localQCacheOpQueue.add(localQCacheOp);
        }
        catch (Throwable t) {
            LOG.error("Critical state of local cache replication queue - cannot add: {} to queue: {}, error: {}", new Object[]{localQCacheOp, localQCacheOpQueue.name(), t.getMessage(), t});
            localQCacheOpQueue.notifyCriticalAdd();
            this.criticalHandler.handleCriticalAdd(localQCacheOpQueue, localQCacheOp, t);
            return false;
        }
    }

    Set<LocalQCacheOpQueue.QueueId> getQueueIdsForExistingQueues() {
        return new HashSet<LocalQCacheOpQueue.QueueId>(Collections.list(this.queuesByQueueId.keys()));
    }

    private static enum Status {
        CREATED,
        STARTING,
        STARTED,
        STOPPING,
        STOPPED;

    }
}

