/*
 * Decompiled with CFR 0.152.
 */
package com.google.cloud.spring.pubsub.core.subscriber;

import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutureCallback;
import com.google.api.core.ApiFutures;
import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.cloud.pubsub.v1.stub.SubscriberStub;
import com.google.cloud.spring.pubsub.core.subscriber.PubSubSubscriberOperations;
import com.google.cloud.spring.pubsub.support.AcknowledgeablePubsubMessage;
import com.google.cloud.spring.pubsub.support.BasicAcknowledgeablePubsubMessage;
import com.google.cloud.spring.pubsub.support.PubSubSubscriptionUtils;
import com.google.cloud.spring.pubsub.support.SubscriberFactory;
import com.google.cloud.spring.pubsub.support.converter.ConvertedAcknowledgeablePubsubMessage;
import com.google.cloud.spring.pubsub.support.converter.ConvertedBasicAcknowledgeablePubsubMessage;
import com.google.cloud.spring.pubsub.support.converter.PubSubMessageConverter;
import com.google.cloud.spring.pubsub.support.converter.SimplePubSubMessageConverter;
import com.google.protobuf.Empty;
import com.google.pubsub.v1.AcknowledgeRequest;
import com.google.pubsub.v1.ModifyAckDeadlineRequest;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.PullRequest;
import com.google.pubsub.v1.PullResponse;
import com.google.pubsub.v1.ReceivedMessage;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.util.Assert;

