/*
 * Decompiled with CFR 0.152.
 */
package com.couchbase.client.core.transaction;

import com.couchbase.client.core.Core;
import com.couchbase.client.core.annotation.Stability;
import com.couchbase.client.core.annotation.UsedBy;
import com.couchbase.client.core.api.kv.CoreKvResponseMetadata;
import com.couchbase.client.core.api.kv.CoreSubdocGetResult;
import com.couchbase.client.core.api.query.CoreQueryContext;
import com.couchbase.client.core.api.query.CoreQueryOps;
import com.couchbase.client.core.api.query.CoreQueryOptions;
import com.couchbase.client.core.api.query.CoreQueryOptionsTransactions;
import com.couchbase.client.core.api.query.CoreQueryResult;
import com.couchbase.client.core.api.query.CoreQueryStatus;
import com.couchbase.client.core.classic.query.ClassicCoreQueryResult;
import com.couchbase.client.core.classic.query.ClassicCoreReactiveQueryResult;
import com.couchbase.client.core.cnc.Event;
import com.couchbase.client.core.cnc.RequestSpan;
import com.couchbase.client.core.cnc.RequestTracer;
import com.couchbase.client.core.cnc.events.transaction.IllegalDocumentStateEvent;
import com.couchbase.client.core.config.BucketCapabilities;
import com.couchbase.client.core.deps.com.fasterxml.jackson.core.JsonProcessingException;
import com.couchbase.client.core.deps.com.fasterxml.jackson.databind.JsonNode;
import com.couchbase.client.core.deps.com.fasterxml.jackson.databind.node.ArrayNode;
import com.couchbase.client.core.deps.com.fasterxml.jackson.databind.node.BooleanNode;
import com.couchbase.client.core.deps.com.fasterxml.jackson.databind.node.IntNode;
import com.couchbase.client.core.deps.com.fasterxml.jackson.databind.node.ObjectNode;
import com.couchbase.client.core.deps.com.fasterxml.jackson.databind.node.TextNode;
import com.couchbase.client.core.deps.com.fasterxml.jackson.databind.util.RawValue;
import com.couchbase.client.core.error.CouchbaseException;
import com.couchbase.client.core.error.DecodingFailureException;
import com.couchbase.client.core.error.DocumentExistsException;
import com.couchbase.client.core.error.DocumentNotFoundException;
import com.couchbase.client.core.error.DocumentUnretrievableException;
import com.couchbase.client.core.error.FeatureNotAvailableException;
import com.couchbase.client.core.error.TimeoutException;
import com.couchbase.client.core.error.context.ReducedKeyValueErrorContext;
import com.couchbase.client.core.error.transaction.ActiveTransactionRecordEntryNotFoundException;
import com.couchbase.client.core.error.transaction.ActiveTransactionRecordFullException;
import com.couchbase.client.core.error.transaction.ActiveTransactionRecordNotFoundException;
import com.couchbase.client.core.error.transaction.AttemptExpiredException;
import com.couchbase.client.core.error.transaction.AttemptNotFoundOnQueryException;
import com.couchbase.client.core.error.transaction.CommitNotPermittedException;
import com.couchbase.client.core.error.transaction.ConcurrentOperationsDetectedOnSameDocumentException;
import com.couchbase.client.core.error.transaction.ForwardCompatibilityFailureException;
import com.couchbase.client.core.error.transaction.PreviousOperationFailedException;
import com.couchbase.client.core.error.transaction.RetryTransactionException;
import com.couchbase.client.core.error.transaction.RollbackNotPermittedException;
import com.couchbase.client.core.error.transaction.TransactionAlreadyAbortedException;
import com.couchbase.client.core.error.transaction.TransactionAlreadyCommittedException;
import com.couchbase.client.core.error.transaction.TransactionOperationFailedException;
import com.couchbase.client.core.error.transaction.internal.CoreTransactionCommitAmbiguousException;
import com.couchbase.client.core.error.transaction.internal.CoreTransactionExpiredException;
import com.couchbase.client.core.error.transaction.internal.CoreTransactionFailedException;
import com.couchbase.client.core.error.transaction.internal.ForwardCompatibilityRequiresRetryException;
import com.couchbase.client.core.error.transaction.internal.RetryAtrCommitException;
import com.couchbase.client.core.error.transaction.internal.RetryOperationException;
import com.couchbase.client.core.error.transaction.internal.WrappedTransactionOperationFailedException;
import com.couchbase.client.core.io.CollectionIdentifier;
import com.couchbase.client.core.io.netty.kv.MemcacheProtocol;
import com.couchbase.client.core.json.Mapper;
import com.couchbase.client.core.logging.RedactableArgument;
import com.couchbase.client.core.msg.kv.CodecFlags;
import com.couchbase.client.core.msg.kv.DurabilityLevel;
import com.couchbase.client.core.msg.kv.InsertResponse;
import com.couchbase.client.core.msg.kv.SubdocCommandType;
import com.couchbase.client.core.msg.kv.SubdocGetRequest;
import com.couchbase.client.core.msg.kv.SubdocMutateRequest;
import com.couchbase.client.core.msg.kv.SubdocMutateResponse;
import com.couchbase.client.core.msg.query.QueryChunkRow;
import com.couchbase.client.core.msg.query.QueryChunkTrailer;
import com.couchbase.client.core.retry.reactor.Jitter;
import com.couchbase.client.core.retry.reactor.Retry;
import com.couchbase.client.core.retry.reactor.RetryExhaustedException;
import com.couchbase.client.core.topology.NodeIdentifier;
import com.couchbase.client.core.transaction.CoreTransactionContext;
import com.couchbase.client.core.transaction.CoreTransactionGetMultiResult;
import com.couchbase.client.core.transaction.CoreTransactionGetResult;
import com.couchbase.client.core.transaction.CoreTransactionOptionalGetMultiResult;
import com.couchbase.client.core.transaction.CoreTransactionResult;
import com.couchbase.client.core.transaction.CoreTransactionsReactive;
import com.couchbase.client.core.transaction.atr.ActiveTransactionRecordIds;
import com.couchbase.client.core.transaction.cleanup.CleanupRequest;
import com.couchbase.client.core.transaction.cleanup.CoreTransactionsCleanup;
import com.couchbase.client.core.transaction.components.ActiveTransactionRecord;
import com.couchbase.client.core.transaction.components.ActiveTransactionRecordEntry;
import com.couchbase.client.core.transaction.components.ActiveTransactionRecordUtil;
import com.couchbase.client.core.transaction.components.CoreTransactionGetMultiSpec;
import com.couchbase.client.core.transaction.components.DocRecord;
import com.couchbase.client.core.transaction.components.DocumentGetter;
import com.couchbase.client.core.transaction.components.DocumentMetadata;
import com.couchbase.client.core.transaction.components.DurabilityLevelUtil;
import com.couchbase.client.core.transaction.components.TransactionLinks;
import com.couchbase.client.core.transaction.config.CoreMergedTransactionConfig;
import com.couchbase.client.core.transaction.error.internal.ErrorClass;
import com.couchbase.client.core.transaction.forwards.CoreTransactionsExtension;
import com.couchbase.client.core.transaction.forwards.ForwardCompatibility;
import com.couchbase.client.core.transaction.forwards.ForwardCompatibilityStage;
import com.couchbase.client.core.transaction.getmulti.CoreGetMultiOptions;
import com.couchbase.client.core.transaction.getmulti.CoreGetMultiPhase;
import com.couchbase.client.core.transaction.getmulti.CoreGetMultiSignal;
import com.couchbase.client.core.transaction.getmulti.CoreGetMultiSignalAndReason;
import com.couchbase.client.core.transaction.getmulti.CoreGetMultiState;
import com.couchbase.client.core.transaction.getmulti.CoreGetMultiUtil;
import com.couchbase.client.core.transaction.getmulti.CoreTransactionGetMultiMode;
import com.couchbase.client.core.transaction.log.CoreTransactionLogger;
import com.couchbase.client.core.transaction.support.AttemptState;
import com.couchbase.client.core.transaction.support.OptionsUtil;
import com.couchbase.client.core.transaction.support.SpanWrapper;
import com.couchbase.client.core.transaction.support.SpanWrapperUtil;
import com.couchbase.client.core.transaction.support.StagedMutation;
import com.couchbase.client.core.transaction.support.StagedMutationType;
import com.couchbase.client.core.transaction.util.CoreTransactionAttemptContextHooks;
import com.couchbase.client.core.transaction.util.DebugUtil;
import com.couchbase.client.core.transaction.util.LockTokens;
import com.couchbase.client.core.transaction.util.LogDeferThrowable;
import com.couchbase.client.core.transaction.util.MeteringUnits;
import com.couchbase.client.core.transaction.util.MonoBridge;
import com.couchbase.client.core.transaction.util.QueryUtil;
import com.couchbase.client.core.transaction.util.ReactiveLock;
import com.couchbase.client.core.transaction.util.ReactiveWaitGroup;
import com.couchbase.client.core.transaction.util.TransactionKVHandler;
import com.couchbase.client.core.transaction.util.TriFunction;
import com.couchbase.client.core.util.BucketConfigUtil;
import com.couchbase.client.core.util.CbPreconditions;
import com.couchbase.client.core.util.Either;
import java.io.IOException;
import java.lang.invoke.LambdaMetafactory;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.SignalType;
import reactor.core.scheduler.Scheduler;
import reactor.util.annotation.Nullable;
import reactor.util.function.Tuple2;

@Stability.Internal
public class CoreTransactionAttemptContext {
    public static final byte[] NEAR_EMPTY_BYTE_ARRAY = new byte[]{110, 117, 108, 108};
    public static final int TRANSACTION_STATE_BIT_COMMIT_NOT_ALLOWED = 1;
    public static final int TRANSACTION_STATE_BIT_APP_ROLLBACK_NOT_ALLOWED = 2;
    public static final int TRANSACTION_STATE_BIT_SHOULD_NOT_ROLLBACK = 4;
    public static final int TRANSACTION_STATE_BIT_SHOULD_NOT_RETRY = 8;
    public static final int STATE_BITS_POSITION_FINAL_ERROR = 4;
    public static final int STATE_BITS_MASK_FINAL_ERROR = 112;
    public static final int STATE_BITS_MASK_BITS = 15;
    public static final int UNSTAGING_PARALLELISM = Integer.parseInt(System.getProperty("com.couchbase.transactions.unstagingParallelism", "1000"));
    private final AtomicInteger stateBits = new AtomicInteger(0);
    private final CoreTransactionAttemptContextHooks hooks;
    private final Core core;
    private final CoreMergedTransactionConfig config;
    private final ArrayList<StagedMutation> stagedMutationsLocked = new ArrayList();
    private final String attemptId;
    private final CoreTransactionContext overall;
    private final Duration startTimeClient;
    private Optional<String> atrId = Optional.empty();
    private Optional<CollectionIdentifier> atrCollection = Optional.empty();
    final CoreTransactionLogger LOGGER;
    private AttemptState state = AttemptState.NOT_STARTED;
    private final CoreTransactionsReactive parent;
    private final SpanWrapper attemptSpan;
    private volatile boolean expiryOvertimeMode = false;
    @Nullable
    private volatile TransactionQueryContext queryContext = null;
    private final AtomicInteger queryStatementIdx = new AtomicInteger(0);
    private final boolean lockDebugging = Boolean.parseBoolean(System.getProperty("com.couchbase.transactions.debug.lock", "false"));
    private final boolean monoBridgeDebugging = Boolean.parseBoolean(System.getProperty("com.couchbase.transactions.debug.monoBridge", "false"));
    private final boolean threadSafetyEnabled = Boolean.parseBoolean(System.getProperty("com.couchbase.transactions.threadSafety", "true"));
    private final ReactiveWaitGroup kvOps = new ReactiveWaitGroup(this, this.lockDebugging);
    private final ReactiveLock mutex = new ReactiveLock(this, this.lockDebugging);
    private final int EXPIRY_THRESHOLD = Integer.parseInt(System.getProperty("com.couchbase.transactions.expiryThresholdMs", "10"));
    private final CoreQueryOps queryOps;
    private MeteringUnits.MeteringUnitsBuilder meteringUnitsBuilder = new MeteringUnits.MeteringUnitsBuilder();
    public static final Duration DEFAULT_DELAY_RETRYING_OPERATION = Duration.ofMillis(3L);
    private static final reactor.util.retry.Retry RETRY_OPERATION_UNTIL_EXPIRY = Retry.anyOf(RetryOperationException.class).exponentialBackoff(Duration.ofMillis(1L), Duration.ofMillis(100L)).jitter(Jitter.random()).toReactorRetry();
    private static final reactor.util.retry.Retry RETRY_OPERATION_UNTIL_EXPIRY_WITH_FIXED_RETRY = Retry.anyOf(RetryOperationException.class).fixedBackoff(DEFAULT_DELAY_RETRYING_OPERATION).toReactorRetry();

    public CoreTransactionAttemptContext(Core core, CoreTransactionContext overall, CoreMergedTransactionConfig config, String attemptId, CoreTransactionsReactive parent, Optional<SpanWrapper> parentSpan, CoreTransactionAttemptContextHooks hooks) {
        this.core = Objects.requireNonNull(core);
        this.overall = Objects.requireNonNull(overall);
        this.LOGGER = Objects.requireNonNull(overall.LOGGER);
        this.config = Objects.requireNonNull(config);
        this.attemptId = Objects.requireNonNull(attemptId);
        this.startTimeClient = Duration.ofNanos(System.nanoTime());
        this.parent = Objects.requireNonNull(parent);
        this.hooks = Objects.requireNonNull(hooks);
        this.queryOps = core.queryOps();
        this.attemptSpan = SpanWrapperUtil.createOp(this, this.tracer(), null, null, "transaction_attempt", parentSpan.orElse(null));
    }

    private ObjectNode makeQueryTxDataLocked() {
        this.assertLocked("makeQueryTxData");
        ObjectNode out = Mapper.createObjectNode();
        out.set("id", Mapper.createObjectNode().put("txn", this.transactionId()).put("atmpt", this.attemptId));
        out.set("state", Mapper.createObjectNode().put("timeLeftMs", this.expiryRemainingMillis()));
        out.set("config", Mapper.createObjectNode().put("kvTimeoutMs", this.core.context().environment().timeoutConfig().kvDurableTimeout().toMillis()).put("durabilityLevel", this.config.durabilityLevel().name()).put("numAtrs", this.config.numAtrs()));
        ArrayNode mutations = Mapper.createArrayNode();
        this.stagedMutationsLocked.forEach(sm -> mutations.add(Mapper.createObjectNode().put("scp", sm.collection.scope().orElse("_default")).put("coll", sm.collection.collection().orElse("_default")).put("bkt", sm.collection.bucket()).put("id", sm.id).put("cas", Long.toString(sm.cas)).put("type", sm.type.name())));
        out.set("mutations", mutations);
        if (this.atrCollection.isPresent() && this.atrId.isPresent()) {
            out.set("atr", Mapper.createObjectNode().put("id", this.atrId.get()).put("bkt", this.atrCollection.get().bucket()).put("scp", this.atrCollection.get().scope().orElse("_default")).put("coll", this.atrCollection.get().collection().orElse("_default")));
        } else if (this.config.metadataCollection().isPresent()) {
            CollectionIdentifier mc = this.config.metadataCollection().get();
            out.set("atr", Mapper.createObjectNode().put("bkt", mc.bucket()).put("scp", mc.scope().orElse("_default")).put("coll", mc.collection().orElse("_default")));
        }
        return out;
    }

    public Core core() {
        return this.core;
    }

    public Scheduler scheduler() {
        return this.core.context().environment().transactionsSchedulers().schedulerBlocking();
    }

    public String attemptId() {
        return this.attemptId;
    }

    public String transactionId() {
        return this.overall.transactionId();
    }

    private List<StagedMutation> stagedReplacesLocked() {
        this.assertLocked("stagedReplaces");
        this.assertNotQueryMode("stagedReplaces");
        return this.stagedMutationsLocked.stream().filter(v -> v.type == StagedMutationType.REPLACE).collect(Collectors.toList());
    }

    private List<StagedMutation> stagedRemovesLocked() {
        this.assertLocked("stagedRemoves");
        this.assertNotQueryMode("stagedRemoves");
        return this.stagedMutationsLocked.stream().filter(v -> v.type == StagedMutationType.REMOVE).collect(Collectors.toList());
    }

    private List<StagedMutation> stagedInsertsLocked() {
        this.assertNotQueryMode("stagedInserts");
        this.assertLocked("stagedInserts");
        return this.stagedMutationsLocked.stream().filter(v -> v.type == StagedMutationType.INSERT).collect(Collectors.toList());
    }

    private Optional<StagedMutation> checkForOwnWriteLocked(CollectionIdentifier collection, String id) {
        this.assertLocked("checkForOwnWrite");
        this.assertNotQueryMode("checkForOwnWrite");
        Optional<StagedMutation> ownReplace = this.stagedReplacesLocked().stream().filter(v -> v.collection.equals(collection) && v.id.equals(id)).findFirst();
        if (ownReplace.isPresent()) {
            return ownReplace;
        }
        Optional<StagedMutation> ownInserted = this.stagedInsertsLocked().stream().filter(v -> v.collection.equals(collection) && v.id.equals(id)).findFirst();
        if (ownInserted.isPresent()) {
            return ownInserted;
        }
        return Optional.empty();
    }

    private Mono<Void> errorIfExpiredAndNotInExpiryOvertimeMode(String stage, Optional<String> docId) {
        if (this.expiryOvertimeMode) {
            this.LOGGER.info(this.attemptId, "not doing expiry check in {} as already in expiry-overtime-mode", stage);
            return Mono.empty();
        }
        if (this.hasExpiredClientSide(stage, docId)) {
            this.LOGGER.info(this.attemptId, "has expired in stage {}", stage);
            return Mono.error((Throwable)new AttemptExpiredException("Attempt has expired in stage " + stage));
        }
        return Mono.empty();
    }

    private void checkExpiryPreCommitAndSetExpiryOvertimeMode(String stage, Optional<String> docId) {
        if (this.hasExpiredClientSide(stage, docId)) {
            this.LOGGER.info(this.attemptId, "has expired in stage {}, setting expiry-overtime-mode", stage);
            this.expiryOvertimeMode = true;
            throw this.operationFailed(TransactionOperationFailedException.Builder.createError().raiseException(TransactionOperationFailedException.FinalErrorToRaise.TRANSACTION_EXPIRED).build());
        }
    }

    private Mono<Optional<CoreTransactionGetResult>> getInternal(CollectionIdentifier collection, String id, SpanWrapper pspan) {
        return this.doKVOperation("get " + DebugUtil.docId(collection, id), pspan, "get", collection, id, (operationId, span, lockToken) -> Mono.defer(() -> {
            if (this.queryModeLocked()) {
                return this.getWithQueryLocked(collection, id, (ReactiveLock.Waiter)lockToken, (SpanWrapper)span);
            }
            return this.getWithKVLocked(collection, id, Optional.empty(), (SpanWrapper)span, (ReactiveLock.Waiter)lockToken, false);
        }));
    }

    private Mono<Optional<CoreTransactionGetResult>> getReplicaFromPreferredServerGroupInternal(CollectionIdentifier collection, String id, SpanWrapper pspan) {
        return this.doKVOperation("get " + DebugUtil.docId(collection, id), pspan, "get", collection, id, (operationId, span, lockToken) -> Mono.defer(() -> {
            if (this.queryModeLocked()) {
                return Mono.error((Throwable)new FeatureNotAvailableException("getReplicaFromPreferredServerGroup cannot presently be used in a transaction that has previously involved the query service.  It can however be used before any query call."));
            }
            return this.getWithKVLocked(collection, id, Optional.empty(), (SpanWrapper)span, (ReactiveLock.Waiter)lockToken, true);
        }));
    }

    private Mono<Optional<CoreTransactionGetResult>> getWithKVLocked(CollectionIdentifier collection, String id, Optional<String> resolvingMissingATREntry, SpanWrapper pspan, ReactiveLock.Waiter lockToken, boolean preferredReplicaMode) {
        return Mono.defer(() -> {
            Optional<StagedMutation> ownRemove;
            this.assertLocked("getWithKV");
            this.LOGGER.info(this.attemptId, "getting doc {}, resolvingMissingATREntry={}, preferredReplicaMode={}", DebugUtil.docId(collection, id), resolvingMissingATREntry.orElse("<empty>"), preferredReplicaMode);
            Optional<StagedMutation> ownWrite = this.checkForOwnWriteLocked(collection, id);
            if (ownWrite.isPresent()) {
                StagedMutation ow = ownWrite.get();
                boolean usable = ow.content != null;
                this.LOGGER.info(this.attemptId, "found own-write of mutated doc {}, usable = {}", DebugUtil.docId(collection, id), usable);
                if (usable) {
                    return this.unlock(lockToken, "found own-write of mutation").then(Mono.just(Optional.of(this.createTransactionGetResult(ow.operationId, collection, id, ow.content, ow.stagedUserFlags, null, 0, ow.cas, ow.documentMetadata, ow.type.toString(), ow.crc32))));
                }
            }
            if ((ownRemove = this.stagedRemovesLocked().stream().filter(v -> v.collection.equals(collection) && v.id.equals(id)).findFirst()).isPresent()) {
                this.LOGGER.info(this.attemptId, "found own-write of removed doc {}", DebugUtil.docId(collection, id));
                return this.unlock(lockToken, "found own-write of removed").then(Mono.just(Optional.empty()));
            }
            MeteringUnits.MeteringUnitsBuilder units = new MeteringUnits.MeteringUnitsBuilder();
            return this.hooks.beforeUnlockGet.apply(this, id).then(this.unlock(lockToken, "standard")).then(this.hooks.beforeDocGet.apply(this, id)).then(DocumentGetter.getAsync(this.core, this.LOGGER, collection, this.config, id, this.attemptId, false, pspan, resolvingMissingATREntry, units, this.overall.supported(), preferredReplicaMode)).publishOn(this.scheduler()).onErrorResume(err -> {
                ErrorClass ec = ErrorClass.classify(err);
                TransactionOperationFailedException.Builder builder = TransactionOperationFailedException.Builder.createError().cause((Throwable)err);
                MeteringUnits built = this.addUnits(units.build());
                this.LOGGER.warn(this.attemptId, "got error while getting doc {}{} in {}us: {}", DebugUtil.docId(collection, id), DebugUtil.dbg(built), pspan.elapsedMicros(), CoreTransactionAttemptContext.dbg(err));
                if (err instanceof DocumentUnretrievableException) {
                    return Mono.error((Throwable)err);
                }
                if (err instanceof ForwardCompatibilityRequiresRetryException || err instanceof ForwardCompatibilityFailureException) {
                    TransactionOperationFailedException.Builder error = TransactionOperationFailedException.Builder.createError().cause(new ForwardCompatibilityFailureException());
                    if (err instanceof ForwardCompatibilityRequiresRetryException) {
                        error.retryTransaction();
                    }
                    return Mono.error((Throwable)this.operationFailed(error.build()));
                }
                if (ec == ErrorClass.TRANSACTION_OPERATION_FAILED) {
                    return Mono.error((Throwable)err);
                }
                if (err instanceof ActiveTransactionRecordNotFoundException || err instanceof ActiveTransactionRecordEntryNotFoundException) {
                    String attemptIdToCheck = err instanceof ActiveTransactionRecordNotFoundException ? ((ActiveTransactionRecordNotFoundException)err).attemptId() : ((ActiveTransactionRecordEntryNotFoundException)err).attemptId();
                    return this.lock("get relock").flatMap(newLockToken -> this.getWithKVLocked(collection, id, Optional.of(attemptIdToCheck), pspan, (ReactiveLock.Waiter)newLockToken, preferredReplicaMode).onErrorResume(e -> this.unlock((ReactiveLock.Waiter)newLockToken, "relock error").then(Mono.error((Throwable)e))));
                }
                if (ec == ErrorClass.FAIL_HARD) {
                    return Mono.error((Throwable)this.operationFailed(builder.doNotRollbackAttempt().build()));
                }
                if (ec == ErrorClass.FAIL_TRANSIENT) {
                    return Mono.error((Throwable)this.operationFailed(builder.retryTransaction().build()));
                }
                return Mono.error((Throwable)this.operationFailed(builder.build()));
            }).flatMap(v -> {
                long elapsed = pspan.elapsedMicros();
                MeteringUnits built = this.addUnits(units.build());
                if (v.isPresent()) {
                    this.LOGGER.info(this.attemptId, "completed get of {}{} in {}us", v.get(), DebugUtil.dbg(built), elapsed);
                } else {
                    this.LOGGER.info(this.attemptId, "completed get of {}{}, could not find, in {}us", DebugUtil.docId(collection, id), DebugUtil.dbg(built), elapsed);
                }
                return this.hooks.afterGetComplete.apply(this, id).thenReturn(v);
            }).flatMap(doc -> {
                if (doc.isPresent()) {
                    return this.forwardCompatibilityCheck(ForwardCompatibilityStage.GETS, doc.flatMap(v -> v.links().forwardCompatibility())).thenReturn(doc);
                }
                return Mono.just((Object)doc);
            });
        });
    }

    private ObjectNode makeTxdata() {
        return Mapper.createObjectNode().put("kv", true);
    }

    private Mono<Optional<CoreTransactionGetResult>> getWithQueryLocked(CollectionIdentifier collection, String id, ReactiveLock.Waiter lockToken, SpanWrapper span) {
        return Mono.defer(() -> {
            this.assertLocked("getWithQuery");
            int sidx = this.queryStatementIdx.getAndIncrement();
            AtomicReference<ReactiveLock.Waiter> lt = new AtomicReference<ReactiveLock.Waiter>(lockToken);
            ArrayNode params = Mapper.createArrayNode().add(CoreTransactionAttemptContext.makeKeyspace(collection)).add(id);
            CoreQueryOptionsTransactions queryOptions = new CoreQueryOptionsTransactions();
            queryOptions.raw("args", params);
            return this.queryWrapperBlockingLocked(sidx, this.queryContext.queryContext, "EXECUTE __get", queryOptions, "queryKvGet", false, true, this.makeTxdata(), params, span, false, lt, true).map(result -> result.collectRows()).map(rows -> {
                Optional<Object> ret;
                if (rows.isEmpty()) {
                    ret = Optional.empty();
                } else {
                    byte[] content;
                    ObjectNode row;
                    try {
                        row = Mapper.reader().readValue(((QueryChunkRow)rows.get(0)).data(), ObjectNode.class);
                        JsonNode doc = row.path("doc");
                        content = Mapper.writer().writeValueAsBytes(doc);
                    }
                    catch (IOException e) {
                        throw new DecodingFailureException(e);
                    }
                    String scas = row.path("scas").textValue();
                    long cas = Long.parseLong(scas);
                    JsonNode txnMeta = row.path("txnMeta");
                    Optional<String> crc32 = Optional.ofNullable(row.path("crc32").textValue());
                    this.logger().info(this.attemptId, "got doc {} from query with scas={} meta={}", DebugUtil.docId(collection, id), scas, txnMeta.isMissingNode() ? "null" : txnMeta.textValue());
                    ret = Optional.of(new CoreTransactionGetResult(id, content, CodecFlags.JSON_COMMON_FLAGS, cas, collection, null, Optional.empty(), txnMeta.isMissingNode() ? Optional.empty() : Optional.of(txnMeta), crc32));
                }
                return ret;
            }).onErrorResume(err -> {
                ErrorClass ec = ErrorClass.classify(err);
                TransactionOperationFailedException.Builder builder = TransactionOperationFailedException.Builder.createError().cause((Throwable)err);
                span.recordExceptionAndSetErrorStatus((Throwable)err);
                if (err instanceof DocumentNotFoundException) {
                    return Mono.just(Optional.empty());
                }
                if (err instanceof TransactionOperationFailedException) {
                    return Mono.error((Throwable)err);
                }
                return Mono.error((Throwable)this.operationFailed(builder.build()));
            }).flatMap(result -> this.unlock((ReactiveLock.Waiter)lt.get(), "getWithQueryLocked end", false).thenReturn(result));
        });
    }

