/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.pulsar.test.support;

import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.springframework.core.log.LogAccessor;
import org.springframework.lang.Nullable;
import org.springframework.pulsar.PulsarException;
import org.springframework.pulsar.core.DefaultPulsarConsumerFactory;
import org.springframework.pulsar.core.PulsarConsumerFactory;
import org.springframework.pulsar.test.support.ConditionTimeoutException;
import org.springframework.pulsar.test.support.ConditionsSpec;
import org.springframework.pulsar.test.support.ConsumedMessagesCondition;
import org.springframework.pulsar.test.support.PulsarTestContainerSupport;
import org.springframework.pulsar.test.support.SchemaSpec;
import org.springframework.pulsar.test.support.TopicSpec;
import org.springframework.util.Assert;

public final class PulsarConsumerTestUtil<T>
implements TopicSpec<T>,
SchemaSpec<T>,
ConditionsSpec<T> {
    private static final LogAccessor LOG = new LogAccessor(PulsarConsumerTestUtil.class);
    private final PulsarClient locallyCreatedPulsarClient;
    private final PulsarConsumerFactory<T> consumerFactory;
    private ConsumedMessagesCondition<T> condition;
    private Schema<T> schema;
    private Duration timeout = Duration.ofSeconds(30L);
    private List<String> topics;
    private boolean untilMethodAlreadyCalled = false;

    public static <T> TopicSpec<T> consumeMessages() {
        if (PulsarTestContainerSupport.isContainerStarted()) {
            return PulsarConsumerTestUtil.consumeMessages(PulsarTestContainerSupport.getPulsarBrokerUrl());
        }
        return PulsarConsumerTestUtil.consumeMessages("pulsar://localhost:6650");
    }

    public static <T> TopicSpec<T> consumeMessages(String url) {
        Assert.notNull((Object)url, (String)"url must not be null");
        try {
            PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(url).build();
            return PulsarConsumerTestUtil.consumeMessagesInternal(pulsarClient, new DefaultPulsarConsumerFactory(pulsarClient, List.of()));
        }
        catch (PulsarClientException ex) {
            throw new PulsarException((Throwable)ex);
        }
    }

    public static <T> TopicSpec<T> consumeMessages(PulsarClient pulsarClient) {
        Assert.notNull((Object)pulsarClient, (String)"pulsarClient must not be null");
        return PulsarConsumerTestUtil.consumeMessagesInternal(null, new DefaultPulsarConsumerFactory(pulsarClient, List.of()));
    }

    public static <T> TopicSpec<T> consumeMessages(PulsarConsumerFactory<T> pulsarConsumerFactory) {
        return PulsarConsumerTestUtil.consumeMessagesInternal(null, pulsarConsumerFactory);
    }

    private static <T> TopicSpec<T> consumeMessagesInternal(PulsarClient locallyCreatedPulsarClient, PulsarConsumerFactory<T> pulsarConsumerFactory) {
        return new PulsarConsumerTestUtil<T>(locallyCreatedPulsarClient, pulsarConsumerFactory);
    }

    private PulsarConsumerTestUtil(@Nullable PulsarClient locallyCreatedPulsarClient, PulsarConsumerFactory<T> consumerFactory) {
        Assert.notNull(consumerFactory, (String)"PulsarConsumerFactory must not be null");
        this.consumerFactory = consumerFactory;
        this.locallyCreatedPulsarClient = locallyCreatedPulsarClient;
    }

    @Override
    public SchemaSpec<T> fromTopic(String topic) {
        Assert.notNull((Object)topic, (String)"Topic must not be null");
        this.topics = List.of(topic);
        return this;
    }

    @Override
    public ConditionsSpec<T> withSchema(Schema<T> schema) {
        Assert.notNull(schema, (String)"Schema must not be null");
        this.schema = schema;
        return this;
    }

    @Override
    public ConditionsSpec<T> awaitAtMost(Duration timeout) {
        Assert.notNull((Object)timeout, (String)"Timeout must not be null");
        this.timeout = timeout;
        return this;
    }

    @Override
    public ConditionsSpec<T> until(ConsumedMessagesCondition<T> condition) {
        if (this.untilMethodAlreadyCalled) {
            throw new IllegalStateException("Multiple calls to 'until' are not allowed. Use 'and' to combine conditions.");
        }
        this.untilMethodAlreadyCalled = true;
        this.condition = condition;
        return this;
    }

    @Override
    public List<Message<T>> get() {
        ArrayList messages = new ArrayList();
        try {
            String subscriptionName = "test-consumer-%s".formatted(UUID.randomUUID());
            try (Consumer consumer = this.consumerFactory.createConsumer(this.schema, this.topics, subscriptionName, c -> c.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest));){
                long loopStartTime;
                long remainingMillis = this.timeout.toMillis();
                do {
                    loopStartTime = System.currentTimeMillis();
                    Message message = consumer.receive(200, TimeUnit.MILLISECONDS);
                    if (message != null) {
                        messages.add(message);
                        consumer.acknowledge(message);
                    }
                    if (this.condition == null || !this.condition.meets(messages)) continue;
                    ArrayList arrayList = messages;
                    return arrayList;
                } while ((remainingMillis -= System.currentTimeMillis() - loopStartTime) > 0L);
            }
        }
        catch (PulsarClientException ex) {
            throw new PulsarException((Throwable)ex);
        }
        finally {
            if (this.locallyCreatedPulsarClient != null && !this.locallyCreatedPulsarClient.isClosed()) {
                try {
                    this.locallyCreatedPulsarClient.close();
                }
                catch (PulsarClientException e) {
                    LOG.error((Throwable)e, () -> "Failed to close locally created Pulsar client due to: " + e.getMessage());
                }
            }
        }
        if (this.condition != null && !this.condition.meets(messages)) {
            throw new ConditionTimeoutException("Condition was not met within %d seconds".formatted(this.timeout.toSeconds()));
        }
        return messages;
    }
}

