/*
 * Decompiled with CFR 0.152.
 */
package org.exoplatform.analytics.api.service;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.exoplatform.analytics.api.processor.StatisticDataProcessorPlugin;
import org.exoplatform.analytics.model.StatisticDataQueueEntry;
import org.exoplatform.services.log.ExoLogger;
import org.exoplatform.services.log.Log;
import org.picocontainer.Startable;

public class StatisticDataProcessorService
implements Startable {
    private static final Log LOG = ExoLogger.getLogger(StatisticDataProcessorService.class);
    private static final short MAX_PROCESS_ATTEMPTS_COUNT = 5;
    private ArrayList<StatisticDataProcessorPlugin> dataProcessorPlugins = new ArrayList();

    public void start() {
        for (StatisticDataProcessorPlugin statisticDataProcessorPlugin : this.dataProcessorPlugins) {
            try {
                statisticDataProcessorPlugin.init();
            }
            catch (Exception e) {
                LOG.error("Error initializing processor with id {}", new Object[]{statisticDataProcessorPlugin.getId(), e});
            }
        }
    }

    public void stop() {
    }

    public void addProcessor(StatisticDataProcessorPlugin dataProcessorPlugin) {
        this.dataProcessorPlugins.add(dataProcessorPlugin);
    }

    public boolean pauseProcessor(String id) {
        StatisticDataProcessorPlugin processor = this.getProcessor(id);
        if (processor == null || processor.isPaused()) {
            return false;
        }
        processor.setPaused(true);
        return true;
    }

    public boolean unpauseProcessor(String id) {
        StatisticDataProcessorPlugin processor = this.getProcessor(id);
        if (processor == null || !processor.isPaused()) {
            return false;
        }
        processor.setPaused(false);
        return true;
    }

    public boolean isProcessorInitialized(String id) {
        StatisticDataProcessorPlugin processor = this.getProcessor(id);
        return processor != null && processor.isInitialized();
    }

    public List<StatisticDataProcessorPlugin> getProcessors() {
        return this.dataProcessorPlugins;
    }

    public StatisticDataProcessorPlugin getProcessor(String id) {
        return this.dataProcessorPlugins.stream().filter(processor -> StringUtils.equals((CharSequence)id, (CharSequence)processor.getId())).findFirst().orElse(null);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void process(List<? extends StatisticDataQueueEntry> queueEntries) {
        if (queueEntries == null || queueEntries.isEmpty()) {
            return;
        }
        List queueEntriesToProcess = queueEntries.stream().filter(queueEntry -> !queueEntry.isProcessed() && (!queueEntry.isError() || queueEntry.getAttemptCount() < 5)).collect(Collectors.toList());
        if (queueEntriesToProcess.isEmpty()) {
            return;
        }
        if (queueEntriesToProcess.size() > 100) {
            List subList;
            LOG.debug("Processing queue having {} documents with chunk of {}", new Object[]{queueEntriesToProcess.size(), (short)100});
            int index = 0;
            do {
                subList = queueEntriesToProcess.stream().skip(index).limit(100L).collect(Collectors.toList());
                this.process(subList);
            } while ((index += subList.size()) < queueEntriesToProcess.size());
            return;
        }
        for (StatisticDataProcessorPlugin statisticDataProcessorPlugin : this.dataProcessorPlugins) {
            if (statisticDataProcessorPlugin.isPaused()) {
                LOG.debug("Statistic Data Processor with id '{}' is paused, ignore processing queue items", new Object[]{statisticDataProcessorPlugin.getId()});
                continue;
            }
            if (!statisticDataProcessorPlugin.isInitialized()) {
                LOG.info("Statistic Data Processor with id '{}' is not initialized yet, ignore processing queue items", new Object[]{statisticDataProcessorPlugin.getId()});
                continue;
            }
            String processorId = statisticDataProcessorPlugin.getId();
            List<StatisticDataQueueEntry> processorQueueEntries = queueEntriesToProcess.stream().filter(queueEntry -> !this.isProcessorRun((StatisticDataQueueEntry)queueEntry, processorId)).collect(Collectors.toList());
            if (processorQueueEntries.isEmpty()) continue;
            try {
                statisticDataProcessorPlugin.process(processorQueueEntries);
                processorQueueEntries.forEach(queueEntry -> this.markProcessorAsSuccess((StatisticDataQueueEntry)queueEntry, processorId, (List<StatisticDataProcessorPlugin>)this.dataProcessorPlugins));
            }
            catch (Exception e) {
                if (LOG.isDebugEnabled()) {
                    LOG.warn("Error processing queue entries: {}, try to process entries one by one.", new Object[]{processorQueueEntries, e});
                } else {
                    LOG.warn((Object)"Error processing queue entries, try to process entries one by one.", (Throwable)e);
                }
                processorQueueEntries.forEach(queueEntry -> {
                    try {
                        statisticDataProcessorPlugin.process(Collections.singletonList(queueEntry));
                        this.markProcessorAsSuccess((StatisticDataQueueEntry)queueEntry, processorId, (List<StatisticDataProcessorPlugin>)this.dataProcessorPlugins);
                    }
                    catch (Exception exception) {
                        LOG.warn("Error processing queue entry {}", new Object[]{queueEntry, exception});
                        this.markProcessorAsError((StatisticDataQueueEntry)queueEntry, processorId);
                    }
                });
            }
            finally {
                processorQueueEntries.forEach(queueEntry -> queueEntry.setAttemptCount((short)(queueEntry.getAttemptCount() + 1)));
            }
        }
    }

    private void markProcessorAsSuccess(StatisticDataQueueEntry dataQueueEntry, String processorId, List<StatisticDataProcessorPlugin> processors) {
        if (dataQueueEntry.getProcessingStatus() == null) {
            dataQueueEntry.setProcessingStatus(new HashMap<String, Boolean>());
        }
        dataQueueEntry.getProcessingStatus().put(processorId, true);
        boolean processedForAll = this.isProcessedForAll(dataQueueEntry, processors);
        dataQueueEntry.setProcessed(processedForAll);
    }

    private void markProcessorAsError(StatisticDataQueueEntry dataQueueEntry, String processorId) {
        if (dataQueueEntry.getProcessingStatus() == null) {
            dataQueueEntry.setProcessingStatus(new HashMap<String, Boolean>());
        }
        dataQueueEntry.getProcessingStatus().put(processorId, false);
        dataQueueEntry.setError(true);
        if (dataQueueEntry.getAttemptCount() >= 5) {
            LOG.warn("Delete from queue an entry that didn't suceeded to be injected for {} times: {}", new Object[]{(short)5, dataQueueEntry});
            dataQueueEntry.setProcessed(true);
        }
    }

    private boolean isProcessedForAll(StatisticDataQueueEntry dataQueueEntry, List<StatisticDataProcessorPlugin> processors) {
        if (processors == null || processors.isEmpty()) {
            return true;
        }
        for (StatisticDataProcessorPlugin statisticDataProcessorPlugin : processors) {
            if (this.isProcessorRun(dataQueueEntry, statisticDataProcessorPlugin.getId())) continue;
            return false;
        }
        return true;
    }

    private boolean isProcessorRun(StatisticDataQueueEntry dataQueueEntry, String processorId) {
        Boolean processorStatus = dataQueueEntry.getProcessingStatus() == null ? null : dataQueueEntry.getProcessingStatus().get(processorId);
        return processorStatus != null && processorStatus != false;
    }
}

