/*
 * Decompiled with CFR 0.152.
 */
package com.google.cloud.spring.pubsub.integration.inbound;

import com.google.cloud.spring.pubsub.core.subscriber.PubSubSubscriberOperations;
import com.google.cloud.spring.pubsub.integration.AckMode;
import com.google.cloud.spring.pubsub.integration.PubSubHeaderMapper;
import com.google.cloud.spring.pubsub.integration.inbound.PubSubAcknowledgmentCallback;
import com.google.cloud.spring.pubsub.support.converter.ConvertedAcknowledgeablePubsubMessage;
import java.util.ArrayDeque;
import java.util.List;
import java.util.Map;
import org.springframework.integration.endpoint.AbstractFetchLimitingMessageSource;
import org.springframework.integration.mapping.HeaderMapper;
import org.springframework.integration.support.AbstractIntegrationMessageBuilder;
import org.springframework.util.Assert;

public class PubSubMessageSource
extends AbstractFetchLimitingMessageSource<Object> {
    private final String subscriptionName;
    private final PubSubSubscriberOperations pubSubSubscriberOperations;
    private AckMode ackMode = AckMode.AUTO;
    private HeaderMapper<Map<String, String>> headerMapper = new PubSubHeaderMapper();
    private Class<?> payloadType = byte[].class;
    private boolean blockOnPull;
    private final ArrayDeque<ConvertedAcknowledgeablePubsubMessage<?>> cachedMessages = new ArrayDeque();

    public PubSubMessageSource(PubSubSubscriberOperations pubSubSubscriberOperations, String subscriptionName) {
        Assert.notNull((Object)pubSubSubscriberOperations, (String)"Pub/Sub subscriber template can't be null.");
        Assert.notNull((Object)subscriptionName, (String)"Pub/Sub subscription name can't be null.");
        this.pubSubSubscriberOperations = pubSubSubscriberOperations;
        this.subscriptionName = subscriptionName;
    }

    public void setAckMode(AckMode ackMode) {
        Assert.notNull((Object)((Object)ackMode), (String)"The acknowledgement mode can't be null.");
        this.ackMode = ackMode;
    }

    public void setPayloadType(Class<?> payloadType) {
        Assert.notNull(payloadType, (String)"The payload type cannot be null.");
        this.payloadType = payloadType;
    }

    public void setHeaderMapper(HeaderMapper<Map<String, String>> headerMapper) {
        Assert.notNull(headerMapper, (String)"The header mapper can't be null.");
        this.headerMapper = headerMapper;
    }

    public void setBlockOnPull(boolean blockOnPull) {
        this.blockOnPull = blockOnPull;
    }

    protected Object doReceive(int fetchSize) {
        if (this.cachedMessages.isEmpty()) {
            Integer maxMessages = fetchSize > 0 ? fetchSize : 1;
            List<ConvertedAcknowledgeablePubsubMessage<?>> messages = this.pubSubSubscriberOperations.pullAndConvert(this.subscriptionName, maxMessages, !this.blockOnPull, this.payloadType);
            if (messages.isEmpty()) {
                return null;
            }
            if (messages.size() == 1) {
                return this.processMessage(messages.get(0));
            }
            this.cachedMessages.addAll(messages);
        }
        return this.processMessage(this.cachedMessages.pollFirst());
    }

    public String getComponentType() {
        return "gcp-pubsub:message-source";
    }

    private AbstractIntegrationMessageBuilder<?> processMessage(ConvertedAcknowledgeablePubsubMessage<?> message) {
        if (message == null) {
            return null;
        }
        Map messageHeaders = this.headerMapper.toHeaders((Object)message.getPubsubMessage().getAttributesMap());
        messageHeaders.put("gcp_pubsub_original_message", message);
        messageHeaders.put("acknowledgmentCallback", new PubSubAcknowledgmentCallback(message, this.ackMode));
        return this.getMessageBuilderFactory().withPayload(message.getPayload()).copyHeaders(messageHeaders);
    }
}