    public Mono<CoreTransactionGetResult> get(CollectionIdentifier collection, String id) {
        return Mono.defer(() -> {
            SpanWrapper span = SpanWrapperUtil.createOp(this, this.tracer(), collection, id, "transaction_get", this.attemptSpan);
            return this.getInternal(collection, id, span).doOnError(err -> span.finishWithErrorStatus()).flatMap(doc -> {
                span.finish();
                if (doc.isPresent()) {
                    return Mono.just((Object)((CoreTransactionGetResult)doc.get()));
                }
                return Mono.error((Throwable)new DocumentNotFoundException(ReducedKeyValueErrorContext.create(id)));
            });
        });
    }

    public Mono<CoreTransactionGetResult> getReplicaFromPreferredServerGroup(CollectionIdentifier collection, String id) {
        return Mono.defer(() -> {
            SpanWrapper span = SpanWrapperUtil.createOp(this, this.tracer(), collection, id, "transaction_get_replica_from_preferred_server_group", this.attemptSpan);
            return this.getReplicaFromPreferredServerGroupInternal(collection, id, span).doOnError(err -> span.finishWithErrorStatus()).flatMap(doc -> {
                span.finish();
                if (doc.isPresent()) {
                    return Mono.just((Object)((CoreTransactionGetResult)doc.get()));
                }
                return Mono.error((Throwable)new DocumentUnretrievableException(ReducedKeyValueErrorContext.create(id)));
            });
        });
    }

    public Mono<List<CoreTransactionOptionalGetMultiResult>> getMultiAlgo(List<CoreTransactionGetMultiSpec> specs, SpanWrapper pspan, CoreGetMultiOptions options, boolean replicasFromPreferredServerGroup) {
        if (replicasFromPreferredServerGroup && this.core.environment().preferredServerGroup() == null) {
            return Mono.error((Throwable)new CouchbaseException("Preferred server group must be set previously at the environment level"));
        }
        return this.doKVOperation("getMulti", pspan, "getMulti", null, null, (operationId, span, lockToken) -> Mono.defer(() -> {
            if (this.queryModeLocked()) {
                return Mono.error((Throwable)new FeatureNotAvailableException("getMulti cannot be used in a transaction after any SQL++ commands have been executed.  If possible then move the getMulti to before any SQL++ commands."));
            }
            return this.unlock((ReactiveLock.Waiter)lockToken, "getMulti").then(this.getMultiAlgoInternal(specs, pspan, options, replicasFromPreferredServerGroup));
        }));
    }

    private Mono<List<CoreTransactionOptionalGetMultiResult>> getMultiAlgoInternal(List<CoreTransactionGetMultiSpec> specs, SpanWrapper pspan, CoreGetMultiOptions options, boolean replicasFromPreferredServerGroup) {
        return Mono.defer(() -> {
            Instant deadline = Instant.now().plus(CoreGetMultiState.DEFAULT_INITIAL_DOC_FETCH_BOUND);
            CoreGetMultiState operationState = new CoreGetMultiState(specs, deadline, replicasFromPreferredServerGroup, options);
            return Mono.defer(() -> {
                this.LOGGER.info(this.attemptId, "getMulti: state={}", operationState.toString());
                this.throwIfExpired("getMulti");
                return this.getMultiDocumentFetch(operationState).flatMap(getMultiDocumentSignal -> {
                    switch (getMultiDocumentSignal) {
                        case CONTINUE: {
                            return Mono.empty();
                        }
                        case COMPLETED: {
                            return Mono.empty();
                        }
                        case RESET_AND_RETRY: {
                            return Mono.error((Throwable)new ResetAndRetryGetMulti("reset and retry in getMultiDocumentFetch"));
                        }
                        case RETRY: {
                            return Mono.error((Throwable)new RetryGetMulti("retry in getMultiDocumentFetch"));
                        }
                        case BOUND_EXCEEDED: {
                            return Mono.error((Throwable)new BoundExceeded());
                        }
                    }
                    return Mono.error((Throwable)this.operationFailed(TransactionOperationFailedException.Builder.createError().cause(new RuntimeException("Internal bug: Unexpected signal " + (Object)getMultiDocumentSignal + " received")).build()));
                }).then(this.getMultiDocumentDisambiguation(operationState)).flatMap(documentDisambiguationSignal -> {
                    this.LOGGER.info(this.attemptId, "getMulti: documentDisambiguation returned {}", documentDisambiguationSignal);
                    switch (documentDisambiguationSignal.signal) {
                        case COMPLETED: {
                            return Mono.empty();
                        }
                        case RESET_AND_RETRY: {
                            return Mono.error((Throwable)new ResetAndRetryGetMulti(documentDisambiguationSignal.reason));
                        }
                        case RETRY: {
                            return Mono.error((Throwable)new RetryGetMulti(documentDisambiguationSignal.reason));
                        }
                        case BOUND_EXCEEDED: {
                            return Mono.error((Throwable)new BoundExceeded());
                        }
                        case CONTINUE: {
                            return this.getMultiReadSkewResolution(operationState).flatMap(readSkewSignal -> {
                                this.LOGGER.info(this.attemptId, "getMulti: readSkewResolution returned {}", readSkewSignal);
                                switch (readSkewSignal.signal) {
                                    case COMPLETED: {
                                        return Mono.empty();
                                    }
                                    case RESET_AND_RETRY: {
                                        return Mono.error((Throwable)new ResetAndRetryGetMulti(readSkewSignal.reason));
                                    }
                                    case RETRY: {
                                        return Mono.error((Throwable)new RetryGetMulti(readSkewSignal.reason));
                                    }
                                    case BOUND_EXCEEDED: {
                                        return Mono.error((Throwable)new BoundExceeded());
                                    }
                                }
                                return Mono.error((Throwable)this.operationFailed(TransactionOperationFailedException.Builder.createError().cause(new RuntimeException("Internal bug: Unexpected signal " + readSkewSignal + " received")).build()));
                            });
                        }
                    }
                    return Mono.error((Throwable)this.operationFailed(TransactionOperationFailedException.Builder.createError().cause(new RuntimeException("Internal bug: Unexpected signal " + documentDisambiguationSignal + " received")).build()));
                }).then(Mono.defer(() -> Mono.just(operationState.alreadyFetched().stream().sorted().collect(Collectors.toList()))));
            }).retryWhen((reactor.util.retry.Retry)reactor.util.retry.Retry.backoff((long)Integer.MAX_VALUE, (Duration)Duration.ofMillis(1L)).filter(err -> err instanceof ResetAndRetryGetMulti || err instanceof RetryGetMulti).doBeforeRetry(v -> {
                boolean resetFirst = v.failure() instanceof ResetAndRetryGetMulti;
                this.LOGGER.info(this.attemptId, "Retrying multi-get {}", v.failure().getMessage(), resetFirst ? "after resetting" : "");
                if (resetFirst) {
                    operationState.reset(this.LOGGER);
                }
            })).onErrorResume(err -> {
                if (err instanceof BoundExceeded) {
                    if (operationState.alreadyFetched().size() == operationState.originalSpecs.size()) {
                        this.LOGGER.info(this.attemptId, "getMulti: deadline expiring and have all docs, returning");
                        return Mono.just(operationState.alreadyFetched().stream().sorted().collect(Collectors.toList()));
                    }
                    this.LOGGER.info(this.attemptId, "getMulti: deadline expiring and do not have all docs, failing");
                    return Mono.error((Throwable)this.operationFailed(TransactionOperationFailedException.Builder.createError().retryTransaction().cause(new RuntimeException("Operation timeout was exceeded, but do not have results for all documents (have " + operationState.alreadyFetched().size() + " of " + operationState.originalSpecs.size() + ")")).build()));
                }
                return Mono.error((Throwable)err);
            });
        });
    }

    public Mono<Either<CoreTransactionOptionalGetMultiResult, CoreGetMultiSignal>> getMultiSingleDocumentFetch(CoreTransactionGetMultiSpec spec, CoreGetMultiState operationState) {
        return Mono.defer(() -> {
            long start = System.nanoTime();
            Duration operationTimeout = Duration.ofMillis(Math.min(Duration.between(Instant.now(), operationState.deadline).toMillis(), this.kvTimeoutNonMutating().toMillis()));
            this.LOGGER.info(this.attemptId, "getMulti: getting doc {} with timeout {}", spec, operationTimeout);
            this.throwIfExpired("getMultiIndividualDocument");
            return DocumentGetter.justGetDoc(this.core, spec.collectionIdentifier, spec.id, operationTimeout, null, !operationState.replicasFromPreferredServerGroup, this.LOGGER, this.meteringUnitsBuilder, operationState.replicasFromPreferredServerGroup).flatMap(doc -> {
                this.LOGGER.info(this.attemptId, "getMulti: completed get of {} in {}us, present={}, inTransaction={}", spec, TimeUnit.NANOSECONDS.toMicros(System.nanoTime() - start), doc.isPresent(), doc.map(v -> ((CoreTransactionGetResult)v.getT1()).isInTransaction()));
                Mono possibleForwardCompatCheck = Mono.empty();
                if (doc.isPresent() && ((CoreTransactionGetResult)((Tuple2)doc.get()).getT1()).isInTransaction()) {
                    possibleForwardCompatCheck = this.forwardCompatibilityCheck(ForwardCompatibilityStage.GET_MULTI_GET, Objects.requireNonNull(((CoreTransactionGetResult)((Tuple2)doc.get()).getT1()).links()).forwardCompatibility()).then(this.forwardCompatibilityCheck(ForwardCompatibilityStage.GETS, Objects.requireNonNull(((CoreTransactionGetResult)((Tuple2)doc.get()).getT1()).links()).forwardCompatibility()));
                }
                return possibleForwardCompatCheck.then(doc.map(d -> Mono.just((Object)new CoreTransactionOptionalGetMultiResult(spec, Optional.of((CoreTransactionGetResult)d.getT1())))).orElseGet(() -> Mono.just((Object)new CoreTransactionOptionalGetMultiResult(spec, Optional.empty()))));
            }).map(Either::ofLeft).onErrorResume(err -> {
                ErrorClass ec = ErrorClass.classify(err);
                this.LOGGER.info("getMulti: document-fetch hit error on {}: {}", spec, err);
                if (err instanceof DocumentUnretrievableException) {
                    return Mono.just(Either.ofLeft(new CoreTransactionOptionalGetMultiResult(spec, Optional.empty())));
                }
                if (err instanceof TimeoutException) {
                    if (operationState.options.mode == CoreTransactionGetMultiMode.PRIORITISE_READ_SKEW_DETECTION) {
                        return Mono.error((Throwable)new RetryGetMulti("Retrying individual single doc fetch"));
                    }
                    return Mono.just(Either.ofRight(CoreGetMultiSignal.BOUND_EXCEEDED));
                }
                if (ec == ErrorClass.FAIL_TRANSIENT) {
                    return Mono.error((Throwable)new RetryGetMulti("Retrying individual single doc fetch"));
                }
                return Mono.error((Throwable)this.operationFailed(TransactionOperationFailedException.Builder.createError().cause((Throwable)err).rollbackAttempt(ec == ErrorClass.FAIL_HARD).build()));
            });
        }).retryWhen((reactor.util.retry.Retry)reactor.util.retry.Retry.backoff((long)Integer.MAX_VALUE, (Duration)Duration.ofMillis(1L)).filter(err -> err instanceof RetryGetMulti).doBeforeRetry(v -> this.LOGGER.info("getMulti: document fetch {} retry attempt {}", spec, v.totalRetries())));
    }

    public Mono<CoreGetMultiSignal> getMultiDocumentFetch(CoreGetMultiState operationState) {
        return Mono.defer(() -> {
            this.LOGGER.info(this.attemptId, "getMulti: document fetch {}", operationState.toString());
            int desiredParallelism = 100;
            long start = System.nanoTime();
            return Flux.fromIterable(operationState.toFetch()).flatMap(spec -> this.getMultiSingleDocumentFetch((CoreTransactionGetMultiSpec)spec, operationState), desiredParallelism).collectList().map(allDocs -> {
                this.LOGGER.info(this.attemptId, "getMulti: have got %d doc reports (not necessarily docs) in {}us", allDocs.size(), TimeUnit.NANOSECONDS.toMicros(System.nanoTime() - start));
                ArrayList<CoreTransactionOptionalGetMultiResult> alreadyFetched = new ArrayList<CoreTransactionOptionalGetMultiResult>();
                alreadyFetched.addAll(operationState.alreadyFetched());
                alreadyFetched.addAll(allDocs.stream().filter(v -> v.left().isPresent()).map(v -> (CoreTransactionOptionalGetMultiResult)v.left().get()).collect(Collectors.toList()));
                operationState.update(this.LOGGER, Collections.emptyList(), alreadyFetched, operationState.phase(), operationState.deadline);
                Optional<Either> anySignal = allDocs.stream().filter(v -> v.right().isPresent()).findAny();
                if (anySignal.isPresent()) {
                    CoreGetMultiSignal signal = (CoreGetMultiSignal)((Object)((Object)((Object)anySignal.get().right().get())));
                    this.LOGGER.info(this.attemptId, "getMulti: returning signal {} from individual doc fetch", new Object[]{signal});
                    return signal;
                }
                if (operationState.options.mode == CoreTransactionGetMultiMode.DISABLE_READ_SKEW_DETECTION) {
                    return CoreGetMultiSignal.COMPLETED;
                }
                if (operationState.phase() == CoreGetMultiPhase.FIRST_DOC_FETCH) {
                    Instant newDeadline = operationState.options.mode == CoreTransactionGetMultiMode.PRIORITISE_LATENCY ? Instant.now().plus(CoreGetMultiState.DEFAULT_READ_SKEW_BOUND) : Instant.now().plus(Duration.ofMillis(this.expiryRemainingMillis()));
                    this.LOGGER.info(this.attemptId, "getMulti: setting deadline to {}", newDeadline);
                    operationState.update(this.LOGGER, operationState.toFetch(), operationState.alreadyFetched(), CoreGetMultiPhase.SUBSEQUENT_TO_FIRST_DOC_FETCH, newDeadline);
                }
                return CoreGetMultiSignal.CONTINUE;
            });
        });
    }

    private Mono<CoreGetMultiSignalAndReason> getMultiDocumentDisambiguation(CoreGetMultiState operationState) {
        return Mono.defer(() -> {
            List<CoreTransactionGetMultiResult> fetchedAndPresent = operationState.fetchedAndPresent();
            if (fetchedAndPresent.isEmpty()) {
                this.LOGGER.info(this.attemptId, "getMultiDD: no docs present");
                return Mono.just((Object)CoreGetMultiSignalAndReason.COMPLETED);
            }
            if (fetchedAndPresent.size() == 1) {
                this.LOGGER.info(this.attemptId, "getMultiDD: just one doc present, performing MAV");
                CoreTransactionGetMultiResult single = fetchedAndPresent.get(0);
                Mono<Optional<CoreTransactionGetResult>> mavRead = DocumentGetter.getAsync(this.core, this.LOGGER, single.spec.collectionIdentifier, this.config, single.spec.id, this.attemptId, false, this.attemptSpan, Optional.empty(), this.meteringUnitsBuilder, this.overall.supported(), false);
                return mavRead.flatMap(doc -> {
                    CoreTransactionOptionalGetMultiResult r = new CoreTransactionOptionalGetMultiResult(single.spec, (Optional<CoreTransactionGetResult>)doc);
                    ArrayList<CoreTransactionOptionalGetMultiResult> updated = new ArrayList<CoreTransactionOptionalGetMultiResult>(operationState.alreadyFetched().stream().filter(result -> result.spec.specIndex != single.spec.specIndex).collect(Collectors.toList()));
                    updated.add(r);
                    operationState.update(this.logger(), operationState.toFetch(), updated, operationState.phase(), operationState.deadline);
                    return Mono.just((Object)CoreGetMultiSignalAndReason.COMPLETED);
                });
            }
            Set transactionIdsInvolved = fetchedAndPresent.stream().filter(v -> Objects.requireNonNull(v.internal.links()).isDocumentInTransaction()).map(v -> Objects.requireNonNull(v.internal.links()).stagedTransactionId().get()).collect(Collectors.toSet());
            this.LOGGER.info(this.attemptId, "getMultiDD: involves these other transactions: {}", transactionIdsInvolved);
            if (transactionIdsInvolved.isEmpty()) {
                return Mono.just((Object)CoreGetMultiSignalAndReason.COMPLETED);
            }
            if (transactionIdsInvolved.size() > 1) {
                return Mono.error((Throwable)new RetryGetMulti("Too many transaction ids involved " + transactionIdsInvolved.size()));
            }
            return Mono.just((Object)CoreGetMultiSignalAndReason.CONTINUE);
        });
    }

    private Mono<CoreGetMultiSignalAndReason> getMultiReadSkewResolution(CoreGetMultiState operationState) {
        return Mono.defer(() -> {
            this.LOGGER.info(this.attemptId, "getMultiRSR: {}", operationState.toString());
            CoreGetMultiSignalAndReason signal = operationState.assertInReadSkewResolutionState();
            if (signal.signal != CoreGetMultiSignal.CONTINUE) {
                return Mono.just((Object)signal);
            }
            if (operationState.deadlineExceededSoon()) {
                return Mono.just((Object)CoreGetMultiSignalAndReason.BOUND_EXCEEDED);
            }
            List<CoreTransactionGetMultiResult> fetchedAndPresent = operationState.fetchedAndPresent();
            TransactionLinks t1 = fetchedAndPresent.stream().filter((Predicate<CoreTransactionGetMultiResult>)LambdaMetafactory.metafactory(null, null, null, (Ljava/lang/Object;)Z, lambda$getMultiReadSkewResolution$59(com.couchbase.client.core.transaction.CoreTransactionGetMultiResult ), (Lcom/couchbase/client/core/transaction/CoreTransactionGetMultiResult;)Z)()).findFirst().get().internal.links();
            CollectionIdentifier atrCollection = new CollectionIdentifier(t1.atrBucketName().get(), t1.atrScopeName(), t1.atrCollectionName());
            String t1TransactionId = t1.stagedTransactionId().get();
            Duration operationTimeout = Duration.ofMillis(Math.min(Duration.between(Instant.now(), operationState.deadline).toMillis(), this.kvTimeoutNonMutating().toMillis()));
            this.LOGGER.info(this.attemptId, "getMultiRSR: getting T1 entry {} with timeout {}", t1TransactionId, operationTimeout);
            return ActiveTransactionRecord.findEntryForTransaction(this.core, atrCollection, t1.atrId().get(), t1.stagedAttemptId().get(), this.config, null, this.LOGGER, this.meteringUnitsBuilder, operationTimeout).flatMap(atrResult -> this.getMultiReadSkewResolutionAfterT1AtrEntryRead(operationState, (Optional<ActiveTransactionRecordEntry>)atrResult, t1TransactionId));
        });
    }

    private boolean isIn(List<CoreTransactionGetMultiResult> results, CoreTransactionGetMultiResult check) {
        return results.stream().anyMatch(v -> v.internal.collection().equals(check.internal.collection()) && v.internal.id().equals(check.internal.id()));
    }

    private Mono<CoreGetMultiSignalAndReason> getMultiReadSkewResolutionAfterT1AtrEntryRead(CoreGetMultiState operationState, Optional<ActiveTransactionRecordEntry> atrResult, String t1TransactionId) {
        return Mono.fromSupplier(() -> {
            List<CoreTransactionGetMultiResult> fetchedAndPresent = operationState.fetchedAndPresent();
            List<CoreTransactionGetMultiResult> fetchedInT1 = fetchedAndPresent.stream().filter(result -> result.internal.isInTransaction(t1TransactionId)).collect(Collectors.toList());
            List<CoreTransactionOptionalGetMultiResult> fetchedNotInT1 = operationState.alreadyFetched().stream().filter(result -> !result.isPresent() || !this.isIn(fetchedInT1, result.get())).collect(Collectors.toList());
            this.LOGGER.info(this.attemptId, "getMultiRSRA: T1={} fetchedInT1={} fetchedNotInT1={} state={}", atrResult, fetchedInT1.stream().map(v -> DebugUtil.docId(v.spec.collectionIdentifier, v.spec.id)).collect(Collectors.toList()), fetchedNotInT1.size(), operationState.toString());
            if (fetchedNotInT1.size() + fetchedInT1.size() != operationState.originalSpecs.size()) {
                return new CoreGetMultiSignalAndReason(CoreGetMultiSignal.RESET_AND_RETRY, "Internal bug");
            }
            if (!atrResult.isPresent()) {
                if (operationState.phase() == CoreGetMultiPhase.RESOLVING_T1_ATR_ENTRY_MISSING) {
                    if (!fetchedInT1.isEmpty()) {
                        return CoreGetMultiSignalAndReason.COMPLETED;
                    }
                    return new CoreGetMultiSignalAndReason(CoreGetMultiSignal.RESET_AND_RETRY, "Remain in ambiguous state after refetching when T1 ATR entry is missing.  Polling until simpler state");
                }
                CoreGetMultiSignalAndReason signal = operationState.update(this.LOGGER, CoreGetMultiUtil.toSpecs(fetchedInT1), fetchedNotInT1, CoreGetMultiPhase.RESOLVING_T1_ATR_ENTRY_MISSING, operationState.deadline);
                if (signal.signal != CoreGetMultiSignal.CONTINUE) {
                    return signal;
                }
                return new CoreGetMultiSignalAndReason(CoreGetMultiSignal.RETRY, "Have found T1's ATR entry missing.  Refetching its docs to disambiguate T1 state.");
            }
            AttemptState state = ((ActiveTransactionRecordEntry)atrResult.get()).state();
            switch (state) {
                case PENDING: {
                    return CoreGetMultiSignalAndReason.COMPLETED;
                }
                case ABORTED: {
                    return CoreGetMultiSignalAndReason.COMPLETED;
                }
                case COMMITTED: {
                    if (operationState.phase() == CoreGetMultiPhase.SUBSEQUENT_TO_FIRST_DOC_FETCH) {
                        List<CoreTransactionGetMultiResult> wereInT1 = fetchedNotInT1.stream().filter(CoreTransactionOptionalGetMultiResult::isPresent).map(CoreTransactionOptionalGetMultiResult::get).filter(v -> ((ActiveTransactionRecordEntry)atrResult.get()).containsDocument(v.internal.collection(), v.internal.id())).collect(Collectors.toList());
                        List<CoreTransactionOptionalGetMultiResult> remainder = operationState.alreadyFetched().stream().filter(v -> !v.isPresent() || !this.isIn(wereInT1, v.get())).collect(Collectors.toList());
                        if (wereInT1.isEmpty()) {
                            ArrayList<CoreTransactionOptionalGetMultiResult> toReturn = new ArrayList<CoreTransactionOptionalGetMultiResult>(fetchedNotInT1);
                            toReturn.addAll(fetchedInT1.stream().map(v -> v.convertToPostTransaction().toOptional()).collect(Collectors.toList()));
                            CoreGetMultiSignalAndReason signal = operationState.update(this.LOGGER, Collections.emptyList(), toReturn, CoreGetMultiPhase.SUBSEQUENT_TO_FIRST_DOC_FETCH, operationState.deadline);
                            if (signal != CoreGetMultiSignalAndReason.CONTINUE) {
                                return signal;
                            }
                            return CoreGetMultiSignalAndReason.COMPLETED;
                        }
                        CoreGetMultiSignalAndReason signal = operationState.update(this.LOGGER, CoreGetMultiUtil.toSpecs(wereInT1), remainder, CoreGetMultiPhase.SUBSEQUENT_TO_FIRST_DOC_FETCH, operationState.deadline);
                        if (signal != CoreGetMultiSignalAndReason.CONTINUE) {
                            return signal;
                        }
                        return new CoreGetMultiSignalAndReason(CoreGetMultiSignal.RETRY, "Fetched some docs that we've later discovered to be involved in T1: refetching them");
                    }
                    if (operationState.phase() == CoreGetMultiPhase.DISCOVERED_DOCS_IN_T1) {
                        CoreGetMultiSignalAndReason signal = operationState.update(this.LOGGER, Collections.emptyList(), operationState.alreadyFetched().stream().map(CoreTransactionOptionalGetMultiResult::convertToPostTransaction).collect(Collectors.toList()), CoreGetMultiPhase.SUBSEQUENT_TO_FIRST_DOC_FETCH, operationState.deadline);
                        if (signal != CoreGetMultiSignalAndReason.CONTINUE) {
                            return signal;
                        }
                        return CoreGetMultiSignalAndReason.COMPLETED;
                    }
                    return new CoreGetMultiSignalAndReason(CoreGetMultiSignal.RESET_AND_RETRY, "Impossible situation");
                }
            }
            return new CoreGetMultiSignalAndReason(CoreGetMultiSignal.RESET_AND_RETRY, "Unknown ATR state " + (Object)((Object)state));
        });
    }

    boolean hasExpiredClientSide(String place, Optional<String> docId) {
        boolean over = this.overall.hasExpiredClientSide();
        boolean hook = this.hooks.hasExpiredClientSideHook.apply(this, place, docId);
        if (over) {
            this.LOGGER.info(this.attemptId, "expired in {}", place);
        }
        if (hook) {
            this.LOGGER.info(this.attemptId, "fake expiry in {}", place);
        }
        return over || hook;
    }

    public Optional<String> atrId() {
        return this.atrId;
    }

    public Optional<CollectionIdentifier> atrCollection() {
        return this.atrCollection;
    }

    private CollectionIdentifier getAtrCollection(CollectionIdentifier docCollection) {
        if (this.config.metadataCollection().isPresent()) {
            return this.config.metadataCollection().get();
        }
        return new CollectionIdentifier(docCollection.bucket(), Optional.of("_default"), Optional.of("_default"));
    }

    private static String makeKeyspace(CollectionIdentifier collection) {
        return String.format("default:`%s`.`%s`.`%s`", collection.bucket(), collection.scope().orElse("_default"), collection.collection().orElse("_default"));
    }

    public Mono<CoreTransactionGetResult> insert(CollectionIdentifier collection, String id, byte[] content, int flagsToStage, SpanWrapper pspan) {
        return this.doKVOperation("insert " + DebugUtil.docId(collection, id), pspan, "insert", collection, id, (operationId, span, lockToken) -> this.insertInternal((String)operationId, collection, id, content, flagsToStage, (SpanWrapper)span, (ReactiveLock.Waiter)lockToken));
    }

    @Deprecated
    @UsedBy(value=UsedBy.Project.SPRING_DATA_COUCHBASE)
    public Mono<CoreTransactionGetResult> insert(CollectionIdentifier collection, String id, byte[] content, SpanWrapper pspan) {
        return this.insert(collection, id, content, CodecFlags.JSON_COMMON_FLAGS, pspan);
    }

    private Mono<CoreTransactionGetResult> insertInternal(String operationId, CollectionIdentifier collection, String id, byte[] content, int flagsToStage, SpanWrapper span, ReactiveLock.Waiter lockToken) {
        return Mono.defer(() -> {
            if (this.queryModeLocked()) {
                return this.insertWithQueryLocked(collection, id, content, flagsToStage, lockToken, span);
            }
            return this.insertWithKVLocked(operationId, collection, id, content, flagsToStage, span, lockToken);
        });
    }

