/*
 * Decompiled with CFR 0.152.
 */
package io.smallrye.reactive.messaging.pulsar;

import io.opentelemetry.api.OpenTelemetry;
import io.smallrye.mutiny.Multi;
import io.smallrye.reactive.messaging.health.HealthReport;
import io.smallrye.reactive.messaging.pulsar.ConfigResolver;
import io.smallrye.reactive.messaging.pulsar.PulsarAckHandler;
import io.smallrye.reactive.messaging.pulsar.PulsarConnectorIncomingConfiguration;
import io.smallrye.reactive.messaging.pulsar.PulsarFailureHandler;
import io.smallrye.reactive.messaging.pulsar.PulsarIncomingBatchMessage;
import io.smallrye.reactive.messaging.pulsar.PulsarIncomingMessage;
import io.smallrye.reactive.messaging.pulsar.PulsarIncomingMessageMetadata;
import io.smallrye.reactive.messaging.pulsar.PulsarMessage;
import io.smallrye.reactive.messaging.pulsar.SchemaResolver;
import io.smallrye.reactive.messaging.pulsar.i18n.PulsarLogging;
import io.smallrye.reactive.messaging.pulsar.tracing.PulsarOpenTelemetryInstrumenter;
import io.smallrye.reactive.messaging.pulsar.tracing.PulsarTrace;
import io.vertx.core.impl.ContextInternal;
import io.vertx.core.impl.VertxInternal;
import io.vertx.mutiny.core.Vertx;
import jakarta.enterprise.inject.Instance;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.Flow;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerBuilder;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import org.apache.pulsar.client.impl.schema.AvroSchema;
import org.apache.pulsar.client.impl.schema.generic.GenericAvroSchema;
import org.apache.pulsar.client.impl.schema.generic.GenericJsonSchema;
import org.apache.pulsar.client.impl.schema.generic.GenericProtobufNativeSchema;

public class PulsarIncomingChannel<T> {
    private final Consumer<T> consumer;
    private final Flow.Publisher<? extends org.eclipse.microprofile.reactive.messaging.Message<?>> publisher;
    private final String channel;
    private final PulsarAckHandler ackHandler;
    private final PulsarFailureHandler failureHandler;
    private final ContextInternal context;
    private final AtomicBoolean closed = new AtomicBoolean(false);
    private final List<Throwable> failures = new ArrayList<Throwable>();
    private final boolean healthEnabled;
    private final boolean tracingEnabled;
    private final PulsarOpenTelemetryInstrumenter instrumenter;

    public PulsarIncomingChannel(PulsarClient client, Vertx vertx, Schema<T> schema, PulsarAckHandler.Factory ackHandlerFactory, PulsarFailureHandler.Factory failureHandlerFactory, PulsarConnectorIncomingConfiguration ic, ConfigResolver configResolver, Instance<OpenTelemetry> openTelemetryInstance) throws PulsarClientException {
        this.channel = ic.getChannel();
        this.healthEnabled = ic.getHealthEnabled();
        this.tracingEnabled = ic.getTracingEnabled();
        ConsumerConfigurationData<?> conf = configResolver.getConsumerConf(ic);
        if (conf.getSubscriptionName() == null) {
            String s = UUID.randomUUID().toString();
            PulsarLogging.log.noSubscriptionName(s);
            conf.setSubscriptionName(s);
        }
        if (!PulsarIncomingChannel.hasTopicConfig(conf)) {
            conf.setTopicNames(Arrays.stream(ic.getTopic().orElse(this.channel).split(",")).collect(Collectors.toSet()));
        }
        if (conf.getConsumerName() == null) {
            conf.setConsumerName(this.channel);
        }
        ConsumerBuilder builder = configResolver.configure(client.newConsumer(schema), ic, conf);
        this.consumer = builder.subscribe();
        PulsarLogging.log.createdConsumerWithConfig(this.channel, SchemaResolver.getSchemaName(schema), conf);
        this.ackHandler = ackHandlerFactory.create(this.consumer, ic);
        this.failureHandler = failureHandlerFactory.create(this.consumer, ic, this::reportFailure);
        this.context = ((VertxInternal)vertx.getDelegate()).createEventLoopContext();
        if (!ic.getBatchReceive().booleanValue()) {
            Multi receiveMulti = Multi.createBy().repeating().completionStage(() -> this.consumer.receiveAsync()).until(m -> this.closed.get()).plug(msgMulti -> {
                if (PulsarIncomingChannel.schemaRequiresBlockingFetch(schema)) {
                    return msgMulti.onItem().invoke(Message::getValue);
                }
                return msgMulti;
            }).emitOn(command -> this.context.runOnContext(event -> command.run())).onItem().transform(message -> new PulsarIncomingMessage(message, this.ackHandler, this.failureHandler)).onFailure(throwable -> this.isEndOfStream(client, (Throwable)throwable)).recoverWithCompletion().onFailure().invoke(failure -> {
                PulsarLogging.log.failedToReceiveFromConsumer(this.channel, (Throwable)failure);
                this.reportFailure((Throwable)failure, false);
            });
            if (this.tracingEnabled) {
                receiveMulti = receiveMulti.onItem().invoke(this::incomingTrace);
            }
            this.publisher = receiveMulti;
        } else {
            Multi batchReceiveMulti = Multi.createBy().repeating().completionStage(() -> this.consumer.batchReceiveAsync()).until(m -> this.closed.get()).filter(m -> m.size() > 0).plug(msgMulti -> {
                if (PulsarIncomingChannel.schemaRequiresBlockingFetch(schema)) {
                    return msgMulti.onItem().invoke(msg -> msg.forEach(Message::getValue));
                }
                return msgMulti;
            }).emitOn(command -> this.context.runOnContext(event -> command.run())).onItem().transform(m -> new PulsarIncomingBatchMessage(m, this.ackHandler, this.failureHandler)).onFailure(throwable -> this.isEndOfStream(client, (Throwable)throwable)).recoverWithCompletion().onFailure().invoke(failure -> {
                PulsarLogging.log.failedToReceiveFromConsumer(this.channel, (Throwable)failure);
                this.reportFailure((Throwable)failure, false);
            });
            if (this.tracingEnabled) {
                batchReceiveMulti = batchReceiveMulti.onItem().invoke(this::incomingBatchTrace);
            }
            this.publisher = batchReceiveMulti;
        }
        this.instrumenter = this.tracingEnabled ? PulsarOpenTelemetryInstrumenter.createForSource(openTelemetryInstance) : null;
    }

