/*
 * Decompiled with CFR 0.152.
 */
package org.infinispan.distribution.rehash;

import jakarta.transaction.TransactionManager;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.infinispan.AdvancedCache;
import org.infinispan.Cache;
import org.infinispan.commands.VisitableCommand;
import org.infinispan.commands.remote.recovery.TxCompletionNotificationCommand;
import org.infinispan.commands.statetransfer.StateResponseCommand;
import org.infinispan.commands.statetransfer.StateTransferGetTransactionsCommand;
import org.infinispan.commands.statetransfer.StateTransferStartCommand;
import org.infinispan.commands.triangle.BackupWriteCommand;
import org.infinispan.commands.tx.CommitCommand;
import org.infinispan.commands.tx.PrepareCommand;
import org.infinispan.commands.write.BackupAckCommand;
import org.infinispan.distribution.BaseDistFunctionalTest;
import org.infinispan.distribution.BlockingInterceptor;
import org.infinispan.distribution.MagicKey;
import org.infinispan.globalstate.NoOpGlobalConfigurationManager;
import org.infinispan.interceptors.impl.EntryWrappingInterceptor;
import org.infinispan.manager.CacheContainer;
import org.infinispan.manager.EmbeddedCacheManager;
import org.infinispan.remoting.transport.Address;
import org.infinispan.statetransfer.StateConsumer;
import org.infinispan.statetransfer.StateTransferInterceptor;
import org.infinispan.test.AbstractCacheTest;
import org.infinispan.test.Mocks;
import org.infinispan.test.TestingUtil;
import org.infinispan.test.fwk.CheckPoint;
import org.infinispan.test.op.TestWriteOperation;
import org.infinispan.topology.ClusterTopologyManager;
import org.infinispan.util.ControlledRpcManager;
import org.mockito.AdditionalAnswers;
import org.mockito.ArgumentMatchers;
import org.mockito.MockSettings;
import org.mockito.Mockito;
import org.mockito.stubbing.Answer;
import org.testng.AssertJUnit;
import org.testng.annotations.Test;

