/*
 * Decompiled with CFR 0.152.
 */
package io.smallrye.reactive.messaging.providers.helpers;

import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.helpers.queues.Queues;
import io.smallrye.mutiny.operators.multi.processors.UnicastProcessor;
import io.smallrye.reactive.messaging.providers.i18n.ProviderLogging;
import java.time.Duration;
import java.util.Queue;
import java.util.concurrent.Executor;
import java.util.concurrent.Flow;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;

public class PausablePollingStream<P, T> {
    private static final int STATE_NEW = 0;
    private static final int STATE_POLLING = 1;
    private static final int STATE_PAUSED = 2;
    private static final int STATE_CANCELLED = 3;
    private final AtomicInteger state = new AtomicInteger(0);
    private final Queue<T> queue;
    private final ScheduledExecutorService pollerExecutor;
    private final String channel;
    private final int maxQueueSize;
    private final Uni<P> pollUni;
    private final UnicastProcessor<T> processor;
    private final Multi<T> stream;
    private final int halfMaxQueueSize;
    private final boolean pauseResumeEnabled;

    public PausablePollingStream(String channel, Uni<P> pollUni, BiConsumer<P, Flow.Processor<T, T>> emitFunction, ScheduledExecutorService pollerExecutor, int maxQueueSize, boolean pauseResumeEnabled) {
        this.channel = channel;
        this.maxQueueSize = maxQueueSize;
        this.halfMaxQueueSize = maxQueueSize / 2;
        this.pollerExecutor = pollerExecutor;
        this.pauseResumeEnabled = pauseResumeEnabled;
        this.queue = Queues.createSpscUnboundedArrayQueue((int)maxQueueSize);
        this.processor = UnicastProcessor.create(this.queue, null);
        this.pollUni = Uni.createFrom().deferred(() -> {
            if (this.state.get() != 1) {
                return Uni.createFrom().nullItem();
            }
            return pollUni.onItem().invoke(p -> emitFunction.accept((P)p, (Flow.Processor<T, T>)this.processor));
        });
        this.stream = this.processor.onRequest().invoke(n -> {
            if (this.state.compareAndSet(0, 1)) {
                this.poll();
            }
        });
    }

    public Multi<T> getStream() {
        return this.stream;
    }

    private void poll() {
        int state = this.state.get();
        if (state == 3 || state == 0) {
            return;
        }
        if (this.pauseResumeEnabled) {
            this.pauseResume();
        }
        this.pollUni.subscribe().with(messages -> {
            if (messages == null) {
                this.executeWithDelay(this::poll, Duration.ofMillis(2L)).subscribe().with(this::emptyConsumer, this::report);
            } else {
                this.runOnRequestThread(this::poll).subscribe().with(this::emptyConsumer, this::report);
            }
        }, this::report);
    }

    private <I> void emptyConsumer(I ignored) {
    }

    private void report(Throwable fail) {
        int state;
        while ((state = this.state.get()) != 3) {
            if (!this.state.compareAndSet(state, 3)) continue;
            this.processor.onError(fail);
            break;
        }
    }

    private Uni<Void> runOnRequestThread(Runnable action) {
        return Uni.createFrom().voidItem().invoke(action).runSubscriptionOn((Executor)this.pollerExecutor);
    }

    private Uni<Void> executeWithDelay(Runnable action, Duration delay) {
        return Uni.createFrom().emitter(e -> this.pollerExecutor.schedule(() -> {
            try {
                action.run();
            }
            catch (Exception ex) {
                e.fail((Throwable)ex);
                return;
            }
            e.complete(null);
        }, delay.toMillis(), TimeUnit.MILLISECONDS));
    }

    private void pauseResume() {
        int size = this.queue.size();
        if (size >= this.maxQueueSize && this.state.compareAndSet(1, 2)) {
            ProviderLogging.log.pausingRequestingMessages(this.channel, size, this.maxQueueSize);
        } else if (size <= this.halfMaxQueueSize && this.state.compareAndSet(2, 1)) {
            ProviderLogging.log.resumingRequestingMessages(this.channel, size, this.halfMaxQueueSize);
        }
    }
}

