/*
 * Decompiled with CFR 0.152.
 */
package io.nats.examples.benchmark;

import io.nats.client.Connection;
import io.nats.client.ConnectionListener;
import io.nats.client.Consumer;
import io.nats.client.ErrorListener;
import io.nats.client.NUID;
import io.nats.client.Nats;
import io.nats.client.Options;
import io.nats.client.Subscription;
import io.nats.examples.benchmark.Benchmark;
import io.nats.examples.benchmark.Sample;
import io.nats.examples.benchmark.Utils;
import java.io.FileInputStream;
import java.io.IOException;
import java.security.NoSuchAlgorithmException;
import java.security.Provider;
import java.security.Security;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.Properties;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Phaser;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;

public class NatsBench2 {
    final BlockingQueue<Throwable> errorQueue = new LinkedBlockingQueue<Throwable>();
    private int numMsgs = 5000000;
    private int numPubs = 1;
    private int numSubs = 0;
    private int size = 128;
    private String urls = "nats://localhost:4222";
    private String subject;
    private final AtomicLong sent = new AtomicLong();
    private final AtomicLong received = new AtomicLong();
    private boolean csv = false;
    private boolean stats = false;
    private boolean conscrypt = false;
    private Thread shutdownHook;
    private final AtomicBoolean shutdown = new AtomicBoolean(false);
    private boolean secure = false;
    private Benchmark bench;
    private long succPubMsgCount;
    private long succSubMsgCount;
    static final String usageString = "\nUsage: java -cp <classpath> NatsBench [-s server] [-tls] [-np num] [-ns num] [-n num] [-ms size] [-csv file] <subject>\n\nOptions:\n    -s  <urls>                     The nats server URLs (comma-separated), use tls:// or opentls:// to require tls\n    -np <int>                       Number of concurrent publishers (1)\n    -ns <int>                       Number of concurrent subscribers (0)\n    -n  <int>                       Number of messages to publish (100,000)\n    -ms <int>                       Size of the message (128)\n    -csv                            Print results to stdout as csv (false)\n    -tls                            Set the secure flag on the SSL context to true (false)\n    -stats                          Track and print out internal statistics (false)\n";

    public NatsBench2(String[] args) throws Exception {
        if (args == null || args.length < 1) {
            this.usage();
            return;
        }
        this.parseArgs(args);
    }

    public NatsBench2(Properties properties) throws NoSuchAlgorithmException {
        this.urls = properties.getProperty("bench.nats.servers", this.urls);
        this.secure = Boolean.parseBoolean(properties.getProperty("bench.nats.secure", Boolean.toString(this.secure)));
        this.numMsgs = Integer.parseInt(properties.getProperty("bench.nats.msg.count", Integer.toString(this.numMsgs)));
        this.size = Integer.parseInt(properties.getProperty("bench.nats.msg.size", Integer.toString(this.numSubs)));
        this.numPubs = Integer.parseInt(properties.getProperty("bench.nats.pubs", Integer.toString(this.numPubs)));
        this.numSubs = Integer.parseInt(properties.getProperty("bench.nats.subs", Integer.toString(this.numSubs)));
        this.csv = Boolean.parseBoolean(properties.getProperty("bench.nats.csv", Boolean.toString(this.csv)));
        this.subject = properties.getProperty("bench.nats.subject", NUID.nextGlobal());
    }

