/*
 * Decompiled with CFR 0.152.
 */
package org.infinispan.scattered.impl;

import io.reactivex.rxjava3.core.Flowable;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.infinispan.commands.ReplicableCommand;
import org.infinispan.commons.util.IntSet;
import org.infinispan.commons.util.IntSets;
import org.infinispan.configuration.cache.Configurations;
import org.infinispan.container.entries.InternalCacheEntry;
import org.infinispan.container.entries.RemoteMetadata;
import org.infinispan.container.versioning.SimpleClusteredVersion;
import org.infinispan.distribution.LocalizedCacheTopology;
import org.infinispan.factories.annotations.Inject;
import org.infinispan.remoting.transport.Address;
import org.infinispan.remoting.transport.impl.MapResponseCollector;
import org.infinispan.scattered.ScatteredStateProvider;
import org.infinispan.scattered.ScatteredVersionManager;
import org.infinispan.statetransfer.OutboundTransferTask;
import org.infinispan.statetransfer.StateChunk;
import org.infinispan.statetransfer.StateProviderImpl;
import org.infinispan.topology.CacheTopology;
import org.infinispan.util.concurrent.CompletableFutures;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;
import org.reactivestreams.Publisher;

public class ScatteredStateProviderImpl
extends StateProviderImpl
implements ScatteredStateProvider {
    private static final Log log = LogFactory.getLog(ScatteredStateProviderImpl.class);
    private static final boolean trace = log.isTraceEnabled();
    @Inject
    protected ScatteredVersionManager svm;

    @Override
    public void start() {
        super.start();
    }

    @Override
    public CompletableFuture<Void> onTopologyUpdate(CacheTopology cacheTopology, boolean isRebalance) {
        if (isRebalance) {
            return this.replicateAndInvalidate(cacheTopology);
        }
        return CompletableFutures.completedNull();
    }

    private CompletableFuture<Void> replicateAndInvalidate(CacheTopology cacheTopology) {
        Address nextMember = this.getNextMember(cacheTopology);
        if (nextMember != null) {
            HashSet<Address> otherMembers = new HashSet<Address>(cacheTopology.getActualMembers());
            Address localAddress = this.rpcManager.getAddress();
            otherMembers.remove(localAddress);
            otherMembers.remove(nextMember);
            if (!cacheTopology.getCurrentCH().getMembers().contains(localAddress)) {
                log.trace("Local address is not a member of currentCH, returning");
                return CompletableFutures.completedNull();
            }
            IntSet oldSegments = IntSets.from(cacheTopology.getCurrentCH().getSegmentsForOwner(localAddress));
            oldSegments.retainAll(cacheTopology.getPendingCH().getSegmentsForOwner(localAddress));
            log.trace("Segments to replicate and invalidate: " + oldSegments);
            if (oldSegments.isEmpty()) {
                return CompletableFutures.completedNull();
            }
            AtomicInteger outboundInvalidations = new AtomicInteger(1);
            CompletableFuture<Void> invalidationFuture = new CompletableFuture<Void>();
            OutboundTransferTask outboundTransferTask = new OutboundTransferTask(nextMember, oldSegments, cacheTopology.getCurrentCH().getNumSegments(), this.chunkSize, cacheTopology.getTopologyId(), this.keyPartitioner, chunks -> this.invalidateChunks((Collection<StateChunk>)chunks, (Set<Address>)otherMembers, outboundInvalidations, invalidationFuture, cacheTopology), this.rpcManager, this.commandsFactory, this.timeout, this.cacheName, true, true);
            outboundTransferTask.execute((Flowable<InternalCacheEntry<Object, Object>>)Flowable.concat(this.publishDataContainerEntries(oldSegments), this.publishStoreEntries(oldSegments))).whenComplete((ignored, throwable) -> {
                if (throwable != null) {
                    this.logError(outboundTransferTask, (Throwable)throwable);
                }
                if (outboundInvalidations.decrementAndGet() == 0) {
                    invalidationFuture.complete(null);
                }
            });
            return invalidationFuture;
        }
        return CompletableFutures.completedNull();
    }

    private void invalidateChunks(Collection<StateChunk> stateChunks, Set<Address> otherMembers, AtomicInteger outboundInvalidations, CompletableFuture<Void> invalidationFuture, CacheTopology cacheTopology) {
        int numEntries = stateChunks.stream().mapToInt(chunk -> chunk.getCacheEntries().size()).sum();
        if (numEntries == 0) {
            log.tracef("Nothing to invalidate", new Object[0]);
            return;
        }
        Object[] keys = new Object[numEntries];
        int[] topologyIds = new int[numEntries];
        long[] versions = new long[numEntries];
        int i = 0;
        for (StateChunk chunk2 : stateChunks) {
            for (InternalCacheEntry<?, ?> entry : chunk2.getCacheEntries()) {
                if (entry.getMetadata() == null || entry.getMetadata().version() == null) continue;
                keys[i] = entry.getKey();
                SimpleClusteredVersion version = (SimpleClusteredVersion)entry.getMetadata().version();
                topologyIds[i] = version.getTopologyId();
                versions[i] = version.getVersion();
                ++i;
            }
        }
        if (trace) {
            log.tracef("Invalidating %d entries from segments %s", numEntries, stateChunks.stream().map(StateChunk::getSegmentId).collect(Collectors.toList()));
        }
        outboundInvalidations.incrementAndGet();
        this.rpcManager.invokeCommand(otherMembers, (ReplicableCommand)this.commandsFactory.buildInvalidateVersionsCommand(cacheTopology.getTopologyId(), keys, topologyIds, versions, true), MapResponseCollector.ignoreLeavers(otherMembers.size()), this.rpcManager.getSyncRpcOptions()).whenComplete((r, t) -> {
            try {
                if (t != null) {
                    log.failedInvalidatingRemoteCache((Throwable)t);
                }
            }
            finally {
                if (outboundInvalidations.decrementAndGet() == 0) {
                    invalidationFuture.complete(null);
                }
            }
        });
    }

    private Address getNextMember(CacheTopology cacheTopology) {
        Address myAddress = this.rpcManager.getAddress();
        List<Address> members = cacheTopology.getActualMembers();
        if (members.size() == 1) {
            return null;
        }
        Iterator<Address> it = members.iterator();
        while (it.hasNext()) {
            Address member = it.next();
            if (!member.equals(myAddress)) continue;
            if (it.hasNext()) {
                return it.next();
            }
            return members.get(0);
        }
        throw new IllegalStateException();
    }

    @Override
    public void startKeysTransfer(IntSet segments, Address origin) {
        LocalizedCacheTopology cacheTopology = this.distributionManager.getCacheTopology();
        OutboundTransferTask outboundTransferTask = new OutboundTransferTask(origin, segments, cacheTopology.getCurrentCH().getNumSegments(), this.chunkSize, cacheTopology.getTopologyId(), this.keyPartitioner, chunks -> {}, this.rpcManager, this.commandsFactory, this.timeout, this.cacheName, true, false);
        this.addTransfer(outboundTransferTask);
        outboundTransferTask.execute((Flowable<InternalCacheEntry<Object, Object>>)Flowable.concat(this.publishDataContainerKeys(segments), this.publishStoreKeys(segments))).whenComplete((ignored, throwable) -> {
            if (throwable != null) {
                this.logError(outboundTransferTask, (Throwable)throwable);
            }
            this.onTaskCompletion(outboundTransferTask);
        });
    }

    private Flowable<InternalCacheEntry<Object, Object>> publishDataContainerKeys(IntSet segments) {
        Address localAddress = this.rpcManager.getAddress();
        return Flowable.fromIterable(() -> this.dataContainer.iterator(segments)).filter(ice -> ice.getMetadata() != null && ice.getMetadata().version() != null).map(ice -> this.entryFactory.create(ice.getKey(), null, new RemoteMetadata(localAddress, ice.getMetadata().version())));
    }

    private Flowable<InternalCacheEntry<Object, Object>> publishStoreKeys(IntSet segments) {
        Address localAddress = this.rpcManager.getAddress();
        Publisher loaderPublisher = this.persistenceManager.publishEntries(segments, k -> !this.dataContainer.containsKey(k), true, true, Configurations::isStateTransferStore);
        return Flowable.fromPublisher(loaderPublisher).filter(me -> me.getMetadata() != null && me.getMetadata().version() != null).map(me -> this.entryFactory.create(me.getKey(), null, new RemoteMetadata(localAddress, me.getMetadata().version())));
    }

    @Override
    public CompletableFuture<Void> confirmRevokedSegments(int topologyId) {
        return this.stateTransferLock.topologyFuture(topologyId);
    }
}

