/*
 * Decompiled with CFR 0.152.
 */
package com.couchbase.client.core.transaction.util;

import com.couchbase.client.core.annotation.Stability;
import com.couchbase.client.core.error.transaction.AttemptExpiredException;
import com.couchbase.client.core.error.transaction.TransactionOperationFailedException;
import com.couchbase.client.core.transaction.AccessorUtil;
import com.couchbase.client.core.transaction.CoreTransactionAttemptContext;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Objects;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;

@Stability.Internal
public class ReactiveWaitGroup {
    private final CoreTransactionAttemptContext ctx;
    private final ArrayList<Waiter> waiting = new ArrayList();
    private final boolean debugMode;

    public ReactiveWaitGroup(CoreTransactionAttemptContext ctx, boolean debugMode) {
        this.debugMode = debugMode;
        this.ctx = Objects.requireNonNull(ctx);
    }

    public synchronized int waitingCount() {
        return this.waiting.size();
    }

    public Mono<Waiter> add(String dbg) {
        return Mono.defer(() -> {
            Waiter waiter = new Waiter(dbg);
            ReactiveWaitGroup reactiveWaitGroup = this;
            synchronized (reactiveWaitGroup) {
                this.waiting.add(waiter);
                if (this.debugMode) {
                    this.ctx.logger().info(this.ctx.attemptId(), "WG: adding [{}], {} now in waiting", dbg, this.waiting.size());
                }
            }
            return Mono.just((Object)waiter);
        });
    }

    public Mono<Void> done(Waiter waiter) {
        return Mono.defer(() -> {
            Sinks.One<Void> notifier = null;
            ReactiveWaitGroup reactiveWaitGroup = this;
            synchronized (reactiveWaitGroup) {
                if (!this.waiting.remove(waiter)) {
                    if (this.debugMode) {
                        this.ctx.logger().info(this.ctx.attemptId(), "WG: wanted to remove [{}] from waiters but it's not in there", waiter.dbg);
                    }
                } else {
                    if (this.debugMode) {
                        this.ctx.logger().info(this.ctx.attemptId(), "WG: [{}] is done, {} now in waiting", waiter.dbg, this.waiting.size());
                    }
                    notifier = waiter.notifier;
                }
            }
            if (notifier != null) {
                notifier.tryEmitEmpty().orThrow();
            }
            return Mono.empty();
        });
    }

    public Mono<Void> await(Duration timeout) {
        return Mono.defer(() -> {
            ArrayList<Waiter> waiters;
            ReactiveWaitGroup reactiveWaitGroup = this;
            synchronized (reactiveWaitGroup) {
                waiters = new ArrayList<Waiter>(this.waiting);
            }
            return Flux.merge((Iterable)waiters.stream().map(v -> v.notifier.asMono()).collect(Collectors.toList())).timeout(timeout).publishOn(this.ctx.scheduler()).onErrorResume(err -> {
                if (err instanceof TimeoutException) {
                    String msg = String.format("Attempt expired while waiting for %d - %s", waiters.size(), waiters.stream().map(v -> v.dbg).collect(Collectors.joining(",")));
                    if (this.debugMode) {
                        this.ctx.logger().info(this.ctx.attemptId(), msg);
                    }
                    return Mono.error((Throwable)AccessorUtil.operationFailed(this.ctx, TransactionOperationFailedException.Builder.createError().raiseException(TransactionOperationFailedException.FinalErrorToRaise.TRANSACTION_EXPIRED).doNotRollbackAttempt().cause(new AttemptExpiredException(msg, (Throwable)err)).build()));
                }
                return Mono.error((Throwable)err);
            }).then();
        });
    }

    public static class Waiter {
        public final Sinks.One<Void> notifier = Sinks.one();
        public final String dbg;

        public Waiter(String dbg) {
            this.dbg = Objects.requireNonNull(dbg);
        }
    }
}

