/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.api.functions.source;

import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.net.InetSocketAddress;
import java.net.Socket;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.util.IOUtils;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@PublicEvolving
public class SocketTextStreamFunction
implements SourceFunction<String> {
    private static final long serialVersionUID = 1L;
    private static final Logger LOG = LoggerFactory.getLogger(SocketTextStreamFunction.class);
    private static final int DEFAULT_CONNECTION_RETRY_SLEEP = 500;
    private static final int CONNECTION_TIMEOUT_TIME = 0;
    private final String hostname;
    private final int port;
    private final char delimiter;
    private final long maxNumRetries;
    private final long delayBetweenRetries;
    private transient Socket currentSocket;
    private volatile boolean isRunning = true;

    public SocketTextStreamFunction(String hostname, int port, char delimiter, long maxNumRetries) {
        this(hostname, port, delimiter, maxNumRetries, 500L);
    }

    public SocketTextStreamFunction(String hostname, int port, char delimiter, long maxNumRetries, long delayBetweenRetries) {
        Preconditions.checkArgument((port > 0 && port < 65536 ? 1 : 0) != 0, (Object)"port is out of range");
        Preconditions.checkArgument((maxNumRetries >= -1L ? 1 : 0) != 0, (Object)"maxNumRetries must be zero or larger (num retries), or -1 (infinite retries)");
        Preconditions.checkArgument((delayBetweenRetries >= 0L ? 1 : 0) != 0, (Object)"delayBetweenRetries must be zero or positive");
        this.hostname = (String)Preconditions.checkNotNull((Object)hostname, (String)"hostname must not be null");
        this.port = port;
        this.delimiter = delimiter;
        this.maxNumRetries = maxNumRetries;
        this.delayBetweenRetries = delayBetweenRetries;
    }

    @Override
    public void run(SourceFunction.SourceContext<String> ctx) throws Exception {
        StringBuilder buffer = new StringBuilder();
        long attempt = 0L;
        while (this.isRunning) {
            try (Socket socket = new Socket();){
                int data;
                this.currentSocket = socket;
                LOG.info("Connecting to server socket " + this.hostname + ':' + this.port);
                socket.connect(new InetSocketAddress(this.hostname, this.port), 0);
                BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
                while (this.isRunning && (data = reader.read()) != -1) {
                    if (data != this.delimiter) {
                        buffer.append((char)data);
                        continue;
                    }
                    if (this.delimiter == '\n' && buffer.length() > 0 && buffer.charAt(buffer.length() - 1) == '\r') {
                        buffer.setLength(buffer.length() - 1);
                    }
                    ctx.collect(buffer.toString());
                    buffer.setLength(0);
                }
            }
            if (!this.isRunning) continue;
            if (this.maxNumRetries != -1L && ++attempt >= this.maxNumRetries) break;
            LOG.warn("Lost connection to server socket. Retrying in " + this.delayBetweenRetries + " msecs...");
            Thread.sleep(this.delayBetweenRetries);
        }
        if (buffer.length() > 0) {
            ctx.collect(buffer.toString());
        }
    }

    @Override
    public void cancel() {
        this.isRunning = false;
        Socket theSocket = this.currentSocket;
        if (theSocket != null) {
            IOUtils.closeSocket((Socket)theSocket);
        }
    }
}

