/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.processor.util.put;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import javax.net.ssl.SSLContext;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.AbstractSessionFactoryProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processor.util.put.sender.ChannelSender;
import org.apache.nifi.processor.util.put.sender.DatagramChannelSender;
import org.apache.nifi.processor.util.put.sender.SSLSocketChannelSender;
import org.apache.nifi.processor.util.put.sender.SocketChannelSender;
import org.apache.nifi.ssl.SSLContextService;

public abstract class AbstractPutEventProcessor
extends AbstractSessionFactoryProcessor {
    public static final PropertyDescriptor HOSTNAME = new PropertyDescriptor.Builder().name("Hostname").description("The ip address or hostname of the destination.").addValidator(StandardValidators.NON_EMPTY_VALIDATOR).defaultValue("localhost").required(true).expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY).build();
    public static final PropertyDescriptor PORT = new PropertyDescriptor.Builder().name("Port").description("The port on the destination.").required(true).addValidator(StandardValidators.PORT_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY).build();
    public static final PropertyDescriptor MAX_SOCKET_SEND_BUFFER_SIZE = new PropertyDescriptor.Builder().name("Max Size of Socket Send Buffer").description("The maximum size of the socket send buffer that should be used. This is a suggestion to the Operating System to indicate how big the socket buffer should be. If this value is set too low, the buffer may fill up before the data can be read, and incoming data will be dropped.").addValidator(StandardValidators.DATA_SIZE_VALIDATOR).defaultValue("1 MB").required(true).build();
    public static final PropertyDescriptor IDLE_EXPIRATION = new PropertyDescriptor.Builder().name("Idle Connection Expiration").description("The amount of time a connection should be held open without being used before closing the connection.").required(true).defaultValue("5 seconds").addValidator(StandardValidators.TIME_PERIOD_VALIDATOR).build();
    public static final AllowableValue TCP_VALUE = new AllowableValue("TCP", "TCP");
    public static final AllowableValue UDP_VALUE = new AllowableValue("UDP", "UDP");
    public static final PropertyDescriptor PROTOCOL = new PropertyDescriptor.Builder().name("Protocol").description("The protocol for communication.").required(true).allowableValues(new AllowableValue[]{TCP_VALUE, UDP_VALUE}).defaultValue(TCP_VALUE.getValue()).build();
    public static final PropertyDescriptor MESSAGE_DELIMITER = new PropertyDescriptor.Builder().name("Message Delimiter").description("Specifies the delimiter to use for splitting apart multiple messages within a single FlowFile. If not specified, the entire content of the FlowFile will be used as a single message. If specified, the contents of the FlowFile will be split on this delimiter and each section sent as a separate message. Note that if messages are delimited and some messages for a given FlowFile are transferred successfully while others are not, the messages will be split into individual FlowFiles, such that those messages that were successfully sent are routed to the 'success' relationship while other messages are sent to the 'failure' relationship.").required(false).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).build();
    public static final PropertyDescriptor CHARSET = new PropertyDescriptor.Builder().name("Character Set").description("Specifies the character set of the data being sent.").required(true).defaultValue("UTF-8").addValidator(StandardValidators.CHARACTER_SET_VALIDATOR).build();
    public static final PropertyDescriptor TIMEOUT = new PropertyDescriptor.Builder().name("Timeout").description("The timeout for connecting to and communicating with the destination. Does not apply to UDP").required(false).defaultValue("10 seconds").addValidator(StandardValidators.TIME_PERIOD_VALIDATOR).build();
    public static final PropertyDescriptor OUTGOING_MESSAGE_DELIMITER = new PropertyDescriptor.Builder().name("Outgoing Message Delimiter").description("Specifies the delimiter to use when sending messages out over the same TCP stream. The delimiter is appended to each FlowFile message that is transmitted over the stream so that the receiver can determine when one message ends and the next message begins. Users should ensure that the FlowFile content does not contain the delimiter character to avoid errors. In order to use a new line character you can enter '\\n'. For a tab character use '\\t'. Finally for a carriage return use '\\r'.").required(false).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).build();
    public static final PropertyDescriptor CONNECTION_PER_FLOWFILE = new PropertyDescriptor.Builder().name("Connection Per FlowFile").description("Specifies whether to send each FlowFile's content on an individual connection.").required(true).defaultValue("false").allowableValues(new String[]{"true", "false"}).build();
    public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder().name("SSL Context Service").description("The Controller Service to use in order to obtain an SSL Context. If this property is set, messages will be sent over a secure connection.").required(false).identifiesControllerService(SSLContextService.class).build();
    public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("FlowFiles that are sent successfully to the destination are sent out this relationship.").build();
    public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description("FlowFiles that failed to send to the destination are sent out this relationship.").build();
    private Set<Relationship> relationships;
    private List<PropertyDescriptor> descriptors;
    protected volatile String transitUri;
    protected volatile BlockingQueue<ChannelSender> senderPool;
    protected final BlockingQueue<FlowFileMessageBatch> completeBatches = new LinkedBlockingQueue<FlowFileMessageBatch>();
    protected final Set<FlowFileMessageBatch> activeBatches = Collections.synchronizedSet(new HashSet());

    protected void init(ProcessorInitializationContext context) {
        ArrayList<PropertyDescriptor> descriptors = new ArrayList<PropertyDescriptor>();
        descriptors.add(HOSTNAME);
        descriptors.add(PORT);
        descriptors.add(MAX_SOCKET_SEND_BUFFER_SIZE);
        descriptors.add(IDLE_EXPIRATION);
        descriptors.addAll(this.getAdditionalProperties());
        this.descriptors = Collections.unmodifiableList(descriptors);
        HashSet<Relationship> relationships = new HashSet<Relationship>();
        relationships.add(REL_SUCCESS);
        relationships.add(REL_FAILURE);
        relationships.addAll(this.getAdditionalRelationships());
        this.relationships = Collections.unmodifiableSet(relationships);
    }

    protected List<Relationship> getAdditionalRelationships() {
        return Collections.EMPTY_LIST;
    }

    protected List<PropertyDescriptor> getAdditionalProperties() {
        return Collections.EMPTY_LIST;
    }

    public final Set<Relationship> getRelationships() {
        return this.relationships;
    }

    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return this.descriptors;
    }

    @OnScheduled
    public void onScheduled(ProcessContext context) throws IOException {
        this.senderPool = new LinkedBlockingQueue<ChannelSender>(context.getMaxConcurrentTasks());
        this.transitUri = this.createTransitUri(context);
    }

    @OnStopped
    public void closeSenders() {
        if (this.senderPool != null) {
            ChannelSender sender = (ChannelSender)this.senderPool.poll();
            while (sender != null) {
                sender.close();
                sender = (ChannelSender)this.senderPool.poll();
            }
        }
    }

    protected abstract String createTransitUri(ProcessContext var1);

    protected abstract ChannelSender createSender(ProcessContext var1) throws IOException;

    protected PruneResult pruneIdleSenders(long idleThreshold) {
        ChannelSender sender;
        int numClosed = 0;
        int numConsidered = 0;
        long currentTime = System.currentTimeMillis();
        ArrayList<ChannelSender> putBack = new ArrayList<ChannelSender>();
        while ((sender = (ChannelSender)this.senderPool.poll()) != null) {
            ++numConsidered;
            if (currentTime > sender.getLastUsed() + idleThreshold) {
                this.getLogger().debug("Closing idle connection...");
                sender.close();
                ++numClosed;
                continue;
            }
            putBack.add(sender);
        }
        for (ChannelSender putBackSender : putBack) {
            boolean returned = this.senderPool.offer(putBackSender);
            if (returned) continue;
            putBackSender.close();
        }
        return new PruneResult(numClosed, numConsidered);
    }

    protected ChannelSender createSender(String protocol, String host, int port, int timeout, int maxSendBufferSize, SSLContext sslContext) throws IOException {
        ChannelSender sender = protocol.equals(UDP_VALUE.getValue()) ? new DatagramChannelSender(host, port, maxSendBufferSize, this.getLogger()) : (sslContext != null ? new SSLSocketChannelSender(host, port, maxSendBufferSize, sslContext, this.getLogger()) : new SocketChannelSender(host, port, maxSendBufferSize, this.getLogger()));
        sender.setTimeout(timeout);
        sender.open();
        return sender;
    }

    protected ChannelSender acquireSender(ProcessContext context, ProcessSession session, FlowFile flowFile) {
        ChannelSender sender = (ChannelSender)this.senderPool.poll();
        if (sender == null) {
            try {
                this.getLogger().debug("No available connections, creating a new one...");
                sender = this.createSender(context);
            }
            catch (IOException e) {
                this.getLogger().error("No available connections, and unable to create a new one, transferring {} to failure", new Object[]{flowFile}, (Throwable)e);
                session.transfer(flowFile, REL_FAILURE);
                session.commit();
                context.yield();
                sender = null;
            }
        }
        return sender;
    }

    protected void relinquishSender(ChannelSender sender) {
        if (sender != null) {
            if (sender.isConnected()) {
                boolean returned = this.senderPool.offer(sender);
                if (!returned) {
                    this.getLogger().debug("Sender wasn't returned because queue was full, closing sender");
                    sender.close();
                }
            } else {
                this.getLogger().debug("Sender is not connected, closing sender");
                sender.close();
            }
        }
    }

    protected String getOutgoingMessageDelimiter(ProcessContext context, FlowFile flowFile) {
        String delimiter = context.getProperty(OUTGOING_MESSAGE_DELIMITER).evaluateAttributeExpressions(flowFile).getValue();
        if (delimiter != null) {
            delimiter = delimiter.replace("\\n", "\n").replace("\\r", "\r").replace("\\t", "\t");
        }
        return delimiter;
    }

    protected class FlowFileMessageBatch {
        private final ProcessSession session;
        private final FlowFile flowFile;
        private final long startTime = System.nanoTime();
        private final List<Range> successfulRanges = new ArrayList<Range>();
        private final List<Range> failedRanges = new ArrayList<Range>();
        private Exception lastFailureReason;
        private long numMessages = -1L;
        private long completeTime = 0L;
        private boolean canceled = false;

        public FlowFileMessageBatch(ProcessSession session, FlowFile flowFile) {
            this.session = session;
            this.flowFile = flowFile;
        }

        public synchronized void cancelOrComplete() {
            if (this.isComplete()) {
                this.completeSession();
                return;
            }
            this.canceled = true;
            this.session.rollback();
            this.successfulRanges.clear();
            this.failedRanges.clear();
        }

        public synchronized void addSuccessfulRange(long start, long end) {
            if (this.canceled) {
                return;
            }
            this.successfulRanges.add(new Range(start, end));
            if (this.isComplete()) {
                AbstractPutEventProcessor.this.activeBatches.remove(this);
                AbstractPutEventProcessor.this.completeBatches.add(this);
                this.completeTime = System.nanoTime();
            }
        }

        public synchronized void addFailedRange(long start, long end, Exception e) {
            if (this.canceled) {
                return;
            }
            this.failedRanges.add(new Range(start, end));
            this.lastFailureReason = e;
            if (this.isComplete()) {
                AbstractPutEventProcessor.this.activeBatches.remove(this);
                AbstractPutEventProcessor.this.completeBatches.add(this);
                this.completeTime = System.nanoTime();
            }
        }

        private boolean isComplete() {
            return !this.canceled && this.numMessages > -1L && (long)(this.successfulRanges.size() + this.failedRanges.size()) >= this.numMessages;
        }

        public synchronized void setNumMessages(long msgCount) {
            this.numMessages = msgCount;
            if (this.isComplete()) {
                AbstractPutEventProcessor.this.activeBatches.remove(this);
                AbstractPutEventProcessor.this.completeBatches.add(this);
                this.completeTime = System.nanoTime();
            }
        }

        private void transferRanges(List<Range> ranges, Relationship relationship) {
            Collections.sort(ranges, new Comparator<Range>(){

                @Override
                public int compare(Range o1, Range o2) {
                    return Long.compare(o1.getStart(), o2.getStart());
                }
            });
            for (int i = 0; i < ranges.size(); ++i) {
                Range nextRange;
                Range range = ranges.get(i);
                int count = 1;
                while (i + 1 < ranges.size() && (nextRange = ranges.get(i + 1)).getStart() == range.getEnd()) {
                    range = new Range(range.getStart(), nextRange.getEnd());
                    ++count;
                    ++i;
                }
                FlowFile child = this.session.clone(this.flowFile, range.getStart(), range.getEnd() - range.getStart());
                if (relationship == REL_SUCCESS) {
                    this.session.getProvenanceReporter().send(child, AbstractPutEventProcessor.this.transitUri, "Sent " + count + " messages");
                    this.session.transfer(child, relationship);
                    continue;
                }
                child = this.session.penalize(child);
                this.session.transfer(child, relationship);
            }
        }

        public synchronized void completeSession() {
            if (this.canceled) {
                return;
            }
            if (this.successfulRanges.isEmpty() && this.failedRanges.isEmpty()) {
                AbstractPutEventProcessor.this.getLogger().info("Completed processing {} but sent 0 FlowFiles", new Object[]{this.flowFile});
                this.session.transfer(this.flowFile, REL_SUCCESS);
                this.session.commit();
                return;
            }
            if (this.successfulRanges.isEmpty()) {
                AbstractPutEventProcessor.this.getLogger().error("Failed to send {}; routing to 'failure'; last failure reason reported was {};", new Object[]{this.flowFile, this.lastFailureReason});
                FlowFile penalizedFlowFile = this.session.penalize(this.flowFile);
                this.session.transfer(penalizedFlowFile, REL_FAILURE);
                this.session.commit();
                return;
            }
            if (this.failedRanges.isEmpty()) {
                long transferMillis = TimeUnit.NANOSECONDS.toMillis(this.completeTime - this.startTime);
                this.session.getProvenanceReporter().send(this.flowFile, AbstractPutEventProcessor.this.transitUri, "Sent " + this.successfulRanges.size() + " messages;", transferMillis);
                this.session.transfer(this.flowFile, REL_SUCCESS);
                AbstractPutEventProcessor.this.getLogger().info("Successfully sent {} messages for {} in {} millis", new Object[]{this.successfulRanges.size(), this.flowFile, transferMillis});
                this.session.commit();
                return;
            }
            this.transferRanges(this.successfulRanges, REL_SUCCESS);
            this.transferRanges(this.failedRanges, REL_FAILURE);
            this.session.remove(this.flowFile);
            AbstractPutEventProcessor.this.getLogger().error("Successfully sent {} messages, but failed to send {} messages; the last error received was {}", new Object[]{this.successfulRanges.size(), this.failedRanges.size(), this.lastFailureReason});
            this.session.commit();
        }
    }

    protected static class Range {
        private final long start;
        private final long end;

        public Range(long start, long end) {
            this.start = start;
            this.end = end;
        }

        public long getStart() {
            return this.start;
        }

        public long getEnd() {
            return this.end;
        }

        public String toString() {
            return "Range[" + this.start + "-" + this.end + "]";
        }
    }

    protected static class PruneResult {
        private final int numClosed;
        private final int numConsidered;

        public PruneResult(int numClosed, int numConsidered) {
            this.numClosed = numClosed;
            this.numConsidered = numConsidered;
        }

        public int getNumClosed() {
            return this.numClosed;
        }

        public int getNumConsidered() {
            return this.numConsidered;
        }
    }
}

