/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.integration.ip.tcp.connection;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
import java.net.SocketTimeoutException;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.springframework.integration.Message;
import org.springframework.integration.MessagingException;
import org.springframework.integration.ip.tcp.connection.AbstractTcpConnection;
import org.springframework.integration.ip.tcp.connection.NoListenerException;
import org.springframework.integration.ip.tcp.connection.TcpMessageMapper;
import org.springframework.integration.ip.tcp.serializer.SoftEndOfStreamException;
import org.springframework.util.Assert;

/*
 * This class specifies class file version 49.0 but uses Java 6 signatures.  Assumed Java 6.
 */
public class TcpNioConnection
extends AbstractTcpConnection {
    private static final long DEFAULT_PIPE_TIMEOUT = 60000L;
    private final SocketChannel socketChannel;
    private final ChannelOutputStream channelOutputStream;
    private volatile PipedOutputStream pipedOutputStream;
    private volatile PipedInputStream pipedInputStream;
    private volatile boolean usingDirectBuffers;
    private volatile Executor taskExecutor;
    private volatile ByteBuffer rawBuffer;
    private volatile int maxMessageSize = 61440;
    private volatile long lastRead;
    private AtomicInteger executionControl = new AtomicInteger();
    private volatile boolean writingToPipe;
    private volatile long pipeTimeout = 60000L;

    public TcpNioConnection(SocketChannel socketChannel, boolean server, boolean lookupHost) throws Exception {
        super(socketChannel.socket(), server, lookupHost);
        this.socketChannel = socketChannel;
        int receiveBufferSize = socketChannel.socket().getReceiveBufferSize();
        if (receiveBufferSize <= 0) {
            receiveBufferSize = this.maxMessageSize;
        }
        this.pipedInputStream = new PipedInputStream(receiveBufferSize);
        this.pipedOutputStream = new PipedOutputStream(this.pipedInputStream);
        this.channelOutputStream = new ChannelOutputStream();
    }

    public void setPipeTimeout(long pipeTimeout) {
        this.pipeTimeout = pipeTimeout;
    }

    @Override
    public void close() {
        this.doClose();
    }

    private void doClose() {
        if (this.pipedOutputStream != null) {
            try {
                this.pipedOutputStream.close();
            }
            catch (IOException e) {
                // empty catch block
            }
        }
        try {
            this.socketChannel.close();
        }
        catch (Exception exception) {
            // empty catch block
        }
        super.close();
    }

    @Override
    public boolean isOpen() {
        return this.socketChannel.isOpen();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void send(Message<?> message) throws Exception {
        TcpMessageMapper tcpMessageMapper = this.getMapper();
        synchronized (tcpMessageMapper) {
            Object object = this.getMapper().fromMessage(message);
            this.getSerializer().serialize(object, (OutputStream)this.getChannelOutputStream());
            this.afterSend(message);
        }
    }

    @Override
    public Object getPayload() throws Exception {
        return this.getDeserializer().deserialize((InputStream)this.pipedInputStream);
    }

    @Override
    public int getPort() {
        return this.socketChannel.socket().getPort();
    }

    protected ByteBuffer allocate(int length) {
        ByteBuffer buffer = this.usingDirectBuffers ? ByteBuffer.allocateDirect(length) : ByteBuffer.allocate(length);
        return buffer;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void run() {
        if (this.logger.isTraceEnabled()) {
            this.logger.trace((Object)(this.getConnectionId() + " Nio message assembler running..."));
        }
        try {
            if (this.getListener() == null && !this.isSingleUse()) {
                this.logger.debug((Object)"TcpListener exiting - no listener and not single use");
                return;
            }
            try {
                if (this.dataAvailable()) {
                    Message<?> message = this.convert();
                    if (this.dataAvailable()) {
                        this.executionControl.incrementAndGet();
                        this.taskExecutor.execute(this);
                    }
                    this.executionControl.decrementAndGet();
                    if (message != null) {
                        this.sendToChannel(message);
                    }
                } else {
                    this.executionControl.decrementAndGet();
                }
            }
            catch (Exception e) {
                if (this.logger.isTraceEnabled()) {
                    this.logger.error((Object)("Read exception " + this.getConnectionId()), (Throwable)e);
                } else {
                    this.logger.error((Object)("Read exception " + this.getConnectionId() + " " + e.getClass().getSimpleName() + ":" + e.getCause() + ":" + e.getMessage()));
                }
                this.closeConnection();
                this.logger.trace((Object)"Nio message assembler exiting...");
                try {
                    if (this.isOpen() && this.dataAvailable()) {
                        this.checkForAssembler();
                    }
                }
                catch (IOException e2) {
                    this.logger.error((Object)"Exception when checking for assembler", (Throwable)e2);
                }
                return;
            }
        }
        finally {
            this.logger.trace((Object)"Nio message assembler exiting...");
            try {
                if (this.isOpen() && this.dataAvailable()) {
                    this.checkForAssembler();
                }
            }
            catch (IOException e) {
                this.logger.error((Object)"Exception when checking for assembler", (Throwable)e);
            }
        }
    }

    private boolean dataAvailable() throws IOException {
        return this.pipedInputStream.available() > 0 || this.writingToPipe;
    }

    private synchronized Message<?> convert() throws Exception {
        if (!this.dataAvailable()) {
            return null;
        }
        Message<Object> message = null;
        try {
            message = this.getMapper().toMessage(this);
        }
        catch (Exception e) {
            this.closeConnection();
            if (e instanceof SocketTimeoutException && this.isSingleUse()) {
                if (this.logger.isDebugEnabled()) {
                    this.logger.debug((Object)("Closing single use socket after timeout " + this.getConnectionId()));
                }
            } else if (!(e instanceof SoftEndOfStreamException)) {
                throw e;
            }
            return null;
        }
        return message;
    }

    private void sendToChannel(Message<?> message) {
        boolean intercepted = false;
        try {
            if (message != null) {
                intercepted = this.getListener().onMessage(message);
            }
        }
        catch (Exception e) {
            if (e instanceof NoListenerException) {
                if (this.isSingleUse()) {
                    if (this.logger.isDebugEnabled()) {
                        this.logger.debug((Object)("Closing single use channel after inbound message " + this.getConnectionId()));
                    }
                    this.closeConnection();
                }
            }
            this.logger.error((Object)("Exception sending meeeage: " + message), (Throwable)e);
        }
        if (this.isSingleUse() && (!this.isServer() && !intercepted || this.isServer() && this.getSender() == null)) {
            if (this.logger.isDebugEnabled()) {
                this.logger.debug((Object)("Closing single use cbannel after inbound message " + this.getConnectionId()));
            }
            this.closeConnection();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void doRead() throws Exception {
        if (this.rawBuffer == null) {
            this.rawBuffer = this.allocate(this.maxMessageSize);
        }
        this.writingToPipe = true;
        try {
            int len;
            if (this.taskExecutor == null) {
                this.taskExecutor = Executors.newCachedThreadPool();
            }
            this.checkForAssembler();
            if (this.logger.isTraceEnabled()) {
                this.logger.trace((Object)("Before read:" + this.rawBuffer.position() + "/" + this.rawBuffer.limit()));
            }
            if ((len = this.socketChannel.read(this.rawBuffer)) < 0) {
                this.writingToPipe = false;
                this.closeConnection();
            }
            if (this.logger.isTraceEnabled()) {
                this.logger.trace((Object)("After read:" + this.rawBuffer.position() + "/" + this.rawBuffer.limit()));
            }
            this.rawBuffer.flip();
            if (this.logger.isTraceEnabled()) {
                this.logger.trace((Object)("After flip:" + this.rawBuffer.position() + "/" + this.rawBuffer.limit()));
            }
            if (this.logger.isDebugEnabled()) {
                this.logger.debug((Object)("Read " + this.rawBuffer.limit() + " into raw buffer"));
            }
            final CountDownLatch latch = new CountDownLatch(1);
            this.taskExecutor.execute(new Runnable(){

                public void run() {
                    try {
                        TcpNioConnection.this.sendToPipe(TcpNioConnection.this.rawBuffer);
                        latch.countDown();
                    }
                    catch (Exception e) {
                        TcpNioConnection.this.logger.error((Object)(TcpNioConnection.this.getConnectionId() + " Failed to write to pipe"), (Throwable)e);
                    }
                }
            });
            if (!latch.await(this.pipeTimeout, TimeUnit.MILLISECONDS)) {
                throw new MessagingException("Timed out writing to pipe, probably due to insufficient threads in a fixed thread pool; consider increasing this task executor pool size");
            }
        }
        finally {
            this.writingToPipe = false;
        }
    }

    protected void sendToPipe(ByteBuffer rawBuffer) throws IOException {
        Assert.notNull((Object)rawBuffer, (String)"rawBuffer cannot be null");
        if (this.logger.isTraceEnabled()) {
            this.logger.trace((Object)("Sending " + rawBuffer.limit() + " to pipe"));
        }
        this.pipedOutputStream.write(rawBuffer.array(), 0, rawBuffer.limit());
        this.pipedOutputStream.flush();
        rawBuffer.clear();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void checkForAssembler() {
        AtomicInteger atomicInteger = this.executionControl;
        synchronized (atomicInteger) {
            if (this.executionControl.incrementAndGet() <= 1) {
                this.executionControl.set(1);
                this.taskExecutor.execute(this);
            } else {
                this.executionControl.decrementAndGet();
            }
        }
    }

    public void readPacket() {
        if (this.logger.isDebugEnabled()) {
            this.logger.debug((Object)(this.getConnectionId() + " Reading..."));
        }
        try {
            this.doRead();
        }
        catch (ClosedChannelException cce) {
            if (this.logger.isDebugEnabled()) {
                this.logger.debug((Object)(this.getConnectionId() + " Channel is closed"));
            }
            this.closeConnection();
        }
        catch (Exception e) {
            this.logger.error((Object)("Exception on Read " + this.getConnectionId() + " " + e.getMessage()), (Throwable)e);
            this.closeConnection();
        }
    }

    void timeout() {
        this.closeConnection();
    }

    public void setTaskExecutor(Executor taskExecutor) {
        this.taskExecutor = taskExecutor;
    }

    public void setUsingDirectBuffers(boolean usingDirectBuffers) {
        this.usingDirectBuffers = usingDirectBuffers;
    }

    protected boolean isUsingDirectBuffers() {
        return this.usingDirectBuffers;
    }

    protected ChannelOutputStream getChannelOutputStream() {
        return this.channelOutputStream;
    }

    public long getLastRead() {
        return this.lastRead;
    }

    public void setLastRead(long lastRead) {
        this.lastRead = lastRead;
    }

    class ChannelOutputStream
    extends OutputStream {
        private Selector selector;
        private int soTimeout;

        ChannelOutputStream() {
        }

        public void write(int b) throws IOException {
            byte[] bytes = new byte[]{(byte)b};
            ByteBuffer buffer = ByteBuffer.wrap(bytes);
            this.doWrite(buffer);
        }

        public void close() throws IOException {
            TcpNioConnection.this.doClose();
        }

        public void flush() throws IOException {
        }

        public void write(byte[] b, int off, int len) throws IOException {
            ByteBuffer buffer = ByteBuffer.wrap(b, off, len);
            this.doWrite(buffer);
        }

        public void write(byte[] b) throws IOException {
            ByteBuffer buffer = ByteBuffer.wrap(b);
            this.doWrite(buffer);
        }

        protected synchronized void doWrite(ByteBuffer buffer) throws IOException {
            if (TcpNioConnection.this.logger.isDebugEnabled()) {
                TcpNioConnection.this.logger.debug((Object)(TcpNioConnection.this.getConnectionId() + " writing " + buffer.remaining()));
            }
            TcpNioConnection.this.socketChannel.write(buffer);
            int remaining = buffer.remaining();
            if (remaining == 0) {
                return;
            }
            if (this.selector == null) {
                this.selector = Selector.open();
                this.soTimeout = TcpNioConnection.this.socketChannel.socket().getSoTimeout();
            }
            TcpNioConnection.this.socketChannel.register(this.selector, 4);
            while (remaining > 0) {
                int selectionCount = this.selector.select(this.soTimeout);
                if (selectionCount == 0) {
                    throw new SocketTimeoutException("Timeout on write");
                }
                this.selector.selectedKeys().clear();
                TcpNioConnection.this.socketChannel.write(buffer);
                remaining = buffer.remaining();
            }
        }
    }
}

