/*
 * Decompiled with CFR 0.152.
 */
package org.infinispan.reactive.publisher.impl;

import io.reactivex.rxjava3.core.Flowable;
import io.reactivex.rxjava3.core.Single;
import io.reactivex.rxjava3.functions.BiFunction;
import io.reactivex.rxjava3.functions.Function;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletionStage;
import java.util.function.Consumer;
import java.util.stream.Collector;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import org.infinispan.Cache;
import org.infinispan.commons.util.IntSet;
import org.infinispan.commons.util.IntSets;
import org.infinispan.commons.util.concurrent.CompletionStages;
import org.infinispan.configuration.cache.CacheMode;
import org.infinispan.configuration.cache.ConfigurationBuilder;
import org.infinispan.distribution.DistributionManager;
import org.infinispan.distribution.ch.KeyPartitioner;
import org.infinispan.reactive.publisher.impl.DeliveryGuarantee;
import org.infinispan.reactive.publisher.impl.LocalPublisherManager;
import org.infinispan.reactive.publisher.impl.SegmentAwarePublisherSupplier;
import org.infinispan.reactive.publisher.impl.SimpleClusterPublisherManagerTest;
import org.infinispan.reactive.publisher.impl.commands.reduction.PublisherResult;
import org.infinispan.test.MultipleCacheManagersTest;
import org.infinispan.test.TestingUtil;
import org.infinispan.test.fwk.InCacheMode;
import org.infinispan.util.concurrent.BlockingManager;
import org.reactivestreams.Publisher;
import org.testng.AssertJUnit;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

