/*
 * Decompiled with CFR 0.152.
 */
package org.infinispan.statetransfer;

import java.lang.invoke.CallSite;
import java.util.Collection;
import java.util.HashMap;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import org.infinispan.Cache;
import org.infinispan.commons.tx.lookup.TransactionManagerLookup;
import org.infinispan.commons.util.concurrent.CompletableFutures;
import org.infinispan.commons.util.concurrent.CompletionStages;
import org.infinispan.configuration.cache.CacheMode;
import org.infinispan.configuration.cache.ConfigurationBuilder;
import org.infinispan.configuration.cache.IsolationLevel;
import org.infinispan.container.DataContainer;
import org.infinispan.distribution.LocalizedCacheTopology;
import org.infinispan.distribution.ch.ConsistentHashFactory;
import org.infinispan.factories.annotations.Inject;
import org.infinispan.factories.scopes.Scope;
import org.infinispan.factories.scopes.Scopes;
import org.infinispan.remoting.transport.Address;
import org.infinispan.statetransfer.DelegatingStateConsumer;
import org.infinispan.statetransfer.StateChunk;
import org.infinispan.statetransfer.StateConsumer;
import org.infinispan.test.MultipleCacheManagersTest;
import org.infinispan.test.TestingUtil;
import org.infinispan.test.fwk.CleanupAfterMethod;
import org.infinispan.transaction.LockingMode;
import org.infinispan.transaction.TransactionMode;
import org.infinispan.transaction.lookup.EmbeddedTransactionManagerLookup;
import org.infinispan.util.ControlledConsistentHashFactory;
import org.infinispan.util.concurrent.BlockingManager;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;
import org.testng.AssertJUnit;
import org.testng.annotations.Test;

