/*
 * Decompiled with CFR 0.152.
 */
package com.azure.cosmos.implementation.changefeed.epkversion;

import com.azure.cosmos.implementation.CosmosSchedulers;
import com.azure.cosmos.implementation.Strings;
import com.azure.cosmos.implementation.changefeed.Bootstrapper;
import com.azure.cosmos.implementation.changefeed.Lease;
import com.azure.cosmos.implementation.changefeed.LeaseStore;
import com.azure.cosmos.implementation.changefeed.LeaseStoreManager;
import com.azure.cosmos.implementation.changefeed.common.ChangeFeedMode;
import com.azure.cosmos.implementation.changefeed.common.ChangeFeedState;
import com.azure.cosmos.implementation.changefeed.common.LeaseVersion;
import com.azure.cosmos.implementation.changefeed.epkversion.PartitionSynchronizer;
import com.azure.cosmos.implementation.guava25.base.Preconditions;
import com.azure.cosmos.models.ChangeFeedProcessorOptions;
import java.time.Duration;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;

public class PkRangeIdVersionLeaseStoreBootstrapperImpl
implements Bootstrapper {
    private final Logger logger = LoggerFactory.getLogger(PkRangeIdVersionLeaseStoreBootstrapperImpl.class);
    private final PartitionSynchronizer synchronizer;
    private final LeaseStore leaseStore;
    private final LeaseStoreManager pkRangeIdVersionLeaseStoreManager;
    private final LeaseStoreManager epkRangeVersionLeaseStoreManager;
    private final Duration lockTime;
    private final Duration sleepTime;
    private final ChangeFeedMode changeFeedModeToStart;
    private final ChangeFeedProcessorOptions changeFeedProcessorOptions;
    private volatile boolean isInitialized;
    private volatile boolean isLockAcquired;
    private volatile boolean isPkRangeIdVersionLeaseStoreLockAcquired;

    public PkRangeIdVersionLeaseStoreBootstrapperImpl(PartitionSynchronizer synchronizer, LeaseStore leaseStore, Duration lockTime, Duration sleepTime, LeaseStoreManager pkRangeIdVersionLeaseStoreManager, LeaseStoreManager epkRangeVersionLeaseStoreManager, ChangeFeedProcessorOptions changeFeedProcessorOptions, ChangeFeedMode changeFeedModeToStart) {
        Preconditions.checkNotNull(synchronizer, "Argument 'synchronizer' can not be null");
        Preconditions.checkNotNull(leaseStore, "Argument 'leaseStore' can not be null");
        Preconditions.checkArgument(lockTime != null && this.isPositive(lockTime), "lockTime should be non-null and positive");
        Preconditions.checkArgument(sleepTime != null && this.isPositive(sleepTime), "sleepTime should be non-null and positive");
        Preconditions.checkArgument(pkRangeIdVersionLeaseStoreManager != null, "Argument 'pkRangeIdVersionLeaseStoreManager' should not be null");
        this.synchronizer = synchronizer;
        this.leaseStore = leaseStore;
        this.pkRangeIdVersionLeaseStoreManager = pkRangeIdVersionLeaseStoreManager;
        this.epkRangeVersionLeaseStoreManager = epkRangeVersionLeaseStoreManager;
        this.changeFeedProcessorOptions = changeFeedProcessorOptions;
        this.changeFeedModeToStart = changeFeedModeToStart;
        this.lockTime = lockTime;
        this.sleepTime = sleepTime;
        this.isInitialized = false;
    }

    private boolean isPositive(Duration duration) {
        return !duration.isNegative() && !duration.isZero();
    }

    @Override
    public Mono<Void> initialize() {
        this.isInitialized = false;
        return Mono.just((Object)this).flatMap(value -> this.leaseStore.isInitialized()).flatMap(initialized -> {
            this.isInitialized = initialized;
            if (initialized.booleanValue()) {
                return this.validateLeaseCFModeInteroperabilityForEpkRangeBasedLease();
            }
            this.logger.info("Acquire initialization lock");
            return this.acquireInitializationLock().flatMap(lockAcquired -> {
                if (!lockAcquired.booleanValue()) {
                    this.logger.info("Another instance is initializing the store");
                    return Mono.just((Object)this.isLockAcquired).delayElement(this.sleepTime, CosmosSchedulers.COSMOS_PARALLEL);
                }
                return this.pkRangeIdVersionLeaseStoreManager.isInitialized().flatMap(pkRangeIdVersionLeaseStoreInitialized -> {
                    if (pkRangeIdVersionLeaseStoreInitialized.booleanValue()) {
                        return this.bootstrapFromPkRangeVersionLeases();
                    }
                    return this.bootstrapFromScratch();
                });
            }).onErrorResume(throwable -> {
                this.logger.warn("Unexpected exception caught while initializing the lock", throwable);
                return Mono.just((Object)this.isLockAcquired);
            }).flatMap(lockAcquired -> {
                if (this.isLockAcquired) {
                    return this.leaseStore.releaseInitializationLock().then(this.releasePkRangeIdVersionLeaseStoreLock());
                }
                return Mono.just((Object)lockAcquired);
            });
        }).repeat(() -> !this.isInitialized).then();
    }

    private Mono<Boolean> releasePkRangeIdVersionLeaseStoreLock() {
        if (this.isPkRangeIdVersionLeaseStoreLockAcquired) {
            return this.pkRangeIdVersionLeaseStoreManager.releaseInitializationLock();
        }
        return Mono.just((Object)Boolean.FALSE);
    }

    private Mono<Boolean> bootstrapFromPkRangeVersionLeases() {
        return this.pkRangeIdVersionLeaseStoreManager.getAllLeases().collectList().flatMap(pkRangeIdVersionLeases -> this.synchronizer.createMissingLeases((List<Lease>)pkRangeIdVersionLeases).thenReturn(pkRangeIdVersionLeases)).flatMap(pkRangeIdVersionLeases -> this.leaseStore.markInitialized().flatMap(isInitialized -> {
            if (isInitialized.booleanValue()) {
                return this.pkRangeIdVersionLeaseStoreManager.deleteAll((List<Lease>)pkRangeIdVersionLeases);
            }
            return Mono.empty();
        }).thenReturn((Object)this.isLockAcquired));
    }

    private Mono<Void> validateLeaseCFModeInteroperabilityForEpkRangeBasedLease() {
        return this.epkRangeVersionLeaseStoreManager.getTopLeases(1).next().flatMap(lease -> {
            ChangeFeedState changeFeedState;
            if (lease.getVersion() == LeaseVersion.EPK_RANGE_BASED_LEASE && !Strings.isNullOrEmpty(lease.getId()) && !Strings.isNullOrEmpty(lease.getContinuationToken()) && (changeFeedState = ChangeFeedState.fromString(lease.getContinuationToken())).getMode() != this.changeFeedModeToStart) {
                String errorMessage = String.format("ChangeFeedProcessor#handleLatestVersionChanges cannot be invoked when ChangeFeedProcessor#handleAllVersionsAndDeletes was also started for lease prefix : %s", this.changeFeedProcessorOptions.getLeasePrefix());
                return Mono.error((Throwable)new IllegalStateException(errorMessage));
            }
            return Mono.empty();
        });
    }

    private Mono<Boolean> bootstrapFromScratch() {
        return this.synchronizer.createMissingLeases().thenReturn((Object)this).flatMap(bootstrapper -> this.leaseStore.markInitialized()).flatMap(leaseStoreInitialized -> {
            if (leaseStoreInitialized.booleanValue()) {
                return this.pkRangeIdVersionLeaseStoreManager.markInitialized();
            }
            return Mono.empty();
        }).thenReturn((Object)this.isLockAcquired);
    }

    private Mono<Boolean> acquireInitializationLock() {
        return this.leaseStore.acquireInitializationLock(this.lockTime).flatMap(isLockAcquired -> {
            this.isLockAcquired = isLockAcquired;
            if (!isLockAcquired.booleanValue()) {
                return Mono.just((Object)Boolean.FALSE);
            }
            return this.pkRangeIdVersionLeaseStoreManager.isInitialized().flatMap(pkRangeIdVersionLeaseStoreInitialized -> {
                if (pkRangeIdVersionLeaseStoreInitialized.booleanValue()) {
                    return Mono.just((Object)Boolean.TRUE);
                }
                return this.pkRangeIdVersionLeaseStoreManager.acquireInitializationLock(this.lockTime).flatMap(pkRangeIdVersionLeaseStoreLockAcquired -> {
                    this.isPkRangeIdVersionLeaseStoreLockAcquired = pkRangeIdVersionLeaseStoreLockAcquired;
                    return Mono.just((Object)this.isPkRangeIdVersionLeaseStoreLockAcquired);
                });
            });
        });
    }
}

