/*
 * Decompiled with CFR 0.152.
 */
package reactor.test.subscriber;

import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.atomic.AtomicReference;
import org.jspecify.annotations.Nullable;
import org.reactivestreams.Subscription;
import reactor.core.Fuseable;
import reactor.core.Scannable;
import reactor.core.publisher.Operators;
import reactor.core.publisher.Signal;
import reactor.test.subscriber.TestSubscriber;
import reactor.test.subscriber.TestSubscriberBuilder;
import reactor.util.context.Context;

class DefaultTestSubscriber<T>
implements TestSubscriber<T> {
    final long initialRequest;
    final Context context;
    final TestSubscriber.FusionRequirement fusionRequirement;
    final int requestedFusionMode;
    final int expectedFusionMode;
    Subscription s;
    // Could not load outer class - annotation placement on inner may be incorrect
    @Nullable Fuseable.QueueSubscription<T> qs;
    int fusionMode = -1;
    final AtomicBoolean cancelled;
    final List<T> receivedOnNext;
    final List<T> receivedPostCancellation;
    final List<Signal<T>> protocolErrors;
    final CountDownLatch doneLatch;
    final AtomicReference<@Nullable AssertionError> subscriptionFailure;
    volatile @Nullable Signal<T> terminalSignal;
    volatile int state;
    static final AtomicIntegerFieldUpdater<DefaultTestSubscriber> STATE = AtomicIntegerFieldUpdater.newUpdater(DefaultTestSubscriber.class, "state");
    volatile long requestedTotal;
    static final AtomicLongFieldUpdater<DefaultTestSubscriber> REQUESTED_TOTAL = AtomicLongFieldUpdater.newUpdater(DefaultTestSubscriber.class, "requestedTotal");
    volatile long requestedPreSubscription;
    static final AtomicLongFieldUpdater<DefaultTestSubscriber> REQUESTED_PRE_SUBSCRIPTION = AtomicLongFieldUpdater.newUpdater(DefaultTestSubscriber.class, "requestedPreSubscription");
    static final int MASK_TERMINATED = 8;
    static final int MASK_TERMINATING = 4;
    static final int MASK_ON_NEXT = 1;

    DefaultTestSubscriber(TestSubscriberBuilder options) {
        this.initialRequest = options.initialRequest;
        this.context = options.context;
        this.fusionRequirement = options.fusionRequirement;
        this.requestedFusionMode = options.requestedFusionMode;
        this.expectedFusionMode = options.expectedFusionMode;
        this.cancelled = new AtomicBoolean();
        this.receivedOnNext = new CopyOnWriteArrayList<T>();
        this.receivedPostCancellation = new CopyOnWriteArrayList<T>();
        this.protocolErrors = new CopyOnWriteArrayList<Signal<T>>();
        this.state = 0;
        this.doneLatch = new CountDownLatch(1);
        this.subscriptionFailure = new AtomicReference();
        REQUESTED_PRE_SUBSCRIPTION.lazySet(this, this.initialRequest);
    }

    public Context currentContext() {
        return this.context;
    }

    void internalCancel() {
        Subscription s = this.s;
        if (this.cancelled.compareAndSet(false, true) && s != null) {
            s.cancel();
            this.safeClearQueue(s);
        }
    }

    void safeClearQueue(@Nullable Subscription s) {
        if (s instanceof Fuseable.QueueSubscription) {
            ((Fuseable.QueueSubscription)s).clear();
        }
    }

    void subscriptionFail(String message) {
        if (this.subscriptionFailure.compareAndSet(null, new AssertionError((Object)message))) {
            this.internalCancel();
            this.notifyDone();
        }
    }

    final void notifyDone() {
        this.doneLatch.countDown();
    }

    public void onSubscribe(Subscription s) {
        block14: {
            long rPre;
            if (this.cancelled.get()) {
                s.cancel();
                this.safeClearQueue(s);
                return;
            }
            if (!Operators.validate((Subscription)this.s, (Subscription)s)) {
                this.safeClearQueue(s);
                this.subscriptionFail("TestSubscriber must not be reused, but Subscription has already been set.");
                return;
            }
            this.s = s;
            this.fusionMode = -1;
            if (s instanceof Fuseable.QueueSubscription) {
                Fuseable.QueueSubscription converted;
                if (this.fusionRequirement == TestSubscriber.FusionRequirement.NOT_FUSEABLE) {
                    this.subscriptionFail("TestSubscriber configured to reject QueueSubscription, got " + s);
                    return;
                }
                this.qs = converted = (Fuseable.QueueSubscription)s;
                int negotiatedMode = this.qs.requestFusion(this.requestedFusionMode);
                if (this.expectedFusionMode != negotiatedMode && this.expectedFusionMode != 3) {
                    this.subscriptionFail("TestSubscriber negotiated fusion mode inconsistent, expected " + Fuseable.fusionModeName((int)this.expectedFusionMode) + " got " + Fuseable.fusionModeName((int)negotiatedMode));
                    return;
                }
                this.fusionMode = negotiatedMode;
                if (negotiatedMode == 1) {
                    while (true) {
                        if (this.cancelled.get()) {
                            this.safeClearQueue((Subscription)this.qs);
                            break block14;
                        }
                        Object v = this.qs.poll();
                        if (v == null) {
                            this.onComplete();
                            break block14;
                        }
                        this.onNext(v);
                    }
                }
                long rPre2 = REQUESTED_PRE_SUBSCRIPTION.getAndSet(this, -1L);
                if (rPre2 > 0L) {
                    this.upstreamRequest(s, rPre2);
                }
            } else if (this.fusionRequirement == TestSubscriber.FusionRequirement.FUSEABLE) {
                this.subscriptionFail("TestSubscriber configured to require QueueSubscription, got " + s);
            } else if (this.initialRequest > 0L && (rPre = REQUESTED_PRE_SUBSCRIPTION.getAndSet(this, -1L)) > 0L) {
                this.upstreamRequest(s, rPre);
            }
        }
    }

    public void onNext(@Nullable T t) {
        int previousState = this.markOnNextStart();
        boolean wasTerminated = DefaultTestSubscriber.isMarkedTerminated(previousState);
        boolean wasOnNext = DefaultTestSubscriber.isMarkedOnNext(previousState);
        if (wasTerminated || wasOnNext) {
            if (t != null) {
                this.protocolErrors.add(Signal.next(t));
            } else if (wasTerminated) {
                this.protocolErrors.add(Signal.error((Throwable)((Object)new AssertionError((Object)"onNext(null) received despite SYNC fusion (which has already completed)"))));
            } else {
                this.protocolErrors.add(Signal.error((Throwable)((Object)new AssertionError((Object)"onNext(null) received despite SYNC fusion (with concurrent onNext)"))));
            }
            return;
        }
        if (t == null) {
            if (this.fusionMode == 2) {
                this.drainAsync(false);
                return;
            }
            this.subscriptionFail("onNext(null) received while ASYNC fusion not established");
        }
        this.receivedOnNext.add(t);
        if (this.cancelled.get()) {
            this.receivedPostCancellation.add(t);
        }
        this.checkTerminatedAfterOnNext();
    }

    public void onComplete() {
        Signal sig = Signal.complete();
        int previousState = this.markTerminated();
        if (DefaultTestSubscriber.isMarkedTerminated(previousState) || DefaultTestSubscriber.isMarkedTerminating(previousState)) {
            this.protocolErrors.add(sig);
            return;
        }
        if (DefaultTestSubscriber.isMarkedOnNext(previousState)) {
            this.protocolErrors.add(sig);
            this.terminalSignal = sig;
            return;
        }
        this.terminalSignal = sig;
        if (this.fusionMode == 2) {
            this.drainAsync(true);
            return;
        }
        this.notifyDone();
    }

    public void onError(Throwable t) {
        Signal sig = Signal.error((Throwable)t);
        int previousState = this.markTerminated();
        if (DefaultTestSubscriber.isMarkedTerminated(previousState) || DefaultTestSubscriber.isMarkedTerminating(previousState)) {
            this.protocolErrors.add(sig);
            return;
        }
        if (DefaultTestSubscriber.isMarkedOnNext(previousState)) {
            this.protocolErrors.add(sig);
            this.terminalSignal = sig;
            return;
        }
        this.terminalSignal = sig;
        if (this.fusionMode == 2) {
            this.drainAsync(true);
            return;
        }
        this.notifyDone();
    }

    void drainAsync(boolean isTerminal) {
        assert (this.qs != null);
        int previousState = this.state;
        if (isTerminal && DefaultTestSubscriber.isMarkedOnNext(previousState)) {
            return;
        }
        if (DefaultTestSubscriber.isMarkedTerminated(previousState)) {
            this.safeClearQueue((Subscription)this.qs);
            this.notifyDone();
            return;
        }
        while (true) {
            if (this.cancelled.get()) {
                this.safeClearQueue((Subscription)this.qs);
                this.notifyDone();
                return;
            }
            long r = REQUESTED_TOTAL.get(this);
            if (r != Long.MAX_VALUE && r - (long)this.receivedOnNext.size() < 1L) {
                if (this.checkTerminatedAfterOnNext()) {
                    this.safeClearQueue((Subscription)this.qs);
                }
                return;
            }
            Object t = this.qs.poll();
            if (t == null) {
                if (this.checkTerminatedAfterOnNext()) {
                    this.safeClearQueue((Subscription)this.qs);
                }
                return;
            }
            this.receivedOnNext.add(t);
        }
    }

    public @Nullable Object scanUnsafe(Scannable.Attr key) {
        if (key == Scannable.Attr.TERMINATED) {
            return this.terminalSignal != null || this.subscriptionFailure.get() != null;
        }
        if (key == Scannable.Attr.CANCELLED) {
            return this.cancelled.get();
        }
        if (key == Scannable.Attr.ERROR) {
            Throwable subFailure = (Throwable)((Object)this.subscriptionFailure.get());
            Signal<T> sig = this.terminalSignal;
            if (sig != null && sig.getThrowable() != null) {
                return sig.getThrowable();
            }
            return subFailure;
        }
        if (key == Scannable.Attr.PARENT) {
            return this.s;
        }
        if (key == Scannable.Attr.RUN_STYLE) {
            return Scannable.Attr.RunStyle.SYNC;
        }
        if (key == Scannable.Attr.REQUESTED_FROM_DOWNSTREAM) {
            return REQUESTED_TOTAL.get(this);
        }
        return null;
    }

    void upstreamRequest(Subscription s, long n) {
        long prev = Operators.addCap(REQUESTED_TOTAL, (Object)this, (long)n);
        if (prev != Long.MAX_VALUE) {
            s.request(n);
        }
    }

    boolean checkTerminatedAfterOnNext() {
        int donePreviousState = this.markOnNextDone();
        if (DefaultTestSubscriber.isMarkedTerminating(donePreviousState)) {
            this.notifyDone();
            return true;
        }
        return false;
    }

    static boolean isMarkedTerminated(int state) {
        return (state & 8) == 8;
    }

    static boolean isMarkedOnNext(int state) {
        return (state & 1) == 1;
    }

    static boolean isMarkedTerminating(int state) {
        return (state & 4) == 4 && (state & 8) != 8;
    }

    int markTerminated() {
        int newState;
        int state;
        do {
            if (!DefaultTestSubscriber.isMarkedTerminated(state = this.state) && !DefaultTestSubscriber.isMarkedTerminating(state)) continue;
            return state;
        } while (!STATE.compareAndSet(this, state, newState = DefaultTestSubscriber.isMarkedOnNext(state) ? state | 4 : 8));
        return state;
    }

    int markOnNextStart() {
        int state;
        do {
            if ((state = this.state) == 0) continue;
            return state;
        } while (!STATE.compareAndSet(this, state, 1));
        return state;
    }

    int markOnNextDone() {
        int nextState;
        int state;
        while (!STATE.compareAndSet(this, state = this.state, nextState = state & 0xFFFFFFFE)) {
        }
        return state;
    }

    @Override
    public void cancel() {
        if (this.cancelled.compareAndSet(false, true)) {
            if (this.s != null) {
                this.s.cancel();
            }
            if (this.requestedFusionMode == 2) {
                int st = this.state;
                Fuseable.QueueSubscription<T> q = this.qs;
                if (!DefaultTestSubscriber.isMarkedOnNext(st) && q != null) {
                    q.clear();
                }
            }
            this.notifyDone();
        }
    }

    @Override
    public void request(long n) {
        if (this.s == null) {
            long newReq;
            long prevReq;
            do {
                if ((prevReq = REQUESTED_PRE_SUBSCRIPTION.get(this)) != -1L) continue;
                this.request(n);
                return;
            } while (!REQUESTED_PRE_SUBSCRIPTION.compareAndSet(this, prevReq, newReq = Operators.addCap((long)prevReq, (long)n)));
            return;
        }
        if (Operators.validate((long)n)) {
            if (this.fusionMode == 1) {
                this.internalCancel();
                throw new IllegalStateException("Request is short circuited in SYNC fusion mode, and should not be explicitly used");
            }
            this.upstreamRequest(this.s, n);
        }
    }

    void checkSubscriptionFailure() {
        AssertionError subscriptionFailure = this.subscriptionFailure.get();
        if (subscriptionFailure != null) {
            throw subscriptionFailure;
        }
    }

    @Override
    public boolean isTerminatedOrCancelled() {
        this.checkSubscriptionFailure();
        return this.doneLatch.getCount() == 0L;
    }

    @Override
    public boolean isTerminated() {
        this.checkSubscriptionFailure();
        return this.terminalSignal != null;
    }

    @Override
    public boolean isTerminatedComplete() {
        this.checkSubscriptionFailure();
        Signal<T> ts = this.terminalSignal;
        return ts != null && ts.isOnComplete();
    }

    @Override
    public boolean isTerminatedError() {
        this.checkSubscriptionFailure();
        Signal<T> ts = this.terminalSignal;
        return ts != null && ts.isOnError();
    }

    @Override
    public boolean isCancelled() {
        this.checkSubscriptionFailure();
        return this.cancelled.get();
    }

    @Override
    public @Nullable Signal<T> getTerminalSignal() {
        this.checkSubscriptionFailure();
        return this.terminalSignal;
    }

    @Override
    public Signal<T> expectTerminalSignal() {
        this.checkSubscriptionFailure();
        Signal<T> sig = this.terminalSignal;
        if (sig == null || !sig.isOnError() && !sig.isOnComplete()) {
            this.cancel();
            throw new AssertionError((Object)"Expected subscriber to be terminated, but it has not been terminated yet: cancelling subscription.");
        }
        return sig;
    }

    @Override
    public Throwable expectTerminalError() {
        this.checkSubscriptionFailure();
        Signal<T> sig = this.terminalSignal;
        if (sig == null) {
            this.cancel();
            throw new AssertionError((Object)"Expected subscriber to have errored, but it has not been terminated yet.");
        }
        if (sig.isOnComplete()) {
            throw new AssertionError((Object)"Expected subscriber to have errored, but it has completed instead.");
        }
        Throwable terminal = sig.getThrowable();
        if (terminal == null) {
            this.cancel();
            throw new AssertionError((Object)("Expected subscriber to have errored, got unexpected terminal signal <" + sig + ">."));
        }
        return terminal;
    }

    @Override
    public List<T> getReceivedOnNext() {
        this.checkSubscriptionFailure();
        return new ArrayList<T>(this.receivedOnNext);
    }

    @Override
    public List<T> getReceivedOnNextAfterCancellation() {
        this.checkSubscriptionFailure();
        return new ArrayList<T>(this.receivedPostCancellation);
    }

    @Override
    public List<Signal<T>> getProtocolErrors() {
        this.checkSubscriptionFailure();
        return new ArrayList<Signal<T>>(this.protocolErrors);
    }

    @Override
    public int getFusionMode() {
        this.checkSubscriptionFailure();
        return this.fusionMode;
    }

    @Override
    public void block() {
        try {
            this.doneLatch.await();
            this.checkSubscriptionFailure();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new AssertionError("Block() interrupted", e);
        }
    }

    @Override
    public void block(Duration timeout) {
        long timeoutMs = timeout.toMillis();
        try {
            boolean done = this.doneLatch.await(timeoutMs, TimeUnit.MILLISECONDS);
            this.checkSubscriptionFailure();
            if (!done) {
                throw new AssertionError((Object)("TestSubscriber timed out, not terminated after " + timeout + " (" + timeoutMs + "ms)"));
            }
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new AssertionError("Block(" + timeout + ") interrupted", e);
        }
    }
}