    private Mono<CoreTransactionGetResult> insertWithKVLocked(String operationId, CollectionIdentifier collection, String id, byte[] content, int flagsToStage, SpanWrapper span, ReactiveLock.Waiter lockToken) {
        this.assertLocked("insertWithKV");
        Optional<StagedMutation> existing = this.findStagedMutationLocked(collection, id);
        if (existing.isPresent()) {
            StagedMutation op = existing.get();
            if (op.type == StagedMutationType.INSERT || op.type == StagedMutationType.REPLACE) {
                return Mono.error((Throwable)new DocumentExistsException(null));
            }
        }
        return this.initAtrIfNeededLocked(collection, id, span).then(this.hooks.beforeUnlockInsert.apply(this, id)).then(this.unlock(lockToken, "standard")).then(Mono.defer(() -> {
            if (existing.isPresent() && ((StagedMutation)existing.get()).type == StagedMutationType.REMOVE) {
                return this.createStagedReplace(operationId, ((StagedMutation)existing.get()).collection, ((StagedMutation)existing.get()).id, ((StagedMutation)existing.get()).cas, ((StagedMutation)existing.get()).documentMetadata, ((StagedMutation)existing.get()).crc32, content, flagsToStage, null, flagsToStage, span, false);
            }
            return this.createStagedInsert(operationId, collection, id, content, flagsToStage, span, Optional.empty());
        }));
    }

    private Mono<CoreTransactionGetResult> insertWithQueryLocked(CollectionIdentifier collection, String id, byte[] content, int flags, ReactiveLock.Waiter lockToken, SpanWrapper span) {
        return Mono.defer(() -> {
            this.requireNonBinaryContent(flags);
            ArrayNode params = Mapper.createArrayNode().add(CoreTransactionAttemptContext.makeKeyspace(collection)).add(id).addRawValue(new RawValue(new String(content, StandardCharsets.UTF_8))).add(Mapper.createObjectNode());
            int sidx = this.queryStatementIdx.getAndIncrement();
            AtomicReference<ReactiveLock.Waiter> lt = new AtomicReference<ReactiveLock.Waiter>(lockToken);
            CoreQueryOptionsTransactions queryOptions = new CoreQueryOptionsTransactions();
            queryOptions.raw("args", params);
            return this.queryWrapperBlockingLocked(sidx, this.queryContext.queryContext, "EXECUTE __insert", queryOptions, "queryKvInsert", false, true, this.makeTxdata(), params, span, false, lt, true).flatMap(result -> this.unlock((ReactiveLock.Waiter)lt.get(), "insertWithQueryLocked end", false).thenReturn(result.collectRows())).map(rows -> {
                ObjectNode row;
                if (rows.isEmpty()) {
                    throw this.operationFailed(TransactionOperationFailedException.Builder.createError().cause(new IllegalStateException("Did not get any rows back while KV inserting with query")).build());
                }
                try {
                    row = Mapper.reader().readValue(((QueryChunkRow)rows.get(0)).data(), ObjectNode.class);
                }
                catch (IOException e) {
                    throw new DecodingFailureException(e);
                }
                String scas = row.path("scas").textValue();
                long cas = Long.parseLong(scas);
                return CoreTransactionGetResult.createFromInsert(collection, id, content, flags, this.transactionId(), this.attemptId, null, null, null, null, cas);
            }).onErrorResume(err -> {
                if (err instanceof TransactionOperationFailedException) {
                    return Mono.error((Throwable)err);
                }
                if (err instanceof DocumentExistsException) {
                    return Mono.error((Throwable)err);
                }
                ErrorClass ec = ErrorClass.classify(err);
                TransactionOperationFailedException out = this.operationFailed(TransactionOperationFailedException.Builder.createError().cause((Throwable)err).build());
                span.recordExceptionAndSetErrorStatus((Throwable)err);
                return Mono.error((Throwable)out);
            });
        });
    }

    protected String randomAtrIdForVbucket(CoreTransactionAttemptContext self, Integer vbucketIdForDoc, int numAtrs) {
        return this.hooks.randomAtrIdForVbucket.apply(self).orElse(ActiveTransactionRecordIds.randomAtrIdForVbucket(vbucketIdForDoc, numAtrs));
    }

    private Mono<ReactiveLock.Waiter> lock(String dbg) {
        return Mono.defer(() -> {
            if (this.threadSafetyEnabled) {
                return this.mutex.lock(dbg, Duration.ofMillis(this.expiryRemainingMillis()));
            }
            return Mono.empty();
        });
    }

    private Mono<Void> unlock(ReactiveLock.Waiter lockToken, String dbgExtra, boolean removeFromWaiters) {
        return Mono.defer(() -> {
            if (this.threadSafetyEnabled) {
                return this.mutex.unlock(lockToken, dbgExtra, removeFromWaiters);
            }
            return Mono.empty();
        });
    }

    private Mono<Void> unlock(ReactiveLock.Waiter lockToken, String dbgExtra) {
        return this.unlock(lockToken, dbgExtra, false);
    }

    private Mono<LockTokens> lockAndIncKVOps(String dbg) {
        return this.lock(dbg).flatMap(lt -> this.kvOps.add(dbg).map(opsToken -> new LockTokens((ReactiveLock.Waiter)lt, (ReactiveWaitGroup.Waiter)opsToken)));
    }

    private Mono<ReactiveLock.Waiter> waitForAllKVOpsThenLock(String dbg) {
        return Mono.fromRunnable(() -> {
            this.assertNotLocked(dbg);
            this.logger().info(this.attemptId, "waiting for {} KV ops finish for {}", this.kvOps.waitingCount(), dbg);
        }).then(Mono.defer(() -> {
            if (this.threadSafetyEnabled) {
                return this.kvOps.await(Duration.ofMillis(this.expiryRemainingMillis()));
            }
            return Mono.empty();
        })).then(this.lock(dbg)).flatMap(lockToken -> {
            if (this.kvOps.waitingCount() > 0) {
                return this.unlock((ReactiveLock.Waiter)lockToken, dbg + " still waiting for KV ops").then(this.waitForAllKVOpsThenLock(dbg + " still waiting for KV ops"));
            }
            return Mono.just((Object)lockToken);
        });
    }

    private Mono<Void> waitForAllOpsThenDoUnderLock(String dbg, @Nullable SpanWrapper span, Supplier<Mono<Void>> doUnderLock) {
        return Mono.defer(() -> this.waitForAllOps(dbg).then(this.lock(dbg)).flatMap(arg_0 -> this.lambda$waitForAllOpsThenDoUnderLock$85(dbg, span, (Supplier)doUnderLock, arg_0)).doOnError(err -> span.span().status(RequestSpan.StatusCode.ERROR)));
    }

    private Mono<Void> waitForAllOps(String dbg) {
        return Mono.fromRunnable(() -> {
            this.assertNotLocked(dbg);
            this.logger().info(this.attemptId, "waiting for {} KV ops in {}", this.kvOps.waitingCount(), dbg);
        }).then(Mono.defer(() -> {
            if (this.threadSafetyEnabled) {
                return this.kvOps.await(Duration.ofMillis(this.expiryRemainingMillis()));
            }
            return Mono.empty();
        }));
    }

    public Mono<CoreTransactionGetResult> replace(CoreTransactionGetResult doc, byte[] content, int flags, SpanWrapper pspan) {
        return this.doKVOperation("replace " + DebugUtil.docId(doc), pspan, "replace", doc.collection(), doc.id(), (operationId, span, lockToken) -> this.replaceInternalLocked((String)operationId, doc, content, flags, (SpanWrapper)span, (ReactiveLock.Waiter)lockToken));
    }

    @Deprecated
    @UsedBy(value=UsedBy.Project.SPRING_DATA_COUCHBASE)
    public Mono<CoreTransactionGetResult> replace(CoreTransactionGetResult doc, byte[] content, SpanWrapper pspan) {
        return this.replace(doc, content, CodecFlags.JSON_COMMON_FLAGS, pspan);
    }

    private <T> Mono<T> createMonoBridge(String debug, Mono<T> internal) {
        if (this.threadSafetyEnabled) {
            return new MonoBridge<T>(internal, debug, this, this.monoBridgeDebugging ? this.LOGGER : null).external();
        }
        return internal;
    }

    private <T> Mono<T> doKVOperation(String lockDebugOrig, SpanWrapper span, String stageName, @Nullable CollectionIdentifier docCollection, @Nullable String docId, TriFunction<String, SpanWrapper, ReactiveLock.Waiter, Mono<T>> op) {
        return this.createMonoBridge(lockDebugOrig, Mono.defer(() -> {
            String operationId = UUID.randomUUID().toString();
            String lockDebug = lockDebugOrig + " - " + operationId.substring(0, 5);
            SpanWrapperUtil.setAttributes(span, this, docCollection, docId);
            return this.lockAndIncKVOps(lockDebug).subscribeOn(this.scheduler()).flatMap(lockTokens -> Mono.defer(() -> {
                TransactionOperationFailedException returnEarly = this.canPerformOperation(lockDebug);
                if (returnEarly != null) {
                    return Mono.error((Throwable)returnEarly);
                }
                if (this.hasExpiredClientSide(stageName, Optional.ofNullable(docId))) {
                    this.LOGGER.info(this.attemptId, "has expired in stage {}, setting expiry-overtime-mode", stageName);
                    this.expiryOvertimeMode = true;
                    return Mono.error((Throwable)this.operationFailed(TransactionOperationFailedException.Builder.createError().cause(new AttemptExpiredException("Attempt expired in stage " + stageName)).raiseException(TransactionOperationFailedException.FinalErrorToRaise.TRANSACTION_EXPIRED).build()));
                }
                return Mono.empty();
            }).then((Mono)op.apply(operationId, span, lockTokens.mutexToken)).doFinally(v -> {
                if (v == SignalType.CANCEL || v == SignalType.ON_ERROR) {
                    this.LOGGER.info(this.attemptId, "doKVOperation {} got signal {}", lockDebug, v);
                    this.unlock(lockTokens.mutexToken, "doKVOperation", v == SignalType.CANCEL).block();
                }
                this.kvOps.done(lockTokens.waitGroupToken).block();
            })).doOnError(err -> span.setErrorStatus());
        }));
    }

    public <T> Mono<T> doQueryOperation(String lockDebugIn, String statement, @Nullable SpanWrapper pspan, TriFunction<Integer, AtomicReference<ReactiveLock.Waiter>, SpanWrapper, Mono<T>> op) {
        return Mono.defer(() -> {
            int sidx = this.queryStatementIdx.getAndIncrement();
            String lockDebug = lockDebugIn + " q" + sidx;
            SpanWrapper span = SpanWrapperUtil.createOp(this, this.tracer(), null, null, "transaction_query", pspan).attribute("db.statement", statement);
            return this.createMonoBridge(lockDebug, Mono.defer(() -> {
                AtomicReference lt = new AtomicReference();
                return this.lock(lockDebug).subscribeOn(this.scheduler()).flatMap(lockToken -> {
                    lt.set(lockToken);
                    return ((Mono)op.apply(sidx, lt, span)).doFinally(v -> {
                        if (v == SignalType.CANCEL || v == SignalType.ON_ERROR) {
                            this.LOGGER.info(this.attemptId, "doQueryOperation {} got signal {}", lockDebug, v);
                        }
                        this.unlock((ReactiveLock.Waiter)lt.get(), "doQueryOperation", v == SignalType.CANCEL).block();
                    });
                });
            })).doOnError(err -> span.finishWithErrorStatus()).doOnNext(ignored -> span.finish());
        });
    }

    private Mono<CoreTransactionGetResult> replaceInternalLocked(String operationId, CoreTransactionGetResult doc, byte[] content, int flags, SpanWrapper pspan, ReactiveLock.Waiter lockToken) {
        this.LOGGER.info(this.attemptId, "replace doc {}, operationId = {}", doc, operationId);
        if (this.queryModeLocked()) {
            return this.replaceWithQueryLocked(doc, content, flags, lockToken, pspan);
        }
        return this.replaceWithKVLocked(operationId, doc, content, flags, pspan, lockToken);
    }

    private Mono<CoreTransactionGetResult> replaceWithKVLocked(String operationId, CoreTransactionGetResult doc, byte[] content, int flags, SpanWrapper pspan, ReactiveLock.Waiter lockToken) {
        Optional<StagedMutation> existing = this.findStagedMutationLocked(doc);
        boolean mayNeedToWriteATR = this.state == AttemptState.NOT_STARTED;
        return this.hooks.beforeUnlockReplace.apply(this, doc.id()).then(this.unlock(lockToken, "standard")).then(Mono.defer(() -> {
            if (existing.isPresent()) {
                StagedMutation op = (StagedMutation)existing.get();
                if (op.type == StagedMutationType.REMOVE) {
                    return Mono.error((Throwable)this.operationFailed(TransactionOperationFailedException.Builder.createError().cause(new DocumentNotFoundException(null)).build()));
                }
            }
            return this.checkAndHandleBlockingTxn(doc, pspan, ForwardCompatibilityStage.WRITE_WRITE_CONFLICT_REPLACING, existing).then(this.initATRIfNeeded(mayNeedToWriteATR, doc.collection(), doc.id(), pspan)).then(Mono.defer(() -> {
                if (existing.isPresent() && ((StagedMutation)existing.get()).type == StagedMutationType.INSERT) {
                    return this.createStagedInsert(operationId, doc.collection(), doc.id(), content, flags, pspan, Optional.of(doc.cas()));
                }
                return this.createStagedReplace(operationId, doc.collection(), doc.id(), doc.cas(), doc.documentMetadata(), doc.crc32OfGet(), content, flags, doc.contentAsBytes(), doc.userFlags(), pspan, doc.links().isDeleted());
            }));
        }));
    }

    private Mono<Void> initAtrIfNeededLocked(CollectionIdentifier docCollection, String docId, SpanWrapper pspan) {
        return Mono.defer(() -> {
            if (this.state == AttemptState.NOT_STARTED) {
                return Mono.fromCallable(() -> this.selectAtrLocked(docCollection, docId)).flatMap(atrCollection -> this.atrPendingLocked((CollectionIdentifier)atrCollection, pspan)).then();
            }
            return Mono.empty();
        });
    }

    private Mono<Void> initATRIfNeeded(boolean mayNeedToWriteATR, CollectionIdentifier docCollection, String docId, SpanWrapper pspan) {
        return Mono.defer(() -> {
            if (mayNeedToWriteATR) {
                return this.doUnderLock("before ATR " + DebugUtil.docId(docCollection, docId), () -> this.initAtrIfNeededLocked(docCollection, docId, pspan));
            }
            return Mono.empty();
        });
    }

    private CollectionIdentifier selectAtrLocked(CollectionIdentifier docCollection, String docId) {
        if (this.atrId.isPresent()) {
            throw new IllegalStateException("Internal bug: two operations have concurrently initialised the ATR");
        }
        long vbucketIdForDoc = ActiveTransactionRecordIds.vbucketForKey(docId, 1024);
        String atr = this.randomAtrIdForVbucket(this, (int)vbucketIdForDoc, this.config.numAtrs());
        this.atrId = Optional.of(atr);
        this.atrCollection = this.config.metadataCollection().isPresent() ? Optional.of(this.config.metadataCollection().get()) : Optional.of(this.getAtrCollection(docCollection));
        this.LOGGER.info(this.attemptId, "First mutated doc in txn is '{}' on vbucket {}, so using atr {}", DebugUtil.docId(docCollection, docId), vbucketIdForDoc, atr);
        return this.atrCollection.get();
    }

    private void requireNonBinaryContent(int userFlags) {
        if (CodecFlags.extractCommonFormatFlags(userFlags) == CodecFlags.CommonFlags.BINARY.ordinal()) {
            FeatureNotAvailableException cause = new FeatureNotAvailableException("Binary documents are only supported in a KV-only transaction");
            throw this.operationFailed(TransactionOperationFailedException.Builder.createError().cause(cause).build());
        }
    }

    private Mono<CoreTransactionGetResult> replaceWithQueryLocked(CoreTransactionGetResult doc, byte[] content, int flags, ReactiveLock.Waiter lockToken, SpanWrapper span) {
        return Mono.defer(() -> {
            this.requireNonBinaryContent(flags);
            ObjectNode txData = this.makeTxdata();
            txData.put("scas", Long.toString(doc.cas()));
            doc.txnMeta().ifPresent(v -> txData.set("txnMeta", (JsonNode)v));
            ArrayNode params = Mapper.createArrayNode().add(CoreTransactionAttemptContext.makeKeyspace(doc.collection())).add(doc.id()).addRawValue(new RawValue(new String(content, StandardCharsets.UTF_8))).add(content).add(Mapper.createObjectNode());
            int sidx = this.queryStatementIdx.getAndIncrement();
            AtomicReference<ReactiveLock.Waiter> lt = new AtomicReference<ReactiveLock.Waiter>(lockToken);
            CoreQueryOptionsTransactions queryOptions = new CoreQueryOptionsTransactions();
            queryOptions.raw("args", params);
            return this.queryWrapperBlockingLocked(sidx, this.queryContext.queryContext, "EXECUTE __update", queryOptions, "queryKvReplace", false, true, txData, params, span, false, lt, true).flatMap(result -> this.unlock((ReactiveLock.Waiter)lt.get(), "replaceWithQueryLocked end", false).thenReturn(result.collectRows())).map(rows -> {
                ObjectNode row;
                if (rows.isEmpty()) {
                    throw this.operationFailed(TransactionOperationFailedException.Builder.createError().cause(new IllegalStateException("Did not get any rows back while KV replacing with query")).build());
                }
                try {
                    row = Mapper.reader().readValue(((QueryChunkRow)rows.get(0)).data(), ObjectNode.class);
                }
                catch (IOException e) {
                    throw new DecodingFailureException(e);
                }
                String scas = row.path("scas").textValue();
                long cas = Long.parseLong(scas);
                JsonNode updatedDoc = row.path("doc");
                Optional<String> crc32 = Optional.ofNullable(row.path("crc32").textValue());
                return new CoreTransactionGetResult(doc.id(), content, CodecFlags.JSON_COMMON_FLAGS, cas, doc.collection(), null, Optional.empty(), Optional.empty(), crc32);
            }).onErrorResume(err -> {
                ErrorClass ec = ErrorClass.classify(err);
                TransactionOperationFailedException.Builder builder = TransactionOperationFailedException.Builder.createError().cause((Throwable)err);
                span.recordExceptionAndSetErrorStatus((Throwable)err);
                if (err instanceof TransactionOperationFailedException) {
                    return Mono.error((Throwable)err);
                }
                if (ec == ErrorClass.FAIL_DOC_NOT_FOUND || ec == ErrorClass.FAIL_CAS_MISMATCH) {
                    TransactionOperationFailedException out = this.operationFailed(builder.retryTransaction().build());
                    return Mono.error((Throwable)out);
                }
                TransactionOperationFailedException out = this.operationFailed(builder.build());
                return Mono.error((Throwable)out);
            });
        });
    }

    private Mono<Void> removeWithQueryLocked(CoreTransactionGetResult doc, ReactiveLock.Waiter lockToken, SpanWrapper span) {
        return Mono.defer(() -> {
            ObjectNode txData = this.makeTxdata();
            txData.put("scas", Long.toString(doc.cas()));
            doc.txnMeta().ifPresent(v -> txData.set("txnMeta", (JsonNode)v));
            ArrayNode params = Mapper.createArrayNode().add(CoreTransactionAttemptContext.makeKeyspace(doc.collection())).add(doc.id()).add(Mapper.createObjectNode());
            int sidx = this.queryStatementIdx.getAndIncrement();
            AtomicReference<ReactiveLock.Waiter> lt = new AtomicReference<ReactiveLock.Waiter>(lockToken);
            CoreQueryOptionsTransactions queryOptions = new CoreQueryOptionsTransactions();
            queryOptions.raw("args", params);
            return this.queryWrapperBlockingLocked(sidx, this.queryContext.queryContext, "EXECUTE __delete", queryOptions, "queryKvRemove", false, true, txData, params, span, false, lt, true).flatMap(result -> this.unlock((ReactiveLock.Waiter)lt.get(), "removeWithQueryLocked end", false)).onErrorResume(err -> {
                ErrorClass ec = ErrorClass.classify(err);
                TransactionOperationFailedException.Builder builder = TransactionOperationFailedException.Builder.createError().cause((Throwable)err);
                span.recordExceptionAndSetErrorStatus((Throwable)err);
                if (err instanceof TransactionOperationFailedException) {
                    return Mono.error((Throwable)err);
                }
                if (ec == ErrorClass.FAIL_DOC_NOT_FOUND || ec == ErrorClass.FAIL_CAS_MISMATCH) {
                    TransactionOperationFailedException out = this.operationFailed(builder.retryTransaction().build());
                    return Mono.error((Throwable)out);
                }
                TransactionOperationFailedException out = this.operationFailed(builder.build());
                return Mono.error((Throwable)out);
            });
        });
    }

    private Mono<Void> forwardCompatibilityCheck(ForwardCompatibilityStage stage, Optional<ForwardCompatibility> fc) {
        return ForwardCompatibility.check(this.core, stage, fc, this.logger(), this.overall.supported()).onErrorResume(err -> {
            TransactionOperationFailedException.Builder error = TransactionOperationFailedException.Builder.createError().cause(new ForwardCompatibilityFailureException());
            if (err instanceof ForwardCompatibilityRequiresRetryException) {
                error.retryTransaction();
            }
            return Mono.error((Throwable)this.operationFailed(error.build()));
        });
    }

    private Mono<Void> checkATREntryForBlockingDocInternal(CoreTransactionGetResult doc, CollectionIdentifier collection, SpanWrapper span, MeteringUnits.MeteringUnitsBuilder units) {
        return Mono.fromRunnable(() -> this.checkExpiryPreCommitAndSetExpiryOvertimeMode("staging.check_atr_entry_blocking_doc", Optional.empty())).then(this.hooks.beforeCheckATREntryForBlockingDoc.apply(this, doc.links().atrId().get())).then(ActiveTransactionRecord.findEntryForTransaction(this.core, collection, doc.links().atrId().get(), doc.links().stagedAttemptId().get(), this.config, span, this.logger(), units, null).flatMap(atrEntry -> {
            if (atrEntry.isPresent()) {
                ActiveTransactionRecordEntry ae = (ActiveTransactionRecordEntry)atrEntry.get();
                this.LOGGER.info(this.attemptId, "fetched ATR entry for blocking txn: hasExpired={} entry={}", ae.hasExpired(), ae);
                return this.forwardCompatibilityCheck(ForwardCompatibilityStage.WRITE_WRITE_CONFLICT_READING_ATR, ae.forwardCompatibility()).then(Mono.defer(() -> {
                    switch (ae.state()) {
                        case COMPLETED: 
                        case ROLLED_BACK: {
                            this.LOGGER.info(this.attemptId, "ATR entry state of {} indicates we can proceed to overwrite", new Object[]{((ActiveTransactionRecordEntry)atrEntry.get()).state()});
                            return Mono.empty();
                        }
                    }
                    return Mono.error((Throwable)new RetryOperationException());
                }));
            }
            this.LOGGER.info(this.attemptId, "blocking txn {}'s entry has been removed indicating the txn expired, so proceeding to overwrite", doc.links().stagedAttemptId().get());
            return Mono.empty();
        })).retryWhen(Retry.anyOf(RetryOperationException.class).exponentialBackoff(Duration.ofMillis(50L), Duration.ofMillis(500L)).timeout(Duration.ofSeconds(1L)).toReactorRetry()).publishOn(this.scheduler()).onErrorResume(err -> {
            if (err instanceof RetryExhaustedException) {
                this.LOGGER.info(this.attemptId, "still blocked by a valid transaction, retrying to unlock documents");
            } else {
                if (err instanceof DocumentNotFoundException) {
                    this.LOGGER.info(this.attemptId, "blocking txn's ATR has been removed so proceeding to overwrite");
                    return Mono.empty();
                }
                this.LOGGER.warn(this.attemptId, "got error in checkATREntryForBlockingDoc: {}", CoreTransactionAttemptContext.dbg(err));
            }
            return Mono.error((Throwable)this.operationFailed(TransactionOperationFailedException.Builder.createError().cause((Throwable)err).retryTransaction().build()));
        }).then();
    }

    private Mono<Void> checkATREntryForBlockingDoc(CoreTransactionGetResult doc, SpanWrapper pspan) {
        return Mono.defer(() -> {
            CollectionIdentifier collection = doc.links().collection();
            MeteringUnits.MeteringUnitsBuilder units = new MeteringUnits.MeteringUnitsBuilder();
            return this.checkATREntryForBlockingDocInternal(doc, collection, pspan, units).doOnTerminate(() -> this.addUnits(units.build()));
        });
    }

    private RedactableArgument getAtrDebug(CollectionIdentifier collection, Optional<String> atrId) {
        return ActiveTransactionRecordUtil.getAtrDebug(collection, atrId.orElse("-"));
    }

    private RedactableArgument getAtrDebug(Optional<CollectionIdentifier> collection, Optional<String> atrId) {
        return ActiveTransactionRecordUtil.getAtrDebug(collection, atrId);
    }

    long expiryRemainingMillis() {
        long nowNanos = System.nanoTime();
        long expiredMillis = this.overall.timeSinceStartOfTransactionsMillis(nowNanos);
        long remainingMillis = this.config.expirationTime().toMillis() - expiredMillis;
        return Math.max(Math.min(remainingMillis, this.config.expirationTime().toMillis()), 0L);
    }

    private RequestTracer tracer() {
        return this.core.context().coreResources().requestTracer();
    }

    private byte[] serialize(Object in) {
        try {
            return Mapper.writer().writeValueAsBytes(in);
        }
        catch (JsonProcessingException e) {
            throw new DecodingFailureException(e);
        }
    }

