/*
 * Decompiled with CFR 0.152.
 */
package com.google.cloud.pubsublite.internal.wire;

import com.google.api.gax.rpc.ResponseObserver;
import com.google.api.gax.rpc.StatusCode;
import com.google.cloud.pubsublite.Offset;
import com.google.cloud.pubsublite.internal.CheckedApiException;
import com.google.cloud.pubsublite.internal.CheckedApiPreconditions;
import com.google.cloud.pubsublite.internal.CloseableMonitor;
import com.google.cloud.pubsublite.internal.wire.BatchPublisher;
import com.google.cloud.pubsublite.internal.wire.BatchPublisherFactory;
import com.google.cloud.pubsublite.internal.wire.SingleConnection;
import com.google.cloud.pubsublite.internal.wire.StreamFactories;
import com.google.cloud.pubsublite.internal.wire.StreamFactory;
import com.google.cloud.pubsublite.proto.MessagePublishResponse;
import com.google.cloud.pubsublite.proto.PubSubMessage;
import com.google.cloud.pubsublite.proto.PublishRequest;
import com.google.cloud.pubsublite.proto.PublishResponse;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import java.util.Collection;
import java.util.Optional;

class BatchPublisherImpl
extends SingleConnection<PublishRequest, PublishResponse, Offset>
implements BatchPublisher {
    private final CloseableMonitor monitor = new CloseableMonitor();
    @GuardedBy(value="monitor.monitor")
    private Optional<Offset> lastOffset = Optional.empty();

    private BatchPublisherImpl(StreamFactories.PublishStreamFactory streamFactory, ResponseObserver<Offset> publishCompleteStream, PublishRequest initialRequest) {
        super(streamFactory, publishCompleteStream);
        this.initialize(initialRequest);
    }

    @Override
    public void publish(Collection<PubSubMessage> messages) {
        PublishRequest.Builder builder = PublishRequest.newBuilder();
        builder.getMessagePublishRequestBuilder().addAllMessages(messages);
        this.sendToStream(builder.build());
    }

    @Override
    protected void handleInitialResponse(PublishResponse response) throws CheckedApiException {
        CheckedApiPreconditions.checkState(response.hasInitialResponse(), "First stream response is not an initial response: " + response);
    }

    @Override
    protected void handleStreamResponse(PublishResponse response) throws CheckedApiException {
        CheckedApiPreconditions.checkState(!response.hasInitialResponse(), "Received duplicate initial response.");
        CheckedApiPreconditions.checkState(response.hasMessageResponse(), "Received response on stream which was neither a message or initial response.");
        this.onMessageResponse(response.getMessageResponse());
    }

    private void onMessageResponse(MessagePublishResponse response) throws CheckedApiException {
        Offset offset = Offset.of(response.getStartCursor().getOffset());
        try (CloseableMonitor.Hold h = this.monitor.enter();){
            if (this.lastOffset.isPresent() && offset.value() <= this.lastOffset.get().value()) {
                throw new CheckedApiException("Received out of order offsets on stream.", StatusCode.Code.FAILED_PRECONDITION);
            }
            this.lastOffset = Optional.of(offset);
        }
        this.sendToClient(offset);
    }

    static class Factory
    implements BatchPublisherFactory {
        Factory() {
        }

        @Override
        public BatchPublisherImpl New(StreamFactory<PublishRequest, PublishResponse> streamFactory, ResponseObserver<Offset> clientStream, PublishRequest initialRequest) {
            return new BatchPublisherImpl(streamFactory::New, clientStream, initialRequest);
        }
    }
}

