/*
 * Decompiled with CFR 0.152.
 */
package org.infinispan.stress;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.infinispan.Cache;
import org.infinispan.configuration.cache.CacheMode;
import org.infinispan.configuration.cache.ConfigurationBuilder;
import org.infinispan.manager.EmbeddedCacheManager;
import org.infinispan.test.AbstractInfinispanTest;
import org.infinispan.test.TestingUtil;
import org.infinispan.test.fwk.TestCacheManagerFactory;
import org.testng.annotations.Test;

@Test(groups={"stress"}, testName="stress.PutIfAbsentStressTest", description="Since this test is slow to run, it should be disabled by default and run by hand as necessary.", timeOut=900000L)
public class PutIfAbsentStressTest
extends AbstractInfinispanTest {
    private static final int NODES_NUM = 5;
    private static final int THREAD_PER_NODE = 12;
    private static final long STRESS_TIME_MINUTES = 2L;
    private static final String SHARED_KEY = "thisIsTheKeyForConcurrentAccess";

    public void testonConcurrentHashMap() throws Exception {
        System.out.println("Running test on ConcurrentHashMap:");
        ConcurrentHashMap<String, String> map = new ConcurrentHashMap<String, String>();
        this.testConcurrentLocking(map);
    }

    public void testonInfinispanLocal() throws Exception {
        System.out.println("Running test on Infinispan, LOCAL:");
        EmbeddedCacheManager cm = TestCacheManagerFactory.createCacheManager(false);
        Cache map = cm.getCache();
        try {
            this.testConcurrentLocking((ConcurrentMap<String, String>)map);
        }
        catch (Throwable throwable) {
            TestingUtil.killCacheManagers(cm);
            throw throwable;
        }
        TestingUtil.killCacheManagers(cm);
    }

    public void testonInfinispanDIST_SYNC() throws Exception {
        System.out.println("Running test on Infinispan, DIST_SYNC:");
        ConfigurationBuilder c = new ConfigurationBuilder();
        c.clustering().cacheMode(CacheMode.DIST_SYNC);
        this.testConcurrentLockingOnMultipleManagers(c);
    }

    public void testonInfinispanDIST_NOL1() throws Exception {
        System.out.println("Running test on Infinispan, DIST_SYNC, disabling L1:");
        ConfigurationBuilder c = new ConfigurationBuilder();
        c.clustering().cacheMode(CacheMode.DIST_SYNC).l1().disable();
        this.testConcurrentLockingOnMultipleManagers(c);
    }

    public void testonInfinispanREPL_SYNC() throws Exception {
        System.out.println("Running test on Infinispan, REPL_SYNC:");
        ConfigurationBuilder c = new ConfigurationBuilder();
        c.clustering().cacheMode(CacheMode.REPL_SYNC);
        this.testConcurrentLockingOnMultipleManagers(c);
    }

    public void testonInfinispanREPL_ASYNC() throws Exception {
        System.out.println("Running test on Infinispan, REPL_ASYNC:");
        ConfigurationBuilder c = new ConfigurationBuilder();
        c.clustering().cacheMode(CacheMode.REPL_ASYNC);
        this.testConcurrentLockingOnMultipleManagers(c);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void testConcurrentLockingOnMultipleManagers(ConfigurationBuilder cfg) throws InterruptedException {
        ArrayList<EmbeddedCacheManager> cacheContainers = new ArrayList<EmbeddedCacheManager>(5);
        ArrayList<Cache> caches = new ArrayList<Cache>();
        ArrayList<ConcurrentMap<String, String>> maps = new ArrayList<ConcurrentMap<String, String>>(60);
        for (int nodeNum = 0; nodeNum < 5; ++nodeNum) {
            EmbeddedCacheManager cm = TestCacheManagerFactory.createClusteredCacheManager(cfg);
            cacheContainers.add(cm);
            Cache cache = cm.getCache();
            caches.add(cache);
            for (int threadNum = 0; threadNum < 12; ++threadNum) {
                maps.add((ConcurrentMap<String, String>)cache);
            }
        }
        TestingUtil.blockUntilViewsReceived(10000, caches);
        try {
            this.testConcurrentLocking(maps);
        }
        finally {
            TestingUtil.killCacheManagers(cacheContainers);
        }
    }

    private void testConcurrentLocking(ConcurrentMap<String, String> map) throws InterruptedException {
        int size = 60;
        ArrayList<ConcurrentMap<String, String>> maps = new ArrayList<ConcurrentMap<String, String>>(size);
        for (int i = 0; i < size; ++i) {
            maps.add(map);
        }
        this.testConcurrentLocking(maps);
    }

    private void testConcurrentLocking(List<ConcurrentMap<String, String>> maps) throws InterruptedException {
        SharedStats stats = new SharedStats();
        ThreadPoolExecutor executor = (ThreadPoolExecutor)Executors.newFixedThreadPool(5);
        ArrayList<StressingThread> threads = new ArrayList<StressingThread>();
        int i = 0;
        for (ConcurrentMap<String, String> map : maps) {
            StressingThread thread = new StressingThread(stats, map, i++);
            threads.add(thread);
            executor.execute(thread);
        }
        executor.shutdown();
        Thread.sleep(5000L);
        int putsAfter5Seconds = stats.succesfullPutsCounter.get();
        System.out.println("\nSituation after 5 seconds:");
        System.out.println(stats.toString());
        executor.awaitTermination(2L, TimeUnit.MINUTES);
        stats.globalQuit = true;
        executor.awaitTermination(10L, TimeUnit.SECONDS);
        executor.shutdownNow();
        System.out.println("\nFinal situation:");
        System.out.println(stats.toString());
        assert (!stats.seenFailures) : "at least one thread has seen unexpected state";
        assert (stats.succesfullPutsCounter.get() > 0) : "the lock should have been taken at least once";
        assert (stats.succesfullPutsCounter.get() > putsAfter5Seconds) : "the lock count didn't improve since the first 5 seconds. Deadlock?";
        assert (stats.succesfullPutsCounter.get() == stats.lockReleasedCounter.get()) : "there's a mismatch in acquires and releases count";
        assert (stats.lockOwnersCounter.get() == 0) : "the lock is still held at test finish";
    }

    public static class SharedStats {
        final AtomicInteger canceledPutsCounter = new AtomicInteger(0);
        final AtomicInteger succesfullPutsCounter = new AtomicInteger(0);
        final AtomicInteger lockReleasedCounter = new AtomicInteger(0);
        final AtomicInteger lockOwnersCounter = new AtomicInteger(0);
        Throwable throwable = null;
        volatile boolean globalQuit = false;
        volatile boolean seenFailures = false;

        public String toString() {
            return "\n\tCanceled puts count:\t" + this.canceledPutsCounter.get() + "\n\tSuccesfull puts count:\t" + this.succesfullPutsCounter.get() + "\n\tRemoved count:\t" + this.lockReleasedCounter.get() + "\n\tIllegal state detected:\t" + this.seenFailures;
        }
    }

    private static class StressingThread
    implements Runnable {
        private final SharedStats stats;
        private final ConcurrentMap<String, String> cache;
        private final String ourValue;

        public StressingThread(SharedStats stats, ConcurrentMap<String, String> cache, int threadId) {
            this.stats = stats;
            this.cache = cache;
            this.ourValue = "v" + threadId;
        }

        @Override
        public void run() {
            while (!(this.stats.seenFailures || this.stats.globalQuit || Thread.interrupted())) {
                this.doCycle();
            }
        }

        private void doCycle() {
            String beforePut = this.cache.putIfAbsent(PutIfAbsentStressTest.SHARED_KEY, this.ourValue);
            if (beforePut != null) {
                this.stats.canceledPutsCounter.incrementAndGet();
            } else {
                String currentCacheValue = (String)this.cache.get(PutIfAbsentStressTest.SHARED_KEY);
                boolean lockIsFine = this.stats.lockOwnersCounter.compareAndSet(0, 1) && this.ourValue.equals(currentCacheValue);
                this.stats.succesfullPutsCounter.incrementAndGet();
                this.checkIsTrue(lockIsFine, "I got the lock, some other thread is owning the lock AS WELL.");
                lockIsFine = this.stats.lockOwnersCounter.compareAndSet(1, 0);
                this.checkIsTrue(lockIsFine, "Some other thread changed the lock count while I was having it!");
                this.cache.remove(PutIfAbsentStressTest.SHARED_KEY);
                this.stats.lockReleasedCounter.incrementAndGet();
            }
        }

        private void checkIsTrue(boolean assertion, String message) {
            if (!assertion) {
                this.stats.seenFailures = true;
                System.out.println(message);
            }
        }
    }
}

