/*
 * Decompiled with CFR 0.152.
 */
package org.apache.camel.component.file;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Deque;
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
import org.apache.camel.AsyncCallback;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.ShutdownRunningTask;
import org.apache.camel.component.file.GenericFile;
import org.apache.camel.component.file.GenericFileEndpoint;
import org.apache.camel.component.file.GenericFileOnCompletion;
import org.apache.camel.component.file.GenericFileOperationFailedException;
import org.apache.camel.component.file.GenericFileOperations;
import org.apache.camel.component.file.GenericFileProcessStrategy;
import org.apache.camel.impl.ScheduledBatchPollingConsumer;
import org.apache.camel.util.CastUtils;
import org.apache.camel.util.ObjectHelper;
import org.apache.camel.util.StopWatch;
import org.apache.camel.util.TimeUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class GenericFileConsumer<T>
extends ScheduledBatchPollingConsumer {
    protected final transient Logger log = LoggerFactory.getLogger(this.getClass());
    protected GenericFileEndpoint<T> endpoint;
    protected GenericFileOperations<T> operations;
    protected volatile boolean loggedIn;
    protected String fileExpressionResult;
    protected volatile ShutdownRunningTask shutdownRunningTask;
    protected volatile int pendingExchanges;
    protected Processor customProcessor;
    protected boolean eagerLimitMaxMessagesPerPoll = true;
    protected volatile boolean prepareOnStartup;

    public GenericFileConsumer(GenericFileEndpoint<T> endpoint, Processor processor, GenericFileOperations<T> operations) {
        super(endpoint, processor);
        this.endpoint = endpoint;
        this.operations = operations;
    }

    public Processor getCustomProcessor() {
        return this.customProcessor;
    }

    public void setCustomProcessor(Processor processor) {
        this.customProcessor = processor;
    }

    public boolean isEagerLimitMaxMessagesPerPoll() {
        return this.eagerLimitMaxMessagesPerPoll;
    }

    public void setEagerLimitMaxMessagesPerPoll(boolean eagerLimitMaxMessagesPerPoll) {
        this.eagerLimitMaxMessagesPerPoll = eagerLimitMaxMessagesPerPoll;
    }

    @Override
    protected int poll() throws Exception {
        int n;
        if (!this.prepareOnStartup) {
            this.endpoint.getGenericFileProcessStrategy().prepareOnStartup(this.operations, this.endpoint);
            this.prepareOnStartup = true;
        }
        this.fileExpressionResult = null;
        this.shutdownRunningTask = null;
        this.pendingExchanges = 0;
        if (!this.prePollCheck()) {
            this.log.debug("Skipping poll as pre poll check returned false");
            return 0;
        }
        ArrayList<GenericFile<T>> files = new ArrayList<GenericFile<T>>();
        String name = this.endpoint.getConfiguration().getDirectory();
        StopWatch stop = new StopWatch();
        boolean limitHit = !this.pollDirectory(name, files, 0);
        long delta = stop.stop();
        if (this.log.isDebugEnabled()) {
            this.log.debug("Took {} to poll: {}", (Object)TimeUtils.printDuration(delta), (Object)name);
        }
        if (limitHit) {
            this.log.debug("Limiting maximum messages to poll at {} files as there was more messages in this poll.", (Object)this.maxMessagesPerPoll);
        }
        if (this.endpoint.getSorter() != null) {
            Collections.sort(files, this.endpoint.getSorter());
        }
        LinkedList<Exchange> exchanges = new LinkedList<Exchange>();
        for (GenericFile genericFile : files) {
            Exchange exchange = this.endpoint.createExchange(genericFile);
            this.endpoint.configureExchange(exchange);
            this.endpoint.configureMessage(genericFile, exchange.getIn());
            exchanges.add(exchange);
        }
        if (this.endpoint.getSortBy() != null) {
            Collections.sort(exchanges, this.endpoint.getSortBy());
        }
        LinkedList<Exchange> q = exchanges;
        if (!this.eagerLimitMaxMessagesPerPoll && this.maxMessagesPerPoll > 0 && files.size() > this.maxMessagesPerPoll) {
            this.log.debug("Limiting maximum messages to poll at {} files as there was more messages in this poll.", (Object)this.maxMessagesPerPoll);
            this.removeExcessiveInProgressFiles(q, this.maxMessagesPerPoll);
        }
        if ((n = exchanges.size()) > 0) {
            this.log.debug("Total {} files to consume", (Object)n);
        }
        int polledMessages = this.processBatch(CastUtils.cast(q));
        this.postPollCheck();
        return polledMessages;
    }

    @Override
    public int processBatch(Queue<Object> exchanges) {
        int total = exchanges.size();
        if (this.maxMessagesPerPoll > 0 && total > this.maxMessagesPerPoll) {
            this.log.debug("Limiting to maximum messages to poll {} as there was {} messages in this poll.", (Object)this.maxMessagesPerPoll, (Object)total);
            total = this.maxMessagesPerPoll;
        }
        for (int index = 0; index < total && this.isBatchAllowed(); ++index) {
            Exchange exchange = (Exchange)exchanges.poll();
            exchange.setProperty("CamelBatchIndex", index);
            exchange.setProperty("CamelBatchSize", total);
            exchange.setProperty("CamelBatchComplete", index == total - 1);
            this.pendingExchanges = total - index - 1;
            if (this.customProcessor != null) {
                this.customProcessExchange(exchange, this.customProcessor);
                continue;
            }
            this.processExchange(exchange);
        }
        this.removeExcessiveInProgressFiles(CastUtils.cast((Deque)exchanges, Exchange.class), 0);
        return total;
    }

    protected void removeExcessiveInProgressFiles(Deque<Exchange> exchanges, int limit) {
        while (exchanges.size() > limit) {
            Exchange exchange = exchanges.removeLast();
            GenericFile file = exchange.getProperty("CamelFileExchangeFile", GenericFile.class);
            String key = file.getAbsoluteFilePath();
            this.endpoint.getInProgressRepository().remove(key);
        }
    }

    public boolean canPollMoreFiles(List<?> fileList) {
        if (!this.eagerLimitMaxMessagesPerPoll) {
            return true;
        }
        if (this.maxMessagesPerPoll <= 0) {
            return true;
        }
        return fileList.size() < this.maxMessagesPerPoll;
    }

    protected boolean prePollCheck() throws Exception {
        return true;
    }

    protected void postPollCheck() {
    }

    protected abstract boolean pollDirectory(String var1, List<GenericFile<T>> var2, int var3);

    public void setOperations(GenericFileOperations<T> operations) {
        this.operations = operations;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void processExchange(Exchange exchange) {
        String absoluteFileName;
        GenericFile<T> file;
        block10: {
            file = this.getExchangeFileProperty(exchange);
            this.log.trace("Processing file: {}", file);
            absoluteFileName = file.getAbsoluteFilePath();
            try {
                GenericFileProcessStrategy<T> processStrategy = this.endpoint.getGenericFileProcessStrategy();
                boolean begin = processStrategy.begin(this.operations, this.endpoint, exchange, file);
                if (begin) break block10;
                this.log.debug("{} cannot begin processing file: {}", this.endpoint, file);
                try {
                    processStrategy.abort(this.operations, this.endpoint, exchange, file);
                }
                finally {
                    this.endpoint.getInProgressRepository().remove(absoluteFileName);
                }
                return;
            }
            catch (Exception e) {
                this.endpoint.getInProgressRepository().remove(absoluteFileName);
                String msg = this.endpoint + " cannot begin processing file: " + file + " due to: " + e.getMessage();
                this.handleException(msg, e);
                return;
            }
        }
        final GenericFile<T> target = this.getExchangeFileProperty(exchange);
        String name = target.getAbsoluteFilePath();
        try {
            if (this.isRetrieveFile()) {
                this.log.trace("Retrieving file: {} from: {}", (Object)name, this.endpoint);
                boolean retrieved = this.operations.retrieveFile(name, exchange);
                if (!retrieved) {
                    throw new GenericFileOperationFailedException("Cannot retrieve file: " + file + " from: " + this.endpoint);
                }
                this.log.trace("Retrieved file: {} from: {}", (Object)name, this.endpoint);
            } else {
                this.log.trace("Skipped retrieval of file: {} from: {}", (Object)name, this.endpoint);
                exchange.getIn().setBody(null);
            }
            exchange.addOnCompletion(new GenericFileOnCompletion<T>(this.endpoint, this.operations, target, absoluteFileName));
            this.log.debug("About to process file: {} using exchange: {}", target, (Object)exchange);
            this.getAsyncProcessor().process(exchange, new AsyncCallback(){

                @Override
                public void done(boolean doneSync) {
                    if (GenericFileConsumer.this.log.isTraceEnabled()) {
                        GenericFileConsumer.this.log.trace("Done processing file: {} {}", (Object)target, (Object)(doneSync ? "synchronously" : "asynchronously"));
                    }
                }
            });
        }
        catch (Exception e) {
            this.endpoint.getInProgressRepository().remove(absoluteFileName);
            String msg = "Error processing file " + file + " due to " + e.getMessage();
            this.handleException(msg, e);
        }
    }

    protected boolean isRetrieveFile() {
        return true;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void customProcessExchange(Exchange exchange, Processor processor) {
        GenericFile<T> file = this.getExchangeFileProperty(exchange);
        this.log.trace("Custom processing file: {}", file);
        String absoluteFileName = file.getAbsoluteFilePath();
        try {
            processor.process(exchange);
        }
        catch (Exception e) {
            if (this.log.isDebugEnabled()) {
                this.log.debug(this.endpoint + " error custom processing: " + file + " due to: " + e.getMessage() + ". This exception will be ignored.", (Throwable)e);
            }
            this.handleException(e);
        }
        finally {
            this.endpoint.getInProgressRepository().remove(absoluteFileName);
        }
    }

    protected boolean isValidFile(GenericFile<T> file, boolean isDirectory, List<T> files) {
        String absoluteFilePath = file.getAbsoluteFilePath();
        if (!this.isMatched(file, isDirectory, files)) {
            this.log.trace("File did not match. Will skip this file: {}", file);
            return false;
        }
        if (this.endpoint.getInProgressRepository().contains(absoluteFilePath)) {
            if (this.log.isTraceEnabled()) {
                this.log.trace("Skipping as file is already in progress: {}", (Object)file.getFileName());
            }
            return false;
        }
        if (this.endpoint.isIdempotent().booleanValue()) {
            String key = file.getAbsoluteFilePath();
            if (this.endpoint.getIdempotentKey() != null) {
                Exchange dummy = this.endpoint.createExchange(file);
                key = this.endpoint.getIdempotentKey().evaluate(dummy, String.class);
            }
            if (key != null && this.endpoint.getIdempotentRepository().contains(key)) {
                this.log.trace("This consumer is idempotent and the file has been consumed before matching idempotentKey: {}. Will skip this file: {}", (Object)key, file);
                return false;
            }
        }
        if (!isDirectory) {
            return this.endpoint.getInProgressRepository().add(absoluteFilePath);
        }
        return true;
    }

    protected boolean isMatched(GenericFile<T> file, boolean isDirectory, List<T> files) {
        String name = file.getFileNameOnly();
        if (name.startsWith(".")) {
            return false;
        }
        if (name.endsWith(".camelLock")) {
            return false;
        }
        if (this.endpoint.getFilter() != null && !this.endpoint.getFilter().accept(file)) {
            return false;
        }
        if (this.endpoint.getAntFilter() != null && !this.endpoint.getAntFilter().accept(file)) {
            return false;
        }
        if (isDirectory) {
            return true;
        }
        if (ObjectHelper.isNotEmpty(this.endpoint.getExclude()) && name.matches(this.endpoint.getExclude())) {
            return false;
        }
        if (ObjectHelper.isNotEmpty(this.endpoint.getInclude()) && !name.matches(this.endpoint.getInclude())) {
            return false;
        }
        if (this.endpoint.getFileName() != null) {
            this.evaluateFileExpression();
            if (this.fileExpressionResult != null && !name.equals(this.fileExpressionResult)) {
                return false;
            }
        }
        if (this.endpoint.getDoneFileName() != null) {
            String doneFileName = this.endpoint.createDoneFileName(file.getAbsoluteFilePath());
            ObjectHelper.notEmpty(doneFileName, "doneFileName", this.endpoint);
            if (this.endpoint.isDoneFile(file.getFileNameOnly())) {
                this.log.trace("Skipping done file: {}", file);
                return false;
            }
            if (!this.isMatched(file, doneFileName, files)) {
                return false;
            }
        }
        return true;
    }

    protected abstract boolean isMatched(GenericFile<T> var1, String var2, List<T> var3);

    @Deprecated
    protected boolean isInProgress(GenericFile<T> file) {
        String key = file.getAbsoluteFilePath();
        return !this.endpoint.getInProgressRepository().add(key);
    }

    private void evaluateFileExpression() {
        if (this.fileExpressionResult == null) {
            Exchange dummy = this.endpoint.createExchange();
            this.fileExpressionResult = this.endpoint.getFileName().evaluate(dummy, String.class);
        }
    }

    private GenericFile<T> getExchangeFileProperty(Exchange exchange) {
        return (GenericFile)exchange.getProperty("CamelFileExchangeFile");
    }

    @Override
    protected void doStart() throws Exception {
        super.doStart();
    }

    @Override
    protected void doStop() throws Exception {
        this.prepareOnStartup = false;
        super.doStop();
    }
}

