/*
 * Decompiled with CFR 0.152.
 */
package com.atlassian.jira.cluster.dbr;

import com.atlassian.event.api.EventListener;
import com.atlassian.event.api.EventPublisher;
import com.atlassian.jira.cluster.dbr.DBRMessageFactory;
import com.atlassian.jira.cluster.dbr.DBRSender;
import com.atlassian.jira.cluster.dbr.DefaultDBRSender;
import com.atlassian.jira.cluster.dbr.transport.DBRTransportManager;
import com.atlassian.jira.config.FeatureManager;
import com.atlassian.jira.config.properties.JiraSystemProperties;
import com.atlassian.jira.issue.index.DefaultIssueIndexer;
import com.atlassian.jira.issue.index.IndexDirectoryFactory;
import com.atlassian.plugin.event.events.PluginFrameworkShutdownEvent;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.lucene.document.Document;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AsyncDBRSender
implements DBRSender {
    private static final String SYSTEM_PROPERTY_DBR_ASYNC_THREADS = "com.atlassian.jira.cluster.dbr.async.threads";
    private static final String SYSTEM_PROPERTY_DBR_ASYNC_QUEUE_SIZE = "com.atlassian.jira.cluster.dbr.async.queue.size";
    private static final Logger log = LoggerFactory.getLogger(AsyncDBRSender.class);
    private final DBRSender delegate;
    private final ExecutorService dbrExecutor;
    private final AtomicInteger dropCounter = new AtomicInteger();

    public AsyncDBRSender(DBRMessageFactory documentMessageFactory, FeatureManager featureManager, EventPublisher eventPublisher, DBRTransportManager dbrTransportManager) {
        this(new DefaultDBRSender(documentMessageFactory, featureManager, dbrTransportManager), eventPublisher);
    }

    AsyncDBRSender(DBRSender delegate, EventPublisher eventPublisher) {
        this.delegate = delegate;
        int numberOfThreads = JiraSystemProperties.getInstance().getInteger(SYSTEM_PROPERTY_DBR_ASYNC_THREADS, Integer.valueOf(50));
        int boundedQueueSize = JiraSystemProperties.getInstance().getInteger(SYSTEM_PROPERTY_DBR_ASYNC_QUEUE_SIZE, Integer.valueOf(1000));
        this.dbrExecutor = AsyncDBRSender.createThreadPool(numberOfThreads, boundedQueueSize);
        eventPublisher.register((Object)this);
    }

    @EventListener
    public void onPluginFrameworkShutdown(PluginFrameworkShutdownEvent event) {
        MoreExecutors.shutdownAndAwaitTermination((ExecutorService)this.dbrExecutor, (long)5L, (TimeUnit)TimeUnit.SECONDS);
    }

    int dropCounter() {
        return this.dropCounter.get();
    }

    @Override
    public void sendUpdateWithRelated(DefaultIssueIndexer.Documents documents, long cost) {
        try {
            this.dbrExecutor.submit(() -> this.delegate.sendUpdateWithRelated(documents, cost));
        }
        catch (RejectedExecutionException e) {
            this.incrementDropped();
        }
    }

    @Override
    public void sendUpdate(IndexDirectoryFactory.Name index, Document document, long cost) {
        try {
            this.dbrExecutor.submit(() -> this.delegate.sendUpdate(index, document, cost));
        }
        catch (RejectedExecutionException e) {
            this.incrementDropped();
        }
    }

    @Override
    public Map totalStats() {
        return this.delegate.totalStats();
    }

    private void incrementDropped() {
        this.dropCounter.incrementAndGet();
        String message = "[DBR] [SENDER] Could not create DBR message - too many DBR messages being processed. Number of dropped messages: " + this.dropCounter.get();
        if (this.dropCounter.get() % 100 == 0) {
            log.info(message);
        } else {
            log.trace(message);
        }
    }

    private static ExecutorService createThreadPool(int numberOfThreads, int boundedQueueSize) {
        log.info("[DBR] [SENDER] creating thread pool with: {} threads and bounded queue of size: {} for creating DBR messages.", (Object)numberOfThreads, (Object)boundedQueueSize);
        ArrayBlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>(boundedQueueSize);
        return new ThreadPoolExecutor(numberOfThreads, numberOfThreads, 0L, TimeUnit.MILLISECONDS, queue, new ThreadFactoryBuilder().setNameFormat("dbr-async-%d").setDaemon(true).build());
    }
}