@Test(groups={"functional"}, testName="reactive.publisher.impl.SimpleLocalPublisherManagerTest")
@InCacheMode(value={CacheMode.REPL_SYNC, CacheMode.DIST_SYNC})
public class SimpleLocalPublisherManagerTest
extends MultipleCacheManagersTest {
    private static final int SEGMENT_COUNT = 128;

    ConfigurationBuilder cacheConfiguration() {
        ConfigurationBuilder builder = SimpleLocalPublisherManagerTest.getDefaultClusteredCacheConfig(this.cacheMode, false);
        builder.clustering().hash().numSegments(128);
        return builder;
    }

    @Override
    protected void createCacheManagers() throws Throwable {
        ConfigurationBuilder builder = this.cacheConfiguration();
        this.createCluster(builder, 3);
        this.waitForClusterToForm();
    }

    private Map<Integer, String> insert(Cache<Integer, String> cache) {
        int amount = 100;
        HashMap<Integer, String> values = new HashMap<Integer, String>(amount);
        IntStream.range(0, amount).forEach(i -> values.put(i, "value-" + i));
        cache.putAll(values);
        return values;
    }

    private LocalPublisherManager<Integer, String> lpm(Cache<Integer, String> cache) {
        return TestingUtil.extractComponent(cache, LocalPublisherManager.class);
    }

    @DataProvider(name="GuaranteeEntry")
    public Object[][] deliveryGuaranteeAndEntryProvider() {
        return (Object[][])Arrays.stream(DeliveryGuarantee.values()).flatMap(dg -> Stream.of(Boolean.TRUE, Boolean.FALSE).map(entry -> new Object[]{dg, entry})).toArray(x$0 -> new Object[x$0][]);
    }

    @Test(dataProvider="GuaranteeEntry")
    public void testNoIntermediateOps(DeliveryGuarantee deliveryGuarantee, boolean isEntry) {
        Consumer<Object> assertConsumer;
        SegmentAwarePublisherSupplier publisher;
        Cache cache = this.cache(0);
        Map<Integer, String> inserted = this.insert(cache);
        LocalPublisherManager<Integer, String> lpm = this.lpm(cache);
        IntSet allSegments = IntSets.immutableRangeSet((int)128);
        if (isEntry) {
            publisher = lpm.keyPublisher(allSegments, null, null, 0L, deliveryGuarantee, java.util.function.Function.identity());
            assertConsumer = obj -> AssertJUnit.assertTrue((boolean)inserted.containsKey(obj));
        } else {
            publisher = lpm.entryPublisher(allSegments, null, null, 0L, deliveryGuarantee, java.util.function.Function.identity());
            assertConsumer = obj -> {
                Map.Entry entry = (Map.Entry)obj;
                Object value = inserted.get(entry.getKey());
                AssertJUnit.assertEquals(value, entry.getValue());
            };
        }
        DistributionManager dm = TestingUtil.extractComponent(cache, DistributionManager.class);
        IntSet localSegments = dm.getCacheTopology().getLocalReadSegments();
        int expected = SimpleClusterPublisherManagerTest.findHowManyInSegments(inserted.size(), localSegments, TestingUtil.extractComponent(cache, KeyPartitioner.class));
        Set results = (Set)Flowable.fromPublisher((Publisher)publisher.publisherWithoutSegments()).collectInto(new HashSet(), HashSet::add).blockingGet();
        AssertJUnit.assertEquals((int)expected, (int)results.size());
        results.forEach(assertConsumer);
    }

    @Test(dataProvider="GuaranteeEntry")
    public void testProperOrderingGuarantees(DeliveryGuarantee deliveryGuarantee, boolean isEntry) {
        Cache cache = this.cache(0);
        Map<Integer, String> inserted = this.insert(cache);
        LocalPublisherManager<Integer, String> lpm = this.lpm(cache);
        IntSet allSegments = IntSets.immutableRangeSet((int)128);
        SegmentAwarePublisherSupplier publisher = isEntry ? lpm.keyPublisher(allSegments, null, null, 0L, deliveryGuarantee, java.util.function.Function.identity()) : lpm.entryPublisher(allSegments, null, null, 0L, deliveryGuarantee, java.util.function.Function.identity());
        DistributionManager dm = TestingUtil.extractComponent(cache, DistributionManager.class);
        IntSet localSegments = dm.getCacheTopology().getLocalReadSegments();
        int expected = SimpleClusterPublisherManagerTest.findHowManyInSegments(inserted.size(), localSegments, TestingUtil.extractComponent(cache, KeyPartitioner.class));
        List list = (List)Flowable.fromPublisher((Publisher)publisher.publisherWithLostSegments()).collect(Collectors.toList()).blockingGet();
        AssertJUnit.assertEquals((int)(expected + 128), (int)list.size());
        int currentSegment = -1;
        for (Object obj : list) {
            SegmentAwarePublisherSupplier.NotificationWithLost notification = (SegmentAwarePublisherSupplier.NotificationWithLost)obj;
            if (notification.isValue()) {
                if (currentSegment == -1) {
                    currentSegment = notification.valueSegment();
                    continue;
                }
                AssertJUnit.assertEquals((int)currentSegment, (int)notification.valueSegment());
                continue;
            }
            if (notification.isSegmentComplete()) {
                segment = notification.completedSegment();
                if (!localSegments.contains(segment)) {
                    AssertJUnit.assertEquals((String)"Only at most once can say the segment is complete without having it", (Object)deliveryGuarantee, (Object)DeliveryGuarantee.AT_MOST_ONCE);
                }
            } else {
                segment = notification.lostSegment();
                AssertJUnit.assertFalse((boolean)localSegments.contains(segment));
            }
            if (currentSegment == -1) continue;
            AssertJUnit.assertEquals((int)currentSegment, (int)notification.completedSegment());
            currentSegment = -1;
        }
    }

    @DataProvider(name="GuaranteeParallelEntry")
    public Object[][] deliveryGuaranteeParallelEntryProvider() {
        return (Object[][])Arrays.stream(DeliveryGuarantee.values()).flatMap(dg -> Stream.of(Boolean.TRUE, Boolean.FALSE).flatMap(parallel -> Stream.of(Boolean.TRUE, Boolean.FALSE).map(entry -> new Object[]{dg, parallel, entry}))).toArray(x$0 -> new Object[x$0][]);
    }

    @Test(dataProvider="GuaranteeParallelEntry")
    public void testWithAsyncOperation(DeliveryGuarantee deliveryGuarantee, boolean isParallel, boolean isEntry) {
        Consumer<Object> assertConsumer;
        CompletionStage stage;
        Cache cache = this.cache(0);
        Map<Integer, String> inserted = this.insert(cache);
        BlockingManager blockingManager = TestingUtil.extractComponent(cache, BlockingManager.class);
        LocalPublisherManager<Integer, String> lpm = this.lpm(cache);
        IntSet allSegments = IntSets.immutableRangeSet((int)128);
        Collector collector = Collectors.toSet();
        BiFunction reduceBiFunction = (left, right) -> {
            left.addAll(right);
            return left;
        };
        Function sleepOnBlockingPoolFunction = value -> Single.fromCompletionStage((CompletionStage)blockingManager.supplyBlocking(() -> value, (Object)"test-blocking-thread"));
        if (isEntry) {
            stage = lpm.keyReduction(isParallel, allSegments, null, null, 0L, deliveryGuarantee, publisher -> Flowable.fromPublisher((Publisher)publisher).concatMapSingle(sleepOnBlockingPoolFunction).collect(collector).toCompletionStage(), publisher -> Flowable.fromPublisher((Publisher)publisher).reduce(reduceBiFunction).toCompletionStage(Collections.emptySet()));
            assertConsumer = obj -> AssertJUnit.assertTrue((boolean)inserted.containsKey(obj));
        } else {
            stage = lpm.entryReduction(isParallel, allSegments, null, null, 0L, deliveryGuarantee, publisher -> Flowable.fromPublisher((Publisher)publisher).concatMapSingle(sleepOnBlockingPoolFunction).collect(collector).toCompletionStage(), publisher -> Flowable.fromPublisher((Publisher)publisher).reduce(reduceBiFunction).toCompletionStage(Collections.emptySet()));
            assertConsumer = obj -> {
                Map.Entry entry = (Map.Entry)obj;
                Object value = inserted.get(entry.getKey());
                AssertJUnit.assertEquals(value, entry.getValue());
            };
        }
        DistributionManager dm = TestingUtil.extractComponent(cache, DistributionManager.class);
        IntSet localSegments = dm.getCacheTopology().getLocalReadSegments();
        int expected = SimpleClusterPublisherManagerTest.findHowManyInSegments(inserted.size(), localSegments, TestingUtil.extractComponent(cache, KeyPartitioner.class));
        Set results = (Set)((PublisherResult)CompletionStages.join((CompletionStage)stage)).getResult();
        AssertJUnit.assertEquals((int)expected, (int)results.size());
        results.forEach(assertConsumer);
    }
}

