/*
 * Decompiled with CFR 0.152.
 */
package org.infinispan.container.versioning.irac;

import io.reactivex.rxjava3.annotations.NonNull;
import io.reactivex.rxjava3.core.Completable;
import io.reactivex.rxjava3.core.CompletableObserver;
import io.reactivex.rxjava3.core.CompletableSource;
import io.reactivex.rxjava3.core.Flowable;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.functions.Function;
import io.reactivex.rxjava3.functions.Predicate;
import java.lang.invoke.MethodHandles;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import net.jcip.annotations.GuardedBy;
import org.infinispan.commands.CommandsFactory;
import org.infinispan.commands.ReplicableCommand;
import org.infinispan.commands.irac.IracTombstoneCleanupCommand;
import org.infinispan.commands.irac.IracTombstonePrimaryCheckCommand;
import org.infinispan.commands.irac.IracTombstoneRemoteSiteCheckCommand;
import org.infinispan.commands.irac.IracTombstoneStateResponseCommand;
import org.infinispan.commons.util.IntSet;
import org.infinispan.configuration.cache.BackupConfiguration;
import org.infinispan.configuration.cache.Configuration;
import org.infinispan.configuration.cache.XSiteStateTransferConfiguration;
import org.infinispan.container.versioning.irac.IracTombstoneInfo;
import org.infinispan.container.versioning.irac.IracTombstoneManager;
import org.infinispan.distribution.DistributionInfo;
import org.infinispan.distribution.DistributionManager;
import org.infinispan.distribution.LocalizedCacheTopology;
import org.infinispan.factories.annotations.ComponentName;
import org.infinispan.factories.annotations.Inject;
import org.infinispan.factories.annotations.Start;
import org.infinispan.factories.annotations.Stop;
import org.infinispan.factories.impl.ComponentRef;
import org.infinispan.factories.scopes.Scope;
import org.infinispan.factories.scopes.Scopes;
import org.infinispan.metadata.impl.IracMetadata;
import org.infinispan.remoting.inboundhandler.DeliverOrder;
import org.infinispan.remoting.rpc.RpcManager;
import org.infinispan.remoting.rpc.RpcOptions;
import org.infinispan.remoting.transport.Address;
import org.infinispan.remoting.transport.Transport;
import org.infinispan.remoting.transport.impl.VoidResponseCollector;
import org.infinispan.util.ExponentialBackOff;
import org.infinispan.util.concurrent.AggregateCompletionStage;
import org.infinispan.util.concurrent.CompletableFutures;
import org.infinispan.util.concurrent.CompletionStages;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;
import org.infinispan.xsite.XSiteBackup;
import org.infinispan.xsite.irac.DefaultIracManager;
import org.infinispan.xsite.irac.IracExecutor;
import org.infinispan.xsite.irac.IracManager;
import org.infinispan.xsite.irac.IracXSiteBackup;
import org.infinispan.xsite.status.SiteState;
import org.infinispan.xsite.status.TakeOfflineManager;

