/*
 * Decompiled with CFR 0.152.
 */
package org.jgroups.blocks;

import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.EOFException;
import java.io.IOException;
import java.net.InetAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketException;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Vector;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.jgroups.Address;
import org.jgroups.Version;
import org.jgroups.stack.IpAddress;
import org.jgroups.util.Queue;
import org.jgroups.util.QueueClosedException;
import org.jgroups.util.Util;

public abstract class BasicConnectionTable {
    final HashMap conns = new HashMap();
    Receiver receiver = null;
    boolean use_send_queues = true;
    InetAddress bind_addr = null;
    Address local_addr = null;
    int srv_port = 7800;
    int recv_buf_size = 120000;
    int send_buf_size = 60000;
    final Vector conn_listeners = new Vector();
    final Object recv_mutex = new Object();
    Reaper reaper = null;
    long reaper_interval = 60000L;
    long conn_expire_time = 300000L;
    int sock_conn_timeout = 1000;
    ThreadGroup thread_group = null;
    protected final Log log = LogFactory.getLog(this.getClass());
    final byte[] cookie = new byte[]{98, 101, 108, 97};
    boolean use_reaper = false;
    static final int backlog = 20;
    ServerSocket srv_sock = null;
    boolean reuse_addr = false;
    boolean tcp_nodelay = false;
    int linger = -1;
    InetAddress external_addr = null;
    int max_port = 0;
    Thread acceptor = null;
    boolean running = false;
    static final long MAX_JOIN_TIMEOUT = 10000L;

    public final void setReceiver(Receiver r) {
        this.receiver = r;
    }

    public void addConnectionListener(ConnectionListener l) {
        if (l != null && !this.conn_listeners.contains(l)) {
            this.conn_listeners.addElement(l);
        }
    }

    public void removeConnectionListener(ConnectionListener l) {
        if (l != null) {
            this.conn_listeners.removeElement(l);
        }
    }

    public Address getLocalAddress() {
        if (this.local_addr == null) {
            this.local_addr = this.bind_addr != null ? new IpAddress(this.bind_addr, this.srv_port) : null;
        }
        return this.local_addr;
    }

    public int getSendBufferSize() {
        return this.send_buf_size;
    }

    public void setSendBufferSize(int send_buf_size) {
        this.send_buf_size = send_buf_size;
    }

    public int getReceiveBufferSize() {
        return this.recv_buf_size;
    }

    public void setReceiveBufferSize(int recv_buf_size) {
        this.recv_buf_size = recv_buf_size;
    }

    public int getSocketConnectionTimeout() {
        return this.sock_conn_timeout;
    }

    public void setSocketConnectionTimeout(int sock_conn_timeout) {
        this.sock_conn_timeout = sock_conn_timeout;
    }

    public int getNumConnections() {
        return this.conns.size();
    }

    public boolean getTcpNodelay() {
        return this.tcp_nodelay;
    }

    public void setTcpNodelay(boolean tcp_nodelay) {
        this.tcp_nodelay = tcp_nodelay;
    }

    public int getLinger() {
        return this.linger;
    }

    public void setLinger(int linger) {
        this.linger = linger;
    }

    public boolean getUseSendQueues() {
        return this.use_send_queues;
    }

    public void setUseSendQueues(boolean flag) {
        this.use_send_queues = flag;
    }

    public void start() throws Exception {
        this.running = true;
    }

