/*
 * Decompiled with CFR 0.152.
 */
package com.sproutsocial.nsq;

import com.sproutsocial.nsq.BasePubSub;
import com.sproutsocial.nsq.Client;
import com.sproutsocial.nsq.Config;
import com.sproutsocial.nsq.HostAndPort;
import com.sproutsocial.nsq.NSQException;
import com.sproutsocial.nsq.ServerConfig;
import com.sproutsocial.nsq.Util;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.Closeable;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.lang.management.ManagementFactory;
import java.lang.reflect.Constructor;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.zip.Deflater;
import java.util.zip.DeflaterOutputStream;
import java.util.zip.Inflater;
import java.util.zip.InflaterInputStream;
import javax.net.ssl.SSLSocketFactory;
import net.jcip.annotations.GuardedBy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

abstract class Connection
extends BasePubSub
implements Closeable {
    protected final HostAndPort host;
    protected DataOutputStream out;
    protected DataInputStream in;
    private volatile boolean isReading = true;
    protected int msgTimeout = 60000;
    protected int heartbeatInterval = 30000;
    protected int maxRdyCount = 2500;
    protected long lastActionFlush;
    protected int unflushedCount;
    protected long lastHeartbeat;
    protected final BlockingQueue<String> respQueue = new ArrayBlockingQueue<String>(1);
    protected final ExecutorService handlerExecutor;
    private static final ThreadFactory readThreadFactory = Util.threadFactory("nsq-read");
    private static final Set<String> nonFatalErrors = Collections.unmodifiableSet(new HashSet<String>(Arrays.asList("E_FIN_FAILED", "E_REQ_FAILED", "E_TOUCH_FAILED")));
    private static final Logger logger = LoggerFactory.getLogger(Connection.class);

    public Connection(Client client, HostAndPort host) {
        super(client);
        this.host = host;
        this.handlerExecutor = client.getExecutor();
    }

    public synchronized void connect(Config config) throws IOException {
        this.addClientConfig(config);
        Socket sock = new Socket();
        sock.setSoTimeout(30000);
        sock.connect(new InetSocketAddress(this.host.getHost(), this.host.getPort()), 30000);
        StreamPair streams = this.setStreams(sock.getInputStream(), sock.getOutputStream(), new StreamPair());
        this.out.write("  V2".getBytes(Util.US_ASCII));
        String response = this.connectCommand("IDENTIFY", this.client.getGson().toJson((Object)config).getBytes(Util.UTF_8));
        ServerConfig serverConfig = (ServerConfig)this.client.getGson().fromJson(response, ServerConfig.class);
        logger.debug("serverConfig:{}", (Object)response);
        this.setConfig(serverConfig);
        this.msgTimeout = Util.firstNonNull(serverConfig.getMsgTimeout(), 60000);
        this.heartbeatInterval = Util.firstNonNull(serverConfig.getHeartbeatInterval(), 30000);
        this.maxRdyCount = Util.firstNonNull(serverConfig.getMaxRdyCount(), 2500);
        logger.info("connected {} msgTimeout:{} heartbeatInterval:{} maxRdyCount:{}", new Object[]{this.host, this.msgTimeout, this.heartbeatInterval, this.maxRdyCount});
        sock.setSoTimeout(this.heartbeatInterval + 5000);
        this.wrapEncryption(serverConfig, sock, streams);
        this.wrapCompression(serverConfig, streams);
        if (!streams.isBuffered) {
            this.in = new DataInputStream(new BufferedInputStream(streams.baseIn));
            this.out = new DataOutputStream(new BufferedOutputStream(streams.baseOut));
        }
        this.sendAuthorization(serverConfig);
        this.scheduleAtFixedRate(new Runnable(){

            @Override
            public void run() {
                Connection.this.checkHeartbeat();
            }
        }, this.heartbeatInterval + 2000, this.heartbeatInterval, false);
        this.lastHeartbeat = Util.clock();
        readThreadFactory.newThread(new Runnable(){

            @Override
            public void run() {
                Connection.this.read();
            }
        }).start();
    }

    private String connectCommand(String command, byte[] data) throws IOException {
        this.out.write((command + "\n").getBytes(Util.US_ASCII));
        this.write(data);
        this.out.flush();
        return this.readResponse();
    }

    private void addClientConfig(Config config) {
        String pidHost;
        int pos;
        if (config.getHostname() == null && (pos = (pidHost = ManagementFactory.getRuntimeMXBean().getName()).indexOf(64)) > 0) {
            config.setHostname(pidHost.substring(pos + 1));
        }
        config.setFeatureNegotiation(true);
    }

    private void wrapEncryption(ServerConfig serverConfig, Socket baseSocket, StreamPair streams) throws IOException {
        if (!serverConfig.getTlsV1().booleanValue()) {
            return;
        }
        logger.debug("adding tls");
        SSLSocketFactory sockFactory = this.client.getSSLSocketFactory();
        if (sockFactory == null) {
            sockFactory = (SSLSocketFactory)SSLSocketFactory.getDefault();
        }
        Socket sslSocket = sockFactory.createSocket(baseSocket, baseSocket.getInetAddress().getHostAddress(), baseSocket.getPort(), true);
        this.setStreams(sslSocket.getInputStream(), sslSocket.getOutputStream(), streams);
        this.readResponse();
    }

    private void wrapCompression(ServerConfig serverConfig, StreamPair streams) throws IOException {
        if (serverConfig.getDeflate().booleanValue()) {
            logger.debug("adding deflate compression");
            try {
                InflaterInputStream inflateIn = new InflaterInputStream(streams.baseIn, new Inflater(true), 32768);
                Deflater deflater = new Deflater(-1, true);
                Constructor constr = DeflaterOutputStream.class.getConstructor(OutputStream.class, Deflater.class, Integer.TYPE, Boolean.TYPE);
                DeflaterOutputStream deflateOut = (DeflaterOutputStream)constr.newInstance(streams.baseOut, deflater, 32768, true);
                this.setStreams(inflateIn, deflateOut, streams);
                streams.isBuffered = true;
                this.readResponse();
            }
            catch (Exception e) {
                throw new NSQException("deflate compression only supported on java7 and up");
            }
        }
        if (serverConfig.getSnappy().booleanValue()) {
            logger.debug("adding snappy compression");
            if (serverConfig.getVersion().startsWith("0.")) {
                throw new NSQException("snappy compression only supported on nsqd 1.0 and up");
            }
            try {
                Constructor<?> snappyInConstr = Class.forName("org.xerial.snappy.SnappyFramedInputStream").getConstructor(InputStream.class);
                Constructor<?> snappyOutConstr = Class.forName("org.xerial.snappy.SnappyFramedOutputStream").getConstructor(OutputStream.class);
                this.setStreams((InputStream)snappyInConstr.newInstance(streams.baseIn), (OutputStream)snappyOutConstr.newInstance(streams.baseOut), streams);
                this.readResponse();
            }
            catch (Exception e) {
                throw new NSQException("snappy compression failed, is org.xerial.snappy:snappy-java available?", e);
            }
        }
    }

    private void sendAuthorization(ServerConfig serverConfig) throws IOException {
        if (serverConfig.getAuthRequired() != null && serverConfig.getAuthRequired().booleanValue()) {
            if (this.client.getAuthSecret() == null) {
                throw new NSQException("nsqd requires authorization, call client.setAuthSecret before connecting");
            }
            if (!serverConfig.getTlsV1().booleanValue()) {
                logger.warn("authorization used without encryption. authSecret sent in plain text");
            }
            String authResponse = this.connectCommand("AUTH", this.client.getAuthSecret());
            logger.info("authorization response:{}", (Object)authResponse);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void checkHeartbeat() {
        try {
            boolean isDead = true;
            long now = Util.clock();
            Connection connection = this;
            synchronized (connection) {
                isDead = now - this.lastHeartbeat > (long)(2 * this.heartbeatInterval);
            }
            if (isDead) {
                logger.info("heartbeat failed, closing connection:{}", (Object)this.toString());
                this.close();
            }
        }
        catch (Exception e) {
            logger.error("problem checking heartbeat, will probably time out soon. {}", (Object)this.toString(), (Object)e);
        }
    }

    @GuardedBy(value="this")
    protected void writeCommand(String cmd, Object param1, Object param2) throws IOException {
        this.out.write((cmd + " " + param1 + " " + param2 + "\n").getBytes(Util.US_ASCII));
    }

    @GuardedBy(value="this")
    protected void writeCommand(String cmd, Object param) throws IOException {
        this.out.write((cmd + " " + param + "\n").getBytes(Util.US_ASCII));
    }

    @GuardedBy(value="this")
    protected void write(byte[] data) throws IOException {
        this.out.writeInt(data.length);
        this.out.write(data);
    }

    @GuardedBy(value="this")
    protected void flush() throws IOException {
        this.out.flush();
        this.lastActionFlush = Util.clock();
        this.unflushedCount = 0;
    }

    /*
     * Enabled force condition propagation
     * Lifted jumps to return sites
     */
    private String readResponse() throws IOException {
        int size = this.in.readInt();
        int frameType = this.in.readInt();
        String response = null;
        if (frameType == 0) {
            return this.readAscii(size - 4);
        }
        if (frameType == 1) {
            String error = this.readAscii(size - 4);
            int index = error.indexOf(" ");
            String errorCode = index == -1 ? error : error.substring(0, index);
            if (!nonFatalErrors.contains(errorCode)) throw new NSQException("error from nsqd:" + error);
            logger.warn("non fatal nsqd error:{} probably due to message timeout", (Object)error);
            return response;
        } else {
            if (frameType != 2) throw new NSQException("bad frame type:" + frameType);
            this.onMessage(this.in.readLong(), this.in.readUnsignedShort(), this.readAscii(16), this.readBytes(size - 30));
        }
        return response;
    }

    private void read() {
        block4: {
            try {
                while (this.isReading) {
                    String response = this.readResponse();
                    if ("_heartbeat_".equals(response)) {
                        this.client.getSchedExecutor().execute(new Runnable(){

                            @Override
                            public void run() {
                                Connection.this.receivedHeartbeat();
                            }
                        });
                        continue;
                    }
                    if (response == null) continue;
                    this.respQueue.offer(response);
                }
            }
            catch (Exception e) {
                if (!this.isReading) break block4;
                this.respQueue.offer(e.toString());
                this.close();
                logger.error("read thread exception. con:{}", (Object)this.toString(), (Object)e);
            }
        }
        logger.debug("read loop done {}", (Object)this.toString());
    }

    private synchronized void receivedHeartbeat() {
        try {
            this.out.write("NOP\n".getBytes(Util.US_ASCII));
            this.out.flush();
            this.lastHeartbeat = Util.clock();
        }
        catch (Throwable t) {
            logger.error("receivedHeartbeat error", t);
        }
    }

    protected void onMessage(long timestamp, int attempts, String id, byte[] data) {
        throw new NSQException("unexpected frame type 2 - message");
    }

    private byte[] readBytes(int size) throws IOException {
        byte[] data = new byte[size];
        this.in.readFully(data);
        return data;
    }

    private String readAscii(int size) throws IOException {
        return new String(this.readBytes(size), Util.US_ASCII);
    }

    protected void flushAndReadOK() throws IOException {
        this.flush();
        try {
            String resp = this.respQueue.poll(this.heartbeatInterval, TimeUnit.MILLISECONDS);
            if (!"OK".equals(resp)) {
                throw new NSQException("bad response:" + (resp != null ? resp : "timeout"));
            }
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new NSQException("read interrupted");
        }
    }

    public synchronized void flushAndClose() {
        try {
            this.flush();
        }
        catch (IOException e) {
            logger.error("flushAndClose error", (Throwable)e);
        }
        this.close();
    }

    @Override
    public void close() {
        this.isReading = false;
        Util.closeQuietly(this.out);
        Util.closeQuietly(this.in);
        this.cancelTasks();
        logger.debug("connection closed:{}", (Object)this.toString());
    }

    public HostAndPort getHost() {
        return this.host;
    }

    public synchronized String stateDesc() {
        long now = Util.clock();
        return String.format("%s lastFlush:%.1f lastHeartbeat:%.1f unflushedCount:%d", this.toString(), Float.valueOf((float)(now - this.lastActionFlush) / 1000.0f), Float.valueOf((float)(now - this.lastHeartbeat) / 1000.0f), this.unflushedCount);
    }

    public synchronized int getMsgTimeout() {
        return this.msgTimeout;
    }

    public synchronized long getLastActionFlush() {
        return this.lastActionFlush;
    }

    public synchronized int getMaxRdyCount() {
        return this.maxRdyCount;
    }

    private StreamPair setStreams(InputStream baseIn, OutputStream baseOut, StreamPair streams) {
        streams.baseIn = baseIn;
        streams.baseOut = baseOut;
        this.in = new DataInputStream(baseIn);
        this.out = new DataOutputStream(baseOut);
        return streams;
    }

    private static class StreamPair {
        private InputStream baseIn;
        private OutputStream baseOut;
        boolean isBuffered = false;

        private StreamPair() {
        }
    }
}