@Test(groups={"functional"})
public abstract class BaseTxStateTransferOverwriteTest
extends BaseDistFunctionalTest<Object, Object> {
    public BaseTxStateTransferOverwriteTest() {
        this.INIT_CLUSTER_SIZE = 3;
        this.numOwners = 2;
        this.transactional = true;
        this.performRehashing = true;
        this.cleanup = AbstractCacheTest.CleanupPhase.AFTER_METHOD;
    }

    protected boolean l1Enabled() {
        return this.cache(0, this.cacheName).getCacheConfiguration().clustering().l1().enabled();
    }

    @Override
    protected void amendCacheManagerBeforeStart(EmbeddedCacheManager cm) {
        NoOpGlobalConfigurationManager.amendCacheManager(cm);
    }

    protected Class<? extends VisitableCommand> getVisitableCommand(TestWriteOperation op) {
        return PrepareCommand.class;
    }

    protected Callable<?> runWithTx(TransactionManager tm, Callable<?> callable) {
        return () -> TestingUtil.withTx(tm, callable);
    }

    @Test
    public void testStateTransferInBetweenPrepareCommitWithPut() throws Exception {
        this.doStateTransferInBetweenPrepareCommit(TestWriteOperation.PUT_OVERWRITE, true);
    }

    @Test
    public void testStateTransferInBetweenPrepareCommitMultipleEntryWithPut() throws Exception {
        this.doStateTransferInBetweenPrepareCommit(TestWriteOperation.PUT_OVERWRITE, false);
    }

    @Test
    public void testStateTransferInBetweenPrepareCommitWithPutCreate() throws Exception {
        this.doStateTransferInBetweenPrepareCommit(TestWriteOperation.PUT_CREATE, true);
    }

    @Test
    public void testStateTransferInBetweenPrepareCommitMultipleEntryWithPutCreate() throws Exception {
        this.doStateTransferInBetweenPrepareCommit(TestWriteOperation.PUT_CREATE, false);
    }

    @Test
    public void testStateTransferInBetweenPrepareCommitWithPutIfAbsent() throws Exception {
        this.doStateTransferInBetweenPrepareCommit(TestWriteOperation.PUT_IF_ABSENT, true);
    }

    @Test
    public void testStateTransferInBetweenPrepareCommitMultipleEntryWithPutIfAbsent() throws Exception {
        this.doStateTransferInBetweenPrepareCommit(TestWriteOperation.PUT_IF_ABSENT, false);
    }

    @Test
    public void testStateTransferInBetweenPrepareCommitWithRemoveExact() throws Exception {
        this.doStateTransferInBetweenPrepareCommit(TestWriteOperation.REMOVE_EXACT, true);
    }

    @Test
    public void testStateTransferInBetweenPrepareCommitMultipleEntryWithRemoveExact() throws Exception {
        this.doStateTransferInBetweenPrepareCommit(TestWriteOperation.REMOVE_EXACT, false);
    }

    @Test
    public void testStateTransferInBetweenPrepareCommitWithRemove() throws Exception {
        this.doStateTransferInBetweenPrepareCommit(TestWriteOperation.REMOVE, true);
    }

    @Test
    public void testStateTransferInBetweenPrepareCommitMultipleEntryWithRemove() throws Exception {
        this.doStateTransferInBetweenPrepareCommit(TestWriteOperation.REMOVE, false);
    }

    @Test
    public void testStateTransferInBetweenPrepareCommitWithReplace() throws Exception {
        this.doStateTransferInBetweenPrepareCommit(TestWriteOperation.REPLACE, true);
    }

    @Test
    public void testStateTransferInBetweenPrepareCommitMultipleEntryWithReplace() throws Exception {
        this.doStateTransferInBetweenPrepareCommit(TestWriteOperation.REPLACE, false);
    }

    @Test
    public void testStateTransferInBetweenPrepareCommitWithReplaceExact() throws Exception {
        this.doStateTransferInBetweenPrepareCommit(TestWriteOperation.REPLACE_EXACT, true);
    }

    @Test
    public void testStateTransferInBetweenPrepareCommitMultipleEntryWithReplaceExact() throws Exception {
        this.doStateTransferInBetweenPrepareCommit(TestWriteOperation.REPLACE_EXACT, false);
    }

    @Test
    public void testNonCoordinatorOwnerLeavingDuringPut() throws Exception {
        this.doTestWhereCommitOccursAfterStateTransferBeginsBeforeCompletion(TestWriteOperation.PUT_CREATE);
    }

    @Test
    public void testNonCoordinatorOwnerLeavingDuringPutIfAbsent() throws Exception {
        this.doTestWhereCommitOccursAfterStateTransferBeginsBeforeCompletion(TestWriteOperation.PUT_IF_ABSENT);
    }

    @Test
    public void testNonCoordinatorOwnerLeavingDuringPut2() throws Exception {
        this.doL1InvalidationOldTopologyComesAfterRebalance(TestWriteOperation.PUT_CREATE);
    }

    @Test
    public void testNonCoordinatorOwnerLeavingDuringPutOverwrite2() throws Exception {
        this.doL1InvalidationOldTopologyComesAfterRebalance(TestWriteOperation.PUT_OVERWRITE);
    }

    @Test
    public void testNonCoordinatorOwnerLeavingDuringPutIfAbsent2() throws Exception {
        this.doL1InvalidationOldTopologyComesAfterRebalance(TestWriteOperation.PUT_IF_ABSENT);
    }

    @Test
    public void testNonCoordinatorOwnerLeavingDuringReplace2() throws Exception {
        this.doL1InvalidationOldTopologyComesAfterRebalance(TestWriteOperation.REPLACE);
    }

    @Test
    public void testNonCoordinatorOwnerLeavingDuringReplaceWithPreviousValue2() throws Exception {
        this.doL1InvalidationOldTopologyComesAfterRebalance(TestWriteOperation.REPLACE_EXACT);
    }

    @Test
    public void testNonCoordinatorOwnerLeavingDuringRemove2() throws Exception {
        this.doL1InvalidationOldTopologyComesAfterRebalance(TestWriteOperation.REMOVE);
    }

    @Test
    public void testNonCoordinatorOwnerLeavingDuringRemoveWithPreviousValue2() throws Exception {
        this.doL1InvalidationOldTopologyComesAfterRebalance(TestWriteOperation.REMOVE_EXACT);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void doStateTransferInBetweenPrepareCommit(TestWriteOperation op, boolean additionalValueOnNonOwner) throws Exception {
        AdvancedCache primaryOwnerCache = this.advancedCache(0, this.cacheName);
        AdvancedCache backupOwnerCache = this.advancedCache(1, this.cacheName);
        AdvancedCache nonOwnerCache = this.advancedCache(2, this.cacheName);
        MagicKey key = new MagicKey(String.valueOf(op) + "-key", this.cache(0, this.cacheName), this.cache(1, this.cacheName));
        Object previousValue = op.getPreviousValue();
        if (previousValue != null) {
            primaryOwnerCache.put((Object)key, previousValue);
            AssertJUnit.assertEquals((Object)previousValue, (Object)primaryOwnerCache.get((Object)key));
            log.tracef("Previous value inserted: %s = %s", (Object)key, previousValue);
            AssertJUnit.assertEquals((Object)previousValue, (Object)nonOwnerCache.get((Object)key));
            if (this.l1Enabled()) {
                this.assertIsInL1((Cache<?, ?>)nonOwnerCache, key);
            }
        }
        CyclicBarrier cyclicBarrier = new CyclicBarrier(2);
        try {
            TransactionManager tm = primaryOwnerCache.getTransactionManager();
            Future<?> future = this.fork(this.runWithTx(tm, () -> {
                if (additionalValueOnNonOwner) {
                    MagicKey mk = new MagicKey("placeholder", (Cache<?, ?>)nonOwnerCache);
                    String value = "somevalue";
                    primaryOwnerCache.put((Object)mk, (Object)value);
                    log.tracef("Adding additional value on nonOwner value inserted: %s = %s", (Object)mk, (Object)value);
                }
                TestingUtil.extractInterceptorChain(primaryOwnerCache).addInterceptorBefore(new BlockingInterceptor<VisitableCommand>(cyclicBarrier, this.getVisitableCommand(op), true, false), StateTransferInterceptor.class);
                return op.perform((AdvancedCache<Object, Object>)primaryOwnerCache, key);
            }));
            cyclicBarrier.await(10L, TimeUnit.SECONDS);
            BaseTxStateTransferOverwriteTest.removeAllBlockingInterceptorsFromCache(primaryOwnerCache);
            CheckPoint checkPoint = new CheckPoint();
            log.trace((Object)"Adding proxy to state transfer");
            this.waitUntilStateBeingTransferred((Cache<?, ?>)nonOwnerCache, checkPoint);
            backupOwnerCache.getCacheManager().stop();
            checkPoint.awaitStrict("pre_state_apply_invoked_for_" + String.valueOf(nonOwnerCache), 10L, TimeUnit.SECONDS);
            cyclicBarrier.await(10L, TimeUnit.SECONDS);
            AssertJUnit.assertEquals((Object)op.getReturnValue(), future.get(10L, TimeUnit.SECONDS));
            checkPoint.trigger("pre_state_apply_release_for_" + String.valueOf(nonOwnerCache));
            TestingUtil.waitForNoRebalance(new Cache[]{primaryOwnerCache, nonOwnerCache});
            switch (op) {
                case REMOVE: 
                case REMOVE_EXACT: {
                    break;
                }
                default: {
                    this.assertIsInContainerImmortal((Cache<?, ?>)primaryOwnerCache, key);
                    this.assertIsInContainerImmortal((Cache<?, ?>)nonOwnerCache, key);
                }
            }
            AssertJUnit.assertEquals((Object)op.getValue(), (Object)primaryOwnerCache.get((Object)key));
            AssertJUnit.assertEquals((Object)op.getValue(), (Object)nonOwnerCache.get((Object)key));
        }
        finally {
            BaseTxStateTransferOverwriteTest.removeAllBlockingInterceptorsFromCache(primaryOwnerCache);
        }
    }

    protected void doTestWhereCommitOccursAfterStateTransferBeginsBeforeCompletion(TestWriteOperation op) throws Exception {
        if (this.l1Enabled() && op.getPreviousValue() != null) {
            AssertJUnit.fail((String)"This test cannot be ran with L1 when a previous value is set");
        }
        AdvancedCache primaryOwnerCache = this.cache(0, this.cacheName).getAdvancedCache();
        AdvancedCache backupOwnerCache = this.cache(1, this.cacheName).getAdvancedCache();
        AdvancedCache nonOwnerCache = this.cache(2, this.cacheName).getAdvancedCache();
        MagicKey key = new MagicKey((Cache<?, ?>)primaryOwnerCache, (Cache<?, ?>[])new Cache[]{backupOwnerCache});
        Object previousValue = op.getPreviousValue();
        if (previousValue != null) {
            primaryOwnerCache.put((Object)key, previousValue);
            AssertJUnit.assertEquals((Object)previousValue, (Object)primaryOwnerCache.get((Object)key));
            log.tracef("Previous value inserted: %s = %s", (Object)key, previousValue);
            AssertJUnit.assertEquals((Object)previousValue, (Object)nonOwnerCache.get((Object)key));
            if (this.l1Enabled()) {
                this.assertIsInL1((Cache<?, ?>)nonOwnerCache, key);
            }
        }
        int preJoinTopologyId = primaryOwnerCache.getDistributionManager().getCacheTopology().getTopologyId();
        CheckPoint checkPoint = new CheckPoint();
        ControlledRpcManager blockingRpcManager0 = ControlledRpcManager.replaceRpcManager(primaryOwnerCache, new Class[0]);
        ControlledRpcManager blockingRpcManager2 = ControlledRpcManager.replaceRpcManager(nonOwnerCache, new Class[0]);
        blockingRpcManager0.excludeCommands(BackupWriteCommand.class, PrepareCommand.class, CommitCommand.class, TxCompletionNotificationCommand.class);
        blockingRpcManager2.excludeCommands(BackupAckCommand.class);
        int rebalanceTopologyId = preJoinTopologyId + 2;
        this.blockRebalanceConfirmation(primaryOwnerCache.getCacheManager(), checkPoint, rebalanceTopologyId);
        AssertJUnit.assertEquals((Object)primaryOwnerCache.getCacheManager().getCoordinator(), (Object)primaryOwnerCache.getCacheManager().getAddress());
        log.trace((Object)"Stopping the cache");
        backupOwnerCache.getCacheManager().stop();
        this.eventuallyEquals(2, () -> primaryOwnerCache.getRpcManager().getMembers().size());
        this.eventuallyEquals(2, () -> nonOwnerCache.getRpcManager().getMembers().size());
        AssertJUnit.assertEquals((Object)primaryOwnerCache.getCacheManager().getCoordinator(), (Object)primaryOwnerCache.getCacheManager().getAddress());
        if (this.transactional.booleanValue()) {
            blockingRpcManager0.expectCommand(StateTransferGetTransactionsCommand.class).send().receiveAll();
            blockingRpcManager2.expectCommand(StateTransferGetTransactionsCommand.class).send().receiveAll();
        }
        ControlledRpcManager.BlockedRequest<StateTransferStartCommand> blockedStateRequest0 = blockingRpcManager0.expectCommand(StateTransferStartCommand.class);
        ControlledRpcManager.BlockedRequest<StateTransferStartCommand> blockedStateRequest2 = blockingRpcManager2.expectCommand(StateTransferStartCommand.class);
        blockedStateRequest2.send().receiveAllAsync();
        ControlledRpcManager.BlockedRequest<StateResponseCommand> blockedStateResponse0 = blockingRpcManager0.expectCommand(StateResponseCommand.class);
        CyclicBarrier beforeCommitCache1Barrier = new CyclicBarrier(2);
        BlockingInterceptor<? extends VisitableCommand> blockingInterceptor1 = new BlockingInterceptor<VisitableCommand>(beforeCommitCache1Barrier, op.getCommandClass(), true, false);
        TestingUtil.extractInterceptorChain(nonOwnerCache).addInterceptorAfter(blockingInterceptor1, EntryWrappingInterceptor.class);
        Future<Object> future = this.fork(() -> op.perform((AdvancedCache<Object, Object>)primaryOwnerCache, key));
        beforeCommitCache1Barrier.await(10L, TimeUnit.SECONDS);
        BaseTxStateTransferOverwriteTest.removeAllBlockingInterceptorsFromCache(nonOwnerCache);
        blockedStateResponse0.send().receiveAll();
        blockedStateRequest0.send().receiveAllAsync();
        blockingRpcManager2.expectCommand(StateResponseCommand.class).send().receiveAll();
        checkPoint.awaitStrict("pre_rebalance_confirmation_" + rebalanceTopologyId + "_from_" + String.valueOf(primaryOwnerCache.getCacheManager().getAddress()), 10L, TimeUnit.SECONDS);
        beforeCommitCache1Barrier.await(10L, TimeUnit.SECONDS);
        Object result = future.get(10L, TimeUnit.SECONDS);
        AssertJUnit.assertEquals((Object)op.getReturnValue(), (Object)result);
        log.tracef("%s operation is done", (Object)op);
        checkPoint.trigger("resume_rebalance_confirmation_" + rebalanceTopologyId + "_from_" + String.valueOf(primaryOwnerCache.getCacheManager().getAddress()));
        checkPoint.trigger("resume_rebalance_confirmation_" + rebalanceTopologyId + "_from_" + String.valueOf(nonOwnerCache.getCacheManager().getAddress()));
        TestingUtil.waitForNoRebalance(new Cache[]{primaryOwnerCache, nonOwnerCache});
        switch (op) {
            case REMOVE: 
            case REMOVE_EXACT: {
                break;
            }
            default: {
                this.assertIsInContainerImmortal((Cache<?, ?>)primaryOwnerCache, key);
                this.assertIsInContainerImmortal((Cache<?, ?>)nonOwnerCache, key);
            }
        }
        AssertJUnit.assertEquals((Object)op.getValue(), (Object)primaryOwnerCache.get((Object)key));
        AssertJUnit.assertEquals((Object)op.getValue(), (Object)nonOwnerCache.get((Object)key));
    }

    private void doL1InvalidationOldTopologyComesAfterRebalance(TestWriteOperation op) throws Exception {
        AdvancedCache primaryOwnerCache = this.advancedCache(0, this.cacheName);
        AdvancedCache backupOwnerCache = this.advancedCache(1, this.cacheName);
        AdvancedCache nonOwnerCache = this.advancedCache(2, this.cacheName);
        MagicKey key = new MagicKey(String.valueOf(op) + "-key", this.cache(0, this.cacheName), this.cache(1, this.cacheName));
        Object previousValue = op.getPreviousValue();
        if (previousValue != null) {
            primaryOwnerCache.put((Object)key, previousValue);
            AssertJUnit.assertEquals((Object)previousValue, (Object)primaryOwnerCache.get((Object)key));
            log.tracef("Previous value inserted: %s = %s", (Object)key, previousValue);
            AssertJUnit.assertEquals((Object)previousValue, (Object)nonOwnerCache.get((Object)key));
            if (this.l1Enabled()) {
                this.assertIsInL1((Cache<?, ?>)nonOwnerCache, key);
            }
        }
        CyclicBarrier beforeCommitCache1Barrier = new CyclicBarrier(2);
        BlockingInterceptor<? extends VisitableCommand> blockingInterceptor1 = new BlockingInterceptor<VisitableCommand>(beforeCommitCache1Barrier, this.getVisitableCommand(op), false, false);
        TestingUtil.extractInterceptorChain(primaryOwnerCache).addInterceptorAfter(blockingInterceptor1, StateTransferInterceptor.class);
        Future<Object> future = this.fork(() -> {
            try {
                Object object = op.perform((AdvancedCache<Object, Object>)primaryOwnerCache, key);
                return object;
            }
            finally {
                log.tracef("%s operation is done", (Object)op);
            }
        });
        beforeCommitCache1Barrier.await(10L, TimeUnit.SECONDS);
        BaseTxStateTransferOverwriteTest.removeAllBlockingInterceptorsFromCache(primaryOwnerCache);
        log.tracef("Stopping the cache", new Object[0]);
        backupOwnerCache.getCacheManager().stop();
        BaseTxStateTransferOverwriteTest.eventually(() -> primaryOwnerCache.getRpcManager().getMembers().size() == 2 && nonOwnerCache.getRpcManager().getMembers().size() == 2);
        TestingUtil.waitForNoRebalance(new Cache[]{primaryOwnerCache, nonOwnerCache});
        beforeCommitCache1Barrier.await(10L, TimeUnit.SECONDS);
        AssertJUnit.assertEquals((Object)op.getReturnValue(), (Object)future.get(10L, TimeUnit.SECONDS));
        log.tracef("%s operation is done", (Object)op);
        switch (op) {
            case REMOVE: 
            case REMOVE_EXACT: {
                break;
            }
            default: {
                this.assertIsInContainerImmortal((Cache<?, ?>)primaryOwnerCache, key);
                this.assertIsInContainerImmortal((Cache<?, ?>)nonOwnerCache, key);
            }
        }
        AssertJUnit.assertEquals((Object)op.getValue(), (Object)primaryOwnerCache.get((Object)key));
        AssertJUnit.assertEquals((Object)op.getValue(), (Object)nonOwnerCache.get((Object)key));
    }

    private void blockRebalanceConfirmation(EmbeddedCacheManager manager, CheckPoint checkPoint, int rebalanceTopologyId) throws Exception {
        ClusterTopologyManager ctm = TestingUtil.extractGlobalComponent((CacheContainer)manager, ClusterTopologyManager.class);
        Answer forwardedAnswer = AdditionalAnswers.delegatesTo((Object)ctm);
        ClusterTopologyManager mockManager = (ClusterTopologyManager)Mockito.mock(ClusterTopologyManager.class, (MockSettings)Mockito.withSettings().defaultAnswer(forwardedAnswer));
        ((ClusterTopologyManager)Mockito.doAnswer(invocation -> {
            Object[] arguments = invocation.getArguments();
            Address source = (Address)arguments[1];
            int topologyId = (Integer)arguments[2];
            if (topologyId == rebalanceTopologyId) {
                checkPoint.trigger("pre_rebalance_confirmation_" + topologyId + "_from_" + String.valueOf(source));
                return checkPoint.future("resume_rebalance_confirmation_" + topologyId + "_from_" + String.valueOf(source), 20L, TimeUnit.SECONDS, this.testExecutor()).thenCompose(ignored -> (CompletionStage)Mocks.callAnotherAnswer(forwardedAnswer, invocation));
            }
            return forwardedAnswer.answer(invocation);
        }).when((Object)mockManager)).handleRebalancePhaseConfirm(ArgumentMatchers.anyString(), (Address)ArgumentMatchers.any(), ArgumentMatchers.anyInt(), (Throwable)ArgumentMatchers.isNull(), ArgumentMatchers.anyInt());
        TestingUtil.replaceComponent((CacheContainer)manager, ClusterTopologyManager.class, mockManager, true);
    }

    protected void waitUntilStateBeingTransferred(Cache<?, ?> cache, CheckPoint checkPoint) {
        StateConsumer sc = TestingUtil.extractComponent(cache, StateConsumer.class);
        Answer forwardedAnswer = AdditionalAnswers.delegatesTo((Object)sc);
        StateConsumer mockConsumer = (StateConsumer)Mockito.mock(StateConsumer.class, (MockSettings)Mockito.withSettings().defaultAnswer(forwardedAnswer));
        ((StateConsumer)Mockito.doAnswer(invocation -> {
            checkPoint.trigger("pre_state_apply_invoked_for_" + String.valueOf(cache));
            checkPoint.awaitStrict("pre_state_apply_release_for_" + String.valueOf(cache), 20L, TimeUnit.SECONDS);
            return forwardedAnswer.answer(invocation);
        }).when((Object)mockConsumer)).applyState((Address)ArgumentMatchers.any(Address.class), ArgumentMatchers.anyInt(), ArgumentMatchers.anyCollection());
        TestingUtil.replaceComponent(cache, StateConsumer.class, mockConsumer, true);
    }
}

