/*
 * Decompiled with CFR 0.152.
 */
package io.smallrye.reactive.messaging.pulsar.fault;

import io.smallrye.common.annotation.Identifier;
import io.smallrye.mutiny.Uni;
import io.smallrye.reactive.messaging.pulsar.PulsarConnectorIncomingConfiguration;
import io.smallrye.reactive.messaging.pulsar.PulsarFailureHandler;
import io.smallrye.reactive.messaging.pulsar.PulsarIncomingMessage;
import io.smallrye.reactive.messaging.pulsar.PulsarReconsumeLaterMetadata;
import io.smallrye.reactive.messaging.pulsar.i18n.PulsarLogging;
import jakarta.enterprise.context.ApplicationScoped;
import java.time.Duration;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import org.apache.pulsar.client.api.Consumer;
import org.eclipse.microprofile.reactive.messaging.Metadata;

public class PulsarReconsumeLater
implements PulsarFailureHandler {
    public static final String STRATEGY_NAME = "reconsume-later";
    private final Consumer<?> consumer;
    private final String channel;
    private final Duration defaultDelay;

    public PulsarReconsumeLater(Consumer<?> consumer, String channel, Duration defaultDelay) {
        this.consumer = consumer;
        this.channel = channel;
        this.defaultDelay = defaultDelay;
    }

    @Override
    public Uni<Void> handle(PulsarIncomingMessage<?> message, Throwable reason, Metadata metadata) {
        Optional<Object> reconsumeLater = Optional.ofNullable(metadata).flatMap(m -> m.get(PulsarReconsumeLaterMetadata.class));
        Duration delay = reconsumeLater.map(PulsarReconsumeLaterMetadata::getDelay).orElse(this.defaultDelay);
        Map customProperties = reconsumeLater.map(PulsarReconsumeLaterMetadata::getCustomProperties).orElse(null);
        PulsarLogging.log.messageFailureDelayed(this.channel, delay.toSeconds(), reason.getMessage());
        PulsarLogging.log.messageFailureFullCause(reason);
        return Uni.createFrom().completionStage(() -> this.consumer.reconsumeLaterAsync(message.unwrap(), customProperties, delay.toSeconds(), TimeUnit.SECONDS)).emitOn(arg_0 -> message.runOnMessageContext(arg_0)).onItem().transformToUni(unused -> Uni.createFrom().completionStage(message.ack())).onFailure().recoverWithUni(t -> Uni.createFrom().completionStage(message.nack((Throwable)t)));
    }

    @ApplicationScoped
    @Identifier(value="reconsume-later")
    public static class Factory
    implements PulsarFailureHandler.Factory {
        @Override
        public PulsarFailureHandler create(Consumer<?> consumer, PulsarConnectorIncomingConfiguration config, BiConsumer<Throwable, Boolean> reportFailure) {
            return new PulsarReconsumeLater(consumer, config.getChannel(), Duration.ofSeconds(config.getReconsumeLaterDelay()));
        }
    }
}