    private Mono<Void> atrPendingLocked(CollectionIdentifier collection, SpanWrapper pspan) {
        return Mono.defer(() -> {
            this.assertLocked("atrPending");
            SpanWrapper span = SpanWrapperUtil.createOp(this, this.tracer(), collection, this.atrId.orElse(null), "transaction_atr_pending", pspan);
            String prefix = "attempts." + this.attemptId;
            if (!this.atrId.isPresent()) {
                return Mono.error((Throwable)new IllegalStateException("atrId not present"));
            }
            return Mono.defer(() -> {
                this.LOGGER.info(this.attemptId, "about to set ATR {} to Pending", this.getAtrDebug(collection, this.atrId));
                return this.errorIfExpiredAndNotInExpiryOvertimeMode("atrPending", Optional.empty());
            }).then(this.hooks.beforeAtrPending.apply(this)).then(TransactionKVHandler.mutateIn(this.core, collection, this.atrId.get(), this.kvTimeoutMutating(), false, true, false, false, false, 0L, CodecFlags.BINARY_COMMON_FLAGS, this.durabilityLevel(), OptionsUtil.createClientContext("atrPending"), span, Arrays.asList(new SubdocMutateRequest.Command(SubdocCommandType.DICT_ADD, prefix + "." + "tid", this.serialize(this.transactionId()), true, true, false, 0), new SubdocMutateRequest.Command(SubdocCommandType.DICT_ADD, prefix + "." + "st", this.serialize(AttemptState.PENDING.name()), false, true, false, 1), new SubdocMutateRequest.Command(SubdocCommandType.DICT_ADD, prefix + "." + "tst", this.serialize("${Mutation.CAS}"), false, true, true, 2), new SubdocMutateRequest.Command(SubdocCommandType.DICT_ADD, prefix + "." + "exp", this.serialize(this.expiryRemainingMillis()), false, true, false, 3), new SubdocMutateRequest.Command(SubdocCommandType.DICT_ADD, prefix + "." + "d", this.serialize(DurabilityLevelUtil.convertDurabilityLevel(this.config.durabilityLevel())), false, true, false, 3), new SubdocMutateRequest.Command(SubdocCommandType.SET_DOC, "", new byte[]{0}, false, false, false, 4)), this.logger())).publishOn(this.scheduler()).flatMap(v -> this.hooks.afterAtrPending.apply(this).map(x -> v)).doOnNext(v -> {
                long elapsed = span.elapsedMicros();
                this.addUnits(v.flexibleExtras());
                this.LOGGER.info(this.attemptId, "set ATR {} to Pending in {}us{}", this.getAtrDebug(collection, this.atrId), elapsed, DebugUtil.dbg(v.flexibleExtras()));
                this.setStateLocked(AttemptState.PENDING);
                this.overall.cleanup().addToCleanupSet(collection);
            }).then().onErrorResume(err -> {
                ErrorClass ec = ErrorClass.classify(err);
                TransactionOperationFailedException.Builder out = TransactionOperationFailedException.Builder.createError().cause((Throwable)err);
                MeteringUnits units = this.addUnits(MeteringUnits.from(err));
                long elapsed = span.finish((Throwable)err);
                this.LOGGER.info(this.attemptId, "error while setting ATR {} to Pending{} in {}us: {}", this.getAtrDebug(collection, this.atrId), DebugUtil.dbg(units), elapsed, CoreTransactionAttemptContext.dbg(err));
                if (this.expiryOvertimeMode) {
                    return this.mapErrorInOvertimeToExpired(true, "atrPending", (Throwable)err, TransactionOperationFailedException.FinalErrorToRaise.TRANSACTION_EXPIRED);
                }
                if (ec == ErrorClass.FAIL_EXPIRY) {
                    return this.setExpiryOvertimeModeAndFail((Throwable)err, "atrPending", ec);
                }
                if (ec == ErrorClass.FAIL_ATR_FULL) {
                    return Mono.error((Throwable)this.operationFailed(out.cause(new ActiveTransactionRecordFullException((Throwable)err)).build()));
                }
                if (ec == ErrorClass.FAIL_AMBIGUOUS) {
                    this.LOGGER.info(this.attemptId, "retrying the op on {} to resolve ambiguity", new Object[]{ec});
                    return Mono.delay((Duration)DEFAULT_DELAY_RETRYING_OPERATION, (Scheduler)this.scheduler()).then(this.atrPendingLocked(collection, span));
                }
                if (ec == ErrorClass.FAIL_PATH_ALREADY_EXISTS) {
                    this.LOGGER.info(this.attemptId, "assuming this is caused by resolved ambiguity, and proceeding as though successful", new Object[]{ec});
                    return Mono.empty();
                }
                if (ec == ErrorClass.FAIL_TRANSIENT) {
                    this.LOGGER.info(this.attemptId, "transient error likely to be solved by retry", new Object[]{ec});
                    return Mono.error((Throwable)this.operationFailed(out.retryTransaction().build()));
                }
                if (ec == ErrorClass.FAIL_HARD) {
                    return Mono.error((Throwable)this.operationFailed(out.doNotRollbackAttempt().build()));
                }
                return Mono.error((Throwable)this.operationFailed(out.build()));
            }).doOnError(err -> span.finish((Throwable)err)).doOnTerminate(() -> span.finish());
        });
    }

    private void setStateLocked(AttemptState newState) {
        this.assertLocked("setState " + (Object)((Object)newState));
        this.logger().info(this.attemptId, "changed state to {}", new Object[]{newState});
        this.state = newState;
    }

    private Mono<CoreTransactionGetResult> createStagedReplace(String operationId, CollectionIdentifier collection, String id, long cas, Optional<DocumentMetadata> documentMetadata, Optional<String> crc32OfGet, byte[] contentToStage, int userFlagsOfContentToStage, byte[] contentOfExistingDocument, int userFlagsOfExistingDocument, SpanWrapper pspan, boolean accessDeleted) {
        return Mono.defer(() -> {
            this.assertNotLocked("createStagedReplace");
            SpanWrapper span = SpanWrapperUtil.createOp(this, this.tracer(), collection, id, "transaction_replace_stage", pspan);
            boolean isBinary = CodecFlags.extractCommonFormatFlags(userFlagsOfContentToStage) == CodecFlags.CommonFlags.BINARY.ordinal();
            byte[] txn = this.createDocumentMetadata("replace", operationId, documentMetadata, userFlagsOfContentToStage);
            return this.hooks.beforeStagedReplace.apply(this, id).then(TransactionKVHandler.mutateIn(this.core, collection, id, this.kvTimeoutMutating(), false, false, false, true, false, cas, userFlagsOfExistingDocument, this.durabilityLevel(), OptionsUtil.createClientContext("createStagedReplace"), span, Arrays.asList(new SubdocMutateRequest.Command(SubdocCommandType.DICT_UPSERT, "txn", txn, true, true, false, 0), isBinary ? new SubdocMutateRequest.Command(SubdocCommandType.DICT_UPSERT, "txn.op.stgd", NEAR_EMPTY_BYTE_ARRAY, false, true, false, 1) : new SubdocMutateRequest.Command(SubdocCommandType.DICT_UPSERT, "txn.op.bin", NEAR_EMPTY_BYTE_ARRAY, false, true, false, 1), isBinary ? new SubdocMutateRequest.Command(SubdocCommandType.DELETE, "txn.op.stgd", null, false, true, false, 2) : new SubdocMutateRequest.Command(SubdocCommandType.DELETE, "txn.op.bin", null, false, true, false, 2), isBinary ? new SubdocMutateRequest.Command(SubdocCommandType.DICT_ADD, "txn.op.bin", contentToStage, false, true, false, true, 3) : new SubdocMutateRequest.Command(SubdocCommandType.DICT_ADD, "txn.op.stgd", contentToStage, false, true, false, false, 3), new SubdocMutateRequest.Command(SubdocCommandType.DICT_ADD, "txn.op.crc32", this.serialize("${Mutation.value_crc32c}"), false, true, true, 4)))).publishOn(this.scheduler()).doOnSubscribe(v -> this.LOGGER.info(this.attemptId, "about to replace doc {} with cas {}, accessDeleted={}", DebugUtil.docId(collection, id), cas, accessDeleted)).flatMap(result -> this.hooks.afterStagedReplaceComplete.apply(this, id).map(x -> result)).doOnNext(updatedDoc -> {
                long elapsed = span.elapsedMicros();
                this.addUnits(updatedDoc.flexibleExtras());
                this.LOGGER.info(this.attemptId, "replaced doc {}{} got cas {}, in {}us", DebugUtil.docId(collection, id), DebugUtil.dbg(updatedDoc.flexibleExtras()), updatedDoc.cas(), elapsed);
            }).flatMap(updatedDoc -> {
                CoreTransactionGetResult out = this.createTransactionGetResult(operationId, collection, id, contentToStage, userFlagsOfExistingDocument, contentToStage, userFlagsOfContentToStage, updatedDoc.cas(), documentMetadata, "replace", crc32OfGet);
                return this.supportsReplaceBodyWithXattr(collection.bucket()).flatMap(supports -> this.addStagedMutation(new StagedMutation(operationId, id, collection, updatedDoc.cas(), documentMetadata, crc32OfGet, userFlagsOfExistingDocument, supports != false ? null : contentToStage, userFlagsOfContentToStage, StagedMutationType.REPLACE)).thenReturn((Object)out));
            }).onErrorResume(err -> this.handleErrorOnStagedMutation("replacing", collection, id, (Throwable)err, span, crc32OfGet, newCas -> this.createStagedReplace(operationId, collection, id, (long)newCas, documentMetadata, crc32OfGet, contentToStage, userFlagsOfContentToStage, contentOfExistingDocument, userFlagsOfExistingDocument, pspan, accessDeleted))).doOnError(err -> span.finish((Throwable)err)).doOnTerminate(() -> span.finish());
        });
    }

    private CoreTransactionGetResult createTransactionGetResult(String operationId, CollectionIdentifier collection, String id, @Nullable byte[] bodyContent, int currentUserFlags, @Nullable byte[] stagedContent, int stagedUserFlags, long cas, Optional<DocumentMetadata> documentMetadata, String opType, Optional<String> crc32OfFetch) {
        boolean isBinary = CodecFlags.extractCommonFormatFlags(stagedUserFlags) == CodecFlags.CommonFlags.BINARY.ordinal();
        TransactionLinks links = new TransactionLinks(isBinary ? Optional.empty() : Optional.ofNullable(stagedContent), !isBinary ? Optional.empty() : Optional.ofNullable(stagedContent), this.atrId, this.atrCollection.map(CollectionIdentifier::bucket), this.atrCollection.flatMap(CollectionIdentifier::scope), this.atrCollection.flatMap(CollectionIdentifier::collection), Optional.of(this.transactionId()), Optional.of(this.attemptId), Optional.empty(), Optional.empty(), Optional.empty(), Optional.of(opType), true, Optional.empty(), Optional.empty(), Optional.of(operationId), Optional.of(stagedUserFlags));
        return new CoreTransactionGetResult(id, bodyContent, currentUserFlags, cas, collection, links, documentMetadata, Optional.empty(), crc32OfFetch);
    }

    private byte[] createDocumentMetadata(String opType, String operationId, Optional<DocumentMetadata> documentMetadata, int userFlagsToStage) {
        boolean isBinary = CodecFlags.extractCommonFormatFlags(userFlagsToStage) == CodecFlags.CommonFlags.BINARY.ordinal();
        ObjectNode op = Mapper.createObjectNode();
        op.put("type", opType);
        ObjectNode aux = Mapper.createObjectNode();
        aux.put("uf", userFlagsToStage);
        ObjectNode ret = Mapper.createObjectNode();
        ret.set("id", Mapper.createObjectNode().put("txn", this.transactionId()).put("atmpt", this.attemptId).put("op", operationId));
        ret.set("atr", Mapper.createObjectNode().put("id", this.atrId.get()).put("bkt", this.atrCollection.get().bucket()).put("scp", this.atrCollection.get().scope().orElse("_default")).put("coll", this.atrCollection.get().collection().orElse("_default")));
        ret.set("op", op);
        ret.set("aux", aux);
        ObjectNode restore = Mapper.createObjectNode();
        documentMetadata.map(DocumentMetadata::cas).ifPresent(v -> restore.put("CAS", (String)v));
        documentMetadata.map(DocumentMetadata::exptime).ifPresent(v -> restore.put("exptime", (Long)v));
        documentMetadata.map(DocumentMetadata::revid).ifPresent(v -> restore.put("revid", (String)v));
        if (restore.size() > 0) {
            ret.set("restore", restore);
        }
        if (isBinary && (opType.equals("replace") || opType.equals("insert"))) {
            ObjectNode fce = Mapper.createObjectNode().put("e", CoreTransactionsExtension.EXT_BINARY_SUPPORT.value()).put("b", "f");
            ArrayNode fcea = Mapper.createArrayNode().add(fce);
            ObjectNode fc = Mapper.createObjectNode();
            fc.set(ForwardCompatibilityStage.WRITE_WRITE_CONFLICT_INSERTING.value(), fcea);
            fc.set(ForwardCompatibilityStage.WRITE_WRITE_CONFLICT_INSERTING_GET.value(), fcea);
            fc.set(ForwardCompatibilityStage.GETS.value(), fcea);
            fc.set(ForwardCompatibilityStage.CLEANUP_ENTRY.value(), fcea);
            ret.set("fc", fc);
        }
        try {
            return Mapper.writer().writeValueAsBytes(ret);
        }
        catch (JsonProcessingException e) {
            throw new DecodingFailureException(e);
        }
    }

    private Mono<Void> createStagedRemove(String operationId, CoreTransactionGetResult doc, long cas, SpanWrapper pspan, boolean accessDeleted) {
        return Mono.defer(() -> {
            SpanWrapper span = SpanWrapperUtil.createOp(this, this.tracer(), doc.collection(), doc.id(), "transaction_remove_stage", pspan);
            this.LOGGER.info(this.attemptId, "about to remove doc {} with cas {}", DebugUtil.docId(doc), cas);
            byte[] txn = this.createDocumentMetadata("remove", operationId, doc.documentMetadata(), doc.userFlags());
            return this.hooks.beforeStagedRemove.apply(this, doc.id()).then(TransactionKVHandler.mutateIn(this.core, doc.collection(), doc.id(), this.kvTimeoutMutating(), false, false, false, accessDeleted, false, cas, doc.userFlags(), this.durabilityLevel(), OptionsUtil.createClientContext("createStagedReplace"), span, Arrays.asList(new SubdocMutateRequest.Command(SubdocCommandType.DICT_UPSERT, "txn", txn, true, true, false, 0), new SubdocMutateRequest.Command(SubdocCommandType.DICT_ADD, "txn.op.crc32", this.serialize("${Mutation.value_crc32c}"), false, true, true, 2)))).publishOn(this.scheduler()).flatMap(updatedDoc -> this.hooks.afterStagedRemoveComplete.apply(this, doc.id()).thenReturn(updatedDoc)).flatMap(response -> {
                long elapsed = span.elapsedMicros();
                this.addUnits(response.flexibleExtras());
                this.LOGGER.info(this.attemptId, "staged remove of doc {}{} got cas {}, in {}us", DebugUtil.docId(doc), DebugUtil.dbg(response.flexibleExtras()), response.cas(), elapsed);
                doc.cas(response.cas());
                return this.addStagedMutation(new StagedMutation(operationId, doc.id(), doc.collection(), doc.cas(), doc.documentMetadata(), doc.crc32OfGet(), doc.userFlags(), null, 0, StagedMutationType.REMOVE));
            }).then().onErrorResume(err -> this.handleErrorOnStagedMutation("removing", doc.collection(), doc.id(), (Throwable)err, span, doc.crc32OfGet(), newCas -> this.createStagedRemove(operationId, doc, (long)newCas, span, accessDeleted))).doOnError(err -> span.finish((Throwable)err)).doOnTerminate(() -> span.finish());
        });
    }

    private Mono<Void> doUnderLock(String dbg, Supplier<Mono<Void>> whileLocked) {
        return this.lock(dbg).flatMap(lockToken -> Mono.defer(() -> CoreTransactionAttemptContext.lambda$doUnderLock$154((Supplier)whileLocked)).doFinally(v -> this.unlock((ReactiveLock.Waiter)lockToken, "doUnderLock on signal " + v).block()));
    }

    private Mono<Void> addStagedMutation(StagedMutation sm) {
        return Mono.defer(() -> this.doUnderLock("addStagedMutation " + DebugUtil.docId(sm.collection, sm.id), () -> Mono.fromRunnable(() -> {
            this.removeStagedMutationLocked(sm.collection, sm.id);
            this.stagedMutationsLocked.add(sm);
        })));
    }

    private <T> Mono<T> handleErrorOnStagedMutation(String stage, CollectionIdentifier collection, String id, Throwable err, SpanWrapper pspan, Optional<String> crc32FromGet, Function<Long, Mono<T>> callback) {
        ErrorClass ec = ErrorClass.classify(err);
        TransactionOperationFailedException.Builder out = TransactionOperationFailedException.Builder.createError().cause(err);
        MeteringUnits units = this.addUnits(MeteringUnits.from(err));
        this.LOGGER.info(this.attemptId, "error while {} doc {}{} in {}us: {}", stage, DebugUtil.docId(collection, id), DebugUtil.dbg(units), pspan.elapsedMicros(), CoreTransactionAttemptContext.dbg(err));
        if (this.expiryOvertimeMode) {
            this.LOGGER.warn(this.attemptId, "should not reach here in expiryOvertimeMode");
        }
        if (ec == ErrorClass.FAIL_EXPIRY) {
            return this.setExpiryOvertimeModeAndFail(err, stage, ec);
        }
        if (ec == ErrorClass.FAIL_CAS_MISMATCH) {
            return this.handleDocChangedDuringStaging(pspan, id, collection, crc32FromGet, callback);
        }
        if (ec == ErrorClass.FAIL_DOC_NOT_FOUND) {
            return Mono.error((Throwable)this.operationFailed(TransactionOperationFailedException.Builder.createError().retryTransaction().build()));
        }
        if (ec == ErrorClass.FAIL_AMBIGUOUS || ec == ErrorClass.FAIL_TRANSIENT) {
            return Mono.error((Throwable)this.operationFailed(out.retryTransaction().build()));
        }
        if (ec == ErrorClass.FAIL_HARD) {
            return Mono.error((Throwable)this.operationFailed(out.doNotRollbackAttempt().build()));
        }
        return Mono.error((Throwable)this.operationFailed(out.build()));
    }

    private Optional<StagedMutation> findStagedMutationLocked(CoreTransactionGetResult doc) {
        return this.findStagedMutationLocked(doc.collection(), doc.id());
    }

    private Optional<StagedMutation> findStagedMutationLocked(CollectionIdentifier collection, String docId) {
        this.assertLocked("findStagedMutation");
        return this.stagedMutationsLocked.stream().filter(v -> v.collection.equals(collection) && v.id.equals(docId)).findFirst();
    }

    private void removeStagedMutationLocked(CollectionIdentifier collection, String id) {
        this.assertLocked("removeStagedMutation");
        this.stagedMutationsLocked.removeIf(v -> v.collection.equals(collection) && v.id.equals(id));
    }

    private static LogDeferThrowable dbg(Throwable err) {
        return DebugUtil.dbg(err);
    }

    private Mono<CoreTransactionGetResult> handleDocExistsDuringStagedInsert(String operationId, CollectionIdentifier collection, String id, byte[] content, int flags, SpanWrapper pspan) {
        String bp = "DocExists on " + DebugUtil.docId(collection, id) + ": ";
        MeteringUnits.MeteringUnitsBuilder units = new MeteringUnits.MeteringUnitsBuilder();
        return this.hooks.beforeGetDocInExistsDuringStagedInsert.apply(this, id).then(DocumentGetter.justGetDoc(this.core, collection, id, this.kvTimeoutNonMutating(), pspan, true, this.logger(), units, false)).publishOn(this.scheduler()).doOnSubscribe(x -> this.LOGGER.info(this.attemptId, "{} getting doc (which may be a tombstone)", bp)).onErrorResume(err -> {
            this.addUnits(units.build());
            ErrorClass ec = ErrorClass.classify(err);
            TransactionOperationFailedException.Builder e = TransactionOperationFailedException.Builder.createError().cause((Throwable)err);
            this.LOGGER.warn(this.attemptId, "{} got error while getting doc: {}", bp, CoreTransactionAttemptContext.dbg(err));
            if (ec == ErrorClass.FAIL_TRANSIENT || ec == ErrorClass.FAIL_PATH_NOT_FOUND) {
                e.retryTransaction();
            }
            return Mono.error((Throwable)this.operationFailed(e.build()));
        }).flatMap(v -> {
            if (v.isPresent()) {
                Tuple2 results = (Tuple2)v.get();
                CoreTransactionGetResult r = (CoreTransactionGetResult)results.getT1();
                CoreSubdocGetResult lir = (CoreSubdocGetResult)results.getT2();
                MeteringUnits built = this.addUnits(units.build());
                this.LOGGER.info(this.attemptId, "{} doc {} exists inTransaction={} isDeleted={}{}", bp, DebugUtil.docId(collection, id), r.links(), lir.tombstone(), DebugUtil.dbg(built));
                return this.forwardCompatibilityCheck(ForwardCompatibilityStage.WRITE_WRITE_CONFLICT_INSERTING_GET, r.links().forwardCompatibility()).then(Mono.defer(() -> {
                    if (lir.tombstone() && !r.links().isDocumentInTransaction()) {
                        this.LOGGER.info(this.attemptId, "{} doc {} is a regular tombstone without txn metadata, proceeding to overwrite", bp, DebugUtil.docId(collection, id));
                        return this.createStagedInsert(operationId, collection, id, content, flags, pspan, Optional.of(r.cas()));
                    }
                    if (!r.links().isDocumentInTransaction()) {
                        this.LOGGER.info(this.attemptId, "{} doc {} exists but is not in txn, raising DocumentExistsException", bp, DebugUtil.docId(collection, id));
                        return Mono.error((Throwable)new DocumentExistsException(ReducedKeyValueErrorContext.create(id)));
                    }
                    if (r.links().stagedAttemptId().get().equals(this.attemptId)) {
                        if (r.links().stagedOperationId().isPresent() && r.links().stagedOperationId().get().equals(operationId)) {
                            this.LOGGER.info(this.attemptId, "{} doc {} has the same operation id, must be a resolved ambiguity, proceeding", bp, DebugUtil.docId(collection, id));
                            return this.addStagedMutation(new StagedMutation(operationId, r.id(), r.collection(), r.cas(), r.documentMetadata(), r.crc32OfGet(), flags, r.links().stagedContentJsonOrBinary().get(), flags, StagedMutationType.INSERT)).thenReturn((Object)r);
                        }
                        this.LOGGER.info(this.attemptId, "{} doc {} has the same attempt id but a different operation id, must be racing with a concurrent attempt to write the same doc", bp, DebugUtil.docId(collection, id));
                        return Mono.error((Throwable)this.operationFailed(TransactionOperationFailedException.Builder.createError().cause(new ConcurrentOperationsDetectedOnSameDocumentException()).build()));
                    }
                    if (!r.links().op().get().equals("insert")) {
                        this.LOGGER.info(this.attemptId, "{} doc {} is in a txn but is not a staged insert, raising DocumentExistsException", bp, DebugUtil.docId(collection, id));
                        return Mono.error((Throwable)new DocumentExistsException(ReducedKeyValueErrorContext.create(id)));
                    }
                    return this.checkAndHandleBlockingTxn(r, pspan, ForwardCompatibilityStage.WRITE_WRITE_CONFLICT_INSERTING, Optional.empty()).then(this.overwriteStagedInsert(operationId, collection, id, content, flags, pspan, bp, r, lir));
                }));
            }
            this.LOGGER.info(this.attemptId, "{} completed get of {}, could not find, throwing to retry txn which should succeed now", bp, DebugUtil.docId(collection, id));
            return Mono.error((Throwable)this.operationFailed(TransactionOperationFailedException.Builder.createError().retryTransaction().build()));
        });
    }

    private Mono<CoreTransactionGetResult> overwriteStagedInsert(String operationId, CollectionIdentifier collection, String id, byte[] content, int flags, SpanWrapper pspan, String bp, CoreTransactionGetResult r, CoreSubdocGetResult lir) {
        return Mono.defer(() -> {
            CbPreconditions.check(r.links().isDocumentInTransaction());
            CbPreconditions.check(r.links().op().get().equals("insert"));
            if (lir.tombstone()) {
                return this.createStagedInsert(operationId, collection, id, content, flags, pspan, Optional.of(r.cas()));
            }
            this.LOGGER.info(this.attemptId, "{} removing {} as it's a protocol 1.0 staged insert", bp, DebugUtil.docId(collection, id));
            return this.hooks.beforeOverwritingStagedInsertRemoval.apply(this, id).then(TransactionKVHandler.remove(this.core, collection, id, this.kvTimeoutMutating(), lir.cas(), this.durabilityLevel(), OptionsUtil.createClientContext("overwriteStagedInsert"), pspan)).doOnNext(v -> this.addUnits(v.flexibleExtras())).onErrorResume(err -> {
                MeteringUnits units = this.addUnits(MeteringUnits.from(err));
                this.LOGGER.warn(this.attemptId, "{} hit error {} while removing {}{}", bp, DebugUtil.dbg(err), DebugUtil.docId(collection, id), DebugUtil.dbg(units));
                ErrorClass ec = ErrorClass.classify(err);
                TransactionOperationFailedException.Builder out = TransactionOperationFailedException.Builder.createError().cause((Throwable)err);
                if (ec == ErrorClass.FAIL_DOC_NOT_FOUND || ec == ErrorClass.FAIL_CAS_MISMATCH || ec == ErrorClass.FAIL_TRANSIENT) {
                    out.retryTransaction();
                }
                return Mono.error((Throwable)this.operationFailed(out.build()));
            }).then(this.createStagedInsert(operationId, collection, id, content, flags, pspan, Optional.empty()));
        });
    }

    private Mono<Boolean> supportsReplaceBodyWithXattr(String bucketName) {
        return BucketConfigUtil.waitForBucketConfig(this.core, bucketName, Duration.of(this.expiryRemainingMillis(), ChronoUnit.MILLIS)).map(bc -> bc.bucketCapabilities().contains((Object)BucketCapabilities.SUBDOC_REVIVE_DOCUMENT));
    }

    private Mono<CoreTransactionGetResult> createStagedInsert(String operationId, CollectionIdentifier collection, String id, byte[] content, int flagsOfContentToStage, SpanWrapper pspan, Optional<Long> cas) {
        return Mono.defer(() -> {
            this.assertNotLocked("createStagedInsert");
            SpanWrapper span = SpanWrapperUtil.createOp(this, this.tracer(), collection, id, "transaction_insert_stage", pspan);
            boolean isBinary = CodecFlags.extractCommonFormatFlags(flagsOfContentToStage) == CodecFlags.CommonFlags.BINARY.ordinal();
            byte[] txn = this.createDocumentMetadata("insert", operationId, Optional.empty(), flagsOfContentToStage);
            return Mono.defer(() -> {
                this.LOGGER.info(this.attemptId, "about to insert staged doc {} as shadow document, cas={}, operationId={}", DebugUtil.docId(collection, id), cas, operationId);
                return this.errorIfExpiredAndNotInExpiryOvertimeMode("createdStagedInsert", Optional.of(id));
            }).then(this.hooks.beforeStagedInsert.apply(this, id)).then(TransactionKVHandler.mutateIn(this.core, collection, id, this.kvTimeoutMutating(), !cas.isPresent(), false, false, true, true, cas.orElse(0L), flagsOfContentToStage, this.durabilityLevel(), OptionsUtil.createClientContext("createStagedInsert"), span, Arrays.asList(new SubdocMutateRequest.Command(SubdocCommandType.DICT_UPSERT, "txn", txn, true, true, false, 0), isBinary ? new SubdocMutateRequest.Command(SubdocCommandType.DICT_UPSERT, "txn.op.bin", content, false, true, false, true, 3) : new SubdocMutateRequest.Command(SubdocCommandType.DICT_UPSERT, "txn.op.stgd", content, false, true, false, 3), new SubdocMutateRequest.Command(SubdocCommandType.DICT_UPSERT, "txn.op.crc32", this.serialize("${Mutation.value_crc32c}"), false, true, true, 4)))).publishOn(this.scheduler()).flatMap(response -> this.hooks.afterStagedInsertComplete.apply(this, id).thenReturn(response)).doOnNext(response -> {
                long elapsed = span.elapsedMicros();
                this.addUnits(response.flexibleExtras());
                this.LOGGER.info(this.attemptId, "inserted doc {}{} got cas {}, in {}us", DebugUtil.docId(collection, id), DebugUtil.dbg(response.flexibleExtras()), response.cas(), elapsed);
            }).flatMap(updatedDoc -> {
                CoreTransactionGetResult out = CoreTransactionGetResult.createFromInsert(collection, id, content, flagsOfContentToStage, this.transactionId(), this.attemptId, this.atrId.get(), this.atrCollection.get().bucket(), this.atrCollection.get().scope().get(), this.atrCollection.get().collection().get(), updatedDoc.cas());
                return this.supportsReplaceBodyWithXattr(collection.bucket()).flatMap(supports -> this.addStagedMutation(new StagedMutation(operationId, out.id(), out.collection(), out.cas(), out.documentMetadata(), Optional.empty(), flagsOfContentToStage, supports != false ? null : content, flagsOfContentToStage, StagedMutationType.INSERT)).thenReturn((Object)out));
            }).onErrorResume(err -> {
                MeteringUnits units = this.addUnits(MeteringUnits.from(err));
                this.LOGGER.info(this.attemptId, "got err while staging insert of {}{}: {}", DebugUtil.docId(collection, id), DebugUtil.dbg(units), CoreTransactionAttemptContext.dbg(err));
                ErrorClass ec = ErrorClass.classify(err);
                TransactionOperationFailedException.Builder out = TransactionOperationFailedException.Builder.createError().cause((Throwable)err);
                if (err instanceof FeatureNotAvailableException) {
                    return Mono.error((Throwable)this.operationFailed(out.build()));
                }
                if (this.expiryOvertimeMode) {
                    return this.mapErrorInOvertimeToExpired(true, "createdStagedInsert", (Throwable)err, TransactionOperationFailedException.FinalErrorToRaise.TRANSACTION_EXPIRED);
                }
                if (ec == ErrorClass.FAIL_EXPIRY) {
                    return this.setExpiryOvertimeModeAndFail((Throwable)err, "createdStagedInsert", ec);
                }
                if (ec == ErrorClass.FAIL_AMBIGUOUS) {
                    return Mono.delay((Duration)DEFAULT_DELAY_RETRYING_OPERATION, (Scheduler)this.scheduler()).then(this.createStagedInsert(operationId, collection, id, content, flagsOfContentToStage, span, cas));
                }
                if (ec == ErrorClass.FAIL_TRANSIENT) {
                    return Mono.error((Throwable)this.operationFailed(out.retryTransaction().build()));
                }
                if (ec == ErrorClass.FAIL_HARD) {
                    return Mono.error((Throwable)this.operationFailed(out.doNotRollbackAttempt().build()));
                }
                if (ec == ErrorClass.FAIL_DOC_ALREADY_EXISTS || ec == ErrorClass.FAIL_CAS_MISMATCH) {
                    return this.handleDocExistsDuringStagedInsert(operationId, collection, id, content, flagsOfContentToStage, span);
                }
                return Mono.error((Throwable)this.operationFailed(out.build()));
            }).doOnError(err -> span.finish((Throwable)err)).doOnTerminate(() -> span.finish());
        });
    }