@Test(groups={"functional"}, testName="statetransfer.DistStateTransferOnLeaveConsistencyTest")
@CleanupAfterMethod
public class DistStateTransferOnLeaveConsistencyTest
extends MultipleCacheManagersTest {
    private static final Log log = LogFactory.getLog(DistStateTransferOnLeaveConsistencyTest.class);
    private ControlledConsistentHashFactory consistentHashFactory;

    @Override
    protected final void createCacheManagers() {
    }

    protected ConfigurationBuilder createConfigurationBuilder(boolean isOptimistic) {
        ConfigurationBuilder builder = DistStateTransferOnLeaveConsistencyTest.getDefaultClusteredCacheConfig(CacheMode.DIST_SYNC, true, true);
        builder.transaction().transactionMode(TransactionMode.TRANSACTIONAL).transactionManagerLookup((TransactionManagerLookup)new EmbeddedTransactionManagerLookup());
        if (isOptimistic) {
            builder.transaction().lockingMode(LockingMode.OPTIMISTIC).locking().isolationLevel(IsolationLevel.REPEATABLE_READ);
        } else {
            builder.transaction().lockingMode(LockingMode.PESSIMISTIC);
        }
        this.consistentHashFactory = new ControlledConsistentHashFactory.Default(new int[][]{{0, 1}, {1, 2}});
        builder.clustering().hash().numOwners(2).numSegments(2).consistentHashFactory((ConsistentHashFactory)this.consistentHashFactory);
        builder.clustering().stateTransfer().fetchInMemoryState(true).awaitInitialTransfer(false);
        builder.clustering().l1().disable().locking().lockAcquisitionTimeout(TestingUtil.shortTimeoutMillis());
        return builder;
    }

    public void testRemoveOptimistic() throws Exception {
        this.testOperationDuringLeave(Operation.REMOVE, true);
    }

    public void testRemovePessimistic() throws Exception {
        this.testOperationDuringLeave(Operation.REMOVE, false);
    }

    public void testClearOptimistic() throws Exception {
        this.testOperationDuringLeave(Operation.CLEAR, true);
    }

    public void testClearPessimistic() throws Exception {
        this.testOperationDuringLeave(Operation.CLEAR, false);
    }

    public void testPutOptimistic() throws Exception {
        this.testOperationDuringLeave(Operation.PUT, true);
    }

    public void testPutPessimistic() throws Exception {
        this.testOperationDuringLeave(Operation.PUT, false);
    }

    public void testPutMapOptimistic() throws Exception {
        this.testOperationDuringLeave(Operation.PUT_MAP, true);
    }

    public void testPutMapPessimistic() throws Exception {
        this.testOperationDuringLeave(Operation.PUT_MAP, false);
    }

    public void testPutIfAbsentOptimistic() throws Exception {
        this.testOperationDuringLeave(Operation.PUT_IF_ABSENT, true);
    }

    public void testPutIfAbsentPessimistic() throws Exception {
        this.testOperationDuringLeave(Operation.PUT_IF_ABSENT, false);
    }

    public void testReplaceOptimistic() throws Exception {
        this.testOperationDuringLeave(Operation.REPLACE, true);
    }

    public void testReplacePessimistic() throws Exception {
        this.testOperationDuringLeave(Operation.REPLACE, false);
    }

    private void testOperationDuringLeave(Operation op, boolean isOptimistic) throws Exception {
        int i;
        int i2;
        int i3;
        ConfigurationBuilder builder = this.createConfigurationBuilder(isOptimistic);
        this.createCluster(builder, 3);
        this.waitForClusterToForm();
        int numKeys = 5;
        log.infof("Putting %d keys into cache ..", (Object)5);
        for (i3 = 0; i3 < 5; ++i3) {
            this.cache(0).put((Object)i3, (Object)("before_st_" + i3));
        }
        log.info((Object)"Finished putting keys");
        for (i3 = 0; i3 < 5; ++i3) {
            AssertJUnit.assertEquals((Object)("before_st_" + i3), (Object)this.cache(0).get((Object)i3));
            AssertJUnit.assertEquals((Object)("before_st_" + i3), (Object)this.cache(1).get((Object)i3));
            AssertJUnit.assertEquals((Object)("before_st_" + i3), (Object)this.cache(2).get((Object)i3));
        }
        CountDownLatch applyStateProceedLatch = new CountDownLatch(1);
        CountDownLatch applyStateStartedLatch1 = new CountDownLatch(1);
        DistStateTransferOnLeaveConsistencyTest.blockStateTransfer(this.advancedCache(0), applyStateStartedLatch1, applyStateProceedLatch);
        CountDownLatch applyStateStartedLatch2 = new CountDownLatch(1);
        DistStateTransferOnLeaveConsistencyTest.blockStateTransfer(this.advancedCache(2), applyStateStartedLatch2, applyStateProceedLatch);
        this.consistentHashFactory.setOwnerIndexes(new int[][]{{0, 1}, {1, 0}});
        log.info((Object)"Killing node 1 ..");
        TestingUtil.killCacheManagers(this.manager(1));
        log.info((Object)"Node 1 killed");
        DataContainer dc0 = this.advancedCache(0).getDataContainer();
        DataContainer dc2 = this.advancedCache(2).getDataContainer();
        if (!applyStateStartedLatch1.await(15L, TimeUnit.SECONDS)) {
            throw new TimeoutException();
        }
        if (!applyStateStartedLatch2.await(15L, TimeUnit.SECONDS)) {
            throw new TimeoutException();
        }
        if (op == Operation.CLEAR) {
            log.info((Object)"Clearing cache ..");
            this.cache(0).clear();
            log.info((Object)"Finished clearing cache");
            AssertJUnit.assertEquals((int)0, (int)dc0.size());
            AssertJUnit.assertEquals((int)0, (int)dc2.size());
        } else if (op == Operation.REMOVE) {
            log.info((Object)"Removing all keys one by one ..");
            for (i2 = 0; i2 < 5; ++i2) {
                this.cache(0).remove((Object)i2);
            }
            log.info((Object)"Finished removing keys");
            AssertJUnit.assertEquals((int)0, (int)dc0.size());
            AssertJUnit.assertEquals((int)0, (int)dc2.size());
        } else if (op == Operation.PUT || op == Operation.PUT_MAP || op == Operation.REPLACE || op == Operation.PUT_IF_ABSENT) {
            log.info((Object)"Updating all keys ..");
            if (op == Operation.PUT) {
                for (int i4 = 0; i4 < 5; ++i4) {
                    this.cache(0).put((Object)i4, (Object)("after_st_" + i4));
                }
            } else if (op == Operation.PUT_MAP) {
                HashMap<Integer, CallSite> toPut = new HashMap<Integer, CallSite>();
                for (i = 0; i < 5; ++i) {
                    toPut.put(i, (CallSite)((Object)("after_st_" + i)));
                }
                this.cache(0).putAll(toPut);
            } else if (op == Operation.REPLACE) {
                for (int i5 = 0; i5 < 5; ++i5) {
                    String expectedOldValue = "before_st_" + i5;
                    boolean replaced = this.cache(0).replace((Object)i5, (Object)expectedOldValue, (Object)("after_st_" + i5));
                    AssertJUnit.assertTrue((boolean)replaced);
                }
            } else {
                for (int i6 = 0; i6 < 5; ++i6) {
                    String expectedOldValue = "before_st_" + i6;
                    Object prevValue = this.cache(0).putIfAbsent((Object)i6, (Object)("after_st_" + i6));
                    AssertJUnit.assertEquals((Object)expectedOldValue, (Object)prevValue);
                }
            }
            log.info((Object)"Finished updating keys");
        }
        applyStateProceedLatch.countDown();
        TestingUtil.waitForNoRebalance(this.cache(0), this.cache(2));
        log.tracef("Data container of NodeA has %d keys: %s", dc0.size(), (Object)StreamSupport.stream(dc0.spliterator(), false).map(ice -> ice.getKey().toString()).collect(Collectors.joining(",")));
        log.tracef("Data container of NodeC has %d keys: %s", dc2.size(), (Object)StreamSupport.stream(dc2.spliterator(), false).map(ice -> ice.getKey().toString()).collect(Collectors.joining(",")));
        if (op == Operation.CLEAR || op == Operation.REMOVE) {
            for (i2 = 0; i2 < 5; ++i2) {
                AssertJUnit.assertNull((Object)dc0.get((Object)i2));
                AssertJUnit.assertNull((Object)dc2.get((Object)i2));
            }
        } else if (op == Operation.PUT || op == Operation.PUT_MAP || op == Operation.REPLACE) {
            LocalizedCacheTopology cacheTopology = this.advancedCache(0).getDistributionManager().getCacheTopology();
            for (i = 0; i < 5; ++i) {
                owners = 0;
                if (dc0.get((Object)i) != null) {
                    ++owners;
                }
                if (dc2.get((Object)i) != null) {
                    ++owners;
                }
                AssertJUnit.assertEquals((String)"Wrong number of owners", (int)cacheTopology.getDistribution((Object)i).readOwners().size(), (int)owners);
                String expected = "after_st_" + i;
                AssertJUnit.assertEquals((Object)expected, (Object)this.cache(0).get((Object)i));
                AssertJUnit.assertEquals((Object)("after_st_" + i), (Object)this.cache(2).get((Object)i));
            }
        } else {
            LocalizedCacheTopology cacheTopology = this.advancedCache(0).getDistributionManager().getCacheTopology();
            for (i = 0; i < 5; ++i) {
                owners = 0;
                if (dc0.get((Object)i) != null) {
                    ++owners;
                }
                if (dc2.get((Object)i) != null) {
                    ++owners;
                }
                AssertJUnit.assertEquals((String)"Wrong number of owners", (int)cacheTopology.getDistribution((Object)i).readOwners().size(), (int)owners);
                String expected = "before_st_" + i;
                AssertJUnit.assertEquals((Object)expected, (Object)this.cache(0).get((Object)i));
                AssertJUnit.assertEquals((Object)expected, (Object)this.cache(2).get((Object)i));
            }
        }
    }

    private static void blockStateTransfer(Cache<?, ?> cache, CountDownLatch started, CountDownLatch proceed) {
        TestingUtil.wrapComponent(cache, StateConsumer.class, current -> {
            BlockingStateConsumer stateConsumer = current instanceof BlockingStateConsumer ? (BlockingStateConsumer)current : new BlockingStateConsumer((StateConsumer)current);
            stateConsumer.startedLatch = started;
            stateConsumer.proceedLatch = proceed;
            return stateConsumer;
        });
    }

    private static enum Operation {
        REMOVE,
        CLEAR,
        PUT,
        PUT_MAP,
        PUT_IF_ABSENT,
        REPLACE;

    }

    @Scope(value=Scopes.NAMED_CACHE)
    public static class BlockingStateConsumer
    extends DelegatingStateConsumer {
        @Inject
        BlockingManager blockingManager;
        volatile CountDownLatch startedLatch;
        volatile CountDownLatch proceedLatch;

        BlockingStateConsumer(StateConsumer delegate) {
            super(delegate);
        }

        @Override
        public CompletionStage<?> applyState(Address sender, int topologyId, Collection<StateChunk> stateChunks) {
            return this.blockingManager.runBlocking(() -> {
                this.startedLatch.countDown();
                try {
                    if (!this.proceedLatch.await(15L, TimeUnit.SECONDS)) {
                        throw CompletableFutures.asCompletionException((Throwable)new TimeoutException());
                    }
                }
                catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
                CompletionStages.join(super.applyState(sender, topologyId, stateChunks));
            }, (Object)("state-" + String.valueOf(sender)));
        }
    }
}

