/*
 * Decompiled with CFR 0.152.
 */
package io.nats.examples.autobench;

import io.nats.client.Connection;
import io.nats.client.Message;
import io.nats.client.Nats;
import io.nats.client.Options;
import io.nats.client.Subscription;
import io.nats.examples.autobench.AutoBenchmark;
import java.time.Duration;
import java.util.concurrent.CompletableFuture;

public class ReqReplyBenchmark
extends AutoBenchmark {
    public ReqReplyBenchmark(String name, long messageCount, long messageSize) {
        super(name, messageCount, messageSize);
    }

    @Override
    public void execute(Options connectOptions) throws InterruptedException {
        byte[] payload = this.createPayload();
        String subject = this.getSubject();
        CompletableFuture<Object> go = new CompletableFuture<Object>();
        CompletableFuture<Void> replyReady = new CompletableFuture<Void>();
        CompletableFuture<Void> requestReady = new CompletableFuture<Void>();
        CompletableFuture<Void> requestDone = new CompletableFuture<Void>();
        CompletableFuture<Void> replyDone = new CompletableFuture<Void>();
        Thread replyThread = new Thread(() -> {
            try {
                Connection replyConnect = Nats.connect(connectOptions);
                if (replyConnect.getStatus() != Connection.Status.CONNECTED) {
                    throw new Exception("Unable to connect");
                }
                try {
                    Subscription sub = replyConnect.subscribe(subject);
                    this.defaultFlush(replyConnect);
                    replyReady.complete(null);
                    int count = 0;
                    while ((long)count < this.getMessageCount()) {
                        Message msg = sub.nextMessage(Duration.ofSeconds(5L));
                        if (msg == null) continue;
                        replyConnect.publish(msg.getReplyTo(), payload);
                        ++count;
                    }
                    replyDone.complete(null);
                    replyConnect.flush(Duration.ofSeconds(5L));
                }
                catch (Exception exp) {
                    this.setException(exp);
                }
                finally {
                    replyConnect.close();
                }
            }
            catch (Exception ex) {
                replyReady.cancel(true);
                this.setException(ex);
            }
            finally {
                replyDone.complete(null);
            }
        }, "ReqReply Test - Reply");
        replyThread.start();
        Thread requestThread = new Thread(() -> {
            try {
                Connection requestConnect = Nats.connect(connectOptions);
                if (requestConnect.getStatus() != Connection.Status.CONNECTED) {
                    throw new Exception("Unable to connect");
                }
                try {
                    requestReady.complete(null);
                    go.get();
                    int i = 0;
                    while ((long)i < this.getMessageCount()) {
                        CompletableFuture<Message> incoming = requestConnect.request(subject, payload);
                        incoming.get();
                        ++i;
                    }
                    requestDone.complete(null);
                }
                finally {
                    requestConnect.close();
                }
            }
            catch (Exception ex) {
                requestReady.cancel(true);
                this.setException(ex);
            }
            finally {
                requestDone.complete(null);
            }
        }, "ReqReply Test - Request");
        requestThread.start();
        this.getFutureSafely(replyReady);
        this.getFutureSafely(requestReady);
        if (this.getException() != null) {
            go.complete(null);
            return;
        }
        this.startTiming();
        go.complete(null);
        this.getFutureSafely(requestDone);
        this.getFutureSafely(replyDone);
        this.endTiming();
    }
}

