/*
 * Decompiled with CFR 0.152.
 */
package org.infinispan.statetransfer;

import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;
import org.infinispan.commands.TopologyAffectedCommand;
import org.infinispan.commons.CacheException;
import org.infinispan.configuration.cache.CacheMode;
import org.infinispan.configuration.cache.Configuration;
import org.infinispan.configuration.global.GlobalConfiguration;
import org.infinispan.container.versioning.irac.IracVersionGenerator;
import org.infinispan.distribution.DistributionManager;
import org.infinispan.distribution.LocalizedCacheTopology;
import org.infinispan.distribution.ch.ConsistentHashFactory;
import org.infinispan.distribution.ch.KeyPartitioner;
import org.infinispan.distribution.ch.impl.ReplicatedConsistentHash;
import org.infinispan.distribution.ch.impl.ScatteredConsistentHashFactory;
import org.infinispan.distribution.ch.impl.SyncConsistentHashFactory;
import org.infinispan.distribution.ch.impl.SyncReplicatedConsistentHashFactory;
import org.infinispan.distribution.ch.impl.TopologyAwareSyncConsistentHashFactory;
import org.infinispan.factories.annotations.ComponentName;
import org.infinispan.factories.annotations.Inject;
import org.infinispan.factories.annotations.Start;
import org.infinispan.factories.annotations.Stop;
import org.infinispan.factories.scopes.Scope;
import org.infinispan.factories.scopes.Scopes;
import org.infinispan.globalstate.GlobalStateManager;
import org.infinispan.globalstate.ScopedPersistentState;
import org.infinispan.jmx.annotations.DataType;
import org.infinispan.jmx.annotations.MBean;
import org.infinispan.jmx.annotations.ManagedAttribute;
import org.infinispan.notifications.cachelistener.CacheNotifier;
import org.infinispan.partitionhandling.AvailabilityMode;
import org.infinispan.partitionhandling.impl.PartitionHandlingManager;
import org.infinispan.persistence.manager.PreloadManager;
import org.infinispan.remoting.inboundhandler.DeliverOrder;
import org.infinispan.remoting.inboundhandler.PerCacheInboundInvocationHandler;
import org.infinispan.remoting.responses.Response;
import org.infinispan.remoting.rpc.RpcManager;
import org.infinispan.remoting.transport.Address;
import org.infinispan.statetransfer.StateConsumer;
import org.infinispan.statetransfer.StateProvider;
import org.infinispan.statetransfer.StateTransferManager;
import org.infinispan.topology.CacheJoinInfo;
import org.infinispan.topology.CacheTopology;
import org.infinispan.topology.CacheTopologyHandler;
import org.infinispan.topology.LocalTopologyManager;
import org.infinispan.util.concurrent.CompletionStages;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;
import org.infinispan.xsite.irac.IracManager;

