/*
 * Decompiled with CFR 0.152.
 */
package org.infinispan.util;

import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;
import net.jcip.annotations.GuardedBy;
import org.infinispan.Cache;
import org.infinispan.commands.ReplicableCommand;
import org.infinispan.commands.remote.SingleRpcCommand;
import org.infinispan.commons.TimeoutException;
import org.infinispan.commons.test.Exceptions;
import org.infinispan.factories.annotations.ComponentName;
import org.infinispan.factories.annotations.Inject;
import org.infinispan.factories.annotations.Stop;
import org.infinispan.factories.scopes.Scope;
import org.infinispan.factories.scopes.Scopes;
import org.infinispan.remoting.responses.CacheNotFoundResponse;
import org.infinispan.remoting.responses.ExceptionResponse;
import org.infinispan.remoting.responses.Response;
import org.infinispan.remoting.rpc.RpcManager;
import org.infinispan.remoting.rpc.RpcOptions;
import org.infinispan.remoting.transport.Address;
import org.infinispan.remoting.transport.ResponseCollector;
import org.infinispan.test.TestException;
import org.infinispan.test.TestingUtil;
import org.infinispan.util.AbstractDelegatingRpcManager;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;
import org.testng.AssertJUnit;

@Scope(value=Scopes.NAMED_CACHE)
public class ControlledRpcManager
extends AbstractDelegatingRpcManager {
    private static final Log log = LogFactory.getLog(ControlledRpcManager.class);
    private static final int TIMEOUT_SECONDS = 10;
    @Inject
    Cache<?, ?> cache;
    @Inject
    @ComponentName(value="org.infinispan.executors.timeout")
    ScheduledExecutorService timeoutExecutor;
    @Inject
    @ComponentName(value="org.infinispan.executors.non-blocking")
    ExecutorService nonBlockingExecutor;
    private volatile boolean stopped;
    private final Set<Class<? extends ReplicableCommand>> excludedCommands = ConcurrentHashMap.newKeySet();
    private final BlockingQueue<CompletableFuture<ControlledRequest<?>>> waiters = new LinkedBlockingDeque();
    private RuntimeException globalError;

    protected ControlledRpcManager(RpcManager realOne, Cache<?, ?> cache) {
        super(realOne);
        this.cache = cache;
    }

    @SafeVarargs
    public static ControlledRpcManager replaceRpcManager(Cache<?, ?> cache, Class<? extends ReplicableCommand> ... excludedCommand) {
        RpcManager rpcManager = TestingUtil.extractComponent(cache, RpcManager.class);
        if (rpcManager instanceof ControlledRpcManager) {
            throw new IllegalStateException("One ControlledRpcManager per cache should be enough");
        }
        ControlledRpcManager controlledRpcManager = new ControlledRpcManager(rpcManager, cache);
        controlledRpcManager.excludeCommands(excludedCommand);
        log.tracef("Installing ControlledRpcManager on %s", (Object)controlledRpcManager.getAddress());
        TestingUtil.replaceComponent(cache, RpcManager.class, controlledRpcManager, true);
        return controlledRpcManager;
    }

    public void revertRpcManager() {
        this.stopBlocking();
        log.tracef("Restoring regular RpcManager on %s", (Object)this.getAddress());
        RpcManager rpcManager = TestingUtil.extractComponent(this.cache, RpcManager.class);
        AssertJUnit.assertSame((Object)this, (Object)rpcManager);
        TestingUtil.replaceComponent(this.cache, RpcManager.class, this.realOne, true);
    }

    @SafeVarargs
    public final void excludeCommands(Class<? extends ReplicableCommand> ... excluded) {
        if (this.stopped) {
            throw new IllegalStateException("Trying to exclude commands but we already stopped intercepting");
        }
        this.excludedCommands.clear();
        this.excludedCommands.addAll(Arrays.asList(excluded));
    }

    public final void addExcludedCommand(Class<? extends ReplicableCommand> excluded) {
        if (this.stopped) {
            throw new IllegalStateException("Trying to exclude commands but we already stopped intercepting");
        }
        this.excludedCommands.add(excluded);
    }

    public void stopBlocking() {
        log.debugf("Stopping intercepting RPC calls on %s", (Object)this.realOne.getAddress());
        this.stopped = true;
        this.throwGlobalError();
        if (!this.waiters.isEmpty()) {
            AssertJUnit.fail((String)("Stopped intercepting RPCs on " + String.valueOf(this.realOne.getAddress()) + ", but there are " + this.waiters.size() + " waiters in the queue"));
        }
    }

    public <T extends ReplicableCommand> BlockedRequest<T> expectCommand(Class<T> expectedCommandClass) {
        return ControlledRpcManager.uncheckedGet(this.expectCommandAsync(expectedCommandClass));
    }

    public <T extends ReplicableCommand> BlockedRequest<T> expectCommand(Class<T> expectedCommandClass, Consumer<T> checker) {
        BlockedRequest<T> blockedRequest = ControlledRpcManager.uncheckedGet(this.expectCommandAsync(expectedCommandClass));
        ReplicableCommand command = (ReplicableCommand)expectedCommandClass.cast(blockedRequest.request.getCommand());
        checker.accept(command);
        return blockedRequest;
    }

    public <T extends ReplicableCommand> BlockedRequests<T> expectCommands(Class<T> expectedCommandClass, Address ... targets) {
        return this.expectCommands(expectedCommandClass, Arrays.asList(targets));
    }

    public <T extends ReplicableCommand> BlockedRequests<T> expectCommands(Class<T> expectedCommandClass, Collection<Address> targets) {
        HashMap requests = new HashMap(targets.size());
        for (int i = 0; i < targets.size(); ++i) {
            BlockedRequest<T> request = this.expectCommand(expectedCommandClass);
            requests.put(request.getTarget(), request);
        }
        AssertJUnit.assertEquals(new HashSet<Address>(targets), requests.keySet());
        return new BlockedRequests(requests);
    }

    public <T extends ReplicableCommand> CompletableFuture<BlockedRequest<T>> expectCommandAsync(Class<T> expectedCommandClass) {
        this.throwGlobalError();
        log.tracef("Waiting for command %s", expectedCommandClass);
        CompletableFuture future = new CompletableFuture();
        this.waiters.add(future);
        return future.thenApply(request -> {
            log.tracef("Blocked command %s", (Object)request.command);
            log.tracef("Expected command %s", (Object)expectedCommandClass);
            BlockedRequest req = new BlockedRequest((ControlledRequest<?>)request);
            if (!expectedCommandClass.isInstance(request.getCommand())) {
                req.fail(new IllegalStateException("Expected " + String.valueOf(expectedCommandClass) + " but got " + String.valueOf(request.getCommand())));
            }
            return new BlockedRequest((ControlledRequest<?>)request);
        });
    }

    public void expectNoCommand() {
        this.throwGlobalError();
        AssertJUnit.assertNull((String)"There should be no queued commands", this.waiters.poll());
    }

    public void expectNoCommand(long timeout, TimeUnit timeUnit) throws InterruptedException {
        this.throwGlobalError();
        AssertJUnit.assertNull((String)"There should be no queued commands", this.waiters.poll(timeout, timeUnit));
    }

    @Override
    protected <T> CompletionStage<T> performRequest(Collection<Address> targets, ReplicableCommand command, ResponseCollector<T> collector, Function<ResponseCollector<T>, CompletionStage<T>> invoker, RpcOptions rpcOptions) {
        if (this.stopped || this.isCommandExcluded(command)) {
            log.tracef("Not blocking excluded command %s", (Object)command);
            return invoker.apply(collector);
        }
        log.debugf("Intercepted command to %s: %s (excluded=%s)", targets, (Object)command, this.excludedCommands);
        if (command instanceof SingleRpcCommand) {
            command = ((SingleRpcCommand)command).getCommand();
        }
        Address excluded = this.realOne.getAddress();
        ControlledRequest controlledRequest = new ControlledRequest(command, targets, collector, invoker, this.nonBlockingExecutor, excluded);
        try {
            CompletableFuture<ControlledRequest<?>> waiter = this.waiters.poll(10L, TimeUnit.SECONDS);
            if (waiter == null) {
                TimeoutException t2 = new TimeoutException("Found no waiters for command " + String.valueOf(command));
                this.addGlobalError((RuntimeException)((Object)t2));
                throw t2;
            }
            waiter.complete(controlledRequest);
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new TestException(e);
        }
        catch (Exception e) {
            throw new TestException(e);
        }
        if (collector != null) {
            ScheduledFuture<?> cancelTask = this.timeoutExecutor.schedule(() -> {
                TimeoutException e = new TimeoutException("Timed out waiting for test to unblock command " + String.valueOf(controlledRequest.getCommand()));
                this.addGlobalError((RuntimeException)((Object)e));
                controlledRequest.fail(e);
            }, 20L, TimeUnit.SECONDS);
            controlledRequest.resultFuture.whenComplete((ignored, throwable) -> cancelTask.cancel(false));
        }
        return controlledRequest.resultFuture.whenCompleteAsync((r, t) -> {}, (Executor)this.nonBlockingExecutor);
    }

    private void addGlobalError(RuntimeException t) {
        if (this.globalError == null) {
            this.globalError = t;
        } else {
            this.globalError.addSuppressed(t);
        }
    }

    @Override
    protected <T> void performSend(Collection<Address> targets, ReplicableCommand command, Function<ResponseCollector<T>, CompletionStage<T>> invoker) {
        this.performRequest(targets, command, null, invoker, null);
    }

    @Stop
    void stop() {
        this.stopBlocking();
        TestingUtil.stopComponent(this.realOne);
    }

    private boolean isCommandExcluded(ReplicableCommand command) {
        return this.excludedCommands.stream().anyMatch(aClass -> aClass.isInstance(command));
    }

    private void throwGlobalError() {
        if (this.globalError != null) {
            throw this.globalError;
        }
    }

    private static <T> T uncheckedGet(CompletionStage<T> stage) {
        try {
            return stage.toCompletableFuture().get(10L, TimeUnit.SECONDS);
        }
        catch (Exception e) {
            throw new TestException(e);
        }
    }

    public static class BlockedRequest<C extends ReplicableCommand> {
        private final ControlledRequest<?> request;

        public BlockedRequest(ControlledRequest<?> request) {
            this.request = request;
        }

        public SentRequest send() {
            log.tracef("Sending command %s", (Object)this.request.getCommand());
            this.request.send();
            if (this.request.hasCollector()) {
                return new SentRequest(this.request);
            }
            return null;
        }

        public FakeResponses skipSend() {
            log.tracef("Not sending request %s", (Object)this.request.getCommand());
            this.request.skipSend();
            if (this.request.hasCollector()) {
                return new FakeResponses(this.request);
            }
            return null;
        }

        public void fail() {
            this.fail(new TestException("Induced failure!"));
        }

        public void fail(Exception e) {
            this.request.fail(e);
        }

        public C getCommand() {
            return (C)this.request.getCommand();
        }

        public Collection<Address> getTargets() {
            return this.request.getTargets();
        }

        public Address getTarget() {
            Collection<Address> targets = this.request.getTargets();
            AssertJUnit.assertEquals((int)1, (int)targets.size());
            return targets.iterator().next();
        }
    }

    static class ControlledRequest<T> {
        private final ReplicableCommand command;
        private final Collection<Address> targets;
        private final Function<ResponseCollector<T>, CompletionStage<T>> invoker;
        private final ExecutorService executor;
        private final CompletableFuture<T> resultFuture = new CompletableFuture();
        private final LinkedHashMap<Address, CompletableFuture<Response>> responseFutures = new LinkedHashMap();
        private final CompletableFuture<Map<Address, Response>> finishFuture = new CompletableFuture();
        private final CompletableFuture<Void> sendFuture = new CompletableFuture();
        private final Lock collectLock = new ReentrantLock();
        @GuardedBy(value="collectLock")
        private final ResponseCollector<T> collector;
        @GuardedBy(value="collectLock")
        private final Set<Address> collectedResponses = new HashSet<Address>();
        @GuardedBy(value="collectLock")
        private boolean collectedFinish;

        ControlledRequest(ReplicableCommand command, Collection<Address> targets, ResponseCollector<T> collector, Function<ResponseCollector<T>, CompletionStage<T>> invoker, ExecutorService executor, Address excluded) {
            this.command = command;
            this.targets = targets;
            this.collector = collector;
            this.invoker = invoker;
            this.executor = executor;
            for (Address target : targets) {
                if (target.equals((Object)excluded)) continue;
                this.responseFutures.put(target, new CompletableFuture());
            }
        }

        void send() {
            this.invoker.apply(new ResponseCollector<T>(){

                public T addResponse(Address sender, Response response) {
                    this.queueResponse(sender, response);
                    return null;
                }

                public T finish() {
                    this.queueFinish();
                    return null;
                }
            });
            this.sendFuture.complete(null);
        }

        void skipSend() {
            this.sendFuture.complete(null);
            for (CompletableFuture<Response> responseFuture : this.responseFutures.values()) {
                responseFuture.complete(null);
            }
        }

        void awaitSend() {
            ControlledRpcManager.uncheckedGet(this.sendFuture);
        }

        private void queueResponse(Address sender, Response response) {
            log.tracef("Queueing response from %s for command %s", (Object)sender, (Object)this.command);
            CompletableFuture<Response> responseFuture = this.responseFutures.get(sender);
            boolean completedNow = responseFuture.complete(response);
            if (!completedNow) {
                this.fail(new IllegalStateException("Duplicate response received from " + String.valueOf(sender) + ": " + String.valueOf(response)));
            }
        }

        private void queueFinish() {
            log.tracef("Queueing finish for command %s", (Object)this.command);
            LinkedHashMap<Address, Response> responseMap = new LinkedHashMap<Address, Response>();
            for (Map.Entry<Address, CompletableFuture<Response>> entry : this.responseFutures.entrySet()) {
                Address sender = entry.getKey();
                CompletableFuture<Response> responseFuture = entry.getValue();
                if (responseFuture.isDone()) {
                    responseMap.put(sender, ControlledRpcManager.uncheckedGet(responseFuture));
                    continue;
                }
                responseFuture.complete(null);
            }
            boolean completedNow = this.finishFuture.complete(responseMap);
            if (!completedNow) {
                this.fail(new IllegalStateException("Finish queued more than once"));
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        void collectResponse(Address sender, Response response) {
            try {
                Object result;
                this.collectLock.lock();
                try {
                    this.throwIfFailed();
                    AssertJUnit.assertTrue((boolean)this.collectedResponses.add(sender));
                    result = this.collector.addResponse(sender, response);
                    if (result != null) {
                        this.collectedFinish = true;
                    }
                }
                finally {
                    this.collectLock.unlock();
                }
                if (result != null) {
                    this.resultFuture.complete(result);
                }
            }
            catch (Throwable t) {
                this.resultFuture.completeExceptionally(t);
            }
        }

        void collectFinish() {
            try {
                Object result;
                this.collectLock.lock();
                try {
                    this.throwIfFailed();
                    AssertJUnit.assertFalse((boolean)this.collectedFinish);
                    this.collectedFinish = true;
                    result = this.collector.finish();
                }
                finally {
                    this.collectLock.unlock();
                }
                this.resultFuture.complete(result);
            }
            catch (Throwable t) {
                this.resultFuture.completeExceptionally(t);
            }
        }

        void skipFinish() {
            this.collectLock.lock();
            try {
                AssertJUnit.assertFalse((boolean)this.collectedFinish);
            }
            finally {
                this.collectLock.unlock();
            }
            AssertJUnit.assertTrue((boolean)this.resultFuture.isDone());
        }

        void fail(Throwable t) {
            log.tracef("Failing execution of %s with %s", (Object)this.command, (Object)t);
            this.resultFuture.completeExceptionally(t);
            this.sendFuture.completeExceptionally(t);
        }

        void throwIfFailed() {
            if (this.resultFuture.isCompletedExceptionally()) {
                this.resultFuture.join();
            }
        }

        boolean isDone() {
            return this.resultFuture.isDone();
        }

        ReplicableCommand getCommand() {
            return this.command;
        }

        Collection<Address> getTargets() {
            return this.targets;
        }

        boolean hasCollector() {
            return this.collector != null;
        }

        CompletableFuture<Response> responseFuture(Address sender) {
            return this.responseFutures.get(sender);
        }

        CompletableFuture<Map<Address, Response>> finishFuture() {
            return this.finishFuture;
        }
    }

    public static class BlockedRequests<T extends ReplicableCommand> {
        private final Map<Address, BlockedRequest<T>> requests;

        public BlockedRequests(Map<Address, BlockedRequest<T>> requests) {
            this.requests = requests;
        }

        public SentRequest send(Address target) {
            return this.requests.get(target).send();
        }

        public FakeResponses skipSend(Address target) {
            return this.requests.get(target).skipSend();
        }

        public void skipSendAndReceive(Address target, Response fakeResponse) {
            this.requests.get(target).skipSend().receive(target, fakeResponse);
        }

        public void skipSendAndReceiveAsync(Address target, Response fakeResponse) {
            this.requests.get(target).skipSend().receiveAsync(target, fakeResponse);
        }
    }

    public static class FakeResponses {
        private final ControlledRequest<?> request;

        public FakeResponses(ControlledRequest<?> request) {
            this.request = request;
        }

        public void receive(Map<Address, Response> responses) {
            log.tracef("Faking responses for %s: %s", (Object)this.request.getCommand(), responses);
            responses.forEach((sender, response) -> {
                AssertJUnit.assertTrue((boolean)responses.containsKey(sender));
                this.request.collectResponse((Address)sender, (Response)response);
            });
            if (!this.request.isDone()) {
                AssertJUnit.assertEquals(responses.keySet(), this.request.responseFutures.keySet());
                this.request.collectFinish();
            }
        }

        public void receive(Address sender, Response response) {
            this.receive(Collections.singletonMap(sender, response));
        }

        public void receive(Address sender1, Response response1, Address sender2, Response response2) {
            LinkedHashMap<Address, Response> responses = new LinkedHashMap<Address, Response>();
            responses.put(sender1, response1);
            responses.put(sender2, response2);
            this.receive(responses);
        }

        public void receive(Address sender1, Response response1, Address sender2, Response response2, Address sender3, Response response3) {
            LinkedHashMap<Address, Response> responses = new LinkedHashMap<Address, Response>();
            responses.put(sender1, response1);
            responses.put(sender2, response2);
            responses.put(sender3, response3);
            this.receive(responses);
        }

        public CompletionStage<Void> receiveAsync(Map<Address, Response> responses) {
            return CompletableFuture.runAsync(() -> this.receive(responses), this.request.executor);
        }

        public CompletionStage<Void> receiveAsync(Address sender, Response response) {
            return CompletableFuture.runAsync(() -> this.receive(sender, response), this.request.executor);
        }

        public CompletionStage<Void> receiveAsync(Address sender1, Response response1, Address sender2, Response response2) {
            return CompletableFuture.runAsync(() -> this.receive(sender1, response1, sender2, response2), this.request.executor);
        }

        public void forceTimeout() {
            this.fail(log.requestTimedOut(-1L, "Induced failure", "some time"));
        }

        private void fail(Throwable e) {
            AssertJUnit.assertFalse((boolean)this.request.resultFuture.isDone());
            this.request.fail(e);
        }

        public Collection<Address> getTargets() {
            return this.request.getTargets();
        }

        public Address getTarget() {
            Collection<Address> targets = this.request.getTargets();
            AssertJUnit.assertEquals((int)1, (int)targets.size());
            return targets.iterator().next();
        }
    }

    public static class BlockedResponseMap {
        private final ControlledRequest<?> request;
        private final Map<Address, Response> responseMap;

        private BlockedResponseMap(ControlledRequest<?> request, Map<Address, Response> responseMap) {
            this.request = request;
            this.responseMap = responseMap;
        }

        public void receive() {
            AssertJUnit.assertFalse((boolean)this.request.resultFuture.isDone());
            log.tracef("Unblocking responses for %s: %s", (Object)this.request.getCommand(), this.responseMap);
            this.responseMap.forEach(this.request::collectResponse);
            if (!this.request.isDone()) {
                ControlledRpcManager.uncheckedGet(this.request.finishFuture());
                this.request.collectFinish();
            }
        }

        public void replace(Map<Address, Response> newResponses) {
            AssertJUnit.assertFalse((boolean)this.request.resultFuture.isDone());
            log.tracef("Replacing responses for %s: %s (was %s)", (Object)this.request.getCommand(), newResponses, this.responseMap);
            newResponses.forEach(this.request::collectResponse);
            if (!this.request.isDone()) {
                ControlledRpcManager.uncheckedGet(this.request.finishFuture());
                this.request.collectFinish();
            }
        }

        public CompletionStage<Void> receiveAsync() {
            return CompletableFuture.runAsync(this::receive, this.request.executor);
        }

        public CompletionStage<Void> replaceAsync(Map<Address, Response> newResponses) {
            return CompletableFuture.runAsync(() -> this.replace(newResponses), this.request.executor);
        }

        public Map<Address, Response> getResponses() {
            return this.responseMap;
        }
    }

    public static class BlockedResponse {
        private final ControlledRequest<?> request;
        final SentRequest sentRequest;
        final Address sender;
        final Response response;

        private BlockedResponse(ControlledRequest<?> request, SentRequest sentRequest, Address sender, Response response) {
            this.request = request;
            this.sentRequest = sentRequest;
            this.sender = sender;
            this.response = response;
        }

        public SentRequest receive() {
            log.tracef("Unblocking response from %s: %s", (Object)this.sender, (Object)this.response);
            this.request.collectResponse(this.sender, this.response);
            return this.sentRequest;
        }

        public SentRequest replace(Response newResponse) {
            log.tracef("Replacing response from %s: %s (was %s)", (Object)this.sender, (Object)newResponse, (Object)this.response);
            this.request.collectResponse(this.sender, newResponse);
            return this.sentRequest;
        }

        public CompletionStage<SentRequest> receiveAsync() {
            return CompletableFuture.supplyAsync(this::receive, this.request.executor);
        }

        public CompletionStage<SentRequest> replaceAsync(Response newResponse) {
            return CompletableFuture.supplyAsync(() -> this.replace(newResponse), this.request.executor);
        }

        public Address getSender() {
            return this.sender;
        }

        public Response getResponse() {
            return this.response;
        }
    }

    public static class SentRequest {
        private final ControlledRequest<?> request;

        SentRequest(ControlledRequest<?> request) {
            this.request = request;
        }

        public void forceTimeout() {
            AssertJUnit.assertFalse((boolean)this.request.isDone());
            this.request.fail(log.requestTimedOut(-1L, "Induced timeout failure", "some time"));
        }

        public BlockedResponse expectResponse(Address sender, Consumer<Response> checker) {
            BlockedResponse br = ControlledRpcManager.uncheckedGet(this.expectResponseAsync(sender));
            checker.accept(br.response);
            return br;
        }

        public BlockedResponse expectResponse(Address sender) {
            return ControlledRpcManager.uncheckedGet(this.expectResponseAsync(sender));
        }

        public BlockedResponse expectResponse(Address sender, Response expectedResponse) {
            return this.expectResponse(sender, (Response r) -> AssertJUnit.assertEquals((Object)expectedResponse, (Object)r));
        }

        public BlockedResponse expectLeaver(Address a) {
            return this.expectResponse(a, (Response)CacheNotFoundResponse.INSTANCE);
        }

        public BlockedResponse expectException(Address a, Class<? extends Exception> expectedException) {
            return this.expectResponse(a, (Response r) -> {
                Exception exception = ((ExceptionResponse)r).getException();
                Exceptions.assertException((Class)expectedException, (Throwable)exception);
            });
        }

        public BlockedResponseMap expectAllResponses() {
            return ControlledRpcManager.uncheckedGet(this.expectAllResponsesAsync());
        }

        public BlockedResponseMap expectAllResponses(BiConsumer<Address, Response> checker) {
            BlockedResponseMap blockedResponseMap = ControlledRpcManager.uncheckedGet(this.expectAllResponsesAsync());
            blockedResponseMap.responseMap.forEach(checker);
            return blockedResponseMap;
        }

        public void receiveAll() {
            this.expectAllResponses().receive();
        }

        public void receiveAllAsync() {
            this.expectAllResponsesAsync().thenAccept(BlockedResponseMap::receive);
        }

        public void finish() {
            ControlledRpcManager.uncheckedGet(this.request.finishFuture());
            this.request.collectFinish();
        }

        public void noFinish() {
            this.request.skipFinish();
        }

        public CompletionStage<BlockedResponse> expectResponseAsync(Address sender) {
            this.request.throwIfFailed();
            AssertJUnit.assertFalse((boolean)this.request.isDone());
            return this.request.responseFuture(sender).thenApply(response -> {
                log.debugf("Got response for %s from %s: %s", (Object)this.request.getCommand(), (Object)sender, response);
                return new BlockedResponse(this.request, this, sender, (Response)response);
            });
        }

        public CompletionStage<BlockedResponseMap> expectAllResponsesAsync() {
            this.request.throwIfFailed();
            AssertJUnit.assertFalse((boolean)this.request.isDone());
            return this.request.finishFuture().thenApply(responseMap -> new BlockedResponseMap(this.request, (Map<Address, Response>)responseMap));
        }
    }
}

