/*
 * Decompiled with CFR 0.152.
 */
package org.infinispan.reactive.publisher.impl;

import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Stream;
import org.infinispan.Cache;
import org.infinispan.commands.ReplicableCommand;
import org.infinispan.commons.test.ExceptionRunnable;
import org.infinispan.commons.util.EnumUtil;
import org.infinispan.commons.util.IntSet;
import org.infinispan.commons.util.IntSets;
import org.infinispan.configuration.cache.CacheMode;
import org.infinispan.configuration.cache.ConfigurationBuilder;
import org.infinispan.container.impl.InternalDataContainer;
import org.infinispan.context.Flag;
import org.infinispan.distribution.MagicKey;
import org.infinispan.distribution.ch.ConsistentHashFactory;
import org.infinispan.reactive.publisher.PublisherReducers;
import org.infinispan.reactive.publisher.impl.ClusterPublisherManager;
import org.infinispan.reactive.publisher.impl.DeliveryGuarantee;
import org.infinispan.reactive.publisher.impl.LocalPublisherManager;
import org.infinispan.reactive.publisher.impl.SegmentAwarePublisherSupplier;
import org.infinispan.reactive.publisher.impl.commands.reduction.ReductionPublisherRequestCommand;
import org.infinispan.remoting.rpc.RpcManager;
import org.infinispan.remoting.rpc.RpcOptions;
import org.infinispan.remoting.transport.Address;
import org.infinispan.remoting.transport.ResponseCollector;
import org.infinispan.test.Mocks;
import org.infinispan.test.MultipleCacheManagersTest;
import org.infinispan.test.TestDataSCI;
import org.infinispan.test.TestingUtil;
import org.infinispan.test.fwk.CheckPoint;
import org.infinispan.util.ControlledConsistentHashFactory;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.mockito.stubbing.Answer;
import org.testng.AssertJUnit;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

