/*
 * Decompiled with CFR 0.152.
 */
package io.smallrye.reactive.messaging.rabbitmq.internals;

import io.opentelemetry.api.OpenTelemetry;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.helpers.Subscriptions;
import io.smallrye.mutiny.tuples.Tuple2;
import io.smallrye.reactive.messaging.OutgoingMessageMetadata;
import io.smallrye.reactive.messaging.rabbitmq.RabbitMQConnectorOutgoingConfiguration;
import io.smallrye.reactive.messaging.rabbitmq.RabbitMQMessageConverter;
import io.smallrye.reactive.messaging.rabbitmq.i18n.RabbitMQExceptions;
import io.smallrye.reactive.messaging.rabbitmq.i18n.RabbitMQLogging;
import io.smallrye.reactive.messaging.rabbitmq.internals.RabbitMQClientHelper;
import io.smallrye.reactive.messaging.rabbitmq.tracing.RabbitMQOpenTelemetryInstrumenter;
import io.vertx.mutiny.rabbitmq.RabbitMQPublisher;
import jakarta.enterprise.inject.Instance;
import java.time.Duration;
import java.util.Optional;
import java.util.concurrent.Flow;
import java.util.concurrent.atomic.AtomicReference;
import org.eclipse.microprofile.reactive.messaging.Message;

