/*
 * Decompiled with CFR 0.152.
 */
package org.infinispan.statetransfer;

import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
import org.infinispan.Cache;
import org.infinispan.commands.ReplicableCommand;
import org.infinispan.commands.statetransfer.StateResponseCommand;
import org.infinispan.commons.tx.lookup.TransactionManagerLookup;
import org.infinispan.configuration.cache.CacheMode;
import org.infinispan.configuration.cache.ConfigurationBuilder;
import org.infinispan.configuration.global.GlobalConfigurationBuilder;
import org.infinispan.globalstate.NoOpGlobalConfigurationManager;
import org.infinispan.manager.EmbeddedCacheManager;
import org.infinispan.remoting.inboundhandler.DeliverOrder;
import org.infinispan.remoting.transport.Address;
import org.infinispan.remoting.transport.ResponseCollector;
import org.infinispan.remoting.transport.Transport;
import org.infinispan.remoting.transport.jgroups.JGroupsTransport;
import org.infinispan.test.MultipleCacheManagersTest;
import org.infinispan.test.TestingUtil;
import org.infinispan.test.fwk.CleanupAfterMethod;
import org.infinispan.test.fwk.TransportFlags;
import org.infinispan.transaction.lookup.EmbeddedTransactionManagerLookup;
import org.jgroups.protocols.DISCARD;
import org.testng.AssertJUnit;
import org.testng.annotations.Test;

@Test(groups={"functional"}, testName="statetransfer.StateTransferRestartTest")
@CleanupAfterMethod
public class StateTransferRestartTest
extends MultipleCacheManagersTest {
    private ConfigurationBuilder cfgBuilder;
    private GlobalConfigurationBuilder gcfgBuilder;
    private final MockTransport mockTransport = new MockTransport();

    @Override
    protected void createCacheManagers() throws Throwable {
        this.cfgBuilder = StateTransferRestartTest.getDefaultClusteredCacheConfig(CacheMode.DIST_SYNC, true);
        this.cfgBuilder.transaction().transactionManagerLookup((TransactionManagerLookup)new EmbeddedTransactionManagerLookup());
        this.cfgBuilder.clustering().hash().numOwners(2);
        this.cfgBuilder.clustering().stateTransfer().fetchInMemoryState(true);
        this.cfgBuilder.clustering().stateTransfer().timeout(20000L);
        this.gcfgBuilder = new GlobalConfigurationBuilder();
        this.gcfgBuilder.transport().transport((Transport)this.mockTransport);
    }

    @Override
    protected void amendCacheManagerBeforeStart(EmbeddedCacheManager cm) {
        NoOpGlobalConfigurationManager.amendCacheManager(cm);
    }

    public void testStateTransferRestart() throws Throwable {
        int numKeys = 100;
        this.addClusterEnabledCacheManager(this.cfgBuilder, new TransportFlags().withFD(true));
        this.addClusterEnabledCacheManager(this.gcfgBuilder, this.cfgBuilder, new TransportFlags().withFD(true));
        log.info((Object)"waiting for cluster { c0, c1 }");
        this.waitForClusterToForm();
        log.info((Object)"putting in data");
        Cache c0 = this.cache(0);
        Cache c1 = this.cache(1);
        for (int k = 0; k < 100; ++k) {
            c0.put((Object)k, (Object)k);
        }
        TestingUtil.waitForNoRebalance(c0, c1);
        AssertJUnit.assertEquals((int)100, (int)c0.entrySet().size());
        AssertJUnit.assertEquals((int)100, (int)c1.entrySet().size());
        this.mockTransport.callOnStateResponseCommand = () -> {
            this.fork(() -> {
                log.info((Object)"KILLING the c1 cache");
                try {
                    DISCARD d3 = TestingUtil.getDiscardForCache(c1.getCacheManager());
                    d3.discardAll(true);
                    TestingUtil.killCacheManagers(this.manager(c1));
                }
                catch (Exception e) {
                    log.info((Object)"there was some exception while killing cache");
                }
                return null;
            });
            try {
                Thread.sleep(25000L);
            }
            catch (InterruptedException e) {
                log.info((Object)"Interrupted as expected.");
                Thread.currentThread().interrupt();
            }
            return null;
        };
        log.info((Object)"adding cache c2");
        this.addClusterEnabledCacheManager(this.cfgBuilder, new TransportFlags().withFD(true));
        log.info((Object)"get c2");
        Cache c2 = this.cache(2);
        log.info((Object)"waiting for cluster { c0, c2 }");
        TestingUtil.blockUntilViewsChanged(10000L, 2, c0, c2);
        log.infof("c0 entrySet size before : %d", (Object)c0.entrySet().size());
        log.infof("c2 entrySet size before : %d", (Object)c2.entrySet().size());
        this.eventuallyEquals(100, () -> c0.entrySet().size());
        this.eventuallyEquals(100, () -> c2.entrySet().size());
        log.info((Object)"Ending the test");
    }

    static class MockTransport
    extends JGroupsTransport {
        volatile Callable<Void> callOnStateResponseCommand;

        MockTransport() {
        }

        public <T> CompletionStage<T> invokeCommand(Address target, ReplicableCommand command, ResponseCollector<T> collector, DeliverOrder deliverOrder, long timeout, TimeUnit unit) {
            if (this.callOnStateResponseCommand != null && command.getClass() == StateResponseCommand.class) {
                log.trace((Object)"Ignoring StateResponseCommand");
                try {
                    this.callOnStateResponseCommand.call();
                }
                catch (Exception e) {
                    log.error((Object)"Error in callOnStateResponseCommand", (Throwable)e);
                }
                return CompletableFuture.completedFuture(null);
            }
            return super.invokeCommand(target, command, collector, deliverOrder, timeout, unit);
        }
    }
}

