/*
 * Decompiled with CFR 0.152.
 */
package org.infinispan.notifications.cachelistener.cluster;

import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicInteger;
import org.infinispan.Cache;
import org.infinispan.commons.executors.BlockingThreadPoolExecutorFactory;
import org.infinispan.commons.executors.ThreadPoolExecutorFactory;
import org.infinispan.commons.test.TestResourceTracker;
import org.infinispan.configuration.cache.CacheMode;
import org.infinispan.configuration.cache.Configuration;
import org.infinispan.configuration.cache.ConfigurationBuilder;
import org.infinispan.configuration.global.GlobalConfigurationBuilder;
import org.infinispan.manager.CacheContainer;
import org.infinispan.manager.DefaultCacheManager;
import org.infinispan.notifications.Listener;
import org.infinispan.notifications.cachelistener.annotation.CacheEntryCreated;
import org.infinispan.notifications.cachelistener.annotation.CacheEntryModified;
import org.infinispan.notifications.cachelistener.annotation.CacheEntryRemoved;
import org.infinispan.notifications.cachelistener.event.CacheEntryEvent;
import org.infinispan.test.MultipleCacheManagersTest;
import org.infinispan.test.fwk.InCacheMode;
import org.testng.Assert;
import org.testng.annotations.Test;

@Test(groups={"stress"}, testName="notifications.cachelistener.cluster.ClusterListenerStressTest", timeOut=900000L)
@InCacheMode(value={CacheMode.DIST_SYNC})
public class ClusterListenerStressTest
extends MultipleCacheManagersTest {
    protected static final String CACHE_NAME = "cluster-listener";
    protected static final String KEY = "ClusterListenerStressTestKey";
    private static final int NUM_NODES = 3;
    protected ConfigurationBuilder builderUsed;

    @Override
    protected void createCacheManagers() throws Throwable {
        Configuration distConfig = ClusterListenerStressTest.getDefaultClusteredCacheConfig(this.cacheMode, false).build();
        for (int i = 0; i < 3; ++i) {
            GlobalConfigurationBuilder gcb = new GlobalConfigurationBuilder();
            gcb.transport().defaultTransport().nodeName(TestResourceTracker.getNameForIndex((int)i));
            BlockingThreadPoolExecutorFactory remoteExecutorFactory = new BlockingThreadPoolExecutorFactory(10, 1, 0, 60000L);
            gcb.transport().remoteCommandThreadPool().threadPoolFactory((ThreadPoolExecutorFactory)remoteExecutorFactory);
            DefaultCacheManager cm = new DefaultCacheManager(gcb.build());
            this.registerCacheManager(new CacheContainer[]{cm});
            cm.defineConfiguration(CACHE_NAME, distConfig);
            log.infof("Started cache manager %s", (Object)cm.getAddress());
        }
        this.waitForClusterToForm(CACHE_NAME);
    }

    @Test
    public void runStressTestMultipleWriters() throws ExecutionException, InterruptedException {
        Cache cache0 = this.cache(0, CACHE_NAME);
        Cache cache1 = this.cache(1, CACHE_NAME);
        Cache cache2 = this.cache(2, CACHE_NAME);
        ClusterListenerAggregator listener = new ClusterListenerAggregator();
        cache0.addListener((Object)listener);
        cache0.addListener((Object)listener);
        cache0.addListener((Object)listener);
        cache1.addListener((Object)listener);
        cache1.addListener((Object)listener);
        cache2.addListener((Object)listener);
        long begin = System.currentTimeMillis();
        int threadCount = 10;
        final CountDownLatch latch = new CountDownLatch(threadCount);
        Callable<CreateModifyRemovals> callable = new Callable<CreateModifyRemovals>(){

            @Override
            public CreateModifyRemovals call() throws Exception {
                latch.countDown();
                latch.await();
                int creationCount = 0;
                int modifyCount = 0;
                int removalCount = 0;
                block6: for (int i = 0; i < 1000; ++i) {
                    int random = ThreadLocalRandom.current().nextInt(0, 23);
                    boolean key = (random & 1) == 1;
                    int cache = random / 8;
                    int operation = random & 3;
                    Cache cacheToUse = ClusterListenerStressTest.this.cache(cache, ClusterListenerStressTest.CACHE_NAME);
                    String keyToUse = key ? ClusterListenerStressTest.KEY : "ClusterListenerStressTestKey2";
                    switch (operation) {
                        case 0: {
                            Integer prevValue = (Integer)cacheToUse.put((Object)keyToUse, (Object)i);
                            if (prevValue != null) {
                                ++modifyCount;
                                continue block6;
                            }
                            ++creationCount;
                            continue block6;
                        }
                        case 1: {
                            cacheToUse.remove((Object)keyToUse);
                            ++removalCount;
                            continue block6;
                        }
                        case 2: {
                            Integer prevValue = (Integer)cacheToUse.get((Object)keyToUse);
                            if (prevValue != null) {
                                if (!cacheToUse.replace((Object)keyToUse, (Object)prevValue, (Object)i)) continue block6;
                                ++modifyCount;
                                continue block6;
                            }
                            if (cacheToUse.putIfAbsent((Object)keyToUse, (Object)i) != null) continue block6;
                            ++creationCount;
                            continue block6;
                        }
                        case 3: {
                            Integer prevValue = (Integer)cacheToUse.get((Object)keyToUse);
                            if (prevValue == null) continue block6;
                            cacheToUse.remove((Object)keyToUse, (Object)prevValue);
                            ++removalCount;
                            continue block6;
                        }
                        default: {
                            throw new IllegalArgumentException("Unsupported case!, provided " + operation);
                        }
                    }
                }
                return new CreateModifyRemovals(creationCount, modifyCount, removalCount);
            }
        };
        Future[] futures = new Future[threadCount];
        for (int i = 0; i < threadCount; ++i) {
            futures[i] = this.fork(callable);
        }
        int creationCount = 0;
        int modifyCount = 0;
        int removalCount = 0;
        for (Future future : futures) {
            CreateModifyRemovals cmr = (CreateModifyRemovals)future.get();
            creationCount += cmr.creationCount;
            modifyCount += cmr.modifyCount;
            removalCount += cmr.removalCount;
        }
        int listenerCount = 6;
        Assert.assertEquals((int)listener.creationCount.get(), (int)(creationCount * listenerCount));
        Assert.assertEquals((int)listener.modifyCount.get(), (int)(modifyCount * listenerCount));
        Assert.assertEquals((int)listener.removalCount.get(), (int)(removalCount * listenerCount));
        System.out.println("Took " + (System.currentTimeMillis() - begin) + " milliseconds");
    }

    @Listener(clustered=true)
    private static class ClusterListenerAggregator {
        AtomicInteger creationCount = new AtomicInteger();
        AtomicInteger modifyCount = new AtomicInteger();
        AtomicInteger removalCount = new AtomicInteger();

        private ClusterListenerAggregator() {
        }

        @CacheEntryCreated
        public void listenForModifications(CacheEntryEvent<String, Integer> event) {
            this.creationCount.incrementAndGet();
        }

        @CacheEntryModified
        public void modified(CacheEntryEvent<String, Integer> event) {
            this.modifyCount.incrementAndGet();
        }

        @CacheEntryRemoved
        public void removed(CacheEntryEvent<String, Integer> event) {
            this.removalCount.incrementAndGet();
        }
    }

    private static class CreateModifyRemovals {
        private final int creationCount;
        private final int modifyCount;
        private final int removalCount;

        public CreateModifyRemovals(int creationCount, int modifyCount, int removalCount) {
            this.creationCount = creationCount;
            this.modifyCount = modifyCount;
            this.removalCount = removalCount;
        }
    }
}