    public Mono<Void> remove(CoreTransactionGetResult doc, SpanWrapper pspan) {
        return this.doKVOperation("remove " + DebugUtil.docId(doc), pspan, "remove", doc.collection(), doc.id(), (operationId, span, lockToken) -> this.removeInternalLocked((String)operationId, doc, (SpanWrapper)span, (ReactiveLock.Waiter)lockToken).thenReturn((Object)1)).then();
    }

    private Mono<Void> removeInternalLocked(String operationId, CoreTransactionGetResult doc, SpanWrapper span, ReactiveLock.Waiter lockToken) {
        return Mono.defer(() -> {
            this.LOGGER.info(this.attemptId, "remove doc {}, operationId={}", DebugUtil.docId(doc), operationId);
            if (this.queryModeLocked()) {
                return this.removeWithQueryLocked(doc, lockToken, span);
            }
            return this.removeWithKVLocked(operationId, doc, span, lockToken);
        });
    }

    private Mono<Void> removeWithKVLocked(String operationId, CoreTransactionGetResult doc, SpanWrapper span, ReactiveLock.Waiter lockToken) {
        return Mono.defer(() -> {
            boolean mayNeedToWriteATR = this.state == AttemptState.NOT_STARTED;
            Optional<StagedMutation> existing = this.findStagedMutationLocked(doc);
            return this.hooks.beforeUnlockRemove.apply(this, doc.id()).then(this.unlock(lockToken, "standard")).then(Mono.defer(() -> {
                if (existing.isPresent()) {
                    StagedMutation op = (StagedMutation)existing.get();
                    this.LOGGER.info(this.attemptId, "found previous write of {} as {} on remove", new Object[]{DebugUtil.docId(doc), op.type});
                    if (op.type == StagedMutationType.REMOVE) {
                        return Mono.error((Throwable)this.operationFailed(TransactionOperationFailedException.Builder.createError().cause(new DocumentNotFoundException(null)).build()));
                    }
                    if (op.type == StagedMutationType.INSERT) {
                        return this.removeStagedInsert(doc, span);
                    }
                }
                return this.checkAndHandleBlockingTxn(doc, span, ForwardCompatibilityStage.WRITE_WRITE_CONFLICT_REMOVING, existing).then(this.initATRIfNeeded(mayNeedToWriteATR, doc.collection(), doc.id(), span)).then(this.createStagedRemove(operationId, doc, doc.cas(), span, doc.links().isDeleted()));
            }));
        });
    }

    private Mono<Void> checkAndHandleBlockingTxn(CoreTransactionGetResult doc, SpanWrapper pspan, ForwardCompatibilityStage stage, Optional<StagedMutation> existingOpt) {
        if (doc.links().hasStagedWrite()) {
            if (doc.links().stagedTransactionId().get().equals(this.transactionId())) {
                if (doc.links().stagedAttemptId().get().equals(this.attemptId)) {
                    if (existingOpt.isPresent()) {
                        StagedMutation existing = existingOpt.get();
                        if (existing.cas != doc.cas()) {
                            this.LOGGER.info(this.attemptId, "concurrent op race detected on doc {}: have read a document before a concurrent op wrote its stagedMutation", DebugUtil.docId(doc));
                            return Mono.error((Throwable)this.operationFailed(TransactionOperationFailedException.Builder.createError().cause(new ConcurrentOperationsDetectedOnSameDocumentException()).build()));
                        }
                    } else {
                        this.LOGGER.info(this.attemptId, "concurrent op race detected on doc {}: can see the KV result of another op, but stagedMutation not yet written", DebugUtil.docId(doc));
                        return Mono.error((Throwable)this.operationFailed(TransactionOperationFailedException.Builder.createError().cause(new ConcurrentOperationsDetectedOnSameDocumentException()).build()));
                    }
                }
                this.LOGGER.info(this.attemptId, "doc {} has been written by a different attempt in transaction, ok to continue", DebugUtil.docId(doc));
                return Mono.empty();
            }
            if (doc.links().atrId().isPresent() && doc.links().atrBucketName().isPresent()) {
                this.LOGGER.info(this.attemptId, "doc {} is in another txn {}, checking ATR entry {}/{}/{} to see if blocked", DebugUtil.docId(doc), doc.links().stagedAttemptId().get(), doc.links().atrBucketName().orElse(""), doc.links().atrCollectionName().orElse(""), doc.links().atrId().orElse(""));
                return this.forwardCompatibilityCheck(stage, doc.links().forwardCompatibility()).then(this.checkATREntryForBlockingDoc(doc, pspan));
            }
            this.LOGGER.info(this.attemptId, "doc {} is in another txn {}, cannot check ATR entry - probably a bug, so proceeding to overwrite", DebugUtil.docId(doc), doc.links().stagedAttemptId().get());
            return Mono.empty();
        }
        return Mono.empty();
    }

    private byte[] listToDocRecords(List<StagedMutation> docs) throws JsonProcessingException {
        ArrayNode root = Mapper.createArrayNode();
        docs.forEach(doc -> {
            ObjectNode jn = Mapper.createObjectNode();
            jn.set("id", Mapper.convertValue((Object)doc.id, JsonNode.class));
            jn.set("bkt", Mapper.convertValue((Object)doc.collection.bucket(), JsonNode.class));
            jn.set("scp", Mapper.convertValue((Object)doc.collection.scope().orElse("_default"), JsonNode.class));
            jn.set("col", Mapper.convertValue((Object)doc.collection.collection().orElse("_default"), JsonNode.class));
            root.add(jn);
        });
        return Mapper.writer().writeValueAsBytes(root);
    }

    private List<SubdocMutateRequest.Command> addDocsToBuilder(int baseIndex) {
        String prefix = "attempts." + this.attemptId;
        try {
            return Arrays.asList(new SubdocMutateRequest.Command(SubdocCommandType.DICT_UPSERT, prefix + "." + "ins", this.listToDocRecords(this.stagedInsertsLocked()), false, true, false, baseIndex), new SubdocMutateRequest.Command(SubdocCommandType.DICT_UPSERT, prefix + "." + "rep", this.listToDocRecords(this.stagedReplacesLocked()), false, true, false, baseIndex + 1), new SubdocMutateRequest.Command(SubdocCommandType.DICT_UPSERT, prefix + "." + "rem", this.listToDocRecords(this.stagedRemovesLocked()), false, true, false, baseIndex + 2));
        }
        catch (JsonProcessingException e) {
            throw new RuntimeException(e);
        }
    }

    @Nullable
    private CleanupRequest createCleanupRequestIfNeeded(@Nullable CoreTransactionsCleanup cleanup) {
        block4: {
            block6: {
                block5: {
                    block3: {
                        if (this.config.cleanupConfig().runRegularAttemptsCleanupThread() && cleanup != null) break block3;
                        this.LOGGER.trace(this.attemptId(), "skipping addition of cleanup request on failure as regular cleanup disabled");
                        break block4;
                    }
                    if (!this.queryModeUnlocked()) break block5;
                    this.LOGGER.info(this.attemptId(), "Skipping cleanup request as in query mode");
                    break block4;
                }
                if (!this.atrId().isPresent() || !this.atrCollection().isPresent()) break block6;
                switch (this.state()) {
                    case NOT_STARTED: 
                    case COMPLETED: 
                    case ROLLED_BACK: {
                        this.LOGGER.trace(this.attemptId(), "Skipping addition of cleanup request in state {}", new Object[]{this.state()});
                        break block4;
                    }
                    default: {
                        this.LOGGER.trace(this.attemptId(), "Adding cleanup request for {}/{}", this.atrCollection().get().collection(), this.atrId().get());
                        return this.createCleanupRequest();
                    }
                }
            }
            this.LOGGER.trace(this.attemptId(), "Skipping cleanup request as no ATR entry to remove (due to no mutations)");
        }
        return null;
    }

