/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.integration.ip.tcp.connection;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
import java.net.SocketTimeoutException;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import org.springframework.integration.Message;
import org.springframework.integration.ip.tcp.connection.AbstractTcpConnection;
import org.springframework.integration.ip.tcp.connection.NoListenerException;
import org.springframework.integration.ip.tcp.connection.TcpMessageMapper;
import org.springframework.integration.ip.tcp.serializer.SoftEndOfStreamException;

/*
 * This class specifies class file version 49.0 but uses Java 6 signatures.  Assumed Java 6.
 */
public class TcpNioConnection
extends AbstractTcpConnection {
    private final SocketChannel socketChannel;
    private volatile OutputStream channelOutputStream;
    private volatile PipedOutputStream pipedOutputStream;
    private volatile PipedInputStream pipedInputStream;
    private volatile boolean usingDirectBuffers;
    private volatile Executor taskExecutor;
    private volatile ByteBuffer rawBuffer;
    private volatile int maxMessageSize = 61440;
    private volatile long lastRead;
    private AtomicInteger executionControl = new AtomicInteger();
    private volatile boolean writingToPipe;

    public TcpNioConnection(SocketChannel socketChannel, boolean server, boolean lookupHost) throws Exception {
        super(socketChannel.socket(), server, lookupHost);
        this.socketChannel = socketChannel;
        this.pipedInputStream = new PipedInputStream();
        this.pipedOutputStream = new PipedOutputStream(this.pipedInputStream);
        this.channelOutputStream = new ChannelOutputStream();
    }

    @Override
    public void close() {
        this.doClose();
    }

    private void doClose() {
        if (this.pipedOutputStream != null) {
            try {
                this.pipedOutputStream.close();
            }
            catch (IOException e) {
                // empty catch block
            }
        }
        try {
            this.socketChannel.close();
        }
        catch (Exception exception) {
            // empty catch block
        }
        super.close();
    }

    @Override
    public boolean isOpen() {
        return this.socketChannel.isOpen();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void send(Message<?> message) throws Exception {
        TcpMessageMapper tcpMessageMapper = this.mapper;
        synchronized (tcpMessageMapper) {
            Object object = this.mapper.fromMessage(message);
            this.serializer.serialize(object, this.channelOutputStream);
            this.afterSend(message);
        }
    }

    @Override
    public Object getPayload() throws Exception {
        return this.deserializer.deserialize((InputStream)this.pipedInputStream);
    }

    @Override
    public int getPort() {
        return this.socketChannel.socket().getPort();
    }

    protected ByteBuffer allocate(int length) {
        ByteBuffer buffer = this.usingDirectBuffers ? ByteBuffer.allocateDirect(length) : ByteBuffer.allocate(length);
        return buffer;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void run() {
        if (this.logger.isTraceEnabled()) {
            this.logger.trace((Object)(this.getConnectionId() + " Nio message assembler running..."));
        }
        try {
            if (this.listener == null && !this.singleUse) {
                this.logger.debug((Object)"TcpListener exiting - no listener and not single use");
                return;
            }
            try {
                if (this.dataAvailable()) {
                    Message<?> message = this.convert();
                    if (this.dataAvailable()) {
                        this.executionControl.incrementAndGet();
                        this.taskExecutor.execute(this);
                    }
                    this.executionControl.decrementAndGet();
                    if (message != null) {
                        this.sendToChannel(message);
                    }
                } else {
                    this.executionControl.decrementAndGet();
                }
            }
            catch (Exception e) {
                if (this.logger.isTraceEnabled()) {
                    this.logger.error((Object)("Read exception " + this.getConnectionId()), (Throwable)e);
                } else {
                    this.logger.error((Object)("Read exception " + this.getConnectionId() + " " + e.getClass().getSimpleName() + ":" + e.getCause() + ":" + e.getMessage()));
                }
                this.closeConnection();
                this.logger.trace((Object)"Nio message assembler exiting...");
                try {
                    if (this.isOpen() && this.dataAvailable()) {
                        this.checkForAssembler();
                    }
                }
                catch (IOException e2) {
                    this.logger.error((Object)"Exception when checking for assembler", (Throwable)e2);
                }
                return;
            }
        }
        finally {
            this.logger.trace((Object)"Nio message assembler exiting...");
            try {
                if (this.isOpen() && this.dataAvailable()) {
                    this.checkForAssembler();
                }
            }
            catch (IOException e) {
                this.logger.error((Object)"Exception when checking for assembler", (Throwable)e);
            }
        }
    }

    private boolean dataAvailable() throws IOException {
        return this.pipedInputStream.available() > 0 || this.writingToPipe;
    }

    private synchronized Message<?> convert() throws Exception {
        if (!this.dataAvailable()) {
            return null;
        }
        Message<Object> message = null;
        try {
            message = this.mapper.toMessage(this);
        }
        catch (Exception e) {
            this.closeConnection();
            if (e instanceof SocketTimeoutException && this.singleUse) {
                if (this.logger.isDebugEnabled()) {
                    this.logger.debug((Object)("Closing single use socket after timeout " + this.connectionId));
                }
            } else if (!(e instanceof SoftEndOfStreamException)) {
                throw e;
            }
            return null;
        }
        return message;
    }

    private void sendToChannel(Message<?> message) {
        boolean intercepted = false;
        try {
            if (message != null) {
                intercepted = this.listener.onMessage(message);
            }
        }
        catch (Exception e) {
            if (e instanceof NoListenerException) {
                if (this.singleUse) {
                    if (this.logger.isDebugEnabled()) {
                        this.logger.debug((Object)("Closing single use channel after inbound message " + this.connectionId));
                    }
                    this.closeConnection();
                }
            }
            this.logger.error((Object)("Exception sending meeeage: " + message), (Throwable)e);
        }
        if (this.singleUse && (!this.server && !intercepted || this.server && this.sender == null)) {
            this.logger.debug((Object)("Closing single use cbannel after inbound message " + this.connectionId));
            this.closeConnection();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void doRead() throws Exception {
        if (this.rawBuffer == null) {
            this.rawBuffer = this.allocate(this.maxMessageSize);
        }
        this.writingToPipe = true;
        try {
            if (this.taskExecutor == null) {
                this.taskExecutor = Executors.newSingleThreadExecutor();
            }
            this.checkForAssembler();
            this.rawBuffer.clear();
            int len = this.socketChannel.read(this.rawBuffer);
            if (len < 0) {
                this.writingToPipe = false;
                this.closeConnection();
            }
            this.rawBuffer.flip();
            if (this.logger.isDebugEnabled()) {
                this.logger.debug((Object)("Read " + this.rawBuffer.limit() + " into raw buffer"));
            }
            this.pipedOutputStream.write(this.rawBuffer.array(), 0, this.rawBuffer.limit());
            this.pipedOutputStream.flush();
        }
        finally {
            this.writingToPipe = false;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void checkForAssembler() {
        AtomicInteger atomicInteger = this.executionControl;
        synchronized (atomicInteger) {
            if (this.executionControl.incrementAndGet() <= 1) {
                this.executionControl.set(1);
                this.taskExecutor.execute(this);
            } else {
                this.executionControl.decrementAndGet();
            }
        }
    }

    public void readPacket() {
        if (this.logger.isDebugEnabled()) {
            this.logger.debug((Object)(this.getConnectionId() + " Reading..."));
        }
        try {
            this.doRead();
        }
        catch (ClosedChannelException cce) {
            this.closeConnection();
        }
        catch (Exception e) {
            this.logger.error((Object)("Exception on Read " + this.getConnectionId() + " " + e.getMessage()));
            this.closeConnection();
        }
    }

    void timeout() {
        this.closeConnection();
    }

    public void setTaskExecutor(Executor taskExecutor) {
        this.taskExecutor = taskExecutor;
    }

    public void setUsingDirectBuffers(boolean usingDirectBuffers) {
        this.usingDirectBuffers = usingDirectBuffers;
    }

    public long getLastRead() {
        return this.lastRead;
    }

    public void setLastRead(long lastRead) {
        this.lastRead = lastRead;
    }

    class ChannelOutputStream
    extends OutputStream {
        private Selector selector;
        private int soTimeout;

        ChannelOutputStream() {
        }

        public void write(int b) throws IOException {
            byte[] bytes = new byte[]{(byte)b};
            ByteBuffer buffer = ByteBuffer.wrap(bytes);
            this.doWrite(buffer);
        }

        public void close() throws IOException {
            TcpNioConnection.this.doClose();
        }

        public void flush() throws IOException {
        }

        public void write(byte[] b, int off, int len) throws IOException {
            ByteBuffer buffer = ByteBuffer.wrap(b, off, len);
            this.doWrite(buffer);
        }

        public void write(byte[] b) throws IOException {
            ByteBuffer buffer = ByteBuffer.wrap(b);
            this.doWrite(buffer);
        }

        private synchronized void doWrite(ByteBuffer buffer) throws IOException {
            TcpNioConnection.this.socketChannel.write(buffer);
            int remaining = buffer.remaining();
            if (remaining == 0) {
                return;
            }
            if (this.selector == null) {
                this.selector = Selector.open();
                this.soTimeout = TcpNioConnection.this.socketChannel.socket().getSoTimeout();
            }
            TcpNioConnection.this.socketChannel.register(this.selector, 4);
            while (remaining > 0) {
                int selectionCount = this.selector.select(this.soTimeout);
                if (selectionCount == 0) {
                    throw new SocketTimeoutException("Timeout on write");
                }
                this.selector.selectedKeys().clear();
                TcpNioConnection.this.socketChannel.write(buffer);
                remaining = buffer.remaining();
            }
        }
    }
}

