/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.processor.util.listen.dispatcher;

import java.io.Closeable;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.net.SocketOption;
import java.net.StandardSocketOptions;
import java.nio.ByteBuffer;
import java.nio.channels.DatagramChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import org.apache.commons.io.IOUtils;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.util.listen.dispatcher.ChannelDispatcher;
import org.apache.nifi.processor.util.listen.event.Event;
import org.apache.nifi.processor.util.listen.event.EventFactory;
import org.apache.nifi.processor.util.listen.event.EventFactoryUtil;
import org.apache.nifi.processor.util.listen.event.EventQueue;

public class DatagramChannelDispatcher<E extends Event<DatagramChannel>>
implements ChannelDispatcher {
    private final EventFactory<E> eventFactory;
    private final BlockingQueue<ByteBuffer> bufferPool;
    private final EventQueue<E> events;
    private final ComponentLog logger;
    private final String sendingHost;
    private final Integer sendingPort;
    private Selector selector;
    private DatagramChannel datagramChannel;
    private volatile boolean stopped = false;

    public DatagramChannelDispatcher(EventFactory<E> eventFactory, BlockingQueue<ByteBuffer> bufferPool, BlockingQueue<E> events, ComponentLog logger) {
        this(eventFactory, bufferPool, events, logger, null, null);
    }

    public DatagramChannelDispatcher(EventFactory<E> eventFactory, BlockingQueue<ByteBuffer> bufferPool, BlockingQueue<E> events, ComponentLog logger, String sendingHost, Integer sendingPort) {
        this.eventFactory = eventFactory;
        this.bufferPool = bufferPool;
        this.logger = logger;
        this.sendingHost = sendingHost;
        this.sendingPort = sendingPort;
        this.events = new EventQueue<E>(events, logger);
        if (bufferPool == null || bufferPool.size() == 0) {
            throw new IllegalArgumentException("A pool of available ByteBuffers is required");
        }
    }

    @Override
    public void open(InetAddress nicAddress, int port, int maxBufferSize) throws IOException {
        this.stopped = false;
        this.datagramChannel = DatagramChannel.open();
        this.datagramChannel.configureBlocking(false);
        if (maxBufferSize > 0) {
            this.datagramChannel.setOption((SocketOption)StandardSocketOptions.SO_RCVBUF, (Object)maxBufferSize);
            int actualReceiveBufSize = this.datagramChannel.getOption(StandardSocketOptions.SO_RCVBUF);
            if (actualReceiveBufSize < maxBufferSize) {
                this.logger.warn("Attempted to set Socket Buffer Size to " + maxBufferSize + " bytes but could only set to " + actualReceiveBufSize + "bytes. You may want to consider changing the Operating System's maximum receive buffer");
            }
        }
        this.datagramChannel.setOption((SocketOption)StandardSocketOptions.SO_REUSEADDR, (Object)true);
        this.datagramChannel.socket().bind(new InetSocketAddress(nicAddress, port));
        if (this.sendingHost != null && this.sendingPort != null) {
            this.datagramChannel.connect(new InetSocketAddress(this.sendingHost, (int)this.sendingPort));
        }
        this.selector = Selector.open();
        this.datagramChannel.register(this.selector, 1);
    }

    @Override
    public void run() {
        ByteBuffer buffer = (ByteBuffer)this.bufferPool.poll();
        while (!this.stopped) {
            try {
                int selected = this.selector.select();
                if (selected <= 0 || this.stopped) continue;
                Iterator<SelectionKey> selectorKeys = this.selector.selectedKeys().iterator();
                while (selectorKeys.hasNext() && !this.stopped) {
                    SocketAddress socketAddress;
                    SelectionKey key = selectorKeys.next();
                    selectorKeys.remove();
                    if (!key.isValid()) continue;
                    DatagramChannel channel = (DatagramChannel)key.channel();
                    buffer.clear();
                    while (!this.stopped && (socketAddress = channel.receive(buffer)) != null) {
                        String sender = "";
                        if (socketAddress instanceof InetSocketAddress) {
                            sender = ((InetSocketAddress)socketAddress).getAddress().toString();
                        }
                        buffer.flip();
                        byte[] bytes = new byte[buffer.limit()];
                        buffer.get(bytes, 0, buffer.limit());
                        Map<String, String> metadata = EventFactoryUtil.createMapWithSender(sender);
                        E event = this.eventFactory.create(bytes, metadata, null);
                        this.events.offer(event);
                        buffer.clear();
                    }
                }
            }
            catch (InterruptedException e) {
                this.stopped = true;
                Thread.currentThread().interrupt();
            }
            catch (IOException e) {
                this.logger.error("Error reading from DatagramChannel", (Throwable)e);
            }
        }
        if (buffer != null) {
            try {
                this.bufferPool.put(buffer);
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    }

    @Override
    public int getPort() {
        return this.datagramChannel == null ? 0 : this.datagramChannel.socket().getLocalPort();
    }

    @Override
    public void close() {
        this.stopped = true;
        if (this.selector != null) {
            this.selector.wakeup();
        }
        IOUtils.closeQuietly((Selector)this.selector);
        IOUtils.closeQuietly((Closeable)this.datagramChannel);
    }
}