public class RabbitMQMessageSender
implements Flow.Processor<Message<?>, Message<?>>,
Flow.Subscription {
    private final Uni<RabbitMQPublisher> retrieveSender;
    private final RabbitMQConnectorOutgoingConfiguration configuration;
    private final AtomicReference<Flow.Subscription> upstream = new AtomicReference();
    private final AtomicReference<Flow.Subscriber<? super Message<?>>> downstream = new AtomicReference();
    private final String configuredExchange;
    private final boolean isTracingEnabled;
    private final long inflights;
    private final Optional<Long> defaultTtl;
    private final boolean publishConfirms;
    private final RabbitMQOpenTelemetryInstrumenter instrumenter;

    public RabbitMQMessageSender(RabbitMQConnectorOutgoingConfiguration oc, Uni<RabbitMQPublisher> retrieveSender, Instance<OpenTelemetry> openTelemetryInstance) {
        this.retrieveSender = retrieveSender;
        this.configuration = oc;
        this.configuredExchange = RabbitMQClientHelper.getExchangeName(oc);
        this.isTracingEnabled = oc.getTracingEnabled();
        this.inflights = oc.getMaxInflightMessages();
        this.defaultTtl = oc.getDefaultTtl();
        this.publishConfirms = oc.getPublishConfirms();
        if (this.inflights <= 0L) {
            throw RabbitMQExceptions.ex.illegalArgumentInvalidMaxInflightMessages();
        }
        if (this.defaultTtl.isPresent() && this.defaultTtl.get() < 0L) {
            throw RabbitMQExceptions.ex.illegalArgumentInvalidDefaultTtl();
        }
        this.instrumenter = oc.getTracingEnabled() != false ? RabbitMQOpenTelemetryInstrumenter.createForSender(openTelemetryInstance) : null;
    }

    @Override
    public void subscribe(Flow.Subscriber<? super Message<?>> subscriber) {
        if (!this.downstream.compareAndSet(null, subscriber)) {
            Subscriptions.fail(subscriber, (Throwable)RabbitMQExceptions.ex.illegalStateOnlyOneSubscriberAllowed());
        } else if (this.upstream.get() != null) {
            subscriber.onSubscribe(this);
        }
    }

    @Override
    public void onSubscribe(Flow.Subscription subscription) {
        if (this.upstream.compareAndSet(null, subscription)) {
            Flow.Subscriber<Message<?>> subscriber = this.downstream.get();
            if (subscriber != null) {
                subscriber.onSubscribe(this);
            }
        } else {
            Flow.Subscriber<Message<?>> subscriber = this.downstream.get();
            if (subscriber != null) {
                subscriber.onSubscribe((Flow.Subscription)Subscriptions.CANCELLED);
            }
        }
    }

    @Override
    public void onNext(Message<?> message) {
        if (this.isCancelled()) {
            return;
        }
        Flow.Subscriber<? super Message<?>> subscriber = this.downstream.get();
        this.retrieveSender.onItem().transformToUni(sender -> {
            try {
                return this.send((RabbitMQPublisher)sender, message, this.configuredExchange, this.configuration).onItem().transform(m -> Tuple2.of((Object)sender, (Object)m));
            }
            catch (Exception e) {
                RabbitMQLogging.log.serializationFailure(this.configuration.getChannel(), e);
                return Uni.createFrom().completionStage(message.nack((Throwable)e)).map(unused -> null);
            }
        }).subscribe().with(tuple -> {
            if (tuple != null) {
                subscriber.onNext((Message<?>)tuple.getItem2());
                if (this.inflights != Long.MAX_VALUE) {
                    this.upstream.get().request(1L);
                }
            }
        }, subscriber::onError);
    }

    @Override
    public void onError(Throwable t) {
        Flow.Subscription sub = this.upstream.getAndSet((Flow.Subscription)Subscriptions.CANCELLED);
        Flow.Subscriber<Message<?>> subscriber = this.downstream.get();
        if (sub != null && sub != Subscriptions.CANCELLED && subscriber != null) {
            subscriber.onError(t);
        }
    }

    @Override
    public void onComplete() {
        Flow.Subscription sub = this.upstream.getAndSet((Flow.Subscription)Subscriptions.CANCELLED);
        Flow.Subscriber<Message<?>> subscriber = this.downstream.get();
        if (sub != null && sub != Subscriptions.CANCELLED && subscriber != null) {
            subscriber.onComplete();
        }
    }

    @Override
    public void request(long l) {
        if (l != Long.MAX_VALUE) {
            throw RabbitMQExceptions.ex.illegalStateConsumeWithoutBackPressure();
        }
        this.upstream.get().request(this.inflights);
    }

    @Override
    public void cancel() {
        Flow.Subscription sub = this.upstream.getAndSet((Flow.Subscription)Subscriptions.CANCELLED);
        if (sub != null && sub != Subscriptions.CANCELLED) {
            sub.cancel();
        }
    }

    private Uni<Message<?>> send(RabbitMQPublisher publisher, Message<?> msg, String exchange, RabbitMQConnectorOutgoingConfiguration configuration) {
        int retryAttempts = configuration.getReconnectAttempts();
        int retryInterval = configuration.getReconnectInterval();
        String defaultRoutingKey = configuration.getDefaultRoutingKey();
        RabbitMQMessageConverter.OutgoingRabbitMQMessage outgoingRabbitMQMessage = RabbitMQMessageConverter.convert(this.instrumenter, msg, exchange, defaultRoutingKey, this.defaultTtl, this.isTracingEnabled);
        RabbitMQLogging.log.sendingMessageToExchange(exchange, outgoingRabbitMQMessage.getRoutingKey());
        Uni published = this.publishConfirms ? publisher.publishConfirm(exchange, outgoingRabbitMQMessage.getRoutingKey(), outgoingRabbitMQMessage.getProperties(), outgoingRabbitMQMessage.getBody()).onItem().invoke(deliveryTag -> OutgoingMessageMetadata.setResultOnMessage((Message)msg, (Object)deliveryTag)).replaceWithVoid() : publisher.publish(exchange, outgoingRabbitMQMessage.getRoutingKey(), outgoingRabbitMQMessage.getProperties(), outgoingRabbitMQMessage.getBody());
        return published.onFailure().retry().withBackOff(Duration.ofSeconds(1L), Duration.ofSeconds(retryInterval)).atMost((long)retryAttempts).onItemOrFailure().transformToUni((success, failure) -> {
            if (failure != null) {
                return Uni.createFrom().completionStage(msg.nack(failure));
            }
            return Uni.createFrom().completionStage(msg.ack());
        }).onItem().transform(x -> msg);
    }

    private boolean isCancelled() {
        Flow.Subscription subscription = this.upstream.get();
        return subscription == Subscriptions.CANCELLED || subscription == null;
    }
}