    private CleanupRequest createCleanupRequest() {
        CbPreconditions.check(this.state != AttemptState.NOT_STARTED);
        CbPreconditions.check(this.state != AttemptState.COMPLETED);
        long transactionElapsedTimeMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - this.overall.startTimeClient());
        return new CleanupRequest(this.attemptId, this.atrId().get(), this.atrCollection().get(), this.state, this.toDocRecords(this.stagedMutationsLocked.stream().filter(v -> v.type == StagedMutationType.REPLACE).collect(Collectors.toList())), this.toDocRecords(this.stagedMutationsLocked.stream().filter(v -> v.type == StagedMutationType.REMOVE).collect(Collectors.toList())), this.toDocRecords(this.stagedMutationsLocked.stream().filter(v -> v.type == StagedMutationType.INSERT).collect(Collectors.toList())), Duration.ZERO, Optional.empty(), transactionElapsedTimeMillis, Optional.of(this.config.durabilityLevel()));
    }

    public Mono<Void> commit() {
        return this.commitInternal();
    }

    Mono<Void> implicitCommit(boolean singleQueryTransactionMode) {
        return Mono.defer(() -> {
            if (this.hasStateBit(1)) {
                return Mono.just((Object)this);
            }
            if (singleQueryTransactionMode) {
                return Mono.just((Object)this);
            }
            this.LOGGER.info(this.attemptId(), "doing implicit commit");
            return this.commitInternal();
        }).then();
    }

    Mono<Void> commitInternal() {
        return this.createMonoBridge("commit", Mono.defer(() -> {
            SpanWrapper span = SpanWrapperUtil.createOp(this, this.tracer(), null, null, "transaction_commit", this.attemptSpan);
            this.assertNotLocked("commit");
            return this.waitForAllOpsThenDoUnderLock("commit", span, () -> this.commitInternalLocked(span)).doOnError(err -> span.finish((Throwable)err)).doOnTerminate(() -> span.finish());
        }));
    }

    private Mono<Void> commitInternalLocked(SpanWrapper span) {
        return Mono.defer(() -> {
            this.assertLocked("commitInternal");
            TransactionOperationFailedException returnEarly = this.canPerformCommit("commit");
            if (returnEarly != null) {
                this.logger().info(this.attemptId, "commit raising {}", DebugUtil.dbg(returnEarly));
                return Mono.error((Throwable)returnEarly);
            }
            this.setStateBits("commit", 3, 0);
            if (this.queryModeLocked()) {
                return this.commitWithQueryLocked(span);
            }
            this.LOGGER.info(this.attemptId, "commit {}", this);
            this.checkExpiryPreCommitAndSetExpiryOvertimeMode("commit", Optional.empty());
            if (!this.atrCollection.isPresent() || !this.atrId.isPresent()) {
                return Mono.create(s -> {
                    this.LOGGER.info(this.attemptId, "calling commit on attempt that's got no mutations, skipping");
                    s.success();
                });
            }
            return this.commitActualLocked(span);
        }).subscribeOn(this.scheduler());
    }

    private Mono<Void> commitActualLocked(SpanWrapper span) {
        return Mono.defer(() -> {
            String prefix = "attempts." + this.attemptId;
            ArrayList<SubdocMutateRequest.Command> specs = new ArrayList<SubdocMutateRequest.Command>();
            specs.add(new SubdocMutateRequest.Command(SubdocCommandType.DICT_UPSERT, prefix + "." + "st", this.serialize(AttemptState.COMMITTED.name()), false, true, false, 0));
            specs.add(new SubdocMutateRequest.Command(SubdocCommandType.DICT_UPSERT, prefix + "." + "tsc", this.serialize("${Mutation.CAS}"), false, true, true, 1));
            specs.addAll(this.addDocsToBuilder(specs.size()));
            specs.add(new SubdocMutateRequest.Command(SubdocCommandType.DICT_ADD, prefix + "." + "p", this.serialize(0), false, true, false, specs.size()));
            AtomicReference<Long> overallStartTime = new AtomicReference<Long>(0L);
            return this.atrCommitLocked(specs, overallStartTime, span).then(this.commitDocsLocked(span)).then(this.atrCompleteLocked(prefix, overallStartTime, span)).doOnSuccess(ignore -> this.LOGGER.info(this.attemptId, "overall commit completed")).then();
        });
    }

    private Mono<Void> commitWithQueryLocked(SpanWrapper span) {
        return Mono.defer(() -> {
            int sidx = this.queryStatementIdx.getAndIncrement();
            return this.queryWrapperBlockingLocked(sidx, this.queryContext.queryContext, "COMMIT", null, "queryCommit", false, false, null, null, span, false, null, true).doOnNext(v -> this.setStateLocked(AttemptState.COMPLETED)).onErrorResume(err -> {
                ErrorClass ec = ErrorClass.classify(err);
                if (ec == ErrorClass.FAIL_EXPIRY) {
                    TransactionOperationFailedException e = this.operationFailed(TransactionOperationFailedException.Builder.createError().cause((Throwable)err).raiseException(TransactionOperationFailedException.FinalErrorToRaise.TRANSACTION_COMMIT_AMBIGUOUS).doNotRollbackAttempt().build());
                    return Mono.error((Throwable)e);
                }
                if (ec == ErrorClass.TRANSACTION_OPERATION_FAILED) {
                    return Mono.error((Throwable)err);
                }
                TransactionOperationFailedException e = this.operationFailed(TransactionOperationFailedException.Builder.createError().cause((Throwable)err).doNotRollbackAttempt().build());
                return Mono.error((Throwable)e);
            }).then();
        });
    }

    private void checkExpiryDuringCommitOrRollbackLocked(String stage, Optional<String> id) {
        this.assertLocked("checkExpiryDuringCommitOrRollbackLocked in stage " + stage);
        if (!this.expiryOvertimeMode) {
            if (this.hasExpiredClientSide(stage, id)) {
                this.LOGGER.info(this.attemptId, "has expired in stage {}, entering expiry-overtime mode (one attempt to complete)", stage);
                this.expiryOvertimeMode = true;
            }
        } else {
            this.LOGGER.info(this.attemptId, "ignoring expiry in stage {}, as in expiry-overtime mode", stage);
        }
    }

    private Mono<Void> atrCompleteLocked(String prefix, AtomicReference<Long> overallStartTime, SpanWrapper pspan) {
        return Mono.defer(() -> {
            this.assertLocked("atrComplete");
            SpanWrapper span = SpanWrapperUtil.createOp(this, this.tracer(), this.atrCollection.orElse(null), this.atrId.orElse(null), "transaction_atr_complete", pspan);
            this.LOGGER.info(this.attemptId, "about to remove ATR entry {}", this.getAtrDebug(this.atrCollection, this.atrId));
            return Mono.defer(() -> {
                if (!this.expiryOvertimeMode && this.hasExpiredClientSide("atrComplete", Optional.empty())) {
                    String msg = "has expired in stage atrComplete, but transaction has successfully completed so returning success";
                    this.LOGGER.info(this.attemptId, msg);
                    return Mono.error((Throwable)new AttemptExpiredException(msg));
                }
                return Mono.empty();
            }).then(this.hooks.beforeAtrComplete.apply(this)).then(TransactionKVHandler.mutateIn(this.core, this.atrCollection.get(), this.atrId.get(), this.kvTimeoutMutating(), false, false, false, false, false, 0L, CodecFlags.BINARY_COMMON_FLAGS, this.durabilityLevel(), OptionsUtil.createClientContext("atrComplete"), span, Arrays.asList(new SubdocMutateRequest.Command(SubdocCommandType.DELETE, prefix, null, false, true, false, 0)))).publishOn(this.scheduler()).flatMap(v -> this.hooks.afterAtrComplete.apply(this).thenReturn(v)).doOnNext(v -> {
                this.setStateLocked(AttemptState.COMPLETED);
                this.addUnits(v.flexibleExtras());
                long now = System.nanoTime();
                long elapsed = span.elapsedMicros();
                this.LOGGER.info(this.attemptId, "removed ATR {} in {}us{}, overall commit completed in {}us", this.getAtrDebug(this.atrCollection, this.atrId), elapsed, DebugUtil.dbg(v.flexibleExtras()), TimeUnit.NANOSECONDS.toMicros(now - (Long)overallStartTime.get()));
            }).then().onErrorResume(err -> {
                ErrorClass ec = ErrorClass.classify(err);
                MeteringUnits units = this.addUnits(MeteringUnits.from(err));
                this.LOGGER.info(this.attemptId, "error '{}' ec={} while removing ATR {}{}", new Object[]{err, ec, this.getAtrDebug(this.atrCollection, this.atrId), DebugUtil.dbg(units)});
                if (ec == ErrorClass.FAIL_HARD) {
                    return Mono.error((Throwable)this.operationFailed(TransactionOperationFailedException.Builder.createError().raiseException(TransactionOperationFailedException.FinalErrorToRaise.TRANSACTION_FAILED_POST_COMMIT).doNotRollbackAttempt().build()));
                }
                this.LOGGER.info(this.attemptId, "ignoring error during transaction tidyup, regarding as success");
                return Mono.empty();
            }).doOnError(err -> span.finish((Throwable)err)).doOnTerminate(() -> span.finish());
        });
    }

    private <T> Mono<T> mapErrorInOvertimeToExpired(boolean updateAppState, String stage, Throwable err, TransactionOperationFailedException.FinalErrorToRaise toRaise) {
        this.LOGGER.info(this.attemptId, "in expiry-overtime mode so changing error '{}' to raise {} in stage '{}'; no rollback will be tried", new Object[]{err, toRaise, stage});
        if (!this.expiryOvertimeMode) {
            this.LOGGER.warn(this.attemptId, "not in expiry-overtime mode handling error '{}' in stage {}, possibly a bug", err, stage);
        }
        return Mono.error((Throwable)this.operationFailed(updateAppState, TransactionOperationFailedException.Builder.createError().doNotRollbackAttempt().raiseException(toRaise).cause(new AttemptExpiredException(err)).build()));
    }

    private Mono<Void> removeDocLocked(SpanWrapper span, CollectionIdentifier collection, String id, boolean ambiguityResolutionMode) {
        return Mono.defer(() -> {
            this.assertLocked("removeDoc");
            return Mono.fromRunnable(() -> {
                this.LOGGER.info(this.attemptId, "about to remove doc {}, ambiguityResolutionMode={}", DebugUtil.docId(collection, id), ambiguityResolutionMode);
                this.checkExpiryDuringCommitOrRollbackLocked("removeDoc", Optional.of(id));
            }).then(this.hooks.beforeDocRemoved.apply(this, id)).then(TransactionKVHandler.remove(this.core, collection, id, this.kvTimeoutNonMutating(), 0L, this.durabilityLevel(), OptionsUtil.createClientContext("commitRemove"), span)).flatMap(mutationResult -> this.hooks.afterDocRemovedPreRetry.apply(this, id).thenReturn(mutationResult)).doOnNext(mutationResult -> {
                this.addUnits(mutationResult.flexibleExtras());
                this.LOGGER.info(this.attemptId, "commit - removed doc {}{}, mt = {}", DebugUtil.docId(collection, id), DebugUtil.dbg(mutationResult.flexibleExtras()), mutationResult.mutationToken());
            }).then().onErrorResume(err -> {
                ErrorClass ec = ErrorClass.classify(err);
                TransactionOperationFailedException.Builder e = TransactionOperationFailedException.Builder.createError().cause((Throwable)err).doNotRollbackAttempt().raiseException(TransactionOperationFailedException.FinalErrorToRaise.TRANSACTION_FAILED_POST_COMMIT);
                MeteringUnits units = this.addUnits(MeteringUnits.from(err));
                this.LOGGER.info("got error while removing doc {}{} in {}us: {}", DebugUtil.docId(collection, id), DebugUtil.dbg(units), span.elapsedMicros(), CoreTransactionAttemptContext.dbg(err));
                if (this.expiryOvertimeMode) {
                    return this.mapErrorInOvertimeToExpired(true, "removeDoc", (Throwable)err, TransactionOperationFailedException.FinalErrorToRaise.TRANSACTION_FAILED_POST_COMMIT);
                }
                if (ec == ErrorClass.FAIL_AMBIGUOUS) {
                    return Mono.delay((Duration)DEFAULT_DELAY_RETRYING_OPERATION, (Scheduler)this.scheduler()).then(this.removeDocLocked(span, collection, id, true));
                }
                if (ec == ErrorClass.FAIL_DOC_NOT_FOUND) {
                    return Mono.error((Throwable)this.operationFailed(e.build()));
                }
                if (ec == ErrorClass.FAIL_HARD) {
                    return Mono.error((Throwable)this.operationFailed(e.build()));
                }
                return Mono.error((Throwable)this.operationFailed(e.build()));
            }).then(this.hooks.afterDocRemovedPostRetry.apply(this, id)).then().doOnError(err -> span.span().status(RequestSpan.StatusCode.ERROR));
        });
    }

    private Mono<Void> commitDocsLocked(SpanWrapper span) {
        return Mono.defer(() -> {
            this.assertLocked("commitDocs");
            long start = System.nanoTime();
            return Flux.fromIterable(this.stagedMutationsLocked).publishOn(this.scheduler()).flatMap(staged -> this.commitDocWrapperLocked(span, (StagedMutation)staged), UNSTAGING_PARALLELISM).then(Mono.defer(() -> {
                long elapsed = TimeUnit.NANOSECONDS.toMicros(System.nanoTime() - start);
                this.LOGGER.info(this.attemptId, "commit - all {} docs committed in {}us", this.stagedMutationsLocked.size(), elapsed);
                return this.hooks.afterDocsCommitted.apply(this);
            })).then().doOnError(err -> span.span().status(RequestSpan.StatusCode.ERROR));
        });
    }

    private static String msgDocChangedUnexpectedly(CollectionIdentifier collection, String id) {
        return "Tried committing document " + DebugUtil.docId(collection, id) + ", but found that it has been modified by another party in-between staging and committing.  The application must ensure that non-transactional writes cannot happen at the same time as transactional writes on a document. This document may need manual review to verify that no changes have been lost.";
    }

    private static String msgDocRemovedUnexpectedly(CollectionIdentifier collection, String id, boolean willBeWritten) {
        if (willBeWritten) {
            return "Tried committing document " + DebugUtil.docId(collection, id) + ", but found that it has been removed by another party in-between staging and committing.  The application must ensure that non-transactional writes cannot happen at the same time as transactional writes on a document.  The document will be written.";
        }
        return "Tried committing document " + DebugUtil.docId(collection, id) + ", but found that it has been removed by another party in-between staging and committing.  The application must ensure that non-transactional writes cannot happen at the same time as transactional writes on a document.  The document will be left removed, and the transaction's changes will not be written to this document";
    }

    private Mono<Void> commitDocWrapperLocked(SpanWrapper pspan, StagedMutation staged) {
        return Mono.defer(() -> {
            if (staged.type == StagedMutationType.REMOVE) {
                return this.removeDocLocked(pspan, staged.collection, staged.id, false);
            }
            return this.commitDocLocked(pspan, staged, staged.cas, staged.type == StagedMutationType.INSERT, false);
        });
    }

    private Mono<Void> commitDocLocked(SpanWrapper span, StagedMutation staged, long cas, boolean insertMode, boolean ambiguityResolutionMode) {
        return Mono.defer(() -> {
            this.assertLocked("commitDoc");
            String id = staged.id;
            CollectionIdentifier collection = staged.collection;
            return Mono.fromRunnable(() -> {
                this.LOGGER.info(this.attemptId, "commit - committing doc {}, cas={}, insertMode={}, ambiguity-resolution={} supportsReplaceBodyWithXattr={} binary={} userFlags={}", DebugUtil.docId(collection, id), cas, insertMode, ambiguityResolutionMode, staged.supportsReplaceBodyWithXattr(), staged.isStagedBinary(), staged.stagedUserFlags);
                this.checkExpiryDuringCommitOrRollbackLocked("commitDoc", Optional.of(id));
            }).then(this.hooks.beforeDocCommitted.apply(this, id)).then(Mono.defer(() -> {
                if (insertMode) {
                    if (staged.supportsReplaceBodyWithXattr()) {
                        return TransactionKVHandler.mutateIn(this.core, collection, id, this.kvTimeoutMutating(), false, false, true, true, false, cas, staged.stagedUserFlags, this.durabilityLevel(), OptionsUtil.createClientContext("commitDocInsert"), span, Arrays.asList(staged.isStagedBinary() ? new SubdocMutateRequest.Command(SubdocCommandType.REPLACE_BODY_WITH_XATTR, "txn.op.bin", null, false, true, false, true, 0) : new SubdocMutateRequest.Command(SubdocCommandType.REPLACE_BODY_WITH_XATTR, "txn.op.stgd", null, false, true, false, false, 0), new SubdocMutateRequest.Command(SubdocCommandType.DELETE, "txn", null, false, true, false, 1))).doOnNext(v -> {
                            this.addUnits(v.flexibleExtras());
                            this.LOGGER.info(this.attemptId, "commit - committed doc insert swap body {} got cas {}{}", DebugUtil.docId(collection, id), v.cas(), DebugUtil.dbg(v.flexibleExtras()));
                        }).map(SubdocMutateResponse::cas);
                    }
                    return TransactionKVHandler.insert(this.core, collection, id, staged.content, staged.stagedUserFlags, this.kvTimeoutMutating(), this.durabilityLevel(), OptionsUtil.createClientContext("commitDocInsert"), span).doOnNext(v -> {
                        this.addUnits(v.flexibleExtras());
                        this.LOGGER.info(this.attemptId, "commit - committed doc insert {} got cas {}{}", DebugUtil.docId(collection, id), v.cas(), DebugUtil.dbg(v.flexibleExtras()));
                    }).map(InsertResponse::cas);
                }
                if (staged.supportsReplaceBodyWithXattr()) {
                    return TransactionKVHandler.mutateIn(this.core, collection, id, this.kvTimeoutMutating(), false, false, false, false, false, cas, staged.stagedUserFlags, this.durabilityLevel(), OptionsUtil.createClientContext("commitDoc"), span, Arrays.asList(staged.isStagedBinary() ? new SubdocMutateRequest.Command(SubdocCommandType.REPLACE_BODY_WITH_XATTR, "txn.op.bin", null, false, true, false, true, 0) : new SubdocMutateRequest.Command(SubdocCommandType.REPLACE_BODY_WITH_XATTR, "txn.op.stgd", null, false, true, false, false, 0), new SubdocMutateRequest.Command(SubdocCommandType.DELETE, "txn", null, false, true, false, 1))).doOnNext(v -> {
                        this.addUnits(v.flexibleExtras());
                        this.LOGGER.info(this.attemptId, "commit - committed doc replace swap body {} got cas {}{}", DebugUtil.docId(collection, id), v.cas(), DebugUtil.dbg(v.flexibleExtras()));
                    }).map(SubdocMutateResponse::cas);
                }
                return TransactionKVHandler.mutateIn(this.core, collection, id, this.kvTimeoutMutating(), false, false, false, false, false, cas, staged.stagedUserFlags, this.durabilityLevel(), OptionsUtil.createClientContext("commitDoc"), span, Arrays.asList(new SubdocMutateRequest.Command(SubdocCommandType.DICT_UPSERT, "txn", this.serialize(null), false, true, false, 0), new SubdocMutateRequest.Command(SubdocCommandType.DELETE, "txn", null, false, true, false, 1), new SubdocMutateRequest.Command(SubdocCommandType.SET_DOC, "", staged.content, false, false, false, 2))).doOnNext(v -> {
                    this.addUnits(v.flexibleExtras());
                    this.LOGGER.info(this.attemptId, "commit - committed doc replace {} got cas {}{}", DebugUtil.docId(collection, id), v.cas(), DebugUtil.dbg(v.flexibleExtras()));
                }).map(SubdocMutateResponse::cas);
            })).publishOn(this.scheduler()).flatMap(v -> this.hooks.afterDocCommittedBeforeSavingCAS.apply(this, id).thenReturn(v)).flatMap(newCas -> this.hooks.afterDocCommitted.apply(this, id)).onErrorResume(err -> {
                ErrorClass ec = ErrorClass.classify(err);
                TransactionOperationFailedException.Builder e = TransactionOperationFailedException.Builder.createError().cause((Throwable)err).doNotRollbackAttempt().raiseException(TransactionOperationFailedException.FinalErrorToRaise.TRANSACTION_FAILED_POST_COMMIT);
                MeteringUnits units = this.addUnits(MeteringUnits.from(err));
                this.LOGGER.info(this.attemptId, "error while committing doc {}{} in {}us: {}", DebugUtil.docId(collection, id), DebugUtil.dbg(units), span.elapsedMicros(), CoreTransactionAttemptContext.dbg(err));
                if (this.expiryOvertimeMode) {
                    return this.mapErrorInOvertimeToExpired(true, "commitDoc", (Throwable)err, TransactionOperationFailedException.FinalErrorToRaise.TRANSACTION_FAILED_POST_COMMIT).thenReturn((Object)0);
                }
                if (ec == ErrorClass.FAIL_AMBIGUOUS) {
                    this.LOGGER.warn(this.attemptId, "{} while committing doc {}: as op is ambiguously successful, retrying op in ambiguity-resolution mode", DebugUtil.dbg(err), DebugUtil.docId(collection, id));
                    return this.commitDocLocked(span, staged, cas, insertMode, true).thenReturn((Object)0);
                }
                if (ec == ErrorClass.FAIL_CAS_MISMATCH) {
                    return this.handleDocChangedDuringCommit(span, staged, insertMode).thenReturn((Object)0);
                }
                if (ec == ErrorClass.FAIL_DOC_NOT_FOUND) {
                    return this.handleDocMissingDuringCommit(span, staged);
                }
                if (ec == ErrorClass.FAIL_DOC_ALREADY_EXISTS) {
                    if (ambiguityResolutionMode) {
                        return Mono.error((Throwable)e.build());
                    }
                    String msg = CoreTransactionAttemptContext.msgDocChangedUnexpectedly(collection, id);
                    this.LOGGER.warn(this.attemptId, msg);
                    this.LOGGER.eventBus().publish(new IllegalDocumentStateEvent(Event.Severity.WARN, msg, id));
                    if (staged.supportsReplaceBodyWithXattr()) {
                        return Mono.empty();
                    }
                    return Mono.delay((Duration)DEFAULT_DELAY_RETRYING_OPERATION).then(this.commitDocLocked(span, staged, cas, false, ambiguityResolutionMode).thenReturn((Object)0));
                }
                if (ec == ErrorClass.FAIL_HARD) {
                    return Mono.error((Throwable)this.operationFailed(e.build()));
                }
                return Mono.error((Throwable)this.operationFailed(e.build()));
            }).then().doOnError(err -> span.span().status(RequestSpan.StatusCode.ERROR));
        });
    }

    private void addUnits(CoreKvResponseMetadata meta) {
        this.meteringUnitsBuilder.add(meta);
    }

    private void addUnits(@Nullable MemcacheProtocol.FlexibleExtras flexibleExtras) {
        this.meteringUnitsBuilder.add(flexibleExtras);
    }

    private MeteringUnits addUnits(@Nullable MeteringUnits units) {
        this.meteringUnitsBuilder.add(units);
        return units;
    }

    private Mono<Integer> handleDocMissingDuringCommit(SpanWrapper pspan, StagedMutation staged) {
        return Mono.defer(() -> {
            String msg = CoreTransactionAttemptContext.msgDocRemovedUnexpectedly(staged.collection, staged.id, true);
            this.LOGGER.warn(this.attemptId, msg);
            this.LOGGER.eventBus().publish(new IllegalDocumentStateEvent(Event.Severity.WARN, msg, staged.id));
            return Mono.delay((Duration)DEFAULT_DELAY_RETRYING_OPERATION).then(this.commitDocLocked(pspan, staged, 0L, true, false).thenReturn((Object)0));
        });
    }

    public RequestSpan span() {
        return this.attemptSpan.span();
    }

    private DocChanged getDocChanged(CoreTransactionGetResult gr, String stage, Optional<String> crc32Then, String crc32Now) {
        boolean unclearIfBodyHasChanged = !crc32Then.isPresent();
        boolean bodyHasChanged = crc32Then.isPresent() && !crc32Now.equals(crc32Then.get());
        boolean inDifferentTransaction = gr.links() != null && gr.links().stagedAttemptId().isPresent() && !gr.links().stagedAttemptId().get().equals(this.attemptId);
        boolean notInTransaction = gr.links() == null || !gr.links().isDocumentInTransaction();
        DocChanged out = new DocChanged(unclearIfBodyHasChanged, bodyHasChanged, inDifferentTransaction, notInTransaction);
        this.LOGGER.info(this.attemptId, "handling doc changed during {} fetched doc {}, unclearIfBodyHasChanged = {}, bodyHasChanged = {}, inDifferentTransaction = {}, notInTransaction = {}, inSameTransaction = {} links = {} metadata = {} cas = {} crc32Then = {}, crc32Now = {}", stage, DebugUtil.docId(gr.collection(), gr.id()), unclearIfBodyHasChanged, bodyHasChanged, inDifferentTransaction, notInTransaction, out.inSameTransaction(), gr.links(), gr.documentMetadata(), gr.cas(), crc32Then, crc32Now);
        return out;
    }

    private Mono<Void> handleDocChangedDuringCommit(SpanWrapper span, StagedMutation staged, boolean insertMode) {
        return Mono.defer(() -> {
            String id = staged.id;
            CollectionIdentifier collection = staged.collection;
            MeteringUnits.MeteringUnitsBuilder units = new MeteringUnits.MeteringUnitsBuilder();
            return Mono.fromRunnable(() -> {
                this.LOGGER.info(this.attemptId, "commit - handling doc changed {}, insertMode={}", DebugUtil.docId(collection, id), insertMode);
                if (this.hasExpiredClientSide("commitDocChanged", Optional.of(staged.id))) {
                    this.LOGGER.info(this.attemptId, "has expired in stage {}", "commitDocChanged");
                    throw this.operationFailed(TransactionOperationFailedException.Builder.createError().raiseException(TransactionOperationFailedException.FinalErrorToRaise.TRANSACTION_FAILED_POST_COMMIT).doNotRollbackAttempt().cause(new AttemptExpiredException("Attempt has expired in stage commitDocChanged")).build());
                }
            }).then(this.hooks.beforeDocChangedDuringCommit.apply(this, id)).then(DocumentGetter.getAsync(this.core, this.LOGGER, staged.collection, this.config, staged.id, this.attemptId, true, span, Optional.empty(), units, this.overall.supported(), false)).publishOn(this.scheduler()).onErrorResume(err -> {
                ErrorClass ec = ErrorClass.classify(err);
                MeteringUnits built = this.addUnits(units.build());
                span.recordException((Throwable)err);
                this.LOGGER.info(this.attemptId, "commit - handling doc changed {}{}, got error {}", DebugUtil.docId(collection, id), DebugUtil.dbg(built), CoreTransactionAttemptContext.dbg(err));
                if (ec == ErrorClass.TRANSACTION_OPERATION_FAILED) {
                    return Mono.error((Throwable)err);
                }
                if (ec == ErrorClass.FAIL_TRANSIENT) {
                    return Mono.error((Throwable)new RetryOperationException());
                }
                return Mono.error((Throwable)this.operationFailed(TransactionOperationFailedException.Builder.createError().doNotRollbackAttempt().raiseException(TransactionOperationFailedException.FinalErrorToRaise.TRANSACTION_FAILED_POST_COMMIT).cause((Throwable)err).build()));
            }).flatMap(doc -> {
                this.addUnits(units.build());
                if (doc.isPresent()) {
                    CoreTransactionGetResult gr = (CoreTransactionGetResult)doc.get();
                    DocChanged dc = this.getDocChanged(gr, "commit", gr.links().crc32OfStaging(), gr.crc32OfGet().get());
                    return this.forwardCompatibilityCheck(ForwardCompatibilityStage.CAS_MISMATCH_DURING_COMMIT, gr.links().forwardCompatibility()).then(Mono.defer(() -> {
                        if (dc.inDifferentTransaction || dc.notInTransaction) {
                            return Mono.empty();
                        }
                        if (dc.bodyHasChanged) {
                            String msg = CoreTransactionAttemptContext.msgDocChangedUnexpectedly(staged.collection, staged.id);
                            this.LOGGER.warn(this.attemptId, msg);
                            this.LOGGER.eventBus().publish(new IllegalDocumentStateEvent(Event.Severity.WARN, msg, staged.id));
                        }
                        return Mono.delay((Duration)DEFAULT_DELAY_RETRYING_OPERATION).then(this.commitDocLocked(span, staged, gr.cas(), insertMode, false));
                    }));
                }
                return this.handleDocMissingDuringCommit(span, staged);
            }).retryWhen(RETRY_OPERATION_UNTIL_EXPIRY_WITH_FIXED_RETRY).then().doOnError(err -> span.setErrorStatus());
        });
    }

    private <T> Mono<T> handleDocChangedDuringStaging(SpanWrapper span, String id, CollectionIdentifier collection, Optional<String> crc32FromGet, Function<Long, Mono<T>> callback) {
        return Mono.defer(() -> {
            MeteringUnits.MeteringUnitsBuilder units = new MeteringUnits.MeteringUnitsBuilder();
            return Mono.fromRunnable(() -> {
                this.LOGGER.info(this.attemptId, "handling doc changed during staging {}", DebugUtil.docId(collection, id));
                this.throwIfExpired(id, "stagingDocChanged");
            }).then(this.hooks.beforeDocChangedDuringStaging.apply(this, id)).then(DocumentGetter.getAsync(this.core, this.LOGGER, collection, this.config, id, this.attemptId, true, span, Optional.empty(), units, this.overall.supported(), false)).publishOn(this.scheduler()).onErrorResume(err -> {
                MeteringUnits built = this.addUnits(units.build());
                ErrorClass ec = ErrorClass.classify(err);
                this.LOGGER.info(this.attemptId, "handling doc changed during staging {}{}, got error {}", DebugUtil.docId(collection, id), DebugUtil.dbg(built), CoreTransactionAttemptContext.dbg(err));
                if (ec == ErrorClass.TRANSACTION_OPERATION_FAILED) {
                    return Mono.error((Throwable)err);
                }
                if (ec == ErrorClass.FAIL_TRANSIENT) {
                    return Mono.error((Throwable)new RetryOperationException());
                }
                if (ec == ErrorClass.FAIL_HARD) {
                    return Mono.error((Throwable)this.operationFailed(TransactionOperationFailedException.Builder.createError().doNotRollbackAttempt().raiseException(TransactionOperationFailedException.FinalErrorToRaise.TRANSACTION_FAILED).cause((Throwable)err).build()));
                }
                return Mono.error((Throwable)this.operationFailed(TransactionOperationFailedException.Builder.createError().retryTransaction().raiseException(TransactionOperationFailedException.FinalErrorToRaise.TRANSACTION_FAILED).cause((Throwable)err).build()));
            }).flatMap(doc -> {
                this.addUnits(units.build());
                if (doc.isPresent()) {
                    CoreTransactionGetResult gr = (CoreTransactionGetResult)doc.get();
                    DocChanged dc = this.getDocChanged(gr, "staging", crc32FromGet, gr.crc32OfGet().get());
                    return this.forwardCompatibilityCheck(ForwardCompatibilityStage.CAS_MISMATCH_DURING_STAGING, gr.links().forwardCompatibility()).then(Mono.defer(() -> {
                        if (dc.inDifferentTransaction) {
                            return this.checkAndHandleBlockingTxn(gr, span, ForwardCompatibilityStage.CAS_MISMATCH_DURING_STAGING, Optional.empty()).then(Mono.error((Throwable)new RetryOperationException()));
                        }
                        if (dc.bodyHasChanged || dc.unclearIfBodyHasChanged) {
                            return Mono.error((Throwable)this.operationFailed(TransactionOperationFailedException.Builder.createError().retryTransaction().build()));
                        }
                        return Mono.delay((Duration)DEFAULT_DELAY_RETRYING_OPERATION).then((Mono)callback.apply(gr.cas()));
                    }));
                }
                return Mono.error((Throwable)this.operationFailed(TransactionOperationFailedException.Builder.createError().retryTransaction().cause(new DocumentNotFoundException(null)).build()));
            }).retryWhen(RETRY_OPERATION_UNTIL_EXPIRY_WITH_FIXED_RETRY).doFinally(v -> span.finish());
        });
    }

    private void throwIfExpired(String id, String stage) {
        if (this.hasExpiredClientSide(stage, Optional.of(id))) {
            this.LOGGER.info(this.attemptId, "has expired in stage {}", stage);
            throw this.operationFailed(TransactionOperationFailedException.Builder.createError().raiseException(TransactionOperationFailedException.FinalErrorToRaise.TRANSACTION_EXPIRED).doNotRollbackAttempt().cause(new AttemptExpiredException("Attempt has expired in stage " + stage)).build());
        }
    }

    private void throwIfExpired(String stage) {
        if (this.hasExpiredClientSide(stage, Optional.empty())) {
            this.LOGGER.info(this.attemptId, "has expired in stage {}", stage);
            throw this.operationFailed(TransactionOperationFailedException.Builder.createError().raiseException(TransactionOperationFailedException.FinalErrorToRaise.TRANSACTION_EXPIRED).doNotRollbackAttempt().cause(new AttemptExpiredException("Attempt has expired in stage " + stage)).build());
        }
    }

    private Mono<Void> handleDocChangedDuringRollback(SpanWrapper span, String id, CollectionIdentifier collection, Function<Long, Mono<Void>> callback) {
        return Mono.defer(() -> {
            MeteringUnits.MeteringUnitsBuilder units = new MeteringUnits.MeteringUnitsBuilder();
            return Mono.fromRunnable(() -> {
                this.LOGGER.info(this.attemptId, "handling doc changed during rollback {}", DebugUtil.docId(collection, id));
                this.throwIfExpired(id, "rollbackDocChanged");
            }).then(this.hooks.beforeDocChangedDuringRollback.apply(this, id)).then(DocumentGetter.getAsync(this.core, this.LOGGER, collection, this.config, id, this.attemptId, true, span, Optional.empty(), units, this.overall.supported(), false)).publishOn(this.scheduler()).onErrorResume(err -> {
                MeteringUnits built = this.addUnits(units.build());
                ErrorClass ec = ErrorClass.classify(err);
                span.recordException((Throwable)err);
                this.LOGGER.info(this.attemptId, "handling doc changed during rollback {}{}, got error {}", DebugUtil.docId(collection, id), DebugUtil.dbg(built), CoreTransactionAttemptContext.dbg(err));
                if (ec == ErrorClass.TRANSACTION_OPERATION_FAILED) {
                    return Mono.error((Throwable)err);
                }
                if (ec == ErrorClass.FAIL_TRANSIENT) {
                    return Mono.error((Throwable)new RetryOperationException());
                }
                if (ec == ErrorClass.FAIL_HARD) {
                    return Mono.error((Throwable)this.operationFailed(TransactionOperationFailedException.Builder.createError().doNotRollbackAttempt().raiseException(TransactionOperationFailedException.FinalErrorToRaise.TRANSACTION_FAILED).cause((Throwable)err).build()));
                }
                return Mono.error((Throwable)this.operationFailed(TransactionOperationFailedException.Builder.createError().doNotRollbackAttempt().raiseException(TransactionOperationFailedException.FinalErrorToRaise.TRANSACTION_FAILED).cause((Throwable)err).build()));
            }).flatMap(doc -> {
                this.addUnits(units.build());
                if (doc.isPresent()) {
                    CoreTransactionGetResult gr = (CoreTransactionGetResult)doc.get();
                    DocChanged dc = this.getDocChanged(gr, "rollback", gr.links().crc32OfStaging(), gr.crc32OfGet().get());
                    return this.forwardCompatibilityCheck(ForwardCompatibilityStage.CAS_MISMATCH_DURING_ROLLBACK, gr.links().forwardCompatibility()).then(Mono.defer(() -> {
                        if (dc.inDifferentTransaction || dc.notInTransaction) {
                            return Mono.empty();
                        }
                        return (Mono)callback.apply(gr.cas());
                    }));
                }
                return Mono.empty();
            }).retryWhen(RETRY_OPERATION_UNTIL_EXPIRY_WITH_FIXED_RETRY).doOnError(err -> span.setErrorStatus()).then();
        });
    }

    private Optional<DurabilityLevel> durabilityLevel() {
        return this.config.durabilityLevel() == DurabilityLevel.NONE ? Optional.empty() : Optional.of(this.config.durabilityLevel());
    }

    private Duration kvTimeoutMutating() {
        return OptionsUtil.kvTimeoutMutating(this.core);
    }

    private Duration kvTimeoutNonMutating() {
        return OptionsUtil.kvTimeoutNonMutating(this.core);
    }

    private Mono<Void> atrCommitAmbiguityResolutionLocked(AtomicReference<Long> overallStartTime, SpanWrapper span) {
        return Mono.defer(() -> {
            this.LOGGER.info(this.attemptId, "about to fetch status of ATR {} to resolve ambiguity, expiryOvertimeMode={}", this.getAtrDebug(this.atrCollection, this.atrId), this.expiryOvertimeMode);
            overallStartTime.set(System.nanoTime());
            return this.errorIfExpiredAndNotInExpiryOvertimeMode("atrCommitAmbiguityResolution", Optional.empty());
        }).then(this.hooks.beforeAtrCommitAmbiguityResolution.apply(this)).then(TransactionKVHandler.lookupIn(this.core, this.atrCollection.get(), this.atrId.get(), this.kvTimeoutNonMutating(), false, OptionsUtil.createClientContext("atrCommitAmbiguityResolution"), span, false, Arrays.asList(new SubdocGetRequest.Command(SubdocCommandType.GET, "attempts." + this.attemptId + "." + "st", true, 0)))).publishOn(this.scheduler()).flatMap(result -> {
            String status = null;
            try {
                status = Mapper.reader().readValue(result.field(0).value(), String.class);
            }
            catch (IOException e) {
                this.LOGGER.info(this.attemptId, "failed to parse ATR {} status '{}'", this.getAtrDebug(this.atrCollection, this.atrId), new String(result.field(0).value()));
                status = "UNKNOWN";
            }
            this.addUnits(result.meta());
            this.LOGGER.info(this.attemptId, "got status of ATR {}: '{}'", this.getAtrDebug(this.atrCollection, this.atrId), status);
            AttemptState state = AttemptState.convert(status);
            switch (state) {
                case COMMITTED: {
                    return Mono.empty();
                }
                case ABORTED: {
                    return Mono.error((Throwable)this.operationFailed(TransactionOperationFailedException.Builder.createError().retryTransaction().build()));
                }
            }
            return Mono.error((Throwable)this.operationFailed(TransactionOperationFailedException.Builder.createError().doNotRollbackAttempt().cause(new IllegalStateException("This transaction has been changed by another actor to be in unexpected state " + status)).build()));
        }).then().onErrorResume(err -> {
            ErrorClass ec = ErrorClass.classify(err);
            TransactionOperationFailedException.Builder builder = TransactionOperationFailedException.Builder.createError().doNotRollbackAttempt().cause((Throwable)err);
            MeteringUnits units = this.addUnits(MeteringUnits.from(err));
            span.recordException((Throwable)err);
            if (err instanceof RetryAtrCommitException || ec == ErrorClass.TRANSACTION_OPERATION_FAILED) {
                return Mono.error((Throwable)err);
            }
            this.LOGGER.info(this.attemptId, "error while resolving ATR {} ambiguity{} in {}us: {}", this.getAtrDebug(this.atrCollection, this.atrId), DebugUtil.dbg(units), span.elapsedMicros(), CoreTransactionAttemptContext.dbg(err));
            if (ec == ErrorClass.FAIL_EXPIRY) {
                return Mono.error((Throwable)this.operationFailed(TransactionOperationFailedException.Builder.createError().doNotRollbackAttempt().raiseException(TransactionOperationFailedException.FinalErrorToRaise.TRANSACTION_COMMIT_AMBIGUOUS).cause(new AttemptExpiredException((Throwable)err)).build()));
            }
            if (ec == ErrorClass.FAIL_HARD) {
                return Mono.error((Throwable)this.operationFailed(builder.doNotRollbackAttempt().raiseException(TransactionOperationFailedException.FinalErrorToRaise.TRANSACTION_COMMIT_AMBIGUOUS).build()));
            }
            if (ec == ErrorClass.FAIL_TRANSIENT || ec == ErrorClass.FAIL_OTHER) {
                return Mono.error((Throwable)new RetryOperationException());
            }
            if (ec == ErrorClass.FAIL_PATH_NOT_FOUND) {
                return Mono.error((Throwable)this.operationFailed(TransactionOperationFailedException.Builder.createError().doNotRollbackAttempt().cause(new ActiveTransactionRecordEntryNotFoundException(this.atrId.get(), this.attemptId)).raiseException(TransactionOperationFailedException.FinalErrorToRaise.TRANSACTION_COMMIT_AMBIGUOUS).build()));
            }
            if (ec == ErrorClass.FAIL_DOC_NOT_FOUND) {
                return Mono.error((Throwable)this.operationFailed(TransactionOperationFailedException.Builder.createError().doNotRollbackAttempt().cause(new ActiveTransactionRecordNotFoundException(this.atrId.get(), this.attemptId)).raiseException(TransactionOperationFailedException.FinalErrorToRaise.TRANSACTION_COMMIT_AMBIGUOUS).build()));
            }
            return Mono.error((Throwable)this.operationFailed(builder.raiseException(TransactionOperationFailedException.FinalErrorToRaise.TRANSACTION_COMMIT_AMBIGUOUS).build()));
        }).retryWhen(RETRY_OPERATION_UNTIL_EXPIRY_WITH_FIXED_RETRY).publishOn(this.scheduler()).doOnError(err -> span.setErrorStatus());
    }

    private Mono<Void> atrCommitLocked(List<SubdocMutateRequest.Command> specs, AtomicReference<Long> overallStartTime, SpanWrapper pspan) {
        return Mono.defer(() -> {
            this.assertLocked("atrCommit");
            SpanWrapper span = SpanWrapperUtil.createOp(this, this.tracer(), this.atrCollection.orElse(null), this.atrId.orElse(null), "transaction_atr_commit", pspan);
            AtomicBoolean ambiguityResolutionMode = new AtomicBoolean(false);
            return Mono.defer(() -> {
                this.LOGGER.info(this.attemptId, "about to set ATR {} to Committed, expiryOvertimeMode={}, ambiguityResolutionMode={}", this.getAtrDebug(this.atrCollection, this.atrId), this.expiryOvertimeMode, ambiguityResolutionMode);
                overallStartTime.set(System.nanoTime());
                return this.errorIfExpiredAndNotInExpiryOvertimeMode("atrCommit", Optional.empty());
            }).then(this.hooks.beforeAtrCommit.apply(this)).then(TransactionKVHandler.mutateIn(this.core, this.atrCollection.get(), this.atrId.get(), this.kvTimeoutMutating(), false, false, false, false, false, 0L, CodecFlags.BINARY_COMMON_FLAGS, this.durabilityLevel(), OptionsUtil.createClientContext("atrCommit"), span, specs)).publishOn(this.scheduler()).flatMap(v -> this.hooks.afterAtrCommit.apply(this).thenReturn(v)).doOnNext(v -> {
                this.setStateLocked(AttemptState.COMMITTED);
                this.addUnits(v.flexibleExtras());
                this.LOGGER.info(this.attemptId, "set ATR {} to Committed{} in {}us", this.getAtrDebug(this.atrCollection, this.atrId), DebugUtil.dbg(v.flexibleExtras()), span.elapsedMicros());
            }).then().onErrorResume(err -> {
                ErrorClass ec = ErrorClass.classify(err);
                TransactionOperationFailedException.Builder builder = TransactionOperationFailedException.Builder.createError().cause((Throwable)err);
                MeteringUnits units = this.addUnits(MeteringUnits.from(err));
                span.recordException((Throwable)err);
                this.LOGGER.info(this.attemptId, "error while setting ATR {} to Committed{} in {}us: {}", this.getAtrDebug(this.atrCollection, this.atrId), DebugUtil.dbg(units), span.elapsedMicros(), CoreTransactionAttemptContext.dbg(err));
                if (ec == ErrorClass.FAIL_EXPIRY) {
                    TransactionOperationFailedException.FinalErrorToRaise toRaise = ambiguityResolutionMode.get() ? TransactionOperationFailedException.FinalErrorToRaise.TRANSACTION_COMMIT_AMBIGUOUS : TransactionOperationFailedException.FinalErrorToRaise.TRANSACTION_EXPIRED;
                    return Mono.error((Throwable)this.operationFailed(TransactionOperationFailedException.Builder.createError().doNotRollbackAttempt().raiseException(toRaise).cause(new AttemptExpiredException((Throwable)err)).build()));
                }
                if (ec == ErrorClass.FAIL_AMBIGUOUS) {
                    ambiguityResolutionMode.set(true);
                    return Mono.error((Throwable)new RetryOperationException());
                }
                if (ec == ErrorClass.FAIL_HARD) {
                    if (ambiguityResolutionMode.get()) {
                        return Mono.error((Throwable)this.operationFailed(TransactionOperationFailedException.Builder.createError().doNotRollbackAttempt().raiseException(TransactionOperationFailedException.FinalErrorToRaise.TRANSACTION_COMMIT_AMBIGUOUS).cause((Throwable)err).build()));
                    }
                    return Mono.error((Throwable)this.operationFailed(builder.doNotRollbackAttempt().build()));
                }
                if (ec == ErrorClass.FAIL_TRANSIENT) {
                    if (ambiguityResolutionMode.get()) {
                        throw new RetryOperationException();
                    }
                    return Mono.error((Throwable)this.operationFailed(builder.retryTransaction().build()));
                }
                if (ec == ErrorClass.FAIL_PATH_ALREADY_EXISTS) {
                    return this.atrCommitAmbiguityResolutionLocked(overallStartTime, pspan).onErrorResume(e -> {
                        if (e instanceof RetryAtrCommitException) {
                            ambiguityResolutionMode.set(false);
                            throw new RetryOperationException();
                        }
                        return Mono.error((Throwable)e);
                    });
                }
                Throwable cause = err;
                boolean rollback = true;
                switch (ec) {
                    case FAIL_PATH_NOT_FOUND: {
                        cause = new ActiveTransactionRecordEntryNotFoundException(this.atrId.get(), this.attemptId);
                        rollback = false;
                        break;
                    }
                    case FAIL_DOC_NOT_FOUND: {
                        cause = new ActiveTransactionRecordNotFoundException(this.atrId.get(), this.attemptId);
                        rollback = false;
                        break;
                    }
                    case FAIL_ATR_FULL: {
                        cause = new ActiveTransactionRecordFullException(cause);
                        rollback = false;
                    }
                }
                if (ambiguityResolutionMode.get()) {
                    return Mono.error((Throwable)this.operationFailed(TransactionOperationFailedException.Builder.createError().doNotRollbackAttempt().raiseException(TransactionOperationFailedException.FinalErrorToRaise.TRANSACTION_COMMIT_AMBIGUOUS).cause(cause).build()));
                }
                return Mono.error((Throwable)this.operationFailed(builder.cause(cause).rollbackAttempt(rollback).build()));
            }).retryWhen(RETRY_OPERATION_UNTIL_EXPIRY).publishOn(this.scheduler()).doOnError(err -> span.finish((Throwable)err)).doOnTerminate(() -> span.finish());
        });
    }

    private <T> Mono<T> setExpiryOvertimeMode(String stage) {
        return Mono.fromRunnable(() -> {
            this.LOGGER.info(this.attemptId, "moving to expiry-overtime-mode in stage {}", stage);
            this.expiryOvertimeMode = true;
        });
    }

    private <T> Mono<T> setExpiryOvertimeModeAndFail(Throwable err, String stage, ErrorClass ec) {
        this.LOGGER.info(this.attemptId, "moving to expiry-overtime-mode in stage {}, and raising error", stage);
        this.expiryOvertimeMode = true;
        return Mono.error((Throwable)this.operationFailed(TransactionOperationFailedException.Builder.createError().raiseException(TransactionOperationFailedException.FinalErrorToRaise.TRANSACTION_EXPIRED).cause(new AttemptExpiredException(err)).build()));
    }

    public Mono<Void> rollback() {
        return this.createMonoBridge("rollback", Mono.defer(() -> this.waitForAllOpsThenDoUnderLock("app-rollback", this.attemptSpan, () -> this.rollbackInternalLocked(true))));
    }

    Mono<Void> rollbackAuto() {
        return this.createMonoBridge("rollbackAuto", Mono.defer(() -> this.waitForAllOpsThenDoUnderLock("auto-rollback", this.attemptSpan, () -> this.rollbackInternalLocked(false))));
    }

    private Mono<Void> rollbackInternalLocked(boolean isAppRollback) {
        return Mono.defer(() -> {
            SpanWrapper span = SpanWrapperUtil.createOp(this, this.tracer(), null, null, "transaction_rollback", this.attemptSpan);
            return Mono.defer(() -> {
                TransactionOperationFailedException returnEarly = this.canPerformRollback("rollbackInternal", isAppRollback);
                if (returnEarly != null) {
                    this.logger().info(this.attemptId, "rollback raising {}", DebugUtil.dbg(returnEarly));
                    return Mono.error((Throwable)returnEarly);
                }
                int sb = 3;
                this.setStateBits("rollback-" + (isAppRollback ? "app" : "auto"), sb, 0);
                if (this.state == AttemptState.NOT_STARTED && !this.queryModeUnlocked()) {
                    this.LOGGER.info(this.attemptId, "told to auto-rollback but in NOT_STARTED state, so nothing to do - skipping rollback");
                    return Mono.empty();
                }
                if (this.queryModeLocked()) {
                    return this.rollbackQueryLocked(isAppRollback, span);
                }
                return this.rollbackWithKVLocked(isAppRollback, span);
            }).doOnError(err -> span.finishWithErrorStatus()).doOnTerminate(() -> span.finish());
        }).subscribeOn(this.scheduler());
    }

    private Mono<Void> rollbackWithKVLocked(boolean isAppRollback, SpanWrapper span) {
        return Mono.defer(() -> {
            this.assertLocked("rollbackWithKV");
            this.LOGGER.info(this.attemptId, "rollback {} expiryOvertimeMode={} isAppRollback={}", this, this.expiryOvertimeMode, isAppRollback);
            if (!this.expiryOvertimeMode && this.hasExpiredClientSide("rollback", Optional.empty())) {
                this.LOGGER.info(this.attemptId, "has expired before rollback, entering expiry-overtime mode");
                this.expiryOvertimeMode = true;
            }
            if (!this.atrCollection.isPresent() || !this.atrId.isPresent()) {
                return Mono.create(s -> {
                    this.LOGGER.info(this.attemptId, "Calling rollback when it's had no mutations, so nothing to do");
                    s.success();
                });
            }
            return this.rollbackWithKVActual(isAppRollback, span);
        });
    }

    private Mono<Void> rollbackWithKVActual(boolean isAppRollback, SpanWrapper span) {
        String prefix = "attempts." + this.attemptId;
        return this.atrAbortLocked(prefix, span, isAppRollback, false).then(this.rollbackDocsLocked(isAppRollback, span)).then(this.atrRollbackCompleteLocked(isAppRollback, prefix, span)).onErrorResume(err -> {
            if (err instanceof ActiveTransactionRecordNotFoundException) {
                this.LOGGER.info(this.attemptId, "ActiveTransactionRecordNotFound indicates that nothing needs to be done for this rollback: treating as successful rollback");
                return Mono.empty();
            }
            return Mono.error((Throwable)err);
        });
    }

    private Mono<Void> rollbackQueryLocked(boolean appRollback, SpanWrapper span) {
        return Mono.defer(() -> {
            this.assertLocked("rollbackQuery");
            int statementIdx = this.queryStatementIdx.getAndIncrement();
            return this.queryWrapperBlockingLocked(statementIdx, this.queryContext.queryContext, "ROLLBACK", null, "queryRollback", false, false, null, null, span, false, null, appRollback).then(Mono.fromRunnable(() -> this.setStateLocked(AttemptState.ROLLED_BACK))).onErrorResume(err -> {
                span.recordExceptionAndSetErrorStatus((Throwable)err);
                if (err instanceof TransactionOperationFailedException) {
                    return Mono.error((Throwable)err);
                }
                if (err instanceof AttemptNotFoundOnQueryException) {
                    return Mono.empty();
                }
                TransactionOperationFailedException e = this.operationFailed(TransactionOperationFailedException.Builder.createError().cause((Throwable)err).doNotRollbackAttempt().build());
                return Mono.error((Throwable)e);
            }).then();
        });
    }

    private Mono<Void> atrRollbackCompleteLocked(boolean isAppRollback, String prefix, SpanWrapper pspan) {
        return Mono.defer(() -> {
            this.assertLocked("atrRollbackComplete");
            SpanWrapper span = SpanWrapperUtil.createOp(this, this.tracer(), this.atrCollection.orElse(null), this.atrId.orElse(null), "transaction_atr_rollback", pspan);
            return Mono.defer(() -> {
                this.LOGGER.info(this.attemptId, "removing ATR {} as rollback complete", this.getAtrDebug(this.atrCollection, this.atrId));
                return this.errorIfExpiredAndNotInExpiryOvertimeMode("atrRollbackComplete", Optional.empty());
            }).then(this.hooks.beforeAtrRolledBack.apply(this)).then(TransactionKVHandler.mutateIn(this.core, this.atrCollection.get(), this.atrId.get(), this.kvTimeoutMutating(), false, false, false, false, false, 0L, CodecFlags.BINARY_COMMON_FLAGS, this.durabilityLevel(), OptionsUtil.createClientContext("atrRollbackComplete"), span, Arrays.asList(new SubdocMutateRequest.Command(SubdocCommandType.DELETE, prefix, null, false, true, false, 0)))).publishOn(this.scheduler()).flatMap(v -> this.hooks.afterAtrRolledBack.apply(this).thenReturn(v)).doOnNext(v -> {
                this.setStateLocked(AttemptState.ROLLED_BACK);
                long elapsed = span.elapsedMicros();
                this.addUnits(v.flexibleExtras());
                this.LOGGER.info(this.attemptId, "rollback - atr rolled back{} in {}us", DebugUtil.dbg(v.flexibleExtras()), elapsed);
            }).onErrorResume(err -> {
                MeteringUnits units = this.addUnits(MeteringUnits.from(err));
                span.recordException((Throwable)err);
                this.LOGGER.info(this.attemptId, "error while marking ATR {} as rollback complete{} in {}us: {}", this.getAtrDebug(this.atrCollection, this.atrId), DebugUtil.dbg(units), span.elapsedMicros(), CoreTransactionAttemptContext.dbg(err));
                ErrorClass ec = ErrorClass.classify(err);
                TransactionOperationFailedException.Builder error = TransactionOperationFailedException.Builder.createError().doNotRollbackAttempt();
                if (this.expiryOvertimeMode) {
                    return this.mapErrorInOvertimeToExpired(isAppRollback, "atrRollbackComplete", (Throwable)err, TransactionOperationFailedException.FinalErrorToRaise.TRANSACTION_EXPIRED);
                }
                if (ec == ErrorClass.FAIL_EXPIRY) {
                    return Mono.error((Throwable)this.operationFailed(isAppRollback, TransactionOperationFailedException.Builder.createError().doNotRollbackAttempt().raiseException(TransactionOperationFailedException.FinalErrorToRaise.TRANSACTION_EXPIRED).build()));
                }
                if (ec == ErrorClass.FAIL_PATH_NOT_FOUND || ec == ErrorClass.FAIL_DOC_NOT_FOUND) {
                    return Mono.empty();
                }
                if (ec == ErrorClass.FAIL_HARD) {
                    return Mono.error((Throwable)this.operationFailed(isAppRollback, error.build()));
                }
                return Mono.error((Throwable)new RetryOperationException());
            }).retryWhen(RETRY_OPERATION_UNTIL_EXPIRY).publishOn(this.scheduler()).then().doOnError(err -> span.finish((Throwable)err)).doOnTerminate(() -> span.finish());
        });
    }

    private Mono<Void> rollbackDocsLocked(boolean isAppRollback, SpanWrapper span) {
        return Mono.defer(() -> Flux.fromIterable(this.stagedMutationsLocked).publishOn(this.scheduler()).flatMap(staged -> {
            switch (staged.type) {
                case INSERT: {
                    return this.rollbackStagedInsertLocked(isAppRollback, span, staged.collection, staged.id, staged.cas);
                }
            }
            return this.rollbackStagedReplaceOrRemoveLocked(isAppRollback, span, staged.collection, staged.id, staged.cas, staged.currentUserFlags);
        }, UNSTAGING_PARALLELISM).doOnNext(v -> this.LOGGER.info(this.attemptId, "rollback - docs rolled back")).then());
    }

    private Mono<Void> rollbackStagedReplaceOrRemoveLocked(boolean isAppRollback, SpanWrapper span, CollectionIdentifier collection, String id, long cas, int userFlags) {
        return Mono.defer(() -> Mono.defer(() -> {
            this.LOGGER.info(this.attemptId, "rolling back doc {} with cas {} by removing staged mutation", DebugUtil.docId(collection, id), cas);
            return this.errorIfExpiredAndNotInExpiryOvertimeMode("rollbackDoc", Optional.of(id));
        }).then(this.hooks.beforeDocRolledBack.apply(this, id)).then(TransactionKVHandler.mutateIn(this.core, collection, id, this.kvTimeoutMutating(), false, false, false, false, false, cas, userFlags, this.durabilityLevel(), OptionsUtil.createClientContext("rollbackDoc"), span, Arrays.asList(new SubdocMutateRequest.Command(SubdocCommandType.DELETE, "txn", null, false, true, false, 0)))).publishOn(this.scheduler()).flatMap(updatedDoc -> this.hooks.afterRollbackReplaceOrRemove.apply(this, id).thenReturn(updatedDoc)).doOnNext(updatedDoc -> {
            this.addUnits(updatedDoc.flexibleExtras());
            this.LOGGER.info(this.attemptId, "rolled back doc {}{}, got cas {} and mt {}", DebugUtil.docId(collection, id), DebugUtil.dbg(updatedDoc.flexibleExtras()), updatedDoc.cas(), updatedDoc.mutationToken());
        }).then().onErrorResume(err -> {
            ErrorClass ec = ErrorClass.classify(err);
            TransactionOperationFailedException.Builder builder = TransactionOperationFailedException.Builder.createError().doNotRollbackAttempt().cause((Throwable)err);
            MeteringUnits units = this.addUnits(MeteringUnits.from(err));
            span.recordException((Throwable)err);
            this.logger().info(this.attemptId, "got error while rolling back doc {}{} in {}us: {}", DebugUtil.docId(collection, id), DebugUtil.dbg(units), span.elapsedMicros(), CoreTransactionAttemptContext.dbg(err));
            if (this.expiryOvertimeMode) {
                return this.mapErrorInOvertimeToExpired(isAppRollback, "rollbackDoc", (Throwable)err, TransactionOperationFailedException.FinalErrorToRaise.TRANSACTION_EXPIRED);
            }
            if (ec == ErrorClass.FAIL_EXPIRY) {
                return this.setExpiryOvertimeMode("rollbackDoc").then(Mono.error((Throwable)new RetryOperationException()));
            }
            if (ec == ErrorClass.FAIL_PATH_NOT_FOUND) {
                this.LOGGER.info(this.attemptId, "got PATH_NOT_FOUND while cleaning up staged doc {}, it must have already been rolled back, continuing", DebugUtil.docId(collection, id));
                return Mono.empty();
            }
            if (ec == ErrorClass.FAIL_DOC_NOT_FOUND) {
                return Mono.empty();
            }
            if (ec == ErrorClass.FAIL_CAS_MISMATCH) {
                return this.handleDocChangedDuringRollback(span, id, collection, newCas -> this.rollbackStagedReplaceOrRemoveLocked(isAppRollback, span, collection, id, (long)newCas, userFlags));
            }
            if (ec == ErrorClass.FAIL_HARD) {
                return Mono.error((Throwable)this.operationFailed(isAppRollback, builder.doNotRollbackAttempt().build()));
            }
            return Mono.error((Throwable)new RetryOperationException());
        }).retryWhen(RETRY_OPERATION_UNTIL_EXPIRY).publishOn(this.scheduler()).doOnError(err -> span.setErrorStatus()));
    }

    private Mono<Void> rollbackStagedInsertLocked(boolean isAppRollback, SpanWrapper span, CollectionIdentifier collection, String id, long cas) {
        return Mono.defer(() -> Mono.defer(() -> {
            this.LOGGER.info(this.attemptId, "rolling back staged insert {} with cas {}", DebugUtil.docId(collection, id), cas);
            return this.errorIfExpiredAndNotInExpiryOvertimeMode("deleteInserted", Optional.of(id));
        }).then(this.hooks.beforeRollbackDeleteInserted.apply(this, id)).then(TransactionKVHandler.mutateIn(this.core, collection, id, this.kvTimeoutMutating(), false, false, false, true, false, cas, 0, this.durabilityLevel(), OptionsUtil.createClientContext("rollbackStagedInsert"), span, Arrays.asList(new SubdocMutateRequest.Command(SubdocCommandType.DELETE, "txn", null, false, true, false, 0)))).publishOn(this.scheduler()).flatMap(updatedDoc -> this.hooks.afterRollbackDeleteInserted.apply(this, id).thenReturn(updatedDoc)).doOnNext(updatedDoc -> {
            this.addUnits(updatedDoc.flexibleExtras());
            this.LOGGER.info(this.attemptId, "deleted inserted doc {}{}, mt {}", DebugUtil.docId(collection, id), DebugUtil.dbg(updatedDoc.flexibleExtras()), updatedDoc.mutationToken());
        }).then().onErrorResume(err -> {
            ErrorClass ec = ErrorClass.classify(err);
            TransactionOperationFailedException.Builder builder = TransactionOperationFailedException.Builder.createError().cause((Throwable)err);
            MeteringUnits units = this.addUnits(MeteringUnits.from(err));
            span.recordException((Throwable)err);
            this.LOGGER.info(this.attemptId, "error while rolling back inserted doc {}{} in {}us: {}", DebugUtil.docId(collection, id), DebugUtil.dbg(units), span.elapsedMicros(), CoreTransactionAttemptContext.dbg(err));
            if (this.expiryOvertimeMode) {
                return this.mapErrorInOvertimeToExpired(isAppRollback, "removeDoc", (Throwable)err, TransactionOperationFailedException.FinalErrorToRaise.TRANSACTION_EXPIRED);
            }
            if (ec == ErrorClass.FAIL_EXPIRY) {
                return this.setExpiryOvertimeMode("remove").then(Mono.error((Throwable)new RetryOperationException()));
            }
            if (ec == ErrorClass.FAIL_DOC_NOT_FOUND || ec == ErrorClass.FAIL_PATH_NOT_FOUND) {
                this.LOGGER.info(this.attemptId, "got {} while removing staged insert doc {}, it must have already been rolled back, continuing", new Object[]{ec, DebugUtil.docId(collection, id)});
                return Mono.empty();
            }
            if (ec == ErrorClass.FAIL_HARD) {
                return Mono.error((Throwable)this.operationFailed(isAppRollback, builder.doNotRollbackAttempt().build()));
            }
            if (ec == ErrorClass.FAIL_CAS_MISMATCH) {
                return this.handleDocChangedDuringRollback(span, id, collection, newCas -> this.rollbackStagedInsertLocked(isAppRollback, span, collection, id, (long)newCas));
            }
            return Mono.error((Throwable)new RetryOperationException());
        }).retryWhen(RETRY_OPERATION_UNTIL_EXPIRY).publishOn(this.scheduler()).doOnError(err -> span.setErrorStatus()));
    }

    private Mono<Void> removeStagedInsert(CoreTransactionGetResult doc, SpanWrapper span) {
        return Mono.defer(() -> {
            this.assertNotLocked("removeStagedInsert");
            CollectionIdentifier collection = doc.collection();
            String id = doc.id();
            return Mono.defer(() -> {
                this.LOGGER.info(this.attemptId, "removing staged insert {} with cas {}", DebugUtil.docId(collection, id), doc.cas());
                if (this.hasExpiredClientSide("removeStagedInsert", Optional.of(id))) {
                    return Mono.error((Throwable)this.operationFailed(TransactionOperationFailedException.Builder.createError().raiseException(TransactionOperationFailedException.FinalErrorToRaise.TRANSACTION_EXPIRED).doNotRollbackAttempt().cause(new AttemptExpiredException("Attempt has expired in stage removeStagedInsert")).build()));
                }
                return Mono.empty();
            }).then(this.hooks.beforeRemoveStagedInsert.apply(this, id)).then(TransactionKVHandler.mutateIn(this.core, collection, id, this.kvTimeoutMutating(), false, false, false, true, false, doc.cas(), doc.userFlags(), this.durabilityLevel(), OptionsUtil.createClientContext("removeStagedInsert"), span, Arrays.asList(new SubdocMutateRequest.Command(SubdocCommandType.DELETE, "txn", null, false, true, false, 0)))).publishOn(this.scheduler()).flatMap(updatedDoc -> this.hooks.afterRemoveStagedInsert.apply(this, id).thenReturn(updatedDoc)).doOnNext(v -> this.addUnits(v.flexibleExtras())).onErrorResume(err -> {
                ErrorClass ec = ErrorClass.classify(err);
                TransactionOperationFailedException.Builder builder = TransactionOperationFailedException.Builder.createError().retryTransaction().cause((Throwable)err);
                MeteringUnits units = this.addUnits(MeteringUnits.from(err));
                span.recordException((Throwable)err);
                this.LOGGER.info(this.attemptId, "error while removing staged insert doc {}{} in {}us: {}", DebugUtil.docId(collection, id), DebugUtil.dbg(units), span.elapsedMicros(), CoreTransactionAttemptContext.dbg(err));
                if (ec == ErrorClass.TRANSACTION_OPERATION_FAILED) {
                    return Mono.error((Throwable)err);
                }
                if (ec == ErrorClass.FAIL_HARD) {
                    return Mono.error((Throwable)this.operationFailed(builder.doNotRollbackAttempt().build()));
                }
                return Mono.error((Throwable)this.operationFailed(builder.build()));
            }).flatMap(v -> {
                doc.cas(v.cas());
                long elapsed = span.elapsedMicros();
                this.LOGGER.info(this.attemptId, "removed staged insert from doc {} in {}us", DebugUtil.docId(collection, id), elapsed);
                return this.doUnderLock("removeStagedInsert " + DebugUtil.docId(collection, id), () -> Mono.fromRunnable(() -> this.removeStagedMutationLocked(doc.collection(), doc.id())));
            }).then();
        });
    }

    private Mono<Void> atrAbortLocked(String prefix, SpanWrapper pspan, boolean isAppRollback, boolean ambiguityResolutionMode) {
        return Mono.defer(() -> {
            this.assertLocked("atrAbort");
            SpanWrapper span = SpanWrapperUtil.createOp(this, this.tracer(), this.atrCollection.orElse(null), this.atrId.orElse(null), "transaction_atr_abort", pspan);
            ArrayList<SubdocMutateRequest.Command> specs = new ArrayList<SubdocMutateRequest.Command>();
            specs.add(new SubdocMutateRequest.Command(SubdocCommandType.DICT_UPSERT, prefix + "." + "st", this.serialize(AttemptState.ABORTED.name()), false, true, false, 0));
            specs.add(new SubdocMutateRequest.Command(SubdocCommandType.DICT_UPSERT, prefix + "." + "tsrs", this.serialize("${Mutation.CAS}"), false, true, true, 1));
            specs.addAll(this.addDocsToBuilder(specs.size()));
            return Mono.defer(() -> {
                this.LOGGER.info(this.attemptId, "aborting ATR {} isAppRollback={} ambiguityResolutionMode={}", this.getAtrDebug(this.atrCollection, this.atrId), isAppRollback, ambiguityResolutionMode);
                return this.errorIfExpiredAndNotInExpiryOvertimeMode("atrAbort", Optional.empty());
            }).then(this.hooks.beforeAtrAborted.apply(this)).then(TransactionKVHandler.mutateIn(this.core, this.atrCollection.get(), this.atrId.get(), this.kvTimeoutMutating(), false, false, false, false, false, 0L, CodecFlags.BINARY_COMMON_FLAGS, this.durabilityLevel(), OptionsUtil.createClientContext("atrAbort"), span, specs)).publishOn(this.scheduler()).flatMap(v -> this.hooks.afterAtrAborted.apply(this).thenReturn(v)).doOnNext(v -> {
                this.setStateLocked(AttemptState.ABORTED);
                this.addUnits(v.flexibleExtras());
                this.LOGGER.info(this.attemptId, "aborted ATR {}{} in {}us", this.getAtrDebug(this.atrCollection, this.atrId), DebugUtil.dbg(v.flexibleExtras()), span.elapsedMicros());
            }).then().onErrorResume(err -> {
                ErrorClass ec = ErrorClass.classify(err);
                TransactionOperationFailedException.Builder builder = TransactionOperationFailedException.Builder.createError().cause((Throwable)err).doNotRollbackAttempt();
                MeteringUnits units = this.addUnits(MeteringUnits.from(err));
                span.recordException((Throwable)err);
                this.LOGGER.info(this.attemptId, "error {} while aborting ATR {}{}", DebugUtil.dbg(err), this.getAtrDebug(this.atrCollection, this.atrId), DebugUtil.dbg(units));
                if (this.expiryOvertimeMode) {
                    return this.mapErrorInOvertimeToExpired(isAppRollback, "atrAbort", (Throwable)err, TransactionOperationFailedException.FinalErrorToRaise.TRANSACTION_EXPIRED);
                }
                if (ec == ErrorClass.FAIL_EXPIRY) {
                    return this.setExpiryOvertimeMode("atrAbort").then(Mono.error((Throwable)new RetryOperationException()));
                }
                if (ec == ErrorClass.FAIL_PATH_NOT_FOUND) {
                    return Mono.error((Throwable)this.operationFailed(isAppRollback, builder.cause(new ActiveTransactionRecordEntryNotFoundException(this.atrId.get(), this.attemptId)).build()));
                }
                if (ec == ErrorClass.FAIL_DOC_NOT_FOUND) {
                    return Mono.error((Throwable)this.operationFailed(isAppRollback, builder.cause(new ActiveTransactionRecordNotFoundException(this.atrId.get(), this.attemptId)).build()));
                }
                if (ec == ErrorClass.FAIL_ATR_FULL) {
                    return Mono.error((Throwable)this.operationFailed(isAppRollback, builder.cause(new ActiveTransactionRecordFullException((Throwable)err)).build()));
                }
                if (ec == ErrorClass.FAIL_HARD) {
                    return Mono.error((Throwable)this.operationFailed(isAppRollback, builder.doNotRollbackAttempt().build()));
                }
                return Mono.error((Throwable)new RetryOperationException());
            }).retryWhen(RETRY_OPERATION_UNTIL_EXPIRY).publishOn(this.scheduler()).doOnError(err -> span.finish((Throwable)err)).doOnTerminate(() -> span.finish());
        });
    }

    private void assertLocked(String dbg) {
        if (this.threadSafetyEnabled && !this.mutex.isLocked()) {
            throw new IllegalStateException("Internal bug hit: mutex must be locked in " + dbg + " but isn't");
        }
    }

    private void assertNotLocked(String dbg) {
        if (this.threadSafetyEnabled && this.mutex.debugAsSingleThreaded() && this.mutex.isLocked()) {
            throw new IllegalStateException("Internal bug hit: mutex must be unlocked in " + dbg + " but isn't");
        }
    }

    private void assertNotQueryMode(String dbg) {
        if (this.queryModeLocked()) {
            throw new IllegalStateException("Internal bug hit: must not be in queryMode in " + dbg);
        }
    }

    @Nullable
    TransactionOperationFailedException canPerformOperation(String dbg) {
        return this.canPerformOperation(dbg, true);
    }

    @Nullable
    TransactionOperationFailedException canPerformOperation(String dbg, boolean canPerformPendingCheck) {
        switch (this.state) {
            case NOT_STARTED: 
            case PENDING: {
                if (!canPerformPendingCheck || !this.hasStateBit(1)) break;
                this.logger().info(this.attemptId, "failing operation {} as not allowed to commit (probably as previous operations have failed)", dbg);
                return TransactionOperationFailedException.Builder.createError().cause(new PreviousOperationFailedException()).build();
            }
            case COMPLETED: 
            case COMMITTED: {
                return TransactionOperationFailedException.Builder.createError().cause(new TransactionAlreadyCommittedException()).doNotRollbackAttempt().build();
            }
            case ROLLED_BACK: 
            case ABORTED: {
                return TransactionOperationFailedException.Builder.createError().cause(new TransactionAlreadyAbortedException()).doNotRollbackAttempt().build();
            }
        }
        return null;
    }

    @Nullable
    TransactionOperationFailedException canPerformRollback(String dbg, boolean appRollback) {
        if (appRollback && this.hasStateBit(2)) {
            this.LOGGER.info(this.attemptId, "state bits indicate app-rollback is not allowed");
            return TransactionOperationFailedException.Builder.createError().cause(new RollbackNotPermittedException()).doNotRollbackAttempt().build();
        }
        TransactionOperationFailedException out = this.canPerformOperation(dbg, false);
        if (out != null) {
            return out;
        }
        return null;
    }

    @Nullable
    TransactionOperationFailedException canPerformCommit(String dbg) {
        if (this.hasStateBit(1)) {
            this.LOGGER.info(this.attemptId, "state bits indicate commit is not allowed");
            return TransactionOperationFailedException.Builder.createError().cause(new CommitNotPermittedException()).doNotRollbackAttempt().build();
        }
        TransactionOperationFailedException out = this.canPerformOperation(dbg);
        if (out != null) {
            return out;
        }
        return null;
    }

    private boolean hasStateBit(int stateBit) {
        return (this.stateBits.get() & stateBit) != 0;
    }

    private void setStateBits(String dbg, int newBehaviourFlags, int newFinalErrorToRaise) {
        int oldValue = this.stateBits.get();
        int newValue = oldValue | newBehaviourFlags;
        if (newFinalErrorToRaise > (oldValue & 0x70) >> 4) {
            newValue = newValue & 0xF | newFinalErrorToRaise << 4;
        }
        while (!this.stateBits.compareAndSet(oldValue, newValue)) {
            oldValue = this.stateBits.get();
            newValue = oldValue | newBehaviourFlags;
            if (newFinalErrorToRaise <= (oldValue & 0x70) >> 4) continue;
            newValue = newValue & 0xF | newFinalErrorToRaise << 4;
        }
        boolean wasShouldNotRollback = (oldValue & 4) != 0;
        boolean wasShouldNotRetry = (oldValue & 8) != 0;
        boolean wasShouldNotCommit = (oldValue & 1) != 0;
        boolean wasAppRollbackNotAllowed = (oldValue & 2) != 0;
        TransactionOperationFailedException.FinalErrorToRaise wasToRaise = TransactionOperationFailedException.FinalErrorToRaise.values()[(oldValue & 0x70) >> 4];
        boolean shouldNotRollback = (newValue & 4) != 0;
        boolean shouldNotRetry = (newValue & 8) != 0;
        boolean shouldNotCommit = (newValue & 1) != 0;
        boolean appRollbackNotAllowed = (newValue & 2) != 0;
        TransactionOperationFailedException.FinalErrorToRaise toRaise = TransactionOperationFailedException.FinalErrorToRaise.values()[(newValue & 0x70) >> 4];
        StringBuilder sb = new StringBuilder("changed state bits in ").append(dbg).append(", changed");
        if (wasShouldNotRollback != shouldNotRollback) {
            sb.append(" shouldNotRollback to ").append(shouldNotRollback);
        }
        if (wasShouldNotRetry != shouldNotRetry) {
            sb.append(" shouldNotRetry to ").append(shouldNotRetry);
        }
        if (wasShouldNotCommit != shouldNotCommit) {
            sb.append(" shouldNotCommit to ").append(shouldNotCommit);
        }
        if (wasAppRollbackNotAllowed != appRollbackNotAllowed) {
            sb.append(" appRollbackNotAllowed to ").append(appRollbackNotAllowed);
        }
        if (wasToRaise != toRaise) {
            sb.append(" toRaise from ").append((Object)wasToRaise).append(" to ").append((Object)toRaise);
        }
        this.LOGGER.info(this.attemptId, sb.toString());
    }

    public TransactionOperationFailedException operationFailed(boolean updateInternalState, TransactionOperationFailedException err) {
        if (updateInternalState) {
            return this.operationFailed(err);
        }
        return err;
    }

    public TransactionOperationFailedException operationFailed(TransactionOperationFailedException err) {
        int sb = 1;
        if (!err.autoRollbackAttempt()) {
            sb |= 4;
        }
        if (!err.retryTransaction()) {
            sb |= 8;
        }
        this.setStateBits("operationFailed", sb, err.toRaise().ordinal());
        return err;
    }

    AttemptState state() {
        return this.state;
    }

    boolean queryModeLocked() {
        this.assertLocked("queryMode");
        return this.queryContext != null;
    }

    boolean queryModeUnlocked() {
        return this.queryContext != null;
    }

    private Duration queryTimeout() {
        return Duration.ofMillis(this.expiryRemainingMillis()).plus(this.core.context().environment().timeoutConfig().kvDurableTimeout()).plusSeconds(1L);
    }

    private Mono<ClassicCoreReactiveQueryResult> queryInternal(int sidx, @Nullable CoreQueryContext qc, String statement, CoreQueryOptionsTransactions options, @Nullable SpanWrapper pspan, boolean tximplicit) {
        return Mono.defer(() -> this.hooks.beforeQuery.apply(this, statement).then(Mono.defer(() -> {
            this.assertNotLocked("queryInternal");
            options.put("metrics", BooleanNode.getTrue());
            if (tximplicit) {
                options.put("tximplicit", BooleanNode.getTrue());
            }
            if (tximplicit) {
                if (options.scanConsistency() == null) {
                    options.put("scan_consistency", TextNode.valueOf("request_plus"));
                }
                CoreTransactionAttemptContext.applyQueryOptions(this.config, options, this.expiryRemainingMillis());
            }
            this.logger().info(this.attemptId, "q{} using query params {}", sidx, options.toString());
            NodeIdentifier target = this.queryContext == null ? null : this.queryContext.queryTarget;
            return this.queryOps.queryReactive(statement, options, qc, target, null).publishOn(this.scheduler()).map(v -> (ClassicCoreReactiveQueryResult)v).doOnNext(v -> {
                if (this.queryContext == null) {
                    this.queryContext = new TransactionQueryContext(v.lastDispatchedTo(), qc);
                    this.logger().info(this.attemptId, "q{} got query node id {}", sidx, RedactableArgument.redactMeta(this.queryContext.queryTarget));
                }
            });
        })).flatMap(result -> this.hooks.afterQuery.apply(this, statement).thenReturn(result)));
    }

    public Mono<ClassicCoreReactiveQueryResult> queryWrapperLocked(int sidx, @Nullable CoreQueryContext qc, String statement, @Nullable CoreQueryOptions options, String hookPoint, boolean isBeginWork, boolean existingErrorCheck, @Nullable ObjectNode txdata, @Nullable ArrayNode params, @Nullable SpanWrapper span, boolean tximplicit, AtomicReference<ReactiveLock.Waiter> lockToken, boolean updateInternalState) {
        return Mono.defer(() -> {
            this.assertLocked("queryWrapper q" + sidx);
            long start = System.nanoTime();
            this.logger().debug(this.attemptId, "q{}: '{}' params={} txdata={} tximplicit={}", sidx, RedactableArgument.redactUser(statement), RedactableArgument.redactUser(params), RedactableArgument.redactUser(txdata), tximplicit);
            Mono<Void> beginWorkIfNeeded = Mono.empty();
            if (!(tximplicit || this.queryModeLocked() || isBeginWork)) {
                beginWorkIfNeeded = this.beginWorkIfNeeded(sidx, qc, statement, lockToken, span);
            }
            CoreQueryOptionsTransactions optionsCopy = new CoreQueryOptionsTransactions(options);
            if (txdata != null) {
                optionsCopy.raw("txdata", txdata);
            }
            return beginWorkIfNeeded.then(this.queryInternalPreLocked(sidx, statement, hookPoint, existingErrorCheck)).then(Mono.defer(() -> {
                if (!tximplicit && !isBeginWork) {
                    optionsCopy.raw("txid", TextNode.valueOf(this.attemptId));
                }
                return this.queryInternal(sidx, qc, statement, optionsCopy, span, tximplicit);
            }));
        });
    }

    private Mono<CoreQueryResult> queryWrapperBlockingLocked(int sidx, @Nullable CoreQueryContext qc, String statement, @Nullable CoreQueryOptions options, String hookPoint, boolean isBeginWork, boolean existingErrorCheck, @Nullable ObjectNode txdata, @Nullable ArrayNode params, @Nullable SpanWrapper span, boolean tximplicit, AtomicReference<ReactiveLock.Waiter> lockToken, boolean updateInternalState) {
        return Mono.defer(() -> this.queryWrapperLocked(sidx, qc, statement, options, hookPoint, isBeginWork, existingErrorCheck, txdata, params, span, tximplicit, lockToken, updateInternalState).map(result -> result.internal()).flatMap(result -> result.rows().collectList().flatMap(rows -> result.trailer().onErrorResume(err -> {
            RuntimeException converted = this.convertQueryError(sidx, (Throwable)err, true);
            long elapsed = span.elapsedMicros();
            this.logger().warn(this.attemptId, "q{} got error on rows stream {} after {}us, converted from {}", sidx, CoreTransactionAttemptContext.dbg(converted), elapsed, CoreTransactionAttemptContext.dbg(err));
            if (converted != null) {
                return Mono.error((Throwable)converted);
            }
            return Mono.error((Throwable)err);
        }).map(trailer -> new ClassicCoreQueryResult(result.header(), (List<QueryChunkRow>)rows, (QueryChunkTrailer)trailer, null)))).onErrorResume(err -> {
            RuntimeException converted = this.convertQueryError(sidx, (Throwable)err, updateInternalState);
            long elapsed = span.elapsedMicros();
            this.logger().warn(this.attemptId, "q{} got error {} after {}us, converted from {}", sidx, CoreTransactionAttemptContext.dbg(converted), elapsed, CoreTransactionAttemptContext.dbg(err));
            if (converted != null) {
                return Mono.error((Throwable)converted);
            }
            return Mono.error((Throwable)err);
        }).flatMap(result -> {
            long elapsed = span.elapsedMicros();
            this.logger().info(this.attemptId, "q{} returned with metrics {} after {}us", sidx, result.metaData().metrics().get(), elapsed);
            if (result.metaData().status() == CoreQueryStatus.FATAL) {
                TransactionOperationFailedException err = this.operationFailed(updateInternalState, TransactionOperationFailedException.Builder.createError().build());
                return Mono.error((Throwable)err);
            }
            return Mono.just((Object)result);
        }));
    }

    private Mono<Void> beginWorkIfNeeded(int sidx, @Nullable CoreQueryContext qc, String statement, AtomicReference<ReactiveLock.Waiter> lockToken, SpanWrapper span) {
        return this.hooks.beforeUnlockQuery.apply(this, statement).then(this.unlock(lockToken.get(), "before BEGIN WORK q" + sidx)).then(this.waitForAllKVOpsThenLock("queryWrapper q" + sidx)).flatMap(newLockToken -> {
            lockToken.set((ReactiveLock.Waiter)newLockToken);
            boolean stillNeedsBeginWork = !this.queryModeLocked();
            this.LOGGER.info(this.attemptId, "q{} after reacquiring lock stillNeedsBeginWork={}", sidx, stillNeedsBeginWork);
            if (!this.queryModeLocked()) {
                return this.queryBeginWorkLocked(qc, span);
            }
            return Mono.empty();
        });
    }

    private RuntimeException convertQueryError(int sidx, Throwable err, boolean updateInternalState) {
        RuntimeException out = QueryUtil.convertQueryError(err);
        if (out instanceof TransactionOperationFailedException) {
            return this.operationFailed(updateInternalState, (TransactionOperationFailedException)out);
        }
        return out;
    }

    private static CoreQueryOptionsTransactions applyQueryOptions(CoreMergedTransactionConfig config, CoreQueryOptionsTransactions options, long txtimeout) {
        String durabilityLevelString;
        String scanConsistency = null;
        if (config.scanConsistency().isPresent()) {
            scanConsistency = config.scanConsistency().get();
        }
        if (scanConsistency != null) {
            options.put("scan_consistency", TextNode.valueOf(scanConsistency));
        }
        DurabilityLevel durabilityLevel = config.durabilityLevel();
        switch (durabilityLevel) {
            case NONE: {
                durabilityLevelString = "none";
                break;
            }
            case MAJORITY: {
                durabilityLevelString = "majority";
                break;
            }
            case MAJORITY_AND_PERSIST_TO_ACTIVE: {
                durabilityLevelString = "majorityAndPersistActive";
                break;
            }
            case PERSIST_TO_MAJORITY: {
                durabilityLevelString = "persistToMajority";
                break;
            }
            default: {
                throw new IllegalArgumentException("Unknown durability level " + (Object)((Object)durabilityLevel));
            }
        }
        options.put("durability_level", TextNode.valueOf(durabilityLevelString));
        options.put("txtimeout", TextNode.valueOf(txtimeout + "ms"));
        config.metadataCollection().ifPresent(metadataCollection -> options.put("atrcollection", TextNode.valueOf(String.format("`%s`.`%s`.`%s`", metadataCollection.bucket(), metadataCollection.scope().orElse("_default"), metadataCollection.collection().orElse("_default")))));
        options.put("numatrs", IntNode.valueOf(config.numAtrs()));
        return options;
    }

    private Mono<Void> queryBeginWorkLocked(@Nullable CoreQueryContext qc, SpanWrapper span) {
        return Mono.defer(() -> {
            this.assertLocked("queryBeginWork");
            this.stagedMutationsLocked.forEach(sm -> {
                if (sm.isStagedBinary()) {
                    FeatureNotAvailableException cause = new FeatureNotAvailableException("Binary documents are only supported in a KV-only transaction");
                    throw this.operationFailed(TransactionOperationFailedException.Builder.createError().cause(cause).build());
                }
            });
            ObjectNode txdata = this.makeQueryTxDataLocked();
            CoreQueryOptionsTransactions options = new CoreQueryOptionsTransactions();
            CoreTransactionAttemptContext.applyQueryOptions(this.config, options, this.expiryRemainingMillis());
            String statement = "BEGIN WORK";
            int statementIdx = this.queryStatementIdx.getAndIncrement();
            return this.queryWrapperBlockingLocked(statementIdx, qc, statement, options, "queryBeginWork", true, true, txdata, null, span, false, null, true).doOnNext(v -> {
                this.assertLocked("beginWork");
                this.stagedMutationsLocked.clear();
                if (this.queryContext == null) {
                    throw this.operationFailed(TransactionOperationFailedException.Builder.createError().cause(new IllegalAccessError("Internal error: Must have a queryTarget after BEGIN WORK")).build());
                }
            }).then();
        });
    }

    public Mono<CoreQueryResult> queryBlocking(String statement, @Nullable CoreQueryContext qc, @Nullable CoreQueryOptions options, boolean tximplicit) {
        return this.doQueryOperation("query blocking", statement, this.attemptSpan, (sidx, lockToken, span) -> {
            if (tximplicit) {
                span.attribute("db.couchbase.transaction.single_query", true);
            }
            return this.queryWrapperBlockingLocked((int)sidx, qc, statement, options, "query", false, true, null, null, (SpanWrapper)span, tximplicit, (AtomicReference<ReactiveLock.Waiter>)lockToken, true);
        });
    }

    Mono<Void> queryInternalPreLocked(int sidx, String statement, String hookPoint, boolean existingErrorCheck) {
        return Mono.defer(() -> {
            boolean expiresSoon;
            TransactionOperationFailedException returnEarly;
            this.assertLocked("queryInternalPre");
            if (existingErrorCheck && (returnEarly = this.canPerformOperation("queryInternalPre " + sidx)) != null) {
                return Mono.error((Throwable)returnEarly);
            }
            long remaining = this.expiryRemainingMillis();
            boolean bl = expiresSoon = remaining < (long)this.EXPIRY_THRESHOLD;
            if (this.hasExpiredClientSide(hookPoint, Optional.of(statement)) || expiresSoon) {
                this.logger().info(this.attemptId, "transaction has expired in stage '{}' remaining={} threshold={}", hookPoint, remaining, this.EXPIRY_THRESHOLD);
                return Mono.error((Throwable)this.operationFailed(TransactionOperationFailedException.Builder.createError().raiseException(TransactionOperationFailedException.FinalErrorToRaise.TRANSACTION_EXPIRED).doNotRollbackAttempt().build()));
            }
            return Mono.empty();
        });
    }

    private List<DocRecord> toDocRecords(List<StagedMutation> mutations) {
        return mutations.stream().map(m -> new DocRecord(m.collection.bucket(), m.collection.scope().orElse("_default"), m.collection.collection().orElse("_default"), m.id)).collect(Collectors.toList());
    }

    @Stability.Internal
    private Mono<Void> addCleanup(@Nullable CoreTransactionsCleanup cleanup) {
        return Mono.fromRunnable(() -> {
            CleanupRequest cleanupRequest = this.createCleanupRequestIfNeeded(cleanup);
            if (cleanupRequest != null && cleanup != null) {
                cleanup.add(cleanupRequest);
            }
        });
    }

    @Stability.Internal
    Mono<Void> lambdaEnd(@Nullable CoreTransactionsCleanup cleanup, @Nullable Throwable err, boolean singleQueryTransactionMode) {
        return Mono.defer(() -> {
            int sb = this.stateBits.get();
            boolean shouldNotRollback = (sb & 4) != 0;
            int maskedFinalError = (sb & 0x70) >> 4;
            TransactionOperationFailedException.FinalErrorToRaise finalError = TransactionOperationFailedException.FinalErrorToRaise.values()[maskedFinalError];
            boolean rollbackNeeded = finalError != TransactionOperationFailedException.FinalErrorToRaise.TRANSACTION_SUCCESS && !shouldNotRollback && !singleQueryTransactionMode;
            this.LOGGER.info(this.attemptId, "reached post-lambda in {}us, shouldNotRollback={} finalError={} rollbackNeeded={}, err (only cause of this will be used)={} tximplicit={}{}", new Object[]{TimeUnit.NANOSECONDS.toMicros(System.nanoTime() - this.startTimeClient.toNanos()), shouldNotRollback, finalError, rollbackNeeded, err, singleQueryTransactionMode, this.queryModeUnlocked() ? "" : this.meteringUnitsBuilder.toString()});
            this.core.transactionsContext().counters().attempts().incrementBy(1L);
            return Mono.defer(() -> {
                if (rollbackNeeded) {
                    return this.rollbackAuto().onErrorResume(er -> {
                        this.overall.LOGGER.info(this.attemptId, "rollback failed with {}. Original error will be raised as cause, and retry should be disabled", DebugUtil.dbg(er));
                        this.setStateBits("lambdaEnd", 8, 0);
                        return Mono.empty();
                    });
                }
                return Mono.empty();
            }).then(this.addCleanup(cleanup)).doOnTerminate(() -> {
                if (err != null) {
                    this.attemptSpan.finishWithErrorStatus();
                } else {
                    this.attemptSpan.finish();
                }
            }).then(Mono.defer(() -> this.retryIfRequired(err)));
        });
    }

    private Mono<Void> retryIfRequired(Throwable err) {
        boolean retryNeeded;
        int sb = this.stateBits.get();
        boolean shouldNotRetry = (sb & 8) != 0;
        int maskedFinalError = (sb & 0x70) >> 4;
        TransactionOperationFailedException.FinalErrorToRaise finalError = TransactionOperationFailedException.FinalErrorToRaise.values()[maskedFinalError];
        boolean bl = retryNeeded = finalError != TransactionOperationFailedException.FinalErrorToRaise.TRANSACTION_SUCCESS && !shouldNotRetry;
        if (retryNeeded && this.hasExpiredClientSide("beforeRetry", Optional.empty())) {
            return Mono.error((Throwable)this.operationFailed(TransactionOperationFailedException.Builder.createError().doNotRollbackAttempt().raiseException(TransactionOperationFailedException.FinalErrorToRaise.TRANSACTION_EXPIRED).build()));
        }
        this.LOGGER.info(this.attemptId, "reached end of lambda post-rollback (if needed), shouldNotRetry={} finalError={} retryNeeded={}", new Object[]{shouldNotRetry, finalError, retryNeeded});
        if (retryNeeded) {
            return Mono.error((Throwable)new RetryTransactionException());
        }
        if (err != null) {
            return Mono.error((Throwable)err);
        }
        return Mono.empty();
    }

    @Stability.Internal
    Mono<CoreTransactionResult> transactionEnd(@Nullable Throwable err, boolean singleQueryTransactionMode) {
        return Mono.defer(() -> {
            boolean unstagingComplete = this.state == AttemptState.COMPLETED;
            CoreTransactionResult result = new CoreTransactionResult(this.overall.LOGGER, Duration.ofNanos(System.nanoTime() - this.overall.startTimeClient()), this.overall.transactionId(), unstagingComplete);
            int sb = this.stateBits.get();
            int maskedFinalError = (sb & 0x70) >> 4;
            TransactionOperationFailedException.FinalErrorToRaise finalError = TransactionOperationFailedException.FinalErrorToRaise.values()[maskedFinalError];
            this.LOGGER.info(this.attemptId, "reached end of transaction, toRaise={}, err={}", new Object[]{finalError, err});
            this.core.transactionsContext().counters().transactions().incrementBy(1L);
            Throwable cause = null;
            if (err != null) {
                if (!(err instanceof TransactionOperationFailedException)) {
                    if (!singleQueryTransactionMode) {
                        this.logger().info(this.attemptId, "Non-TransactionOperationFailedException '" + DebugUtil.dbg(err) + "' received, this is a bug");
                    }
                } else {
                    TransactionOperationFailedException e = (TransactionOperationFailedException)err;
                    cause = e.getCause();
                }
            }
            Throwable ret = null;
            switch (finalError) {
                case TRANSACTION_FAILED_POST_COMMIT: {
                    break;
                }
                case TRANSACTION_SUCCESS: {
                    if (!singleQueryTransactionMode) break;
                    ret = err;
                    break;
                }
                case TRANSACTION_EXPIRED: {
                    String msg = "Transaction has expired configured timeout of " + this.overall.expirationTime().toMillis() + "ms.  The transaction is not committed.";
                    ret = new CoreTransactionExpiredException(cause, this.logger(), this.overall.transactionId(), msg);
                    break;
                }
                case TRANSACTION_COMMIT_AMBIGUOUS: {
                    String msg = "It is ambiguous whether the transaction committed";
                    ret = new CoreTransactionCommitAmbiguousException(cause, this.logger(), this.overall.transactionId(), msg);
                    break;
                }
                default: {
                    ret = new CoreTransactionFailedException(cause, this.logger(), this.overall.transactionId());
                }
            }
            if (ret != null) {
                this.LOGGER.info(this.attemptId, "raising final error {} based on state bits {} masked {} tximplicit {}", ret, sb, maskedFinalError, singleQueryTransactionMode);
                return Mono.error((Throwable)ret);
            }
            return Mono.just((Object)result);
        });
    }

    @Stability.Internal
    Throwable convertToOperationFailedIfNeeded(Throwable e, boolean singleQueryTransactionMode) {
        if (e instanceof TransactionOperationFailedException) {
            return (TransactionOperationFailedException)e;
        }
        if (e instanceof WrappedTransactionOperationFailedException) {
            return ((WrappedTransactionOperationFailedException)((Object)e)).wrapped();
        }
        if (singleQueryTransactionMode) {
            this.logger().info(this.attemptId(), "Caught exception from application's lambda {}, not converting", DebugUtil.dbg(e));
            return e;
        }
        TransactionOperationFailedException.Builder builder = TransactionOperationFailedException.Builder.createError().cause(e);
        if (e instanceof RetryTransactionException) {
            builder.retryTransaction();
        }
        TransactionOperationFailedException out = builder.build();
        this.logger().info(this.attemptId(), "Caught exception from application's lambda {}, converted it to {}", DebugUtil.dbg(e), DebugUtil.dbg(out));
        this.attemptSpan.recordExceptionAndSetErrorStatus(e);
        return this.operationFailed(out);
    }

    public CoreTransactionLogger logger() {
        return this.LOGGER;
    }

    public String toString() {
        StringBuilder sb = new StringBuilder("AttemptContextReactive{");
        sb.append("id=").append(this.attemptId.substring(0, 5));
        sb.append(",state=").append((Object)this.state);
        sb.append(",atr=").append(ActiveTransactionRecordUtil.getAtrDebug(this.atrCollection, this.atrId));
        sb.append(",staged=").append(this.stagedMutationsLocked.stream().map(StagedMutation::toString).collect(Collectors.toList()));
        sb.append('}');
        return sb.toString();
    }

    private static /* synthetic */ Mono lambda$doUnderLock$154(Supplier whileLocked) {
        return (Mono)whileLocked.get();
    }

    private /* synthetic */ Mono lambda$waitForAllOpsThenDoUnderLock$85(String dbg, SpanWrapper span, Supplier doUnderLock, ReactiveLock.Waiter lockToken) {
        if (this.kvOps.waitingCount() > 0) {
            return this.unlock(lockToken, dbg + " still waiting for ops").then(this.waitForAllOpsThenDoUnderLock(dbg + " still waiting for ops", span, doUnderLock));
        }
        return ((Mono)doUnderLock.get()).then(this.unlock(lockToken, "after doUnderLock")).onErrorResume(err -> this.unlock(lockToken, "onError doUnderLock").then(Mono.error((Throwable)err)));
    }

    private static /* synthetic */ boolean lambda$getMultiReadSkewResolution$59(CoreTransactionGetMultiResult v) {
        return v.internal.isInTransaction();
    }

    static class TransactionQueryContext {
        public final NodeIdentifier queryTarget;
        @Nullable
        public final CoreQueryContext queryContext;

        public TransactionQueryContext(NodeIdentifier queryTarget, @Nullable CoreQueryContext queryContext) {
            this.queryTarget = Objects.requireNonNull(queryTarget);
            this.queryContext = queryContext;
        }
    }

    static class DocChanged {
        public final boolean unclearIfBodyHasChanged;
        public final boolean bodyHasChanged;
        public final boolean inDifferentTransaction;
        public final boolean notInTransaction;

        public DocChanged(boolean unclearIfBodyHasChanged, boolean bodyHasChanged, boolean inDifferentTransaction, boolean notInTransaction) {
            this.unclearIfBodyHasChanged = unclearIfBodyHasChanged;
            this.bodyHasChanged = bodyHasChanged;
            this.inDifferentTransaction = inDifferentTransaction;
            this.notInTransaction = notInTransaction;
        }

        public boolean inSameTransaction() {
            return !this.notInTransaction && !this.inDifferentTransaction;
        }
    }

    static class RetryGetMulti
    extends RuntimeException {
        public RetryGetMulti(String message) {
            super(message);
        }
    }

    static class BoundExceeded
    extends RuntimeException {
        BoundExceeded() {
        }
    }

    static class ResetAndRetryGetMulti
    extends RuntimeException {
        public ResetAndRetryGetMulti(String message) {
            super(message);
        }
    }
}