    private static <T> boolean schemaRequiresBlockingFetch(Schema<T> schema) {
        return schema.requireFetchingSchemaInfo() || schema instanceof AvroSchema || schema instanceof GenericAvroSchema || schema instanceof GenericJsonSchema || schema instanceof GenericProtobufNativeSchema;
    }

    public void incomingTrace(PulsarMessage<T> pulsarMessage) {
        PulsarIncomingMessageMetadata metadata = (PulsarIncomingMessageMetadata)pulsarMessage.getMetadata(PulsarIncomingMessageMetadata.class).get();
        this.instrumenter.traceIncoming((org.eclipse.microprofile.reactive.messaging.Message<?>)pulsarMessage, new PulsarTrace.Builder().withConsumerName(this.consumer.getConsumerName()).withMessage(metadata.getMessage()).build());
    }

    public void incomingBatchTrace(PulsarIncomingBatchMessage<T> pulsarMessage) {
        for (PulsarMessage<T> message : pulsarMessage.getMessages()) {
            this.incomingTrace(message);
        }
    }

    private boolean isEndOfStream(PulsarClient client, Throwable throwable) {
        if (this.closed.get()) {
            return true;
        }
        if (this.consumer.hasReachedEndOfTopic()) {
            PulsarLogging.log.consumerReachedEndOfTopic(this.channel);
            return true;
        }
        if (client.isClosed()) {
            PulsarLogging.log.clientClosed(this.channel, throwable);
            return true;
        }
        return false;
    }

    static boolean hasTopicConfig(ConsumerConfigurationData<?> conf) {
        return conf.getTopicsPattern() != null || conf.getTopicNames() != null && !conf.getTopicNames().isEmpty();
    }

    public Flow.Publisher<? extends org.eclipse.microprofile.reactive.messaging.Message<?>> getPublisher() {
        return this.publisher;
    }

    public String getChannel() {
        return this.channel;
    }

    public Consumer<T> getConsumer() {
        return this.consumer;
    }

    public void close() {
        this.closed.set(true);
        try {
            this.consumer.close();
        }
        catch (PulsarClientException e) {
            PulsarLogging.log.unableToCloseConsumer(e);
        }
    }

    public synchronized void reportFailure(Throwable failure, boolean fatal) {
        if (this.failures.size() == 10) {
            this.failures.remove(0);
        }
        this.failures.add(failure);
        if (fatal) {
            this.close();
        }
    }

    public void isStarted(HealthReport.HealthReportBuilder builder) {
        if (this.healthEnabled) {
            builder.add(this.channel, this.consumer.isConnected());
        }
    }

    public void isReady(HealthReport.HealthReportBuilder builder) {
        this.isStarted(builder);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void isAlive(HealthReport.HealthReportBuilder builder) {
        if (this.healthEnabled) {
            ArrayList<Throwable> actualFailures;
            PulsarIncomingChannel pulsarIncomingChannel = this;
            synchronized (pulsarIncomingChannel) {
                actualFailures = new ArrayList<Throwable>(this.failures);
            }
            if (!actualFailures.isEmpty()) {
                builder.add(this.channel, false, actualFailures.stream().map(Throwable::getMessage).collect(Collectors.joining()));
            } else {
                builder.add(this.channel, true);
            }
        }
    }
}