    Options prepareOptions(boolean secure) throws NoSuchAlgorithmException {
        String[] servers = this.urls.split(",");
        Options.Builder builder = new Options.Builder();
        builder.maxReconnects(-1);
        builder.connectionName("NatsBench");
        builder.servers(servers);
        builder.connectionListener(new ConnectionListener(){

            @Override
            public void connectionEvent(Connection conn, ConnectionListener.Events type) {
                System.out.println("Connection Event:" + (Object)((Object)type));
                if (type == ConnectionListener.Events.DISCOVERED_SERVERS) {
                    conn.getServers().forEach(System.out::println);
                }
                if (type == ConnectionListener.Events.RECONNECTED) {
                    System.out.println("Reconnected to:" + conn.getConnectedUrl());
                }
            }
        });
        builder.errorListener(new ErrorListener(){

            @Override
            public void errorOccurred(Connection conn, String error) {
                System.out.printf("An error occurred %s\n", error);
            }

            @Override
            public void exceptionOccurred(Connection conn, Exception exp) {
                System.out.println("An exception occurred...");
                exp.printStackTrace();
            }

            @Override
            public void slowConsumerDetected(Connection conn, Consumer consumer) {
                System.out.println("Slow consumer detected");
            }
        });
        if (this.stats) {
            builder.turnOnAdvancedStats();
        }
        if (this.conscrypt) {
            try {
                Provider provider = null;
                provider = (Provider)Class.forName("org.conscrypt.OpenSSLProvider").newInstance();
                Security.insertProviderAt(provider, 1);
            }
            catch (Exception e) {
                e.printStackTrace();
                System.exit(-1);
            }
        }
        if (secure) {
            builder.secure();
        }
        return builder.build();
    }

    public void start() throws Exception {
        this.installShutdownHook();
        System.out.println();
        System.out.printf("Starting benchmark(s) [msgs=%d, msgsize=%d, pubs=%d, subs=%d]\n", this.numMsgs, this.size, this.numPubs, this.numSubs);
        System.out.printf("Current memory usage is %s / %s / %s free/total/max\n", Utils.humanBytes(Runtime.getRuntime().freeMemory()), Utils.humanBytes(Runtime.getRuntime().totalMemory()), Utils.humanBytes(Runtime.getRuntime().maxMemory()));
        System.out.println("Use ctrl-C to cancel.");
        System.out.println();
        if (this.numPubs > 0 && this.numSubs > 0) {
            this.runTest("Pub/Sub", this.numPubs, this.numSubs);
        } else if (this.numPubs > 0) {
            this.runTest("Pub Only", this.numPubs, 0);
        } else {
            this.runTest("Sub Only", 0, this.numSubs);
        }
        System.out.println();
        System.out.println("Successfully Published messages: " + this.getSuccPubMsgCount());
        System.out.println("Successfully Received messages: " + this.getSuccResMsgCount());
        System.out.printf("Final memory usage is %s / %s / %s free/total/max\n", Utils.humanBytes(Runtime.getRuntime().freeMemory()), Utils.humanBytes(Runtime.getRuntime().totalMemory()), Utils.humanBytes(Runtime.getRuntime().maxMemory()));
        Runtime.getRuntime().removeShutdownHook(this.shutdownHook);
    }

    public void runTest(String title, int pubCount, int subCount) throws Exception {
        Phaser subReady = new Phaser();
        Phaser finisher = new Phaser();
        CompletableFuture<Boolean> starter = new CompletableFuture<Boolean>();
        subReady.register();
        finisher.register();
        this.sent.set(0L);
        this.received.set(0L);
        this.bench = new Benchmark(title);
        for (int i = 0; i < subCount; ++i) {
            subReady.register();
            finisher.register();
            new Thread((Runnable)new SyncSubWorker(starter, subReady, finisher, this.numMsgs, this.size, this.secure), "Sub-" + i).start();
        }
        subReady.arriveAndAwaitAdvance();
        if (!this.errorQueue.isEmpty()) {
            Throwable error = this.errorQueue.take();
            System.err.printf(error.getMessage(), new Object[0]);
            error.printStackTrace();
            throw new RuntimeException(error);
        }
        if (pubCount != 0) {
            int remaining = this.numMsgs;
            int perPubMsgs = this.numMsgs / pubCount;
            for (int i = 0; i < pubCount; ++i) {
                finisher.register();
                if (i == this.numPubs - 1) {
                    perPubMsgs = remaining;
                }
                if (subCount == 0) {
                    new Thread((Runnable)new PubWorker(starter, finisher, perPubMsgs, this.size, this.secure), "Pub-" + i).start();
                } else {
                    new Thread((Runnable)new PubWorker(starter, finisher, perPubMsgs, this.size, this.secure), "Pub-" + i).start();
                }
                remaining -= perPubMsgs;
            }
        } else {
            System.out.println("Starting subscribers, time to run the publishers somewhere ...");
        }
        starter.complete(Boolean.TRUE);
        finisher.arriveAndAwaitAdvance();
        if (!this.errorQueue.isEmpty()) {
            Throwable error = this.errorQueue.take();
            System.err.printf("Error running test [%s]\n", error.getMessage());
            System.err.printf("Latest test sent = %d\n", this.sent.get());
            System.err.printf("Latest test received = %d\n", this.received.get());
            Runtime.getRuntime().removeShutdownHook(this.shutdownHook);
            throw new RuntimeException(error);
        }
        if (subCount == 1 && pubCount > 0 && this.sent.get() != this.received.get()) {
            System.out.println("#### Error - sent and received are not equal " + this.sent.get() + " != " + this.received.get());
        }
        this.bench.close();
        if (this.csv) {
            System.out.println(this.bench.csv());
        } else {
            System.out.println(this.bench.report());
        }
    }

