/*
 * Decompiled with CFR 0.152.
 */
package com.atlassian.jira.cluster.distribution.localq;

import com.atlassian.jira.cluster.ClusterNodes;
import com.atlassian.jira.cluster.Node;
import com.atlassian.jira.cluster.distribution.localq.LocalQCacheOp;
import com.atlassian.jira.cluster.distribution.localq.LocalQCacheOpQueueWithStats;
import com.atlassian.jira.cluster.distribution.localq.LocalQCacheOpSender;
import com.atlassian.jira.cluster.distribution.localq.LocalQConfig;
import com.atlassian.jira.cluster.distribution.localq.LocalQCriticalHandler;
import com.google.common.base.Preconditions;
import com.google.common.base.Stopwatch;
import com.google.common.collect.ImmutableSet;
import java.rmi.NotBoundException;
import java.util.NoSuchElementException;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class LocalQCacheOpReader
implements Runnable {
    private static final Logger LOG = LoggerFactory.getLogger(LocalQCacheOpReader.class);
    private static final int LOG_CHECKED_EXCEPTIONS_FREQUENCY = 100;
    private static final int LOG_RUNTIME_EXCEPTIONS_FREQUENCY = 50;
    private final LocalQCacheOpQueueWithStats queue;
    private final LocalQCacheOpSender sender;
    private final LocalQCriticalHandler criticalHandler;
    private final ClusterNodes clusterNodes;
    private final Set<Node.NodeState> nodeStatesWithActiveSenders = ImmutableSet.of((Object)((Object)Node.NodeState.ACTIVE));
    private final long nodeTimeSynchronisationToleranceMillis;

    private LocalQCacheOpReader(LocalQCacheOpQueueWithStats queue, LocalQCacheOpSender sender, LocalQCriticalHandler criticalHandler, ClusterNodes clusterNodes, long nodeTimeSynchronisationToleranceMillis) {
        this.queue = queue;
        this.sender = sender;
        this.criticalHandler = criticalHandler;
        this.clusterNodes = clusterNodes;
        this.nodeTimeSynchronisationToleranceMillis = nodeTimeSynchronisationToleranceMillis;
    }

    public static LocalQCacheOpReader create(LocalQCacheOpQueueWithStats queue, LocalQCacheOpSender sender, LocalQCriticalHandler localQCriticalHandler, ClusterNodes clusterNodes) {
        return new LocalQCacheOpReader(queue, sender, localQCriticalHandler, clusterNodes, LocalQConfig.nodeTimeSynchronisationToleranceMillis());
    }

    @Override
    public void run() {
        LOG.info("Started listening for cache replication queue: {} ", (Object)this.queue.name());
        AtomicLong failuresCount = new AtomicLong(0L);
        while (!Thread.currentThread().isInterrupted() && !this.queue.isClosed()) {
            try {
                Node node = this.clusterNodes.node(this.queue.id().nodeId);
                if (node == null) {
                    LOG.warn("Trying to send a LocalQCacheOp to non-existing node: {}", (Object)this.queue.id().nodeId);
                    TimeUnit.SECONDS.sleep(5L);
                    continue;
                }
                if (!this.nodeStatesWithActiveSenders.contains((Object)node.getState())) {
                    LOG.debug("Not sending to node in state: {}, only sending to nodes in following states: {}", (Object)node.getState(), this.nodeStatesWithActiveSenders);
                    TimeUnit.SECONDS.sleep(5L);
                    continue;
                }
                LocalQCacheOp localQCacheOp = this.peekOrBlock();
                if (localQCacheOp == null) continue;
                if (this.isStaleFor(localQCacheOp, node)) {
                    LOG.debug("Skipping sending stale: {} to node: {}. Removing from cache replication queue: {}.", new Object[]{localQCacheOp, node, this.queue.name()});
                    this.remove(localQCacheOp);
                    this.queue.notifyStale();
                    continue;
                }
                boolean sendOK = false;
                try {
                    Stopwatch stopwatch = Stopwatch.createStarted();
                    this.sender.send(node, localQCacheOp);
                    stopwatch.stop();
                    sendOK = true;
                    failuresCount.set(0L);
                    this.queue.notifySendWithTime(stopwatch.elapsed(TimeUnit.MILLISECONDS));
                }
                catch (RuntimeException e) {
                    this.handleRuntimeException(e, localQCacheOp, failuresCount);
                }
                catch (LocalQCacheOpSender.RecoverableFailure e) {
                    this.handleCheckedException(e, localQCacheOp, failuresCount);
                }
                if (!sendOK) continue;
                this.remove(localQCacheOp);
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
        LOG.info("Finished listening on cache replication queue: {} , queue isClosed: {}, thread isInterrupted: {}", new Object[]{this.queue.name(), this.queue.isClosed(), Thread.currentThread().isInterrupted()});
    }

    private boolean isStaleFor(LocalQCacheOp localQCacheOp, Node nodeWithActiveSender) {
        Preconditions.checkArgument((boolean)this.nodeStatesWithActiveSenders.contains((Object)nodeWithActiveSender.getState()));
        long cacheOpCreationTimeMillis = localQCacheOp.getCreationTimeInMillis();
        long destinationNodeTimeMillis = nodeWithActiveSender.getTimestamp();
        return cacheOpCreationTimeMillis != 0L && destinationNodeTimeMillis != 0L && cacheOpCreationTimeMillis < destinationNodeTimeMillis - this.nodeTimeSynchronisationToleranceMillis;
    }

    boolean remove(LocalQCacheOp localQCacheOp) {
        try {
            if (this.queue.isClosed()) {
                LOG.warn("Will not remove cache replication event: {} from closed cache replication queue: {}", (Object)localQCacheOp, (Object)this.queue.name());
                return false;
            }
            this.queue.remove();
            return true;
        }
        catch (NoSuchElementException e) {
            LOG.warn("Tried to remove: {} from an empty cache replication queue: {}. This could happen if queue.remove was called without queue.peek or there was a queue.backup between queue.peek and queue.remove. Ignoring and continue...", (Object)localQCacheOp, (Object)this.queue.name());
            return false;
        }
        catch (Throwable t) {
            LOG.error("Critical state of local cache replication queue - cannot remove localQCacheOp: {} from queue: {}, error: {}. ", new Object[]{localQCacheOp, this.queue.name(), t.getMessage(), t});
            this.queue.notifyCriticalRemove();
            return this.criticalHandler.handleCriticalRemove(this.queue, localQCacheOp, t);
        }
    }

    LocalQCacheOp peekOrBlock() throws InterruptedException, IllegalStateException {
        try {
            if (this.queue.isClosed()) {
                LOG.warn("Will not peekOrBlock from closed cache replication queue: {}", (Object)this.queue.name());
                return null;
            }
            return this.queue.peekOrBlock();
        }
        catch (InterruptedException e) {
            throw e;
        }
        catch (Throwable t) {
            LOG.error("Critical state of local cache replication queue - cannot peek from queue: {}, error: {}", new Object[]{this.queue.name(), t.getMessage(), t});
            this.queue.notifyCriticalPeek();
            this.criticalHandler.handleCriticalPeek(this.queue, t);
            return null;
        }
    }

    private static int numberOfRetriesFor(RuntimeException e) {
        return e instanceof LocalQCacheOpSender.UnrecoverableFailure ? ((LocalQCacheOpSender.UnrecoverableFailure)e).retry : 10;
    }

    private void handleRuntimeException(RuntimeException e, LocalQCacheOp localQCacheOp, AtomicLong failuresCount) throws InterruptedException {
        boolean isNotBoundException = e.getCause() instanceof NotBoundException;
        if (isNotBoundException) {
            this.queue.notifyNotBoundException();
        } else {
            this.queue.notifyRuntimeException();
        }
        failuresCount.incrementAndGet();
        int retries = LocalQCacheOpReader.numberOfRetriesFor(e);
        if (failuresCount.get() == 1L || failuresCount.get() % 50L == 0L) {
            LOG.warn("Runtime exception: {} occurred when processing: {} from cache replication queue: {}, failuresCount: {}/{}, error: {}", new Object[]{e.getClass().getSimpleName(), localQCacheOp, this.queue.name(), failuresCount.get(), retries, e.getMessage()});
        }
        if (failuresCount.get() >= (long)retries) {
            if (isNotBoundException) {
                LOG.warn("Abandoning sending because cache does not exist on destination node: {} from cache replication queue: {}, failuresCount: {}/{}. Removing from queue. Error: {}", new Object[]{localQCacheOp, this.queue.name(), failuresCount, retries, e.getCause().getMessage(), e.getCause()});
            } else {
                LOG.error("Abandoning sending: {} from cache replication queue: {}, failuresCount: {}/{}. Removing from queue. Error: {}", new Object[]{localQCacheOp, this.queue.name(), failuresCount, retries, e.getMessage(), e});
                this.queue.notifyDroppedOnSend();
            }
            failuresCount.set(0L);
            this.remove(localQCacheOp);
        } else {
            this.sleepBeforeNextOpRetry(failuresCount.get(), 10L, 100L, 500L, 500L, 1000L);
        }
    }

    private void handleCheckedException(LocalQCacheOpSender.RecoverableFailure recoverableFailure, LocalQCacheOp localQCacheOp, AtomicLong failuresCount) throws InterruptedException {
        this.queue.notifyCheckedException();
        failuresCount.incrementAndGet();
        if (failuresCount.get() == 1L || failuresCount.get() % 100L == 0L) {
            LOG.info("Checked exception: {} occurred when processing: {} from cache replication queue: {}, failuresCount: {}. Will retry indefinitely.", new Object[]{recoverableFailure.getClass().getSimpleName(), localQCacheOp, this.queue.name(), failuresCount.get(), recoverableFailure});
        } else {
            LOG.trace("Checked exception: {} occurred when processing: {} from cache replication queue: {}, failuresCount: {}. Will retry indefinitely.", new Object[]{recoverableFailure.getClass().getSimpleName(), localQCacheOp, this.queue.name(), failuresCount.get(), recoverableFailure});
        }
        this.sleepBeforeNextOpRetry(failuresCount.get(), 100L, 500L, 1000L, 1000L, 1000L, 1000L, 2000L, 3000L, 5000L);
    }

    void sleepBeforeNextOpRetry(long failureCount, long ... sleepTimesInMillis) throws InterruptedException {
        if (failureCount < 1L) {
            TimeUnit.MILLISECONDS.sleep(sleepTimesInMillis[0]);
        } else {
            int idx = Math.min((int)failureCount, sleepTimesInMillis.length) - 1;
            TimeUnit.MILLISECONDS.sleep(sleepTimesInMillis[idx]);
        }
    }
}

