/*
 * Decompiled with CFR 0.152.
 */
package com.github.yingzhuo.nsqj;

import com.github.yingzhuo.nsqj.BasePubSub;
import com.github.yingzhuo.nsqj.Client;
import com.github.yingzhuo.nsqj.Config;
import com.github.yingzhuo.nsqj.HostAndPort;
import com.github.yingzhuo.nsqj.NSQException;
import com.github.yingzhuo.nsqj.ServerConfig;
import com.github.yingzhuo.nsqj.Util;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.Closeable;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.lang.management.ManagementFactory;
import java.lang.reflect.Constructor;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.zip.Deflater;
import java.util.zip.DeflaterOutputStream;
import java.util.zip.Inflater;
import java.util.zip.InflaterInputStream;
import javax.net.ssl.SSLSocketFactory;
import net.jcip.annotations.GuardedBy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

abstract class Connection
extends BasePubSub
implements Closeable {
    private static final Logger log = LoggerFactory.getLogger(Connection.class);
    protected final HostAndPort host;
    protected DataOutputStream out;
    protected DataInputStream in;
    private volatile boolean isReading = true;
    protected int msgTimeout = 60000;
    protected int heartbeatInterval = 30000;
    protected int maxRdyCount = 2500;
    protected long lastActionFlush;
    protected int unflushedCount;
    protected long lastHeartbeat;
    protected final BlockingQueue<String> respQueue = new ArrayBlockingQueue<String>(1);
    protected final ExecutorService handlerExecutor;
    private static final ThreadFactory readThreadFactory = Util.threadFactory("nsq-read");
    private static final Set<String> nonFatalErrors = Collections.unmodifiableSet(new HashSet<String>(Arrays.asList("E_FIN_FAILED", "E_REQ_FAILED", "E_TOUCH_FAILED")));

    public Connection(Client client, HostAndPort hostAndPort) {
        super(client);
        this.host = hostAndPort;
        this.handlerExecutor = client.getExecutor();
    }

    public synchronized void connect(Config config) throws IOException {
        this.addClientConfig(config);
        Socket socket = new Socket();
        socket.setSoTimeout(30000);
        socket.connect(new InetSocketAddress(this.host.getHost(), this.host.getPort()), 30000);
        StreamPair streamPair = this.setStreams(socket.getInputStream(), socket.getOutputStream(), new StreamPair());
        this.out.write("  V2".getBytes(Util.US_ASCII));
        String string = this.connectCommand("IDENTIFY", this.client.getGson().toJson((Object)config).getBytes(Util.UTF_8));
        ServerConfig serverConfig = (ServerConfig)this.client.getGson().fromJson(string, ServerConfig.class);
        log.debug("serverConfig:{}", (Object)string);
        this.setConfig(serverConfig);
        this.msgTimeout = Util.firstNonNull(serverConfig.getMsgTimeout(), 60000);
        this.heartbeatInterval = Util.firstNonNull(serverConfig.getHeartbeatInterval(), 30000);
        this.maxRdyCount = Util.firstNonNull(serverConfig.getMaxRdyCount(), 2500);
        log.info("connected {} msgTimeout:{} heartbeatInterval:{} maxRdyCount:{}", new Object[]{this.host, this.msgTimeout, this.heartbeatInterval, this.maxRdyCount});
        socket.setSoTimeout(this.heartbeatInterval + 5000);
        this.wrapEncryption(serverConfig, socket, streamPair);
        this.wrapCompression(serverConfig, streamPair);
        if (!streamPair.isBuffered) {
            this.in = new DataInputStream(new BufferedInputStream(streamPair.baseIn));
            this.out = new DataOutputStream(new BufferedOutputStream(streamPair.baseOut));
        }
        this.sendAuthorization(serverConfig);
        this.scheduleAtFixedRate(new Runnable(){

            @Override
            public void run() {
                Connection.this.checkHeartbeat();
            }
        }, this.heartbeatInterval + 2000, this.heartbeatInterval, false);
        this.lastHeartbeat = Util.clock();
        readThreadFactory.newThread(new Runnable(){

            @Override
            public void run() {
                Connection.this.read();
            }
        }).start();
    }

    private String connectCommand(String string, byte[] byArray) throws IOException {
        this.out.write((string + "\n").getBytes(Util.US_ASCII));
        this.write(byArray);
        this.out.flush();
        return this.readResponse();
    }

    private void addClientConfig(Config config) {
        String string;
        int n;
        if (config.getHostname() == null && (n = (string = ManagementFactory.getRuntimeMXBean().getName()).indexOf(64)) > 0) {
            config.setHostname(string.substring(n + 1));
        }
        config.setFeatureNegotiation(true);
    }

    private void wrapEncryption(ServerConfig serverConfig, Socket socket, StreamPair streamPair) throws IOException {
        if (!serverConfig.getTlsV1().booleanValue()) {
            return;
        }
        log.debug("adding tls");
        SSLSocketFactory sSLSocketFactory = this.client.getSSLSocketFactory();
        if (sSLSocketFactory == null) {
            sSLSocketFactory = (SSLSocketFactory)SSLSocketFactory.getDefault();
        }
        Socket socket2 = sSLSocketFactory.createSocket(socket, socket.getInetAddress().getHostAddress(), socket.getPort(), true);
        this.setStreams(socket2.getInputStream(), socket2.getOutputStream(), streamPair);
        this.readResponse();
    }

    private void wrapCompression(ServerConfig serverConfig, StreamPair streamPair) {
        if (serverConfig.getDeflate().booleanValue()) {
            log.debug("adding deflate compression");
            try {
                InflaterInputStream inflaterInputStream = new InflaterInputStream(streamPair.baseIn, new Inflater(true), 32768);
                Deflater deflater = new Deflater(-1, true);
                Constructor constructor = DeflaterOutputStream.class.getConstructor(OutputStream.class, Deflater.class, Integer.TYPE, Boolean.TYPE);
                DeflaterOutputStream deflaterOutputStream = (DeflaterOutputStream)constructor.newInstance(streamPair.baseOut, deflater, 32768, true);
                this.setStreams(inflaterInputStream, deflaterOutputStream, streamPair);
                streamPair.isBuffered = true;
                this.readResponse();
            }
            catch (Exception exception) {
                throw new NSQException("deflate compression only supported on java7 and up");
            }
        }
        if (serverConfig.getSnappy().booleanValue()) {
            log.debug("adding snappy compression");
            if (serverConfig.getVersion().startsWith("0.")) {
                throw new NSQException("snappy compression only supported on nsqd 1.0 and up");
            }
            try {
                Constructor<?> constructor = Class.forName("org.xerial.snappy.SnappyFramedInputStream").getConstructor(InputStream.class);
                Constructor<?> constructor2 = Class.forName("org.xerial.snappy.SnappyFramedOutputStream").getConstructor(OutputStream.class);
                this.setStreams((InputStream)constructor.newInstance(streamPair.baseIn), (OutputStream)constructor2.newInstance(streamPair.baseOut), streamPair);
                this.readResponse();
            }
            catch (Exception exception) {
                throw new NSQException("snappy compression failed, is org.xerial.snappy:snappy-java available?", exception);
            }
        }
    }

    private void sendAuthorization(ServerConfig serverConfig) throws IOException {
        if (serverConfig.getAuthRequired() != null && serverConfig.getAuthRequired().booleanValue()) {
            if (this.client.getAuthSecret() == null) {
                throw new NSQException("nsqd requires authorization, call client.setAuthSecret before connecting");
            }
            if (!serverConfig.getTlsV1().booleanValue()) {
                log.warn("authorization used without encryption. authSecret sent in plain text");
            }
            String string = this.connectCommand("AUTH", this.client.getAuthSecret());
            log.info("authorization response:{}", (Object)string);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void checkHeartbeat() {
        try {
            boolean bl = true;
            long l = Util.clock();
            Connection connection = this;
            synchronized (connection) {
                bl = l - this.lastHeartbeat > (long)(2 * this.heartbeatInterval);
            }
            if (bl) {
                log.info("heartbeat failed, closing connection:{}", (Object)this.toString());
                this.close();
            }
        }
        catch (Exception exception) {
            log.error("problem checking heartbeat, will probably time out soon. {}", (Object)this.toString(), (Object)exception);
        }
    }

    @GuardedBy(value="this")
    protected void writeCommand(String string, Object object, Object object2) throws IOException {
        this.out.write((string + " " + object + " " + object2 + "\n").getBytes(Util.US_ASCII));
    }

    @GuardedBy(value="this")
    protected void writeCommand(String string, Object object) throws IOException {
        this.out.write((string + " " + object + "\n").getBytes(Util.US_ASCII));
    }

    @GuardedBy(value="this")
    protected void write(byte[] byArray) throws IOException {
        this.out.writeInt(byArray.length);
        this.out.write(byArray);
    }

    @GuardedBy(value="this")
    protected void flush() throws IOException {
        this.out.flush();
        this.lastActionFlush = Util.clock();
        this.unflushedCount = 0;
    }

    /*
     * Enabled force condition propagation
     * Lifted jumps to return sites
     */
    private String readResponse() throws IOException {
        int n = this.in.readInt();
        int n2 = this.in.readInt();
        String string = null;
        if (n2 == 0) {
            return this.readAscii(n - 4);
        }
        if (n2 == 1) {
            String string2 = this.readAscii(n - 4);
            if (!nonFatalErrors.contains(string2)) throw new NSQException("error from nsqd:" + string2);
            log.warn("non fatal nsqd error:{} probably due to message timeout", (Object)string2);
            return string;
        } else {
            if (n2 != 2) throw new NSQException("bad frame type:" + n2);
            this.onMessage(this.in.readLong(), this.in.readUnsignedShort(), this.readAscii(16), this.readBytes(n - 30));
        }
        return string;
    }

    private void read() {
        block4: {
            try {
                while (this.isReading) {
                    String string = this.readResponse();
                    if ("_heartbeat_".equals(string)) {
                        this.client.getSchedExecutor().execute(new Runnable(){

                            @Override
                            public void run() {
                                Connection.this.receivedHeartbeat();
                            }
                        });
                        continue;
                    }
                    if (string == null) continue;
                    this.respQueue.offer(string);
                }
            }
            catch (Exception exception) {
                if (!this.isReading) break block4;
                this.respQueue.offer(exception.toString());
                this.close();
                log.error("read thread exception. con:{}", (Object)this.toString(), (Object)exception);
            }
        }
        log.debug("read loop done {}", (Object)this.toString());
    }

    private synchronized void receivedHeartbeat() {
        try {
            this.out.write("NOP\n".getBytes(Util.US_ASCII));
            this.out.flush();
            this.lastHeartbeat = Util.clock();
        }
        catch (Throwable throwable) {
            log.error("receivedHeartbeat error", throwable);
        }
    }

    protected void onMessage(long l, int n, String string, byte[] byArray) {
        throw new NSQException("unexpected frame type 2 - message");
    }

    private byte[] readBytes(int n) throws IOException {
        byte[] byArray = new byte[n];
        this.in.readFully(byArray);
        return byArray;
    }

    private String readAscii(int n) throws IOException {
        return new String(this.readBytes(n), Util.US_ASCII);
    }

    protected void flushAndReadOK() throws IOException {
        this.flush();
        try {
            String string = this.respQueue.poll(this.heartbeatInterval, TimeUnit.MILLISECONDS);
            if (!"OK".equals(string)) {
                throw new NSQException("bad response:" + (string != null ? string : "timeout"));
            }
        }
        catch (InterruptedException interruptedException) {
            Thread.currentThread().interrupt();
            throw new NSQException("read interrupted");
        }
    }

    public synchronized void flushAndClose() {
        try {
            this.flush();
        }
        catch (IOException iOException) {
            log.error("flushAndClose error", (Throwable)iOException);
        }
        this.close();
    }

    @Override
    public void close() {
        this.isReading = false;
        Util.closeQuietly(this.out);
        Util.closeQuietly(this.in);
        this.cancelTasks();
        log.debug("connection closed:{}", (Object)this.toString());
    }

    public HostAndPort getHost() {
        return this.host;
    }

    public synchronized String stateDesc() {
        long l = Util.clock();
        return String.format("%s lastFlush:%.1f lastHeartbeat:%.1f unflushedCount:%d", this.toString(), Float.valueOf((float)(l - this.lastActionFlush) / 1000.0f), Float.valueOf((float)(l - this.lastHeartbeat) / 1000.0f), this.unflushedCount);
    }

    public synchronized int getMsgTimeout() {
        return this.msgTimeout;
    }

    public synchronized long getLastActionFlush() {
        return this.lastActionFlush;
    }

    public synchronized int getMaxRdyCount() {
        return this.maxRdyCount;
    }

    private StreamPair setStreams(InputStream inputStream, OutputStream outputStream, StreamPair streamPair) {
        streamPair.baseIn = inputStream;
        streamPair.baseOut = outputStream;
        this.in = new DataInputStream(inputStream);
        this.out = new DataOutputStream(outputStream);
        return streamPair;
    }

    private static class StreamPair {
        private InputStream baseIn;
        private OutputStream baseOut;
        boolean isBuffered = false;

        private StreamPair() {
        }
    }
}

