/*
 * Decompiled with CFR 0.152.
 */
package org.infinispan.stream.impl;

import java.util.Collection;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.IntFunction;
import java.util.function.Predicate;
import java.util.function.Supplier;
import org.infinispan.Cache;
import org.infinispan.commons.util.IntSet;
import org.infinispan.configuration.cache.Configuration;
import org.infinispan.distribution.ch.ConsistentHash;
import org.infinispan.factories.annotations.Inject;
import org.infinispan.factories.annotations.Start;
import org.infinispan.notifications.Listener;
import org.infinispan.notifications.cachelistener.annotation.PartitionStatusChanged;
import org.infinispan.notifications.cachelistener.event.PartitionStatusChangedEvent;
import org.infinispan.partitionhandling.AvailabilityException;
import org.infinispan.partitionhandling.AvailabilityMode;
import org.infinispan.partitionhandling.PartitionHandling;
import org.infinispan.remoting.transport.Address;
import org.infinispan.stream.impl.ClusterStreamManager;
import org.infinispan.stream.impl.ClusterStreamManagerImpl;
import org.infinispan.stream.impl.KeyTrackingTerminalOperation;
import org.infinispan.stream.impl.TerminalOperation;
import org.infinispan.stream.impl.intops.IntermediateOperation;

public class PartitionAwareClusterStreamManager<Original, K>
extends ClusterStreamManagerImpl<Original, K> {
    protected final PartitionListener listener = new PartitionListener();
    @Inject
    protected Cache<?, ?> cache;
    @Inject
    protected Configuration configuration;
    private PartitionHandling partitionHandling;

    @Override
    @Start
    public void start() {
        super.start();
        this.partitionHandling = this.configuration.clustering().partitionHandling().whenSplit();
        this.cache.addListener(this.listener);
    }

    @Override
    public boolean awaitCompletion(Object id, long time, TimeUnit unit) throws InterruptedException {
        this.checkPartitionStatus();
        return super.awaitCompletion(id, time, unit);
    }

    @Override
    public <R> Object remoteStreamOperation(boolean parallelDistribution, boolean parallelStream, ConsistentHash ch, Set<Integer> segments, Set<K> keysToInclude, Map<Integer, Set<K>> keysToExclude, boolean includeLoader, boolean entryStream, TerminalOperation<Original, R> operation, ClusterStreamManager.ResultsCallback<R> callback, Predicate<? super R> earlyTerminatePredicate) {
        this.checkPartitionStatus();
        return super.remoteStreamOperation(parallelDistribution, parallelStream, ch, segments, keysToInclude, keysToExclude, includeLoader, entryStream, operation, callback, earlyTerminatePredicate);
    }

    @Override
    public <R> Object remoteStreamOperation(boolean parallelDistribution, boolean parallelStream, ConsistentHash ch, Set<Integer> segments, Set<K> keysToInclude, Map<Integer, Set<K>> keysToExclude, boolean includeLoader, boolean entryStream, KeyTrackingTerminalOperation<Original, K, R> operation, ClusterStreamManager.ResultsCallback<Collection<R>> callback) {
        this.checkPartitionStatus();
        return super.remoteStreamOperation(parallelDistribution, parallelStream, ch, segments, keysToInclude, keysToExclude, includeLoader, entryStream, operation, callback);
    }

    @Override
    public <R> Object remoteStreamOperationRehashAware(boolean parallelDistribution, boolean parallelStream, ConsistentHash ch, Set<Integer> segments, Set<K> keysToInclude, Map<Integer, Set<K>> keysToExclude, boolean includeLoader, boolean entryStream, TerminalOperation<Original, R> operation, ClusterStreamManager.ResultsCallback<R> callback, Predicate<? super R> earlyTerminatePredicate) {
        this.checkPartitionStatus();
        return super.remoteStreamOperationRehashAware(parallelDistribution, parallelStream, ch, segments, keysToInclude, keysToExclude, includeLoader, entryStream, operation, callback, earlyTerminatePredicate);
    }

    @Override
    public Object remoteStreamOperationRehashAware(boolean parallelDistribution, boolean parallelStream, ConsistentHash ch, Set<Integer> segments, Set<K> keysToInclude, Map<Integer, Set<K>> keysToExclude, boolean includeLoader, boolean entryStream, KeyTrackingTerminalOperation<Original, K, ?> operation, ClusterStreamManager.ResultsCallback<Collection<K>> callback) {
        this.checkPartitionStatus();
        return super.remoteStreamOperationRehashAware(parallelDistribution, parallelStream, ch, segments, keysToInclude, keysToExclude, includeLoader, entryStream, operation, callback);
    }

    @Override
    public <E> ClusterStreamManager.RemoteIteratorPublisher<E> remoteIterationPublisher(boolean parallelStream, Supplier<Map.Entry<Address, IntSet>> targets, Set<K> keysToInclude, IntFunction<Set<K>> keysToExclude, boolean includeLoader, boolean entryStream, Iterable<IntermediateOperation> intermediateOperations) {
        this.checkPartitionStatus();
        return super.remoteIterationPublisher(parallelStream, targets, keysToInclude, keysToExclude, includeLoader, entryStream, intermediateOperations);
    }

    private void checkPartitionStatus() {
        if (this.isPartitionDegraded()) {
            throw log.partitionDegraded();
        }
    }

    private boolean isPartitionDegraded() {
        return this.listener.currentMode != AvailabilityMode.AVAILABLE && this.partitionHandling == PartitionHandling.DENY_READ_WRITES;
    }

    @Listener
    private class PartitionListener {
        volatile AvailabilityMode currentMode = AvailabilityMode.AVAILABLE;

        private PartitionListener() {
        }

        @PartitionStatusChanged
        public void onPartitionChange(PartitionStatusChangedEvent<K, ?> event) {
            if (!event.isPre()) {
                this.currentMode = event.getAvailabilityMode();
                if (PartitionAwareClusterStreamManager.this.isPartitionDegraded()) {
                    PartitionAwareClusterStreamManager.this.currentlyRunning.values().forEach(t -> ClusterStreamManagerImpl.markTrackerWithException(t, null, (Throwable)((Object)new AvailabilityException()), null));
                }
            }
        }
    }
}

