/*
 * Decompiled with CFR 0.152.
 */
package org.infinispan.anchored.impl;

import io.reactivex.rxjava3.core.Flowable;
import io.reactivex.rxjava3.core.Maybe;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.concurrent.CompletionStage;
import org.infinispan.Cache;
import org.infinispan.CacheSet;
import org.infinispan.InternalCacheSet;
import org.infinispan.anchored.impl.AnchorManager;
import org.infinispan.anchored.impl.AnchoredReadCommittedEntry;
import org.infinispan.commands.CommandsFactory;
import org.infinispan.commands.DataCommand;
import org.infinispan.commands.FlagAffectedCommand;
import org.infinispan.commands.ReplicableCommand;
import org.infinispan.commands.VisitableCommand;
import org.infinispan.commands.read.EntrySetCommand;
import org.infinispan.commands.read.GetAllCommand;
import org.infinispan.commands.read.GetCacheEntryCommand;
import org.infinispan.commands.read.GetKeyValueCommand;
import org.infinispan.commands.read.KeySetCommand;
import org.infinispan.commands.read.SizeCommand;
import org.infinispan.commands.remote.ClusteredGetCommand;
import org.infinispan.commands.write.ClearCommand;
import org.infinispan.commands.write.IracPutKeyValueCommand;
import org.infinispan.commands.write.PutKeyValueCommand;
import org.infinispan.commands.write.PutMapCommand;
import org.infinispan.commands.write.RemoveCommand;
import org.infinispan.commands.write.ReplaceCommand;
import org.infinispan.commons.util.IntSet;
import org.infinispan.commons.util.concurrent.CompletableFutures;
import org.infinispan.container.entries.CacheEntry;
import org.infinispan.container.entries.InternalCacheValue;
import org.infinispan.container.entries.NullCacheEntry;
import org.infinispan.container.entries.RemoteMetadata;
import org.infinispan.container.impl.EntryFactory;
import org.infinispan.context.InvocationContext;
import org.infinispan.context.impl.FlagBitSets;
import org.infinispan.context.impl.SingleKeyNonTxInvocationContext;
import org.infinispan.distribution.DistributionInfo;
import org.infinispan.distribution.DistributionManager;
import org.infinispan.factories.annotations.Inject;
import org.infinispan.factories.impl.ComponentRef;
import org.infinispan.factories.scopes.Scope;
import org.infinispan.factories.scopes.Scopes;
import org.infinispan.interceptors.impl.BaseRpcInterceptor;
import org.infinispan.notifications.Listener;
import org.infinispan.remoting.responses.ValidResponse;
import org.infinispan.remoting.transport.Address;
import org.infinispan.remoting.transport.ValidSingleResponseCollector;
import org.infinispan.util.concurrent.AggregateCompletionStage;
import org.infinispan.util.concurrent.CompletionStages;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;
import org.reactivestreams.Publisher;