@Scope(value=Scopes.NAMED_CACHE)
public class DefaultIracTombstoneManager
implements IracTombstoneManager {
    private static final Log log = LogFactory.getLog(MethodHandles.lookup().lookupClass());
    @Inject
    DistributionManager distributionManager;
    @Inject
    RpcManager rpcManager;
    @Inject
    CommandsFactory commandsFactory;
    @Inject
    TakeOfflineManager takeOfflineManager;
    @Inject
    ComponentRef<IracManager> iracManager;
    @ComponentName(value="org.infinispan.executors.timeout")
    @Inject
    ScheduledExecutorService scheduledExecutorService;
    @Inject
    @ComponentName(value="org.infinispan.executors.blocking")
    Executor blockingExecutor;
    private final Map<Object, IracTombstoneInfo> tombstoneMap;
    private final IracExecutor iracExecutor = new IracExecutor(this::performCleanup);
    private final Collection<IracXSiteBackup> asyncBackups;
    private final Scheduler scheduler;
    private volatile boolean stopped = true;
    private final int batchSize;

    public DefaultIracTombstoneManager(Configuration configuration) {
        this.asyncBackups = DefaultIracManager.asyncBackups(configuration);
        this.tombstoneMap = new ConcurrentHashMap<Object, IracTombstoneInfo>(configuration.sites().tombstoneMapSize());
        this.scheduler = new Scheduler(configuration.sites().tombstoneMapSize(), configuration.sites().maxTombstoneCleanupDelay());
        this.batchSize = configuration.sites().asyncBackupsStream().map(BackupConfiguration::stateTransfer).map(XSiteStateTransferConfiguration::chunkSize).reduce(1, Integer::max);
    }

    @Start
    public void start() {
        Transport transport = this.rpcManager.getTransport();
        transport.checkCrossSiteAvailable();
        String localSiteName = transport.localSiteName();
        this.asyncBackups.removeIf(xSiteBackup -> localSiteName.equals(xSiteBackup.getSiteName()));
        this.iracExecutor.setBackOff(ExponentialBackOff.NO_OP);
        this.iracExecutor.setExecutor(this.blockingExecutor);
        this.stopped = false;
        this.scheduler.disabled = false;
        this.scheduler.scheduleWithCurrentDelay();
    }

    @Stop
    public void stop() {
        this.stopped = true;
        this.stopCleanupTask();
        this.tombstoneMap.clear();
    }

    public void stopCleanupTask() {
        this.scheduler.disable();
    }

    @Override
    public void storeTombstone(int segment, Object key, IracMetadata metadata) {
        this.tombstoneMap.put(key, new IracTombstoneInfo(key, segment, metadata));
    }

    @Override
    public void storeTombstoneIfAbsent(IracTombstoneInfo tombstone) {
        if (tombstone == null) {
            return;
        }
        this.tombstoneMap.putIfAbsent(tombstone.getKey(), tombstone);
    }

    @Override
    public IracMetadata getTombstone(Object key) {
        IracTombstoneInfo tombstone = this.tombstoneMap.get(key);
        return tombstone == null ? null : tombstone.getMetadata();
    }

    @Override
    public void removeTombstone(IracTombstoneInfo tombstone) {
        if (tombstone == null) {
            return;
        }
        this.tombstoneMap.remove(tombstone.getKey(), tombstone);
    }

    @Override
    public void removeTombstone(Object key) {
        this.tombstoneMap.remove(key);
    }

    @Override
    public boolean isEmpty() {
        return this.tombstoneMap.isEmpty();
    }

    @Override
    public int size() {
        return this.tombstoneMap.size();
    }

    @Override
    public boolean isTaskRunning() {
        return this.scheduler.running;
    }

    @Override
    public long getCurrentDelayMillis() {
        return this.scheduler.currentDelayMillis;
    }

    @Override
    public void sendStateTo(Address requestor, IntSet segments) {
        StateTransferHelper helper = new StateTransferHelper(requestor, segments);
        Flowable.fromIterable(this.tombstoneMap.values()).filter((Predicate)helper).buffer(this.batchSize).concatMapCompletableDelayError((Function)helper).subscribe((CompletableObserver)helper);
    }

    @Override
    public void checkStaleTombstone(Collection<? extends IracTombstoneInfo> tombstones) {
        LocalizedCacheTopology topology = this.distributionManager.getCacheTopology();
        for (IracTombstoneInfo iracTombstoneInfo : tombstones) {
            IracTombstoneInfo data = this.tombstoneMap.get(iracTombstoneInfo.getKey());
            if (!topology.getSegmentDistribution(iracTombstoneInfo.getSegment()).isPrimary() || iracTombstoneInfo.equals(data)) continue;
            IracTombstoneCleanupCommand cmd = this.commandsFactory.buildIracTombstoneCleanupCommand(iracTombstoneInfo);
            this.rpcManager.sendToMany(topology.getSegmentDistribution(iracTombstoneInfo.getSegment()).writeOwners(), cmd, DeliverOrder.NONE);
        }
    }

    public void startCleanupTombstone() {
        this.iracExecutor.run();
    }

    public void runCleanupAndWait() {
        this.performCleanup().toCompletableFuture().join();
    }

    public boolean contains(IracTombstoneInfo tombstone) {
        return tombstone.equals(this.tombstoneMap.get(tombstone.getKey()));
    }

    private CompletionStage<Void> performCleanup() {
        if (this.stopped) {
            return CompletableFutures.completedNull();
        }
        this.scheduler.onTaskStarted(this.tombstoneMap.size());
        try {
            AggregateCompletionStage<Void> stage = CompletionStages.aggregateCompletionStage();
            RpcOptions rpcOptions = this.rpcManager.getSyncRpcOptions();
            HashMap staleTombstones = new HashMap(this.tombstoneMap.size());
            for (IracTombstoneInfo iracTombstoneInfo : this.tombstoneMap.values()) {
                DistributionInfo info = this.distributionManager.getCacheTopology().getSegmentDistribution(iracTombstoneInfo.getSegment());
                if (!info.isWriteOwner()) {
                    this.removeTombstone(iracTombstoneInfo);
                    continue;
                }
                if (this.iracManager.running().containsKey(iracTombstoneInfo.getKey())) continue;
                if (!info.isPrimary()) {
                    CompletionStage<Void> rsp;
                    BackPressure backPressure = (BackPressure)staleTombstones.get(info.primary());
                    if (backPressure == null) {
                        backPressure = new BackPressure(this.commandsFactory.buildIracTombstonePrimaryCheckCommand(this.batchSize), null);
                        staleTombstones.put(info.primary(), backPressure);
                    }
                    if (((IracTombstonePrimaryCheckCommand)backPressure.element).addTombstone(iracTombstoneInfo) != this.batchSize) continue;
                    if (backPressure.delay == null) {
                        rsp = this.rpcManager.invokeCommand(info.primary(), (ReplicableCommand)backPressure.element, VoidResponseCollector.ignoreLeavers(), rpcOptions);
                    } else {
                        ReplicableCommand rCmd = (ReplicableCommand)backPressure.element;
                        rsp = backPressure.delay.thenComposeAsync(unused -> this.rpcManager.invokeCommand(info.primary(), rCmd, VoidResponseCollector.ignoreLeavers(), rpcOptions), this.blockingExecutor);
                    }
                    staleTombstones.put(info.primary(), new BackPressure(this.commandsFactory.buildIracTombstonePrimaryCheckCommand(this.batchSize), rsp));
                    continue;
                }
                stage.dependsOn(new CleanupTask(iracTombstoneInfo).checkRemoteSites());
            }
            for (Map.Entry entry : staleTombstones.entrySet()) {
                BackPressure backPressure = (BackPressure)entry.getValue();
                if (((IracTombstonePrimaryCheckCommand)backPressure.element).isEmpty()) continue;
                if (backPressure.delay == null) {
                    this.rpcManager.sendTo((Address)entry.getKey(), (ReplicableCommand)backPressure.element, DeliverOrder.NONE);
                    continue;
                }
                backPressure.delay.thenRunAsync(() -> this.rpcManager.sendTo((Address)entry.getKey(), (ReplicableCommand)backPressure.element, DeliverOrder.NONE), this.blockingExecutor);
            }
            return stage.freeze().whenComplete(this.scheduler);
        }
        catch (Throwable t) {
            log.debug("Unexpected exception", t);
            this.scheduler.scheduleWithCurrentDelay();
            return CompletableFutures.completedNull();
        }
    }

    private DistributionInfo getSegmentDistribution(int segment) {
        return this.distributionManager.getCacheTopology().getSegmentDistribution(segment);
    }

    private final class Scheduler
    implements BiConsumer<Void, Throwable> {
        final int targetSize;
        final long maxDelayMillis;
        int preCleanupSize;
        int previousPostCleanupSize;
        long currentDelayMillis;
        volatile boolean running;
        volatile boolean disabled;
        @GuardedBy(value="this")
        ScheduledFuture<?> future;

        private Scheduler(int targetSize, long maxDelayMillis) {
            this.targetSize = targetSize;
            this.maxDelayMillis = maxDelayMillis;
            this.currentDelayMillis = maxDelayMillis / 2L;
        }

        void onTaskStarted(int size) {
            this.running = true;
            this.preCleanupSize = size;
        }

        void onTaskCompleted(int postCleanupSize) {
            if (postCleanupSize >= this.targetSize) {
                this.currentDelayMillis = 1L;
            } else {
                double tombstoneCreationRate = (double)(this.preCleanupSize - this.previousPostCleanupSize) * 1.0 / (double)this.currentDelayMillis;
                double estimationMillis = tombstoneCreationRate <= 0.0 ? (double)this.maxDelayMillis : Math.min((double)(this.targetSize - postCleanupSize) / tombstoneCreationRate + 1.0, (double)this.maxDelayMillis);
                this.currentDelayMillis = Math.round(Math.sqrt((double)this.currentDelayMillis * estimationMillis));
            }
            this.previousPostCleanupSize = postCleanupSize;
            this.scheduleWithCurrentDelay();
        }

        synchronized void scheduleWithCurrentDelay() {
            this.running = false;
            if (DefaultIracTombstoneManager.this.stopped || this.disabled) {
                return;
            }
            if (this.future != null) {
                this.future.cancel(true);
            }
            this.future = DefaultIracTombstoneManager.this.scheduledExecutorService.schedule(DefaultIracTombstoneManager.this.iracExecutor, this.currentDelayMillis, TimeUnit.MILLISECONDS);
        }

        synchronized void disable() {
            this.disabled = true;
            if (this.future != null) {
                this.future.cancel(true);
                this.future = null;
            }
        }

        @Override
        public void accept(Void unused, Throwable throwable) {
            this.onTaskCompleted(DefaultIracTombstoneManager.this.tombstoneMap.size());
        }
    }

    private class StateTransferHelper
    implements Predicate<IracTombstoneInfo>,
    Function<Collection<IracTombstoneInfo>, CompletableSource>,
    CompletableObserver {
        private final Address requestor;
        private final IntSet segments;

        private StateTransferHelper(Address requestor, IntSet segments) {
            this.requestor = requestor;
            this.segments = segments;
        }

        public boolean test(IracTombstoneInfo tombstone) {
            return this.segments.contains(tombstone.getSegment());
        }

        public CompletableSource apply(Collection<IracTombstoneInfo> state) {
            RpcOptions rpcOptions = DefaultIracTombstoneManager.this.rpcManager.getSyncRpcOptions();
            IracTombstoneStateResponseCommand cmd = DefaultIracTombstoneManager.this.commandsFactory.buildIracTombstoneStateResponseCommand(state);
            CompletionStage<Void> rsp = DefaultIracTombstoneManager.this.rpcManager.invokeCommand(this.requestor, (ReplicableCommand)cmd, VoidResponseCollector.ignoreLeavers(), rpcOptions);
            return Completable.fromCompletionStage(rsp);
        }

        public void onSubscribe(@NonNull Disposable d) {
        }

        public void onComplete() {
            if (log.isDebugEnabled()) {
                log.debugf("Tombstones transferred to %s for segments %s", this.requestor, this.segments);
            }
        }

        public void onError(@NonNull Throwable e) {
            log.failedToTransferTombstones(this.requestor, this.segments, e);
        }
    }

    private static final class BackPressure<E, T> {
        final E element;
        final CompletionStage<T> delay;

        private BackPressure(E element, CompletionStage<T> delay) {
            this.element = element;
            this.delay = delay;
        }
    }

    private final class CleanupTask
    implements java.util.function.Function<Boolean, CompletionStage<Void>>,
    Runnable {
        private final IracTombstoneInfo tombstone;

        private CleanupTask(IracTombstoneInfo tombstone) {
            this.tombstone = tombstone;
        }

        CompletionStage<Void> checkRemoteSites() {
            AggregateCompletionStage<Boolean> stage = CompletionStages.orBooleanAggregateCompletionStage();
            for (XSiteBackup backup : DefaultIracTombstoneManager.this.asyncBackups) {
                if (DefaultIracTombstoneManager.this.takeOfflineManager.getSiteState(backup.getSiteName()) == SiteState.OFFLINE) continue;
                IracTombstoneRemoteSiteCheckCommand cmd = DefaultIracTombstoneManager.this.commandsFactory.buildIracTombstoneRemoteSiteCheckCommand(this.tombstone.getKey());
                stage.dependsOn(DefaultIracTombstoneManager.this.rpcManager.invokeXSite(backup, cmd));
            }
            return stage.freeze().exceptionally(CompletableFutures.toTrueFunction()).thenComposeAsync(this, DefaultIracTombstoneManager.this.blockingExecutor);
        }

        @Override
        public CompletionStage<Void> apply(Boolean keepTombstone) {
            if (keepTombstone.booleanValue()) {
                return CompletableFutures.completedNull();
            }
            IracTombstoneCleanupCommand cmd = DefaultIracTombstoneManager.this.commandsFactory.buildIracTombstoneCleanupCommand(this.tombstone);
            return DefaultIracTombstoneManager.this.rpcManager.invokeCommand(DefaultIracTombstoneManager.this.getSegmentDistribution(this.tombstone.getSegment()).writeOwners(), (ReplicableCommand)cmd, VoidResponseCollector.validOnly(), DefaultIracTombstoneManager.this.rpcManager.getSyncRpcOptions()).thenRunAsync(this, DefaultIracTombstoneManager.this.blockingExecutor);
        }

        @Override
        public void run() {
            DefaultIracTombstoneManager.this.removeTombstone(this.tombstone);
        }
    }
}