    public void stop() {
        this.running = false;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void remove(Address addr) {
        Connection conn;
        HashMap hashMap = this.conns;
        synchronized (hashMap) {
            conn = (Connection)this.conns.remove(addr);
        }
        if (conn != null) {
            try {
                conn.destroy();
            }
            catch (Exception exception) {
                // empty catch block
            }
        }
        if (this.log.isTraceEnabled()) {
            this.log.trace((Object)("removed " + addr + ", connections are " + this.toString()));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void receive(Address sender, byte[] data, int offset, int length) {
        if (this.receiver != null) {
            Object object = this.recv_mutex;
            synchronized (object) {
                this.receiver.receive(sender, data, offset, length);
            }
        } else if (this.log.isErrorEnabled()) {
            this.log.error((Object)"receiver is null (not set) !");
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public String toString() {
        HashMap copy;
        StringBuffer ret = new StringBuffer();
        HashMap hashMap = this.conns;
        synchronized (hashMap) {
            copy = new HashMap(this.conns);
        }
        ret.append("connections (" + copy.size() + "):\n");
        Iterator it = copy.entrySet().iterator();
        while (it.hasNext()) {
            Map.Entry entry = it.next();
            Address key = (Address)entry.getKey();
            Connection val = (Connection)entry.getValue();
            ret.append("key: " + key + ": " + val + '\n');
        }
        ret.append('\n');
        return ret.toString();
    }

    void notifyConnectionOpened(Address peer) {
        if (peer == null) {
            return;
        }
        for (int i = 0; i < this.conn_listeners.size(); ++i) {
            ((ConnectionListener)this.conn_listeners.elementAt(i)).connectionOpened(peer);
        }
    }

    void notifyConnectionClosed(Address peer) {
        if (peer == null) {
            return;
        }
        for (int i = 0; i < this.conn_listeners.size(); ++i) {
            ((ConnectionListener)this.conn_listeners.elementAt(i)).connectionClosed(peer);
        }
    }

    void addConnection(Address peer, Connection c) {
        this.conns.put(peer, c);
        if (this.reaper != null && !this.reaper.isRunning()) {
            this.reaper.start();
        }
    }

    public void send(Address dest, byte[] data, int offset, int length) throws Exception {
        Connection conn;
        if (dest == null) {
            if (this.log.isErrorEnabled()) {
                this.log.error((Object)"destination is null");
            }
            return;
        }
        if (data == null) {
            this.log.warn((Object)"data is null; discarding packet");
            return;
        }
        if (!this.running) {
            if (this.log.isWarnEnabled()) {
                this.log.warn((Object)("connection table is not running, discarding message to " + dest));
            }
            return;
        }
        try {
            conn = this.getConnection(dest);
            if (conn == null) {
                return;
            }
        }
        catch (Throwable ex) {
            throw new Exception("connection to " + dest + " could not be established", ex);
        }
        try {
            conn.send(data, offset, length);
        }
        catch (Throwable ex) {
            if (this.log.isTraceEnabled()) {
                this.log.trace((Object)("sending msg to " + dest + " failed (" + ex.getClass().getName() + "); removing from connection table"), ex);
            }
            this.remove(dest);
        }
    }

    abstract Connection getConnection(Address var1) throws Exception;

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void retainAll(Collection current_mbrs) {
        HashMap copy;
        if (current_mbrs == null) {
            return;
        }
        HashMap hashMap = this.conns;
        synchronized (hashMap) {
            copy = new HashMap(this.conns);
            this.conns.keySet().retainAll(current_mbrs);
        }
        Iterator it = copy.entrySet().iterator();
        while (it.hasNext()) {
            Connection conn;
            Map.Entry entry = it.next();
            Object oKey = entry.getKey();
            if (current_mbrs.contains(oKey) || null == (conn = (Connection)entry.getValue())) continue;
            if (this.log.isTraceEnabled()) {
                this.log.trace((Object)("Destroy this orphaned connection: " + conn));
            }
            conn.destroy();
        }
        copy.clear();
    }

    class Reaper
    implements Runnable {
        Thread t = null;

        Reaper() {
        }

        public void start() {
            if (BasicConnectionTable.this.conns.size() == 0) {
                return;
            }
            if (this.t != null && !this.t.isAlive()) {
                this.t = null;
            }
            if (this.t == null) {
                this.t = new Thread(BasicConnectionTable.this.thread_group, this, "ConnectionTable.ReaperThread");
                this.t.setDaemon(true);
                this.t.start();
            }
        }

        public void stop() {
            Thread tmp = this.t;
            if (this.t != null) {
                this.t = null;
            }
            if (tmp != null) {
                tmp.interrupt();
                try {
                    tmp.join(10000L);
                }
                catch (InterruptedException interruptedException) {
                    // empty catch block
                }
                if (tmp.isAlive() && BasicConnectionTable.this.log.isWarnEnabled()) {
                    BasicConnectionTable.this.log.warn((Object)("reaper thread was interrupted, but is still alive: " + tmp));
                }
            }
        }

        public boolean isRunning() {
            return this.t != null;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void run() {
            if (BasicConnectionTable.this.log.isInfoEnabled()) {
                BasicConnectionTable.this.log.info((Object)("connection reaper thread was started. Number of connections=" + BasicConnectionTable.this.conns.size() + ", reaper_interval=" + BasicConnectionTable.this.reaper_interval + ", conn_expire_time=" + BasicConnectionTable.this.conn_expire_time));
            }
            while (BasicConnectionTable.this.conns.size() > 0 && this.t != null && this.t.equals(Thread.currentThread())) {
                Util.sleep(BasicConnectionTable.this.reaper_interval);
                if (this.t == null || !Thread.currentThread().equals(this.t)) break;
                HashMap hashMap = BasicConnectionTable.this.conns;
                synchronized (hashMap) {
                    long curr_time = System.currentTimeMillis();
                    Iterator it = BasicConnectionTable.this.conns.entrySet().iterator();
                    while (it.hasNext()) {
                        Map.Entry entry = it.next();
                        Connection value = (Connection)entry.getValue();
                        if (BasicConnectionTable.this.log.isInfoEnabled()) {
                            BasicConnectionTable.this.log.info((Object)("connection is " + (curr_time - value.last_access) / 1000L + " seconds old (curr-time=" + curr_time + ", last_access=" + value.last_access + ')'));
                        }
                        if (value.last_access + BasicConnectionTable.this.conn_expire_time >= curr_time) continue;
                        if (BasicConnectionTable.this.log.isInfoEnabled()) {
                            BasicConnectionTable.this.log.info((Object)("connection " + value + " has been idle for too long (conn_expire_time=" + BasicConnectionTable.this.conn_expire_time + "), will be removed"));
                        }
                        value.destroy();
                        it.remove();
                    }
                }
            }
            if (BasicConnectionTable.this.log.isInfoEnabled()) {
                BasicConnectionTable.this.log.info((Object)"reaper terminated");
            }
            this.t = null;
        }
    }

    class Connection
    implements Runnable {
        Socket sock;
        String sock_addr;
        DataOutputStream out;
        DataInputStream in;
        Thread receiverThread;
        Address peer_addr;
        final Object send_mutex;
        long last_access;
        Queue send_queue;
        Sender sender;
        boolean is_running;

        private String getSockAddress() {
            if (this.sock_addr != null) {
                return this.sock_addr;
            }
            if (this.sock != null) {
                StringBuffer sb = new StringBuffer();
                sb.append(this.sock.getLocalAddress().getHostAddress()).append(':').append(this.sock.getLocalPort());
                sb.append(" - ").append(this.sock.getInetAddress().getHostAddress()).append(':').append(this.sock.getPort());
                this.sock_addr = sb.toString();
            }
            return this.sock_addr;
        }

        Connection(Socket s, Address peer_addr) {
            block2: {
                this.sock = null;
                this.sock_addr = null;
                this.out = null;
                this.in = null;
                this.receiverThread = null;
                this.peer_addr = null;
                this.send_mutex = new Object();
                this.last_access = System.currentTimeMillis();
                this.send_queue = new Queue();
                this.sender = new Sender();
                this.is_running = false;
                this.sock = s;
                this.peer_addr = peer_addr;
                try {
                    this.out = new DataOutputStream(new BufferedOutputStream(this.sock.getOutputStream()));
                    this.in = new DataInputStream(new BufferedInputStream(this.sock.getInputStream()));
                }
                catch (Exception ex) {
                    if (!BasicConnectionTable.this.log.isErrorEnabled()) break block2;
                    BasicConnectionTable.this.log.error((Object)("exception is " + ex));
                }
            }
        }

        boolean established() {
            return this.receiverThread != null;
        }

        void setPeerAddress(Address peer_addr) {
            this.peer_addr = peer_addr;
        }

        Address getPeerAddress() {
            return this.peer_addr;
        }

        void updateLastAccessed() {
            this.last_access = System.currentTimeMillis();
        }

        void init() {
            this.is_running = true;
            if (this.receiverThread == null || !this.receiverThread.isAlive()) {
                this.receiverThread = new Thread(BasicConnectionTable.this.thread_group, this, "ConnectionTable.Connection.Receiver [" + this.getSockAddress() + "]");
                this.receiverThread.setDaemon(true);
                this.receiverThread.start();
                if (BasicConnectionTable.this.log.isTraceEnabled()) {
                    BasicConnectionTable.this.log.trace((Object)"ConnectionTable.Connection.Receiver started");
                }
            }
        }

        void destroy() {
            this.is_running = false;
            this.closeSocket();
            this.sender.stop();
            Thread tmp = this.receiverThread;
            this.receiverThread = null;
            if (tmp != null) {
                try {
                    tmp.interrupt();
                    tmp.join(10000L);
                }
                catch (InterruptedException interruptedException) {
                    // empty catch block
                }
                if (tmp.isAlive() && BasicConnectionTable.this.log.isWarnEnabled()) {
                    BasicConnectionTable.this.log.warn((Object)("stopped receiver thread, but thread (" + tmp + ") is still alive !"));
                }
            }
        }

        void send(byte[] data, int offset, int length) {
            if (!this.is_running) {
                if (BasicConnectionTable.this.log.isWarnEnabled()) {
                    BasicConnectionTable.this.log.warn((Object)"Connection is not running, discarding message");
                }
                return;
            }
            if (BasicConnectionTable.this.use_send_queues) {
                try {
                    byte[] tmp = new byte[length];
                    System.arraycopy(data, offset, tmp, 0, length);
                    this.send_queue.add(tmp);
                    if (!this.sender.isRunning()) {
                        this.sender.start();
                    }
                }
                catch (QueueClosedException e) {
                    BasicConnectionTable.this.log.error((Object)"failed adding message to send_queue", (Throwable)e);
                }
            } else {
                this._send(data, offset, length);
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void _send(byte[] data, int offset, int length) {
            Object object = this.send_mutex;
            synchronized (object) {
                block13: {
                    try {
                        this.doSend(data, offset, length);
                        this.updateLastAccessed();
                    }
                    catch (IOException io_ex) {
                        if (BasicConnectionTable.this.log.isWarnEnabled()) {
                            BasicConnectionTable.this.log.warn((Object)"peer closed connection, trying to re-send msg");
                        }
                        try {
                            this.doSend(data, offset, length);
                            this.updateLastAccessed();
                        }
                        catch (IOException io_ex2) {
                            if (BasicConnectionTable.this.log.isErrorEnabled()) {
                                BasicConnectionTable.this.log.error((Object)"2nd attempt to send data failed too");
                            }
                        }
                        catch (Exception ex2) {
                            if (BasicConnectionTable.this.log.isErrorEnabled()) {
                                BasicConnectionTable.this.log.error((Object)("exception is " + ex2));
                            }
                        }
                    }
                    catch (InterruptedException iex) {
                    }
                    catch (Throwable ex) {
                        if (!BasicConnectionTable.this.log.isErrorEnabled()) break block13;
                        BasicConnectionTable.this.log.error((Object)("exception is " + ex));
                    }
                }
            }
        }

        void doSend(byte[] data, int offset, int length) throws Exception {
            try {
                if (this.out != null) {
                    this.out.writeInt(length);
                    Util.doubleWrite(data, offset, length, this.out);
                    this.out.flush();
                }
            }
            catch (Exception ex) {
                BasicConnectionTable.this.remove(this.peer_addr);
                throw ex;
            }
        }

        Address readPeerAddress(Socket client_sock) throws Exception {
            InetAddress client_addr;
            IpAddress client_peer_addr = null;
            byte[] input_cookie = new byte[BasicConnectionTable.this.cookie.length];
            int client_port = client_sock != null ? client_sock.getPort() : 0;
            InetAddress inetAddress = client_addr = client_sock != null ? client_sock.getInetAddress() : null;
            if (this.in != null) {
                this.initCookie(input_cookie);
                this.in.read(input_cookie, 0, input_cookie.length);
                if (!this.matchCookie(input_cookie)) {
                    throw new SocketException("ConnectionTable.Connection.readPeerAddress(): cookie sent by " + client_peer_addr + " does not match own cookie; terminating connection");
                }
                short version = this.in.readShort();
                if (!Version.compareTo(version) && BasicConnectionTable.this.log.isWarnEnabled()) {
                    BasicConnectionTable.this.log.warn((Object)new StringBuffer("packet from ").append(client_addr).append(':').append(client_port).append(" has different version (").append(version).append(") from ours (").append(240).append("). This may cause problems"));
                }
                client_peer_addr = new IpAddress();
                client_peer_addr.readFrom(this.in);
                this.updateLastAccessed();
            }
            return client_peer_addr;
        }

        void sendLocalAddress(Address local_addr) {
            block5: {
                if (local_addr == null) {
                    if (BasicConnectionTable.this.log.isWarnEnabled()) {
                        BasicConnectionTable.this.log.warn((Object)"local_addr is null");
                    }
                    return;
                }
                if (this.out != null) {
                    try {
                        this.out.write(BasicConnectionTable.this.cookie, 0, BasicConnectionTable.this.cookie.length);
                        this.out.writeShort(240);
                        local_addr.writeTo(this.out);
                        this.out.flush();
                        this.updateLastAccessed();
                    }
                    catch (Throwable t) {
                        if (!BasicConnectionTable.this.log.isErrorEnabled()) break block5;
                        BasicConnectionTable.this.log.error((Object)("exception is " + t));
                    }
                }
            }
        }

        void initCookie(byte[] c) {
            if (c != null) {
                for (int i = 0; i < c.length; ++i) {
                    c[i] = 0;
                }
            }
        }

        boolean matchCookie(byte[] input) {
            if (input == null || input.length < BasicConnectionTable.this.cookie.length) {
                return false;
            }
            for (int i = 0; i < BasicConnectionTable.this.cookie.length; ++i) {
                if (BasicConnectionTable.this.cookie[i] == input[i]) continue;
                return false;
            }
            return true;
        }

        String printCookie(byte[] c) {
            if (c == null) {
                return "";
            }
            return new String(c);
        }

        public void run() {
            byte[] buf = new byte[256];
            int len = 0;
            while (this.receiverThread != null && this.receiverThread.equals(Thread.currentThread()) && this.is_running) {
                try {
                    if (this.in == null) {
                        if (!BasicConnectionTable.this.log.isErrorEnabled()) break;
                        BasicConnectionTable.this.log.error((Object)"input stream is null !");
                        break;
                    }
                    len = this.in.readInt();
                    if (len > buf.length) {
                        buf = new byte[len];
                    }
                    this.in.readFully(buf, 0, len);
                    this.updateLastAccessed();
                    BasicConnectionTable.this.receive(this.peer_addr, buf, 0, len);
                }
                catch (OutOfMemoryError mem_ex) {
                    if (!BasicConnectionTable.this.log.isWarnEnabled()) break;
                    BasicConnectionTable.this.log.warn((Object)"dropped invalid message, closing connection");
                    break;
                }
                catch (EOFException eof_ex) {
                    if (BasicConnectionTable.this.log.isTraceEnabled()) {
                        BasicConnectionTable.this.log.trace((Object)("exception is " + eof_ex));
                    }
                    BasicConnectionTable.this.notifyConnectionClosed(this.peer_addr);
                    break;
                }
                catch (IOException io_ex) {
                    if (BasicConnectionTable.this.log.isTraceEnabled()) {
                        BasicConnectionTable.this.log.trace((Object)("exception is " + io_ex));
                    }
                    BasicConnectionTable.this.notifyConnectionClosed(this.peer_addr);
                    break;
                }
                catch (Throwable e) {
                    if (!BasicConnectionTable.this.log.isWarnEnabled()) continue;
                    BasicConnectionTable.this.log.warn((Object)("exception is " + e));
                }
            }
            if (BasicConnectionTable.this.log.isTraceEnabled()) {
                BasicConnectionTable.this.log.trace((Object)"ConnectionTable.Connection.Receiver terminated");
            }
            this.receiverThread = null;
            this.closeSocket();
        }

        public String toString() {
            StringBuffer ret = new StringBuffer();
            InetAddress local = null;
            InetAddress remote = null;
            if (this.sock == null) {
                ret.append("<null socket>");
            } else {
                Socket tmp_sock = this.sock;
                local = tmp_sock.getLocalAddress();
                remote = tmp_sock.getInetAddress();
                String local_str = local != null ? Util.shortName(local) : "<null>";
                String remote_str = remote != null ? Util.shortName(remote) : "<null>";
                ret.append('<' + local_str + ':' + tmp_sock.getLocalPort() + " --> " + remote_str + ':' + tmp_sock.getPort() + "> (" + (System.currentTimeMillis() - this.last_access) / 1000L + " secs old)");
                tmp_sock = null;
            }
            return ret.toString();
        }

        void closeSocket() {
            Util.close(this.sock);
            this.sock = null;
            Util.close(this.out);
            Util.close(this.in);
        }

        class Sender
        implements Runnable {
            Thread senderThread;
            private boolean is_it_running = false;

            Sender() {
            }

            void start() {
                if (this.senderThread == null || !this.senderThread.isAlive()) {
                    this.senderThread = new Thread(((Connection)Connection.this).BasicConnectionTable.this.thread_group, this, "ConnectionTable.Connection.Sender [" + Connection.this.getSockAddress() + "]");
                    this.senderThread.setDaemon(true);
                    this.senderThread.start();
                    this.is_it_running = true;
                    if (((Connection)Connection.this).BasicConnectionTable.this.log.isTraceEnabled()) {
                        ((Connection)Connection.this).BasicConnectionTable.this.log.trace((Object)"ConnectionTable.Connection.Sender thread started");
                    }
                }
            }

            void stop() {
                this.is_it_running = false;
                if (Connection.this.send_queue != null) {
                    Connection.this.send_queue.close(false);
                }
                if (this.senderThread != null) {
                    Thread tmp = this.senderThread;
                    this.senderThread = null;
                    tmp.interrupt();
                    try {
                        tmp.join(10000L);
                    }
                    catch (InterruptedException interruptedException) {
                        // empty catch block
                    }
                    if (tmp.isAlive() && ((Connection)Connection.this).BasicConnectionTable.this.log.isWarnEnabled()) {
                        ((Connection)Connection.this).BasicConnectionTable.this.log.warn((Object)("sender thread was interrupted, but is still alive: " + tmp));
                    }
                }
            }

            boolean isRunning() {
                return this.is_it_running && this.senderThread != null;
            }

            public void run() {
                while (this.senderThread != null && this.senderThread.equals(Thread.currentThread()) && this.is_it_running) {
                    try {
                        byte[] data = (byte[])Connection.this.send_queue.remove();
                        if (data == null) continue;
                        Connection.this._send(data, 0, data.length);
                    }
                    catch (QueueClosedException e) {
                        // empty catch block
                        break;
                    }
                }
                this.is_it_running = false;
                if (((Connection)Connection.this).BasicConnectionTable.this.log.isTraceEnabled()) {
                    ((Connection)Connection.this).BasicConnectionTable.this.log.trace((Object)"ConnectionTable.Connection.Sender thread terminated");
                }
            }
        }
    }

    public static interface ConnectionListener {
        public void connectionOpened(Address var1);

        public void connectionClosed(Address var1);
    }

    public static interface Receiver {
        public void receive(Address var1, byte[] var2, int var3, int var4);
    }
}