@Listener
@Scope(value=Scopes.NAMED_CACHE)
public class AnchoredFetchInterceptor<K, V>
extends BaseRpcInterceptor {
    private static final Log log = LogFactory.getLog(AnchoredFetchInterceptor.class);
    @Inject
    CommandsFactory cf;
    @Inject
    EntryFactory entryFactory;
    @Inject
    DistributionManager distributionManager;
    @Inject
    ComponentRef<Cache<K, V>> cache;
    @Inject
    AnchorManager anchorManager;

    protected Log getLog() {
        return log;
    }

    public Object visitGetKeyValueCommand(InvocationContext ctx, GetKeyValueCommand command) {
        return this.asyncInvokeNext(ctx, (VisitableCommand)command, this.fetchSingleContextValue(ctx, (DataCommand)command, false));
    }

    public Object visitGetCacheEntryCommand(InvocationContext ctx, GetCacheEntryCommand command) {
        return this.asyncInvokeNext(ctx, (VisitableCommand)command, this.fetchSingleContextValue(ctx, (DataCommand)command, false));
    }

    public Object visitPutKeyValueCommand(InvocationContext ctx, PutKeyValueCommand command) {
        return this.asyncInvokeNext(ctx, (VisitableCommand)command, this.fetchSingleContextValue(ctx, (DataCommand)command, true));
    }

    public Object visitReplaceCommand(InvocationContext ctx, ReplaceCommand command) {
        return this.asyncInvokeNext(ctx, (VisitableCommand)command, this.fetchSingleContextValue(ctx, (DataCommand)command, true));
    }

    public Object visitRemoveCommand(InvocationContext ctx, RemoveCommand command) {
        return this.asyncInvokeNext(ctx, (VisitableCommand)command, this.fetchSingleContextValue(ctx, (DataCommand)command, true));
    }

    public Object visitPutMapCommand(InvocationContext ctx, PutMapCommand command) {
        return this.asyncInvokeNext(ctx, (VisitableCommand)command, this.fetchAllContextValues(ctx, (FlagAffectedCommand)command, true));
    }

    public Object visitIracPutKeyValueCommand(InvocationContext ctx, IracPutKeyValueCommand command) {
        return this.asyncInvokeNext(ctx, (VisitableCommand)command, this.fetchSingleContextValue(ctx, (DataCommand)command, true));
    }

    public Object visitGetAllCommand(InvocationContext ctx, GetAllCommand command) {
        return this.asyncInvokeNext(ctx, (VisitableCommand)command, this.fetchAllContextValues(ctx, (FlagAffectedCommand)command, false));
    }

    public Object visitKeySetCommand(InvocationContext ctx, KeySetCommand command) {
        return this.invokeNext(ctx, (VisitableCommand)command);
    }

    public Object visitEntrySetCommand(InvocationContext ctx, EntrySetCommand command) {
        if (command.hasAnyFlag(FlagBitSets.STATE_TRANSFER_PROGRESS)) {
            return this.invokeNext(ctx, (VisitableCommand)command);
        }
        return new BackingEntrySet((CacheSet)this.invokeNext(ctx, (VisitableCommand)command));
    }

    public Object visitSizeCommand(InvocationContext ctx, SizeCommand command) {
        return this.invokeNext(ctx, (VisitableCommand)command);
    }

    public Object visitClearCommand(InvocationContext ctx, ClearCommand command) {
        return this.invokeNext(ctx, (VisitableCommand)command);
    }

    protected Object handleDefault(InvocationContext ctx, VisitableCommand command) {
        throw new IllegalStateException("Command " + command.getClass().getName() + " is not yet supported");
    }

    private CompletionStage<Void> fetchSingleContextValue(InvocationContext ctx, DataCommand command, boolean isWrite) {
        CacheEntry ctxEntry;
        Object key = command.getKey();
        CompletionStage<CacheEntry<Object, V>> stage = this.fetchContextValue(ctx, (FlagAffectedCommand)command, key, ctxEntry = ((SingleKeyNonTxInvocationContext)ctx).getCacheEntry(), command.getSegment(), isWrite);
        if (stage == null) {
            return CompletableFutures.completedNull();
        }
        return stage.thenAccept(externalEntry -> this.entryFactory.wrapExternalEntry(ctx, key, externalEntry, true, isWrite));
    }

    private CompletionStage<Void> fetchAllContextValues(InvocationContext ctx, FlagAffectedCommand command, boolean isWrite) {
        if (command.hasAnyFlag(FlagBitSets.CACHE_MODE_LOCAL | FlagBitSets.SKIP_REMOTE_LOOKUP)) {
            return CompletableFutures.completedNull();
        }
        AggregateCompletionStage fetchStage = CompletionStages.aggregateCompletionStage();
        ArrayList stages = new ArrayList(ctx.lookedUpEntriesCount());
        ctx.forEachEntry((key, ctxEntry) -> {
            DistributionInfo distributionInfo = this.distributionManager.getCacheTopology().getDistribution(key);
            CompletionStage<CacheEntry<Object, V>> stage = this.fetchContextValue(ctx, command, (K)key, (CacheEntry<?, ?>)ctxEntry, distributionInfo.segmentId(), isWrite);
            stages.add(stage);
            if (stage != null) {
                fetchStage.dependsOn(stage);
            }
        });
        return fetchStage.freeze().thenAccept(__ -> {
            Iterator iterator = stages.iterator();
            ctx.forEachEntry((key, ctxEntry) -> {
                CompletionStage stage = (CompletionStage)iterator.next();
                if (stage != null) {
                    CacheEntry ownerEntry = (CacheEntry)CompletionStages.join((CompletionStage)stage);
                    this.entryFactory.wrapExternalEntry(ctx, key, ownerEntry, true, isWrite);
                }
            });
        });
    }

    private CompletionStage<CacheEntry<K, V>> fetchContextValue(InvocationContext ctx, FlagAffectedCommand command, K key, CacheEntry<?, ?> ctxEntry, int segment, boolean isWrite) {
        if (ctxEntry.getValue() != null) {
            return null;
        }
        if (ctxEntry.getMetadata() instanceof RemoteMetadata) {
            RemoteMetadata remoteMetadata = (RemoteMetadata)ctxEntry.getMetadata();
            Address keyLocation = remoteMetadata.getAddress();
            if (isWrite && !this.isLocalModeForced(command)) {
                Address newLocation = this.anchorManager.updateLocation(keyLocation);
                ((AnchoredReadCommittedEntry)ctxEntry).setLocation(newLocation);
            }
            DistributionInfo distributionInfo = this.distributionManager.getCacheTopology().getSegmentDistribution(segment);
            if (isWrite && !this.shouldLoad(ctx, command, distributionInfo)) {
                return null;
            }
            return this.getRemoteValue(keyLocation, key, segment, isWrite);
        }
        if (isWrite && !this.isLocalModeForced(command)) {
            Address currentWriter = this.anchorManager.getCurrentWriter();
            ((AnchoredReadCommittedEntry)ctxEntry).setLocation(currentWriter);
        }
        return null;
    }

    private CompletionStage<CacheEntry<K, V>> getRemoteValue(Address keyLocation, K key, Integer segment, boolean isWrite) {
        ClusteredGetCommand getCommand = this.cf.buildClusteredGetCommand(key, segment, FlagBitSets.SKIP_OWNERSHIP_CHECK);
        getCommand.setTopologyId(0);
        getCommand.setWrite(isWrite);
        FetchResponseCollector collector = new FetchResponseCollector(key);
        return this.rpcManager.invokeCommand(keyLocation, (ReplicableCommand)getCommand, collector, this.rpcManager.getSyncRpcOptions());
    }

    private class BackingEntrySet
    extends InternalCacheSet<CacheEntry<K, V>> {
        protected final CacheSet<CacheEntry<K, V>> next;

        public BackingEntrySet(CacheSet<CacheEntry<K, V>> next) {
            this.next = next;
        }

        public Publisher<CacheEntry<K, V>> localPublisher(int segment) {
            return this.fixPublisher(this.next.localPublisher(segment), segment);
        }

        public Publisher<CacheEntry<K, V>> localPublisher(IntSet segments) {
            return this.fixPublisher(this.next.localPublisher(segments), null);
        }

        private Flowable<CacheEntry<K, V>> fixPublisher(Publisher<CacheEntry<K, V>> nextPublisher, Integer segment) {
            return Flowable.fromPublisher(nextPublisher).groupBy(entry -> entry.getMetadata() instanceof RemoteMetadata).flatMap(gf -> {
                if (!((Boolean)gf.getKey()).booleanValue()) {
                    return gf;
                }
                return gf.concatMapMaybe(incompleteEntry -> this.fixPublisherEntry(segment, (CacheEntry)incompleteEntry));
            }, 2);
        }

        private Maybe<CacheEntry<K, V>> fixPublisherEntry(Integer segment, CacheEntry<K, V> localEntry) {
            RemoteMetadata remoteMetadata = (RemoteMetadata)localEntry.getMetadata();
            Address keyLocation = remoteMetadata.getAddress();
            return Maybe.fromCompletionStage(AnchoredFetchInterceptor.this.getRemoteValue(keyLocation, localEntry.getKey(), segment, false));
        }
    }

    private static class FetchResponseCollector<K, V>
    extends ValidSingleResponseCollector<CacheEntry<K, V>> {
        private final K key;

        public FetchResponseCollector(K key) {
            this.key = key;
        }

        protected CacheEntry<K, V> withValidResponse(Address sender, ValidResponse response) {
            Object responseValue = response.getResponseValue();
            if (responseValue == null) {
                return NullCacheEntry.getInstance();
            }
            return ((InternalCacheValue)responseValue).toInternalCacheEntry(this.key);
        }

        protected CacheEntry<K, V> targetNotFound(Address sender) {
            return NullCacheEntry.getInstance();
        }
    }
}

