/*
 * Decompiled with CFR 0.152.
 */
package io.smallrye.reactive.messaging.pulsar;

import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import io.smallrye.reactive.messaging.providers.MetadataInjectableMessage;
import io.smallrye.reactive.messaging.providers.locals.ContextAwareMessage;
import io.smallrye.reactive.messaging.pulsar.PulsarAckHandler;
import io.smallrye.reactive.messaging.pulsar.PulsarBatchMessage;
import io.smallrye.reactive.messaging.pulsar.PulsarFailureHandler;
import io.smallrye.reactive.messaging.pulsar.PulsarIncomingBatchMessageMetadata;
import io.smallrye.reactive.messaging.pulsar.PulsarIncomingMessage;
import io.smallrye.reactive.messaging.pulsar.PulsarMessage;
import io.smallrye.reactive.messaging.pulsar.i18n.PulsarMessages;
import io.smallrye.reactive.messaging.pulsar.transactions.PulsarTransactionMetadata;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletionStage;
import java.util.function.BiFunction;
import java.util.function.Function;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Messages;
import org.eclipse.microprofile.reactive.messaging.Metadata;

public class PulsarIncomingBatchMessage<T>
implements PulsarBatchMessage<T>,
MetadataInjectableMessage<List<T>> {
    private final Messages<T> delegate;
    private final List<T> payload;
    private Metadata metadata;
    private final List<PulsarMessage<T>> incomingMessages;

    public PulsarIncomingBatchMessage(Messages<T> messages, PulsarAckHandler ackHandler, PulsarFailureHandler nackHandler) {
        this.delegate = Objects.requireNonNull(messages, PulsarMessages.msg.isRequired("messages"));
        Objects.requireNonNull(ackHandler, PulsarMessages.msg.isRequired("ack"));
        Objects.requireNonNull(nackHandler, PulsarMessages.msg.isRequired("nack"));
        ArrayList incomings = new ArrayList();
        ArrayList<Object> payloads = new ArrayList<Object>();
        for (Message message : messages) {
            incomings.add(new PulsarIncomingMessage(message, ackHandler, nackHandler));
            payloads.add(message.getValue());
        }
        this.incomingMessages = Collections.unmodifiableList(incomings);
        this.payload = Collections.unmodifiableList(payloads);
        this.metadata = ContextAwareMessage.captureContextMetadata((Object[])new Object[]{new PulsarIncomingBatchMessageMetadata(messages)});
    }

    public List<T> getPayload() {
        return this.payload;
    }

    public Messages<T> unwrap() {
        return this.delegate;
    }

    @Override
    public List<PulsarMessage<T>> getMessages() {
        return this.incomingMessages;
    }

    @Override
    public Iterator<PulsarMessage<T>> iterator() {
        return this.incomingMessages.iterator();
    }

    public Metadata getMetadata() {
        return this.metadata;
    }

    public CompletionStage<Void> ack(Metadata metadata) {
        return Multi.createFrom().iterable(this.incomingMessages).plug(stream -> {
            Optional txnMetadata = this.getMetadata(PulsarTransactionMetadata.class);
            if (txnMetadata.isPresent()) {
                return stream.onItem().invoke(m -> ((PulsarIncomingMessage)m).injectMetadata(txnMetadata.get()));
            }
            return stream;
        }).onItem().transformToUniAndMerge(m -> Uni.createFrom().completionStage((CompletionStage)m.getAckWithMetadata().apply(metadata))).toUni().subscribeAsCompletionStage();
    }

    public Function<Metadata, CompletionStage<Void>> getAckWithMetadata() {
        return this::ack;
    }

    public CompletionStage<Void> nack(Throwable reason, Metadata metadata) {
        return Multi.createFrom().iterable(this.incomingMessages).onItem().transformToUniAndMerge(m -> Uni.createFrom().completionStage(() -> m.nack(reason, metadata))).toUni().subscribeAsCompletionStage();
    }

    public BiFunction<Throwable, Metadata, CompletionStage<Void>> getNackWithMetadata() {
        return this::nack;
    }

    public boolean equals(Object o) {
        if (this == o) {
            return true;
        }
        if (o == null || this.getClass() != o.getClass()) {
            return false;
        }
        PulsarIncomingBatchMessage that = (PulsarIncomingBatchMessage)o;
        return this.delegate.equals(that.delegate);
    }

    public int hashCode() {
        return Objects.hash(this.delegate);
    }

    public synchronized void injectMetadata(Object metadataObject) {
        this.metadata = this.metadata.with(metadataObject);
    }
}