public class PubSubSubscriberTemplate
implements PubSubSubscriberOperations,
DisposableBean {
    private final SubscriberFactory subscriberFactory;
    private PubSubMessageConverter pubSubMessageConverter = new SimplePubSubMessageConverter();
    private final ExecutorService defaultAckExecutor = Executors.newSingleThreadExecutor();
    private Executor ackExecutor = this.defaultAckExecutor;
    private Executor asyncPullExecutor = Runnable::run;
    private final ConcurrentHashMap<String, SubscriberStub> subscriptionNameToStubMap = new ConcurrentHashMap();

    public PubSubSubscriberTemplate(SubscriberFactory subscriberFactory) {
        Assert.notNull((Object)subscriberFactory, (String)"The subscriberFactory can't be null.");
        this.subscriberFactory = subscriberFactory;
    }

    public PubSubMessageConverter getMessageConverter() {
        return this.pubSubMessageConverter;
    }

    public void setMessageConverter(PubSubMessageConverter pubSubMessageConverter) {
        Assert.notNull((Object)pubSubMessageConverter, (String)"The pubSubMessageConverter can't be null.");
        this.pubSubMessageConverter = pubSubMessageConverter;
    }

    public void setAckExecutor(Executor ackExecutor) {
        Assert.notNull((Object)ackExecutor, (String)"ackExecutor can't be null.");
        this.ackExecutor = ackExecutor;
    }

    public void setAsyncPullExecutor(Executor asyncPullExecutor) {
        Assert.notNull((Object)asyncPullExecutor, (String)"asyncPullExecutor can't be null.");
        this.asyncPullExecutor = asyncPullExecutor;
    }

    @Override
    public Subscriber subscribe(String subscription, Consumer<BasicAcknowledgeablePubsubMessage> messageConsumer) {
        Assert.notNull(messageConsumer, (String)"The messageConsumer can't be null.");
        Subscriber subscriber = this.subscriberFactory.createSubscriber(subscription, (message, ackReplyConsumer) -> messageConsumer.accept(new PushedAcknowledgeablePubsubMessage(PubSubSubscriptionUtils.toProjectSubscriptionName(subscription, this.subscriberFactory.getProjectId()), message, ackReplyConsumer)));
        subscriber.startAsync();
        return subscriber;
    }

    @Override
    public <T> Subscriber subscribeAndConvert(String subscription, Consumer<ConvertedBasicAcknowledgeablePubsubMessage<T>> messageConsumer, Class<T> payloadType) {
        Assert.notNull(messageConsumer, (String)"The messageConsumer can't be null.");
        Subscriber subscriber = this.subscriberFactory.createSubscriber(subscription, (message, ackReplyConsumer) -> messageConsumer.accept(new ConvertedPushedAcknowledgeablePubsubMessage(PubSubSubscriptionUtils.toProjectSubscriptionName(subscription, this.subscriberFactory.getProjectId()), message, this.getMessageConverter().fromPubSubMessage(message, payloadType), ackReplyConsumer)));
        subscriber.startAsync();
        return subscriber;
    }

    private List<AcknowledgeablePubsubMessage> pull(PullRequest pullRequest) {
        Assert.notNull((Object)pullRequest, (String)"The pull request can't be null.");
        PullResponse pullResponse = (PullResponse)this.getSubscriberStub(pullRequest.getSubscription()).pullCallable().call((Object)pullRequest);
        return this.toAcknowledgeablePubsubMessageList(pullResponse.getReceivedMessagesList(), pullRequest.getSubscription());
    }

    private CompletableFuture<List<AcknowledgeablePubsubMessage>> pullAsync(final PullRequest pullRequest) {
        Assert.notNull((Object)pullRequest, (String)"The pull request can't be null.");
        ApiFuture pullFuture = this.getSubscriberStub(pullRequest.getSubscription()).pullCallable().futureCall((Object)pullRequest);
        final CompletableFuture<List<AcknowledgeablePubsubMessage>> completableFuture = new CompletableFuture<List<AcknowledgeablePubsubMessage>>();
        ApiFutures.addCallback((ApiFuture)pullFuture, (ApiFutureCallback)new ApiFutureCallback<PullResponse>(){

            public void onFailure(Throwable throwable) {
                completableFuture.completeExceptionally(throwable);
            }

            public void onSuccess(PullResponse pullResponse) {
                List<AcknowledgeablePubsubMessage> result = PubSubSubscriberTemplate.this.toAcknowledgeablePubsubMessageList(pullResponse.getReceivedMessagesList(), pullRequest.getSubscription());
                completableFuture.complete(result);
            }
        }, (Executor)this.asyncPullExecutor);
        return completableFuture;
    }

    private List<AcknowledgeablePubsubMessage> toAcknowledgeablePubsubMessageList(List<ReceivedMessage> messages, String subscriptionId) {
        return messages.stream().map(message -> new PulledAcknowledgeablePubsubMessage(PubSubSubscriptionUtils.toProjectSubscriptionName(subscriptionId, this.subscriberFactory.getProjectId()), message.getMessage(), message.getAckId())).toList();
    }

    @Override
    public List<AcknowledgeablePubsubMessage> pull(String subscription, Integer maxMessages, Boolean returnImmediately) {
        return this.pull(this.subscriberFactory.createPullRequest(subscription, maxMessages, returnImmediately));
    }

    @Override
    public CompletableFuture<List<AcknowledgeablePubsubMessage>> pullAsync(String subscription, Integer maxMessages, Boolean returnImmediately) {
        return this.pullAsync(this.subscriberFactory.createPullRequest(subscription, maxMessages, returnImmediately));
    }

    @Override
    public <T> List<ConvertedAcknowledgeablePubsubMessage<T>> pullAndConvert(String subscription, Integer maxMessages, Boolean returnImmediately, Class<T> payloadType) {
        List<AcknowledgeablePubsubMessage> ackableMessages = this.pull(subscription, maxMessages, returnImmediately);
        return this.toConvertedAcknowledgeablePubsubMessages(payloadType, ackableMessages);
    }

    @Override
    public <T> CompletableFuture<List<ConvertedAcknowledgeablePubsubMessage<T>>> pullAndConvertAsync(String subscription, Integer maxMessages, Boolean returnImmediately, Class<T> payloadType) {
        CompletableFuture completableFuture = new CompletableFuture();
        this.pullAsync(subscription, maxMessages, returnImmediately).whenComplete((ackableMessages, exception) -> {
            if (exception != null) {
                completableFuture.completeExceptionally((Throwable)exception);
                return;
            }
            try {
                completableFuture.complete(this.toConvertedAcknowledgeablePubsubMessages(payloadType, (List<AcknowledgeablePubsubMessage>)ackableMessages));
            }
            catch (Exception e) {
                completableFuture.completeExceptionally(e);
            }
        });
        return completableFuture;
    }

    private <T> List<ConvertedAcknowledgeablePubsubMessage<T>> toConvertedAcknowledgeablePubsubMessages(Class<T> payloadType, List<AcknowledgeablePubsubMessage> ackableMessages) {
        return ackableMessages.stream().map(m -> new ConvertedPulledAcknowledgeablePubsubMessage((AcknowledgeablePubsubMessage)m, this.pubSubMessageConverter.fromPubSubMessage(m.getPubsubMessage(), payloadType))).toList();
    }

    @Override
    public List<PubsubMessage> pullAndAck(String subscription, Integer maxMessages, Boolean returnImmediately) {
        PullRequest pullRequest = this.subscriberFactory.createPullRequest(subscription, maxMessages, returnImmediately);
        List<AcknowledgeablePubsubMessage> ackableMessages = this.pull(pullRequest);
        if (!ackableMessages.isEmpty()) {
            this.ack(ackableMessages);
        }
        return ackableMessages.stream().map(BasicAcknowledgeablePubsubMessage::getPubsubMessage).toList();
    }

    @Override
    public CompletableFuture<List<PubsubMessage>> pullAndAckAsync(String subscription, Integer maxMessages, Boolean returnImmediately) {
        PullRequest pullRequest = this.subscriberFactory.createPullRequest(subscription, maxMessages, returnImmediately);
        CompletableFuture<List<PubsubMessage>> completableFuture = new CompletableFuture<List<PubsubMessage>>();
        this.pullAsync(pullRequest).whenComplete((ackableMessages, exception) -> {
            if (exception != null) {
                completableFuture.completeExceptionally((Throwable)exception);
                return;
            }
            if (!ackableMessages.isEmpty()) {
                this.ack((Collection<? extends AcknowledgeablePubsubMessage>)ackableMessages);
            }
            List<PubsubMessage> messages = ackableMessages.stream().map(BasicAcknowledgeablePubsubMessage::getPubsubMessage).toList();
            completableFuture.complete(messages);
        });
        return completableFuture;
    }

    @Override
    public PubsubMessage pullNext(String subscription) {
        List<PubsubMessage> receivedMessageList = this.pullAndAck(subscription, 1, true);
        return receivedMessageList.isEmpty() ? null : receivedMessageList.get(0);
    }

    @Override
    public CompletableFuture<PubsubMessage> pullNextAsync(String subscription) {
        CompletableFuture<PubsubMessage> completableFuture = new CompletableFuture<PubsubMessage>();
        this.pullAndAckAsync(subscription, 1, true).whenComplete((messages, exception) -> {
            if (exception != null) {
                completableFuture.completeExceptionally((Throwable)exception);
                return;
            }
            PubsubMessage message = messages.isEmpty() ? null : (PubsubMessage)messages.get(0);
            completableFuture.complete(message);
        });
        return completableFuture;
    }

    public SubscriberFactory getSubscriberFactory() {
        return this.subscriberFactory;
    }

    @Override
    public CompletableFuture<Void> ack(Collection<? extends AcknowledgeablePubsubMessage> acknowledgeablePubsubMessages) {
        Assert.notEmpty(acknowledgeablePubsubMessages, (String)"The acknowledgeablePubsubMessages can't be empty.");
        return this.doBatchedAsyncOperation(acknowledgeablePubsubMessages, this::ack);
    }

    @Override
    public CompletableFuture<Void> nack(Collection<? extends AcknowledgeablePubsubMessage> acknowledgeablePubsubMessages) {
        return this.modifyAckDeadline(acknowledgeablePubsubMessages, 0);
    }

    @Override
    public CompletableFuture<Void> modifyAckDeadline(Collection<? extends AcknowledgeablePubsubMessage> acknowledgeablePubsubMessages, int ackDeadlineSeconds) {
        Assert.notEmpty(acknowledgeablePubsubMessages, (String)"The acknowledgeablePubsubMessages can't be empty.");
        Assert.isTrue((ackDeadlineSeconds >= 0 ? 1 : 0) != 0, (String)"The ackDeadlineSeconds must not be negative.");
        return this.doBatchedAsyncOperation(acknowledgeablePubsubMessages, (subscriptionName, ackIds) -> this.modifyAckDeadline((String)subscriptionName, (Collection<String>)ackIds, ackDeadlineSeconds));
    }

    public void destroy() {
        this.defaultAckExecutor.shutdown();
        for (SubscriberStub stub : this.subscriptionNameToStubMap.values()) {
            stub.close();
        }
    }

    private ApiFuture<Empty> ack(String subscriptionName, Collection<String> ackIds) {
        AcknowledgeRequest acknowledgeRequest = AcknowledgeRequest.newBuilder().addAllAckIds(ackIds).setSubscription(subscriptionName).build();
        SubscriberStub subscriberStub = this.getSubscriberStub(subscriptionName);
        return subscriberStub.acknowledgeCallable().futureCall((Object)acknowledgeRequest);
    }

    private ApiFuture<Empty> modifyAckDeadline(String subscriptionName, Collection<String> ackIds, int ackDeadlineSeconds) {
        ModifyAckDeadlineRequest modifyAckDeadlineRequest = ModifyAckDeadlineRequest.newBuilder().setAckDeadlineSeconds(ackDeadlineSeconds).addAllAckIds(ackIds).setSubscription(subscriptionName).build();
        SubscriberStub subscriberStub = this.getSubscriberStub(subscriptionName);
        return subscriberStub.modifyAckDeadlineCallable().futureCall((Object)modifyAckDeadlineRequest);
    }

    private CompletableFuture<Void> doBatchedAsyncOperation(Collection<? extends AcknowledgeablePubsubMessage> acknowledgeablePubsubMessages, BiFunction<String, List<String>, ApiFuture<Empty>> asyncOperation) {
        Map groupedMessages = acknowledgeablePubsubMessages.stream().collect(Collectors.groupingBy(BasicAcknowledgeablePubsubMessage::getProjectSubscriptionName, Collectors.mapping(AcknowledgeablePubsubMessage::getAckId, Collectors.toList())));
        Assert.state((groupedMessages.keySet().stream().map(ProjectSubscriptionName::getProject).distinct().count() == 1L ? 1 : 0) != 0, (String)"The project id of all messages must match.");
        final CompletableFuture<Void> completableFuture = new CompletableFuture<Void>();
        final int numExpectedFutures = groupedMessages.size();
        final AtomicInteger numCompletedFutures = new AtomicInteger();
        groupedMessages.forEach((psName, ackIds) -> {
            ApiFuture ackApiFuture = (ApiFuture)asyncOperation.apply(psName.toString(), (List<String>)ackIds);
            ApiFutures.addCallback((ApiFuture)ackApiFuture, (ApiFutureCallback)new ApiFutureCallback<Empty>(){

                public void onFailure(Throwable throwable) {
                    this.processResult(throwable);
                }

                public void onSuccess(Empty empty) {
                    this.processResult(null);
                }

                private void processResult(Throwable throwable) {
                    if (throwable != null) {
                        completableFuture.completeExceptionally(throwable);
                    } else if (numCompletedFutures.incrementAndGet() == numExpectedFutures) {
                        completableFuture.complete(null);
                    }
                }
            }, (Executor)this.ackExecutor);
        });
        return completableFuture;
    }

    private SubscriberStub getSubscriberStub(String subscription) {
        if (this.subscriptionNameToStubMap.containsKey(subscription)) {
            return this.subscriptionNameToStubMap.get(subscription);
        }
        return this.subscriptionNameToStubMap.computeIfAbsent(subscription, this.subscriberFactory::createSubscriberStub);
    }

    private class ConvertedPulledAcknowledgeablePubsubMessage<T>
    extends PulledAcknowledgeablePubsubMessage
    implements ConvertedAcknowledgeablePubsubMessage<T> {
        private final T payload;

        ConvertedPulledAcknowledgeablePubsubMessage(AcknowledgeablePubsubMessage message, T payload) {
            super(message.getProjectSubscriptionName(), message.getPubsubMessage(), message.getAckId());
            this.payload = payload;
        }

        @Override
        public T getPayload() {
            return this.payload;
        }
    }

    private class PulledAcknowledgeablePubsubMessage
    extends AbstractBasicAcknowledgeablePubsubMessage
    implements AcknowledgeablePubsubMessage {
        private final String ackId;

        PulledAcknowledgeablePubsubMessage(ProjectSubscriptionName projectSubscriptionName, PubsubMessage message, String ackId) {
            super(projectSubscriptionName, message);
            this.ackId = ackId;
        }

        @Override
        public String getAckId() {
            return this.ackId;
        }

        @Override
        public CompletableFuture<Void> ack() {
            return PubSubSubscriberTemplate.this.ack(Collections.singleton(this));
        }

        @Override
        public CompletableFuture<Void> nack() {
            return this.modifyAckDeadline(0);
        }

        @Override
        public CompletableFuture<Void> modifyAckDeadline(int ackDeadlineSeconds) {
            return PubSubSubscriberTemplate.this.modifyAckDeadline(Collections.singleton(this), ackDeadlineSeconds);
        }

        public String toString() {
            return "PulledAcknowledgeablePubsubMessage{projectId='" + this.getProjectSubscriptionName().getProject() + "', subscriptionName='" + this.getProjectSubscriptionName().getSubscription() + "', message=" + this.getPubsubMessage() + ", ackId='" + this.ackId + "'}";
        }
    }

    private static class ConvertedPushedAcknowledgeablePubsubMessage<T>
    extends PushedAcknowledgeablePubsubMessage
    implements ConvertedBasicAcknowledgeablePubsubMessage<T> {
        private final T payload;

        ConvertedPushedAcknowledgeablePubsubMessage(ProjectSubscriptionName projectSubscriptionName, PubsubMessage message, T payload, AckReplyConsumer ackReplyConsumer) {
            super(projectSubscriptionName, message, ackReplyConsumer);
            this.payload = payload;
        }

        @Override
        public T getPayload() {
            return this.payload;
        }
    }

    private static class PushedAcknowledgeablePubsubMessage
    extends AbstractBasicAcknowledgeablePubsubMessage {
        private final AckReplyConsumer ackReplyConsumer;

        PushedAcknowledgeablePubsubMessage(ProjectSubscriptionName projectSubscriptionName, PubsubMessage message, AckReplyConsumer ackReplyConsumer) {
            super(projectSubscriptionName, message);
            this.ackReplyConsumer = ackReplyConsumer;
        }

        @Override
        public CompletableFuture<Void> ack() {
            CompletableFuture<Void> completableFuture = new CompletableFuture<Void>();
            try {
                this.ackReplyConsumer.ack();
                completableFuture.complete(null);
            }
            catch (Exception e) {
                completableFuture.completeExceptionally(e);
            }
            return completableFuture;
        }

        @Override
        public CompletableFuture<Void> nack() {
            CompletableFuture<Void> completableFuture = new CompletableFuture<Void>();
            try {
                this.ackReplyConsumer.nack();
                completableFuture.complete(null);
            }
            catch (Exception e) {
                completableFuture.completeExceptionally(e);
            }
            return completableFuture;
        }

        public String toString() {
            return "PushedAcknowledgeablePubsubMessage{projectId='" + this.getProjectSubscriptionName().getProject() + "', subscriptionName='" + this.getProjectSubscriptionName().getSubscription() + "', message=" + this.getPubsubMessage() + "}";
        }
    }

    private static abstract class AbstractBasicAcknowledgeablePubsubMessage
    implements BasicAcknowledgeablePubsubMessage {
        private final ProjectSubscriptionName projectSubscriptionName;
        private final PubsubMessage message;

        AbstractBasicAcknowledgeablePubsubMessage(ProjectSubscriptionName projectSubscriptionName, PubsubMessage message) {
            this.projectSubscriptionName = projectSubscriptionName;
            this.message = message;
        }

        @Override
        public ProjectSubscriptionName getProjectSubscriptionName() {
            return this.projectSubscriptionName;
        }

        @Override
        public PubsubMessage getPubsubMessage() {
            return this.message;
        }
    }
}