    void installShutdownHook() {
        this.shutdownHook = new Thread(new Runnable(){

            @Override
            public void run() {
                System.err.println("\nCaught CTRL-C, shutting down gracefully...\n");
                NatsBench2.this.shutdown.set(true);
                System.err.printf("Sent=%d\n", NatsBench2.this.sent.get());
                System.err.printf("Received=%d\n", NatsBench2.this.received.get());
            }
        });
        Runtime.getRuntime().addShutdownHook(this.shutdownHook);
    }

    void usage() {
        System.err.println(usageString);
        System.exit(-1);
    }

    private void parseArgs(String[] args) {
        ArrayList<String> argList = new ArrayList<String>(Arrays.asList(args));
        this.subject = (String)argList.get(argList.size() - 1);
        argList.remove(argList.size() - 1);
        Iterator it = argList.iterator();
        block22: while (it.hasNext()) {
            String arg;
            switch (arg = (String)it.next()) {
                case "-s": {
                    if (!it.hasNext()) {
                        this.usage();
                    }
                    it.remove();
                    this.urls = (String)it.next();
                    it.remove();
                    continue block22;
                }
                case "-tls": {
                    it.remove();
                    this.secure = true;
                    continue block22;
                }
                case "-np": {
                    if (!it.hasNext()) {
                        this.usage();
                    }
                    it.remove();
                    this.numPubs = Integer.parseInt((String)it.next());
                    it.remove();
                    continue block22;
                }
                case "-ns": {
                    if (!it.hasNext()) {
                        this.usage();
                    }
                    it.remove();
                    this.numSubs = Integer.parseInt((String)it.next());
                    it.remove();
                    continue block22;
                }
                case "-n": {
                    if (!it.hasNext()) {
                        this.usage();
                    }
                    it.remove();
                    this.numMsgs = Integer.parseInt((String)it.next());
                    it.remove();
                    continue block22;
                }
                case "-ms": {
                    if (!it.hasNext()) {
                        this.usage();
                    }
                    it.remove();
                    this.size = Integer.parseInt((String)it.next());
                    it.remove();
                    continue block22;
                }
                case "-csv": {
                    it.remove();
                    this.csv = true;
                    continue block22;
                }
                case "-stats": {
                    it.remove();
                    this.stats = true;
                    continue block22;
                }
                case "-conscrypt": {
                    it.remove();
                    this.conscrypt = true;
                    continue block22;
                }
            }
            System.err.printf("Unexpected token: '%s'\n", arg);
            this.usage();
        }
    }