@MBean(objectName="StateTransferManager", description="Component that handles state transfer")
@Scope(value=Scopes.NAMED_CACHE)
public class StateTransferManagerImpl
implements StateTransferManager {
    private static final Log log = LogFactory.getLog(StateTransferManagerImpl.class);
    @ComponentName(value="cacheName")
    @Inject
    protected String cacheName;
    @Inject
    StateConsumer stateConsumer;
    @Inject
    StateProvider stateProvider;
    @Inject
    PartitionHandlingManager partitionHandlingManager;
    @Inject
    DistributionManager distributionManager;
    @Inject
    CacheNotifier<?, ?> cacheNotifier;
    @Inject
    Configuration configuration;
    @Inject
    GlobalConfiguration globalConfiguration;
    @Inject
    RpcManager rpcManager;
    @Inject
    LocalTopologyManager localTopologyManager;
    @Inject
    KeyPartitioner keyPartitioner;
    @Inject
    GlobalStateManager globalStateManager;
    @Inject
    PreloadManager preloadManager;
    @Inject
    PerCacheInboundInvocationHandler inboundInvocationHandler;
    @Inject
    IracManager iracManager;
    @Inject
    IracVersionGenerator iracVersionGenerator;
    private final CompletableFuture<Void> initialStateTransferComplete = new CompletableFuture();

    @Override
    @Start(priority=60)
    public void start() throws Exception {
        if (log.isTraceEnabled()) {
            log.tracef("Starting StateTransferManager of cache %s on node %s", this.cacheName, this.rpcManager.getAddress());
        }
        Optional<Integer> persistentStateChecksum = this.globalStateManager != null ? this.globalStateManager.readScopedState(this.cacheName).map(ScopedPersistentState::getChecksum) : Optional.empty();
        float capacityFactor = this.globalConfiguration.isZeroCapacityNode() && !"org.infinispan.CONFIG".equals(this.cacheName) ? 0.0f : this.configuration.clustering().hash().capacityFactor();
        CacheJoinInfo joinInfo = new CacheJoinInfo(StateTransferManagerImpl.pickConsistentHashFactory(this.globalConfiguration, this.configuration), this.configuration.clustering().hash().numSegments(), this.configuration.clustering().hash().numOwners(), this.configuration.clustering().stateTransfer().timeout(), this.configuration.clustering().cacheMode(), capacityFactor, this.localTopologyManager.getPersistentUUID(), persistentStateChecksum);
        CompletionStage<CacheTopology> stage = this.localTopologyManager.join(this.cacheName, joinInfo, new CacheTopologyHandler(){

            @Override
            public CompletionStage<Void> updateConsistentHash(CacheTopology cacheTopology) {
                return StateTransferManagerImpl.this.doTopologyUpdate(cacheTopology, false);
            }

            @Override
            public CompletionStage<Void> rebalance(CacheTopology cacheTopology) {
                return StateTransferManagerImpl.this.doTopologyUpdate(cacheTopology, true);
            }
        }, this.partitionHandlingManager);
        CacheTopology initialTopology = CompletionStages.join(stage);
        if (log.isTraceEnabled()) {
            log.tracef("StateTransferManager of cache %s on node %s received initial topology %s", this.cacheName, this.rpcManager.getAddress(), initialTopology);
        }
    }

    public static ConsistentHashFactory pickConsistentHashFactory(GlobalConfiguration globalConfiguration, Configuration configuration) {
        CacheMode cacheMode;
        ConsistentHashFactory<ReplicatedConsistentHash> factory = configuration.clustering().hash().consistentHashFactory();
        if (factory == null && (cacheMode = configuration.clustering().cacheMode()).isClustered()) {
            if (cacheMode.isDistributed()) {
                factory = globalConfiguration.transport().hasTopologyInfo() ? new TopologyAwareSyncConsistentHashFactory() : new SyncConsistentHashFactory();
            } else if (cacheMode.isReplicated() || cacheMode.isInvalidation()) {
                factory = new SyncReplicatedConsistentHashFactory();
            } else if (cacheMode.isScattered()) {
                factory = new ScatteredConsistentHashFactory();
            } else {
                throw new CacheException("Unexpected cache mode: " + (Object)((Object)cacheMode));
            }
        }
        return factory;
    }

    private CompletionStage<Void> doTopologyUpdate(CacheTopology newCacheTopology, boolean isRebalance) {
        LocalizedCacheTopology oldCacheTopology = this.distributionManager.getCacheTopology();
        int newTopologyId = newCacheTopology.getTopologyId();
        if (oldCacheTopology != null && oldCacheTopology.getTopologyId() > newTopologyId) {
            throw new IllegalStateException("Old topology is higher: old=" + oldCacheTopology + ", new=" + newCacheTopology);
        }
        if (log.isTraceEnabled()) {
            log.tracef("Installing new cache topology %s on cache %s", newCacheTopology, this.cacheName);
        }
        if (!(!newCacheTopology.getMembers().contains(this.rpcManager.getAddress()) || this.distributionManager.getCacheTopology().isConnected() && this.distributionManager.getCacheTopology().getMembersSet().contains(this.rpcManager.getAddress()))) {
            if (log.isTraceEnabled()) {
                log.tracef("This is the first topology %d in which the local node is a member", newTopologyId);
            }
            this.inboundInvocationHandler.setFirstTopologyAsMember(newTopologyId);
        }
        int newRebalanceId = newCacheTopology.getRebalanceId();
        CacheTopology.Phase phase = newCacheTopology.getPhase();
        this.iracManager.onTopologyUpdate(oldCacheTopology, newCacheTopology);
        return this.cacheNotifier.notifyTopologyChanged(oldCacheTopology, newCacheTopology, newTopologyId, true).thenCompose(ignored -> this.updateProviderAndConsumer(isRebalance, newTopologyId, newCacheTopology, newRebalanceId, phase)).thenCompose(ignored -> this.cacheNotifier.notifyTopologyChanged(oldCacheTopology, newCacheTopology, newTopologyId, false)).thenRun(() -> {
            this.completeInitialTransferIfNeeded(newCacheTopology, phase);
            this.partitionHandlingManager.onTopologyUpdate(newCacheTopology);
            this.iracVersionGenerator.onTopologyChange(newCacheTopology);
        });
    }

    private CompletionStage<?> updateProviderAndConsumer(boolean isRebalance, int newTopologyId, CacheTopology newCacheTopology, int newRebalanceId, CacheTopology.Phase phase) {
        CompletionStage<CompletionStage<Void>> consumerUpdateFuture = this.stateConsumer.onTopologyUpdate(newCacheTopology, isRebalance);
        CompletionStage consumerTransferFuture = consumerUpdateFuture.thenCompose(Function.identity());
        CompletableFuture<Void> providerFuture = this.stateProvider.onTopologyUpdate(newCacheTopology, isRebalance);
        consumerTransferFuture.runAfterBoth(providerFuture, () -> {
            switch (phase) {
                case TRANSITORY: 
                case READ_OLD_WRITE_ALL: 
                case READ_ALL_WRITE_ALL: 
                case READ_NEW_WRITE_ALL: {
                    this.localTopologyManager.confirmRebalancePhase(this.cacheName, newTopologyId, newRebalanceId, null);
                }
            }
        });
        return consumerUpdateFuture;
    }

    private void completeInitialTransferIfNeeded(CacheTopology newCacheTopology, CacheTopology.Phase phase) {
        if (!this.initialStateTransferComplete.isDone()) {
            boolean isJoined;
            assert (this.distributionManager.getCacheTopology().getTopologyId() == newCacheTopology.getTopologyId());
            boolean bl = isJoined = phase == CacheTopology.Phase.NO_REBALANCE && newCacheTopology.getReadConsistentHash().getMembers().contains(this.rpcManager.getAddress());
            if (isJoined) {
                this.initialStateTransferComplete.complete(null);
                log.tracef("Initial state transfer complete for cache %s on node %s", this.cacheName, this.rpcManager.getAddress());
            }
        }
    }

    @Override
    public void waitForInitialStateTransferToComplete() {
        if (this.configuration.clustering().stateTransfer().awaitInitialTransfer()) {
            try {
                if (!this.localTopologyManager.isCacheRebalancingEnabled(this.cacheName) || this.partitionHandlingManager.getAvailabilityMode() == AvailabilityMode.DEGRADED_MODE) {
                    this.initialStateTransferComplete.complete(null);
                }
                if (log.isTraceEnabled()) {
                    log.tracef("Waiting for initial state transfer to finish for cache %s on %s", this.cacheName, this.rpcManager.getAddress());
                }
                this.initialStateTransferComplete.get(this.configuration.clustering().stateTransfer().timeout(), TimeUnit.MILLISECONDS);
            }
            catch (TimeoutException e) {
                throw log.initialStateTransferTimeout(this.cacheName, this.rpcManager.getAddress());
            }
            catch (CacheException e) {
                throw e;
            }
            catch (Exception e) {
                throw new CacheException((Throwable)e);
            }
        }
    }

    @Override
    @Stop(priority=0)
    public void stop() {
        if (log.isTraceEnabled()) {
            log.tracef("Shutting down StateTransferManager of cache %s on node %s", this.cacheName, this.rpcManager.getAddress());
        }
        this.initialStateTransferComplete.complete(null);
        this.localTopologyManager.leave(this.cacheName, this.configuration.clustering().remoteTimeout());
    }

    @Override
    @ManagedAttribute(description="If true, the node has successfully joined the grid and is considered to hold state.  If false, the join process is still in progress.", displayName="Is join completed?", dataType=DataType.TRAIT)
    public boolean isJoinComplete() {
        return this.initialStateTransferComplete.isDone();
    }

    @Override
    @ManagedAttribute(description="Retrieves the rebalancing status for this cache. Possible values are PENDING, SUSPENDED, IN_PROGRESS, COMPLETE", displayName="Rebalancing progress", dataType=DataType.TRAIT)
    public String getRebalancingStatus() throws Exception {
        return this.localTopologyManager.getRebalancingStatus(this.cacheName).toString();
    }

    @Override
    @ManagedAttribute(description="Checks whether the local node is receiving state from other nodes", displayName="Is state transfer in progress?", dataType=DataType.TRAIT)
    public boolean isStateTransferInProgress() {
        return this.stateConsumer.isStateTransferInProgress();
    }

    @Override
    public Map<Address, Response> forwardCommandIfNeeded(TopologyAffectedCommand command, Set<Object> affectedKeys, Address origin) {
        LocalizedCacheTopology cacheTopology = this.distributionManager.getCacheTopology();
        if (cacheTopology == null) {
            if (log.isTraceEnabled()) {
                log.tracef("Not fowarding command %s because topology is null.", command);
            }
            return Collections.emptyMap();
        }
        int cmdTopologyId = command.getTopologyId();
        int localTopologyId = cacheTopology.getTopologyId();
        if (log.isTraceEnabled()) {
            log.tracef("CommandTopologyId=%s, localTopologyId=%s", cmdTopologyId, localTopologyId);
        }
        if (cmdTopologyId < localTopologyId) {
            HashSet<Address> newTargets = new HashSet<Address>(cacheTopology.getWriteOwners(affectedKeys));
            newTargets.remove(this.rpcManager.getAddress());
            newTargets.remove(origin);
            if (!newTargets.isEmpty()) {
                command.setTopologyId(localTopologyId);
                if (log.isTraceEnabled()) {
                    log.tracef("Forwarding command %s to new targets %s", command, newTargets);
                }
                this.rpcManager.sendToMany(newTargets, command, DeliverOrder.NONE);
            }
        }
        return Collections.emptyMap();
    }

    @Override
    public StateConsumer getStateConsumer() {
        return this.stateConsumer;
    }

    @Override
    public StateProvider getStateProvider() {
        return this.stateProvider;
    }

    public String toString() {
        return "StateTransferManagerImpl [" + this.cacheName + "@" + this.rpcManager.getAddress() + "]";
    }
}