@Test(groups={"functional"}, testName="reactive.publisher.impl.RehashClusterPublisherManagerTest")
public class RehashClusterPublisherManagerTest
extends MultipleCacheManagersTest {
    private static final int[][] START_SEGMENT_OWNERS = new int[][]{{0, 1}, {1, 2}, {2, 3}, {3, 0}};
    protected ControlledConsistentHashFactory factory;

    @Override
    protected void createCacheManagers() throws Throwable {
        ConfigurationBuilder builderUsed = new ConfigurationBuilder();
        this.factory = new ControlledConsistentHashFactory.Default(START_SEGMENT_OWNERS);
        builderUsed.clustering().cacheMode(CacheMode.DIST_SYNC).hash().consistentHashFactory((ConsistentHashFactory)this.factory).numSegments(4);
        this.createClusteredCaches(4, TestDataSCI.INSTANCE, builderUsed);
    }

    @BeforeMethod
    protected void beforeMethod() throws Exception {
        this.factory.setOwnerIndexes(START_SEGMENT_OWNERS);
        this.factory.triggerRebalance(this.cache(0));
        TestingUtil.waitForNoRebalance(this.caches());
        Cache cache2 = this.cache(2);
        LocalPublisherManager lpm = TestingUtil.extractComponent(cache2, LocalPublisherManager.class);
    }

    @DataProvider(name="GuaranteeParallelEntry")
    public Object[][] collectionAndVersionsProvider() {
        return (Object[][])Arrays.stream(DeliveryGuarantee.values()).flatMap(dg -> Stream.of(Boolean.TRUE, Boolean.FALSE).flatMap(parallel -> Stream.of(Boolean.TRUE, Boolean.FALSE).map(entry -> new Object[]{dg, parallel, entry}))).toArray(x$0 -> new Object[x$0][]);
    }

    private void triggerRebalanceSegment2MovesToNode0() throws Exception {
        this.factory.setOwnerIndexes(new int[][]{{0, 1}, {1, 2}, {0, 3}, {3, 0}});
        this.factory.triggerRebalance(this.cache(0));
        TestingUtil.waitForNoRebalance(this.caches());
    }

    Function<Map<MagicKey, Object>, Set<MagicKey>> toKeys(boolean useKeys) {
        if (useKeys) {
            return map -> {
                HashSet<MagicKey> set = new HashSet<MagicKey>();
                map.keySet().stream().filter(key -> key.getSegment() != 1).forEach(set::add);
                set.add(new MagicKey(this.cache(3)));
                return set;
            };
        }
        return map -> null;
    }

    @Test(dataProvider="GuaranteeParallelEntry")
    public void testSegmentMovesToOriginatorWhileRetrievingPublisher(DeliveryGuarantee deliveryGuarantee, boolean parallel, boolean isEntry) throws Exception {
        Cache cache2 = this.cache(2);
        CheckPoint checkPoint = new CheckPoint();
        Mocks.blockingMock(checkPoint, InternalDataContainer.class, cache2, (stub, m) -> ((InternalDataContainer)stub.when(m)).publisher(Mockito.eq((int)2)), new Class[0]);
        Mocks.blockingMock(checkPoint, InternalDataContainer.class, cache2, (stub, m) -> ((InternalDataContainer)stub.when(m)).publisher((IntSet)Mockito.eq((Object)IntSets.immutableSet((int)2))), new Class[0]);
        int expectedAmount = this.caches().size();
        this.runCommand(deliveryGuarantee, parallel, isEntry, expectedAmount, () -> {
            checkPoint.triggerForever("before_release");
            Future<Void> rebalanceFuture = this.fork(this::triggerRebalanceSegment2MovesToNode0);
            checkPoint.awaitStrict("after_invocation", 10L, TimeUnit.SECONDS);
            checkPoint.triggerForever("after_release");
            rebalanceFuture.get(10L, TimeUnit.SECONDS);
        });
    }

    @Test(dataProvider="GuaranteeParallelEntry")
    public void testSegmentMovesToOriginatorJustBeforeSendingRemoteKey(DeliveryGuarantee deliveryGuarantee, boolean parallel, boolean isEntry) throws Exception {
        this.testSegmentMovesToOriginatorJustBeforeSendingRemote(deliveryGuarantee, parallel, isEntry, true);
    }

    @Test(dataProvider="GuaranteeParallelEntry")
    public void testSegmentMovesToOriginatorJustBeforeSendingRemoteNoKey(DeliveryGuarantee deliveryGuarantee, boolean parallel, boolean isEntry) throws Exception {
        this.testSegmentMovesToOriginatorJustBeforeSendingRemote(deliveryGuarantee, parallel, isEntry, false);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void testSegmentMovesToOriginatorJustBeforeSendingRemote(DeliveryGuarantee deliveryGuarantee, boolean parallel, boolean isEntry, boolean useKeys) throws Exception {
        Cache cache0 = this.cache(0);
        Address cache2Address = this.address(2);
        CheckPoint checkPoint = new CheckPoint();
        checkPoint.triggerForever("after_release");
        RpcManager original = Mocks.blockingMock(checkPoint, RpcManager.class, cache0, (stub, m) -> ((RpcManager)stub.when(m)).invokeCommand((Address)ArgumentMatchers.eq((Object)cache2Address), (ReplicableCommand)ArgumentMatchers.isA(ReductionPublisherRequestCommand.class), (ResponseCollector)Mockito.any(), (RpcOptions)Mockito.any()), new Class[0]);
        int expectedAmount = this.caches().size();
        if (deliveryGuarantee == DeliveryGuarantee.AT_MOST_ONCE) {
            --expectedAmount;
        }
        if (useKeys) {
            --expectedAmount;
        }
        try {
            this.runCommand(deliveryGuarantee, parallel, isEntry, expectedAmount, () -> {
                checkPoint.awaitStrict("before_invocation", 10L, TimeUnit.SECONDS);
                this.triggerRebalanceSegment2MovesToNode0();
                checkPoint.triggerForever("before_release");
            }, this.toKeys(useKeys));
        }
        finally {
            if (original != null) {
                TestingUtil.replaceComponent(cache0, RpcManager.class, original, true);
            }
        }
    }

    @Test(dataProvider="GuaranteeParallelEntry")
    public void testSegmentMovesToOriginatorJustBeforePublisherCompletes(DeliveryGuarantee deliveryGuarantee, boolean parallel, boolean isEntry) throws Exception {
        Cache cache2 = this.cache(2);
        CheckPoint checkPoint = new CheckPoint();
        checkPoint.triggerForever("before_release");
        LocalPublisherManager spy = Mocks.replaceComponentWithSpy(cache2, LocalPublisherManager.class);
        Answer blockingLpmAnswer = invocation -> {
            SegmentAwarePublisherSupplier result = (SegmentAwarePublisherSupplier)invocation.callRealMethod();
            return Mocks.blockingPublisherAware(result, checkPoint);
        };
        ((LocalPublisherManager)Mockito.doAnswer((Answer)blockingLpmAnswer).when((Object)spy)).entryPublisher((IntSet)ArgumentMatchers.eq((Object)IntSets.immutableSet((int)2)), (Set)Mockito.any(), (Set)Mockito.any(), ArgumentMatchers.eq((long)EnumUtil.bitSetOf((Enum)Flag.STATE_TRANSFER_PROGRESS)), (DeliveryGuarantee)Mockito.any(), (Function)Mockito.any());
        TestingUtil.replaceComponent(cache2, LocalPublisherManager.class, spy, true);
        int expectedAmount = this.caches().size();
        this.runCommand(deliveryGuarantee, parallel, isEntry, expectedAmount, () -> {
            Future<Void> rebalanceFuture = this.fork(this::triggerRebalanceSegment2MovesToNode0);
            checkPoint.awaitStrict("after_invocation", 10L, TimeUnit.SECONDS);
            checkPoint.triggerForever("after_release");
            rebalanceFuture.get(10L, TimeUnit.SECONDS);
        });
    }

    private void runCommand(DeliveryGuarantee deliveryGuarantee, boolean parallel, boolean isEntry, int expectedAmount, ExceptionRunnable performOperation) throws Exception {
        this.runCommand(deliveryGuarantee, parallel, isEntry, expectedAmount, performOperation, map -> null);
    }

    private void runCommand(DeliveryGuarantee deliveryGuarantee, boolean parallel, boolean isEntry, int expectedAmount, ExceptionRunnable performOperation, Function<Map<MagicKey, Object>, Set<MagicKey>> keysHandler) throws Exception {
        HashMap<MagicKey, String> entries = new HashMap<MagicKey, String>();
        for (Cache cache : this.caches()) {
            MagicKey key = new MagicKey(cache);
            String value = key.toString();
            cache.put((Object)key, (Object)value);
            entries.put(key, value);
        }
        Set<MagicKey> keys = keysHandler.apply(entries);
        Future<CompletionStage> future = this.fork(() -> {
            ClusterPublisherManager cpm = TestingUtil.extractComponent(this.cache(0), ClusterPublisherManager.class);
            CompletionStage stageCount = isEntry ? cpm.entryReduction(parallel, null, keys, null, 0L, deliveryGuarantee, PublisherReducers.count(), PublisherReducers.add()) : cpm.keyReduction(parallel, null, keys, null, 0L, deliveryGuarantee, PublisherReducers.count(), PublisherReducers.add());
            return stageCount;
        });
        performOperation.run();
        Long actualCount = (Long)future.get(10L, TimeUnit.SECONDS).toCompletableFuture().get(10L, TimeUnit.SECONDS);
        AssertJUnit.assertEquals((int)expectedAmount, (int)actualCount.intValue());
    }
}

