/*
 * Decompiled with CFR 0.152.
 */
package com.google.cloud.pubsublite.internal.wire;

import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutures;
import com.google.api.core.ApiService;
import com.google.api.gax.rpc.ApiException;
import com.google.cloud.pubsublite.Offset;
import com.google.cloud.pubsublite.internal.CheckedApiException;
import com.google.cloud.pubsublite.internal.CheckedApiPreconditions;
import com.google.cloud.pubsublite.internal.CloseableMonitor;
import com.google.cloud.pubsublite.internal.ProxyService;
import com.google.cloud.pubsublite.internal.wire.CommitState;
import com.google.cloud.pubsublite.internal.wire.Committer;
import com.google.cloud.pubsublite.internal.wire.ConnectedCommitter;
import com.google.cloud.pubsublite.internal.wire.ConnectedCommitterFactory;
import com.google.cloud.pubsublite.internal.wire.ConnectedCommitterImpl;
import com.google.cloud.pubsublite.internal.wire.RetryingConnection;
import com.google.cloud.pubsublite.internal.wire.RetryingConnectionImpl;
import com.google.cloud.pubsublite.internal.wire.RetryingConnectionObserver;
import com.google.cloud.pubsublite.internal.wire.StreamFactory;
import com.google.cloud.pubsublite.proto.InitialCommitCursorRequest;
import com.google.cloud.pubsublite.proto.SequencedCommitCursorResponse;
import com.google.cloud.pubsublite.proto.StreamingCommitCursorRequest;
import com.google.cloud.pubsublite.proto.StreamingCommitCursorResponse;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.Monitor;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import java.util.Optional;

public class CommitterImpl
extends ProxyService
implements Committer,
RetryingConnectionObserver<SequencedCommitCursorResponse> {
    private final StreamingCommitCursorRequest initialRequest;
    private final CloseableMonitor monitor = new CloseableMonitor();
    private final Monitor.Guard isEmptyOrError;
    @GuardedBy(value="monitor.monitor")
    private final RetryingConnection<StreamingCommitCursorRequest, ConnectedCommitter> connection;
    @GuardedBy(value="monitor.monitor")
    private boolean shutdown;
    @GuardedBy(value="monitor.monitor")
    private Optional<CheckedApiException> permanentError;
    @GuardedBy(value="monitor.monitor")
    private final CommitState state;

    @VisibleForTesting
    CommitterImpl(StreamFactory<StreamingCommitCursorRequest, StreamingCommitCursorResponse> streamFactory, ConnectedCommitterFactory factory, InitialCommitCursorRequest initialRequest) throws ApiException {
        super(new ApiService[0]);
        this.isEmptyOrError = new Monitor.Guard(this.monitor.monitor){

            public boolean isSatisfied() {
                return CommitterImpl.this.state.isEmpty() || CommitterImpl.this.permanentError.isPresent();
            }
        };
        this.shutdown = false;
        this.permanentError = Optional.empty();
        this.state = new CommitState();
        this.initialRequest = StreamingCommitCursorRequest.newBuilder().setInitial(initialRequest).build();
        this.connection = new RetryingConnectionImpl<StreamingCommitCursorRequest, StreamingCommitCursorResponse, SequencedCommitCursorResponse, ConnectedCommitter>(streamFactory, factory, this, this.initialRequest);
        this.addServices(this.connection);
    }

    public CommitterImpl(StreamFactory<StreamingCommitCursorRequest, StreamingCommitCursorResponse> streamFactory, InitialCommitCursorRequest request) throws ApiException {
        this(streamFactory, new ConnectedCommitterImpl.Factory(), request);
    }

    @Override
    protected void handlePermanentError(CheckedApiException error) {
        try (CloseableMonitor.Hold h = this.monitor.enter();){
            this.permanentError = Optional.of(error);
            this.shutdown = true;
            this.state.abort(error);
        }
    }

    @Override
    protected void stop() {
        try (CloseableMonitor.Hold h = this.monitor.enter();){
            this.shutdown = true;
        }
        h = this.monitor.enterWhenUninterruptibly(this.isEmptyOrError);
        if (h != null) {
            h.close();
        }
    }

    @Override
    public void triggerReinitialize(CheckedApiException streamError) {
        try (CloseableMonitor.Hold h = this.monitor.enter();){
            this.connection.reinitialize(this.initialRequest);
            Optional<Offset> offsetOr = this.state.reinitializeAndReturnToSend();
            if (!offsetOr.isPresent()) {
                return;
            }
            this.connection.modifyConnection(connectedCommitter -> {
                Preconditions.checkArgument((boolean)connectedCommitter.isPresent());
                ((ConnectedCommitter)connectedCommitter.get()).commit((Offset)offsetOr.get());
            });
        }
        catch (CheckedApiException e) {
            this.onPermanentError(e);
        }
    }

    @Override
    public void onClientResponse(SequencedCommitCursorResponse value) throws CheckedApiException {
        Preconditions.checkArgument((value.getAcknowledgedCommits() > 0L ? 1 : 0) != 0);
        try (CloseableMonitor.Hold h = this.monitor.enter();){
            this.state.complete(value.getAcknowledgedCommits());
        }
    }

    @Override
    public ApiFuture<Void> commitOffset(Offset offset) {
        CloseableMonitor.Hold h = this.monitor.enter();
        try {
            CheckedApiPreconditions.checkState(!this.shutdown, "Committed after the stream shut down.");
            this.connection.modifyConnection(connectedCommitter -> connectedCommitter.ifPresent(committer -> committer.commit(offset)));
            ApiFuture<Void> apiFuture = this.state.addCommit(offset);
            if (h != null) {
                h.close();
            }
            return apiFuture;
        }
        catch (Throwable throwable) {
            try {
                if (h != null) {
                    try {
                        h.close();
                    }
                    catch (Throwable throwable2) {
                        throwable.addSuppressed(throwable2);
                    }
                }
                throw throwable;
            }
            catch (CheckedApiException e) {
                this.onPermanentError(e);
                return ApiFutures.immediateFailedFuture((Throwable)e);
            }
        }
    }

    @Override
    public void waitUntilEmpty() throws CheckedApiException {
        try (CloseableMonitor.Hold h = this.monitor.enterWhenUninterruptibly(this.isEmptyOrError);){
            if (this.permanentError.isPresent()) {
                throw this.permanentError.get();
            }
        }
    }
}