    private static Properties loadProperties(String configPath) {
        try {
            FileInputStream is = new FileInputStream(configPath);
            Properties prop = new Properties();
            prop.load(is);
            return prop;
        }
        catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    public static void main(String[] args) {
        Properties properties = null;
        try {
            if (args.length == 1 && args[0].endsWith(".properties")) {
                properties = NatsBench2.loadProperties(args[0]);
                new NatsBench2(properties).start();
            } else {
                new NatsBench2(args).start();
            }
        }
        catch (Exception e) {
            System.err.printf("Exiting due to exception [%s]\n", e.getMessage());
            e.printStackTrace();
            System.exit(-1);
        }
        System.exit(0);
    }

    public long getSuccPubMsgCount() {
        return this.succPubMsgCount;
    }

    public long getSuccResMsgCount() {
        return this.succSubMsgCount;
    }

    class PubWorker
    extends Worker {
        private AtomicLong start;

        PubWorker(Future<Boolean> starter, Phaser finisher, int numMsgs, int size, boolean secure) {
            super(starter, finisher, numMsgs, size, secure);
            this.start = new AtomicLong();
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            try {
                Options opts = NatsBench2.this.prepareOptions(this.secure);
                Connection nc = Nats.connect(opts);
                byte[] payload = null;
                if (this.size > 0) {
                    payload = new byte[this.size];
                }
                this.starter.get(60L, TimeUnit.SECONDS);
                this.start.set(System.nanoTime());
                for (int i = 0; i < this.numMsgs; ++i) {
                    boolean success = false;
                    for (int idx = 5; idx < 10 && !success; ++idx) {
                        try {
                            nc.publish(NatsBench2.this.subject, payload);
                            success = true;
                            continue;
                        }
                        catch (IllegalStateException ex) {
                            if (ex.getMessage().contains("Output queue is full ")) {
                                success = false;
                                Thread.sleep(1000L);
                                continue;
                            }
                            throw ex;
                        }
                    }
                    NatsBench2.this.sent.incrementAndGet();
                }
                nc.flush(Duration.ofSeconds(15L));
                long end = System.nanoTime();
                NatsBench2.this.bench.addPubSample(new Sample(this.numMsgs, this.size, this.start.get(), end, nc.getStatistics()));
                if (NatsBench2.this.stats) {
                    System.out.println(nc.getStatistics());
                }
                nc.close();
            }
            catch (Exception e) {
                NatsBench2.this.errorQueue.add(e);
            }
            finally {
                this.finisher.arrive();
            }
        }
    }

    class SyncSubWorker
    extends Worker {
        final Phaser subReady;
        private AtomicLong start;

        SyncSubWorker(Future<Boolean> starter, Phaser subReady, Phaser finisher, int numMsgs, int size, boolean secure) {
            super(starter, finisher, numMsgs, size, secure);
            this.subReady = subReady;
            this.start = new AtomicLong();
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            try {
                Options opts = NatsBench2.this.prepareOptions(this.secure);
                Connection nc = Nats.connect(opts);
                Subscription sub = nc.subscribe(NatsBench2.this.subject);
                nc.flush(null);
                this.subReady.arrive();
                this.starter.get(60L, TimeUnit.SECONDS);
                Duration timeout = Duration.ofMillis(1000L);
                int receivedCount = 0;
                while (receivedCount < this.numMsgs) {
                    if (sub.nextMessage(timeout) == null) continue;
                    if (receivedCount == 0) {
                        this.start.set(System.nanoTime());
                    }
                    NatsBench2.this.received.incrementAndGet();
                    ++receivedCount;
                }
                long end = System.nanoTime();
                if (this.start.get() <= 0L) {
                    throw new Exception("start time was never set");
                }
                NatsBench2.this.bench.addSubSample(new Sample(this.numMsgs, this.size, this.start.get(), end, nc.getStatistics()));
                if (NatsBench2.this.stats) {
                    System.out.println(nc.getStatistics());
                }
                sub.unsubscribe();
                NatsBench2.this.succSubMsgCount = receivedCount;
                nc.close();
            }
            catch (Exception e) {
                NatsBench2.this.errorQueue.add(e);
            }
            finally {
                this.subReady.arrive();
                this.finisher.arrive();
            }
        }
    }

    class Worker
    implements Runnable {
        final Future<Boolean> starter;
        final Phaser finisher;
        final int numMsgs;
        final int size;
        final boolean secure;

        Worker(Future<Boolean> starter, Phaser finisher, int numMsgs, int size, boolean secure) {
            this.starter = starter;
            this.finisher = finisher;
            this.numMsgs = numMsgs;
            this.size = size;
            this.secure = secure;
        }

        @Override
        public void run() {
        }
    }
}

