/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.util.concurrent;

import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;
import java.util.function.Supplier;
import org.apache.flink.core.testutils.FlinkAssertions;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.testutils.TestingUtils;
import org.apache.flink.testutils.executor.TestExecutorExtension;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.concurrent.ExponentialBackoffRetryStrategy;
import org.apache.flink.util.concurrent.FixedRetryStrategy;
import org.apache.flink.util.concurrent.FutureUtils;
import org.apache.flink.util.concurrent.ManuallyTriggeredScheduledExecutor;
import org.apache.flink.util.concurrent.RetryStrategy;
import org.apache.flink.util.concurrent.ScheduledExecutor;
import org.apache.flink.util.concurrent.ScheduledExecutorServiceAdapter;
import org.assertj.core.api.AbstractLongAssert;
import org.assertj.core.api.AbstractObjectAssert;
import org.assertj.core.api.AbstractThrowableAssert;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.ListAssert;
import org.assertj.core.api.ObjectArrayAssert;
import org.assertj.core.api.ObjectAssert;
import org.assertj.core.api.ThrowingConsumer;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

class FutureUtilsTest {
    @RegisterExtension
    static final TestExecutorExtension<ScheduledExecutorService> EXECUTOR_RESOURCE = TestingUtils.defaultExecutorExtension();

    FutureUtilsTest() {
    }

    @Test
    void testRetrySuccess() {
        int retries = 10;
        AtomicInteger atomicInteger = new AtomicInteger(0);
        CompletableFuture retryFuture = FutureUtils.retry(() -> CompletableFuture.supplyAsync(() -> {
            if (atomicInteger.incrementAndGet() == 10) {
                return true;
            }
            throw new CompletionException((Throwable)new FlinkException("Test exception"));
        }, EXECUTOR_RESOURCE.getExecutor()), (int)10, (Executor)EXECUTOR_RESOURCE.getExecutor());
        FlinkAssertions.assertThatFuture((CompletableFuture)retryFuture).eventuallySucceeds().isEqualTo((Object)true);
        Assertions.assertThat((AtomicInteger)atomicInteger).hasValue(10);
    }

    @Test
    void testRetryFailureFixedRetries() {
        int retries = 3;
        CompletableFuture retryFuture = FutureUtils.retry(() -> FutureUtils.completedExceptionally((Throwable)new FlinkException("Test exception")), (int)3, (Executor)EXECUTOR_RESOURCE.getExecutor());
        FlinkAssertions.assertThatFuture((CompletableFuture)retryFuture).eventuallyFailsWith(ExecutionException.class).withCauseInstanceOf(FutureUtils.RetryException.class);
    }

    @Test
    void testRetryCancellation() throws InterruptedException {
        int retries = 10;
        AtomicInteger atomicInteger = new AtomicInteger(0);
        OneShotLatch notificationLatch = new OneShotLatch();
        OneShotLatch waitLatch = new OneShotLatch();
        AtomicReference<Object> atomicThrowable = new AtomicReference<Object>(null);
        CompletableFuture retryFuture = FutureUtils.retry(() -> CompletableFuture.supplyAsync(() -> {
            if (atomicInteger.incrementAndGet() == 2) {
                notificationLatch.trigger();
                try {
                    waitLatch.await();
                }
                catch (InterruptedException e) {
                    atomicThrowable.compareAndSet(null, e);
                }
            }
            throw new CompletionException((Throwable)new FlinkException("Test exception"));
        }, EXECUTOR_RESOURCE.getExecutor()), (int)10, (Executor)EXECUTOR_RESOURCE.getExecutor());
        notificationLatch.await();
        Assertions.assertThat((CompletableFuture)retryFuture).isNotDone();
        retryFuture.cancel(false);
        waitLatch.trigger();
        Assertions.assertThat((CompletableFuture)retryFuture).isCancelled();
        Assertions.assertThat((AtomicInteger)atomicInteger).hasValue(2);
        Assertions.assertThat((Throwable)atomicThrowable.get()).isNull();
    }

    @Test
    void testStopAtNonRetryableException() {
        int retries = 10;
        int notRetry = 3;
        AtomicInteger atomicInteger = new AtomicInteger(0);
        FlinkRuntimeException nonRetryableException = new FlinkRuntimeException("Non-retryable exception");
        CompletableFuture retryFuture = FutureUtils.retry(() -> CompletableFuture.supplyAsync(() -> {
            if (atomicInteger.incrementAndGet() == 3) {
                throw new CompletionException((Throwable)nonRetryableException);
            }
            throw new CompletionException((Throwable)new FlinkException("Test exception"));
        }, EXECUTOR_RESOURCE.getExecutor()), (int)10, throwable -> ExceptionUtils.findThrowable((Throwable)throwable, FlinkException.class).isPresent(), (Executor)EXECUTOR_RESOURCE.getExecutor());
        ((ObjectAssert)((ListAssert)FlinkAssertions.assertThatFuture((CompletableFuture)retryFuture).eventuallyFailsWith(ExecutionException.class).extracting(FlinkAssertions::chainOfCauses, FlinkAssertions.STREAM_THROWABLE)).last()).isEqualTo((Object)nonRetryableException);
        Assertions.assertThat((AtomicInteger)atomicInteger).hasValue(3);
    }

    @Test
    void testRetryWithDelayRetryStrategyFailure() {
        CompletableFuture retryFuture = FutureUtils.retryWithDelay(() -> FutureUtils.completedExceptionally((Throwable)new FlinkException("Test exception")), (RetryStrategy)new FixedRetryStrategy(3, Duration.ofMillis(1L)), (ScheduledExecutor)new ScheduledExecutorServiceAdapter((ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor()));
        FlinkAssertions.assertThatFuture((CompletableFuture)retryFuture).eventuallyFailsWith(ExecutionException.class).withCauseInstanceOf(FutureUtils.RetryException.class);
    }

    @Test
    void testRetryWithDelayRetryStrategy() {
        int retries = 4;
        AtomicInteger countDown = new AtomicInteger(4);
        long start = System.currentTimeMillis();
        CompletableFuture retryFuture = FutureUtils.retryWithDelay(() -> {
            if (countDown.getAndDecrement() == 0) {
                return CompletableFuture.completedFuture(true);
            }
            return FutureUtils.completedExceptionally((Throwable)new FlinkException("Test exception."));
        }, (RetryStrategy)new ExponentialBackoffRetryStrategy(4, Duration.ofMillis(2L), Duration.ofMillis(5L)), (ScheduledExecutor)new ScheduledExecutorServiceAdapter((ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor()));
        FlinkAssertions.assertThatFuture((CompletableFuture)retryFuture).eventuallySucceeds().isEqualTo((Object)true);
        long completionTime = System.currentTimeMillis() - start;
        ((AbstractLongAssert)Assertions.assertThat((long)completionTime).as("The completion time should be at least retries times delay between retries.", new Object[0])).isGreaterThanOrEqualTo(16L);
    }

    @Test
    void testRetryWithDelayRetryStrategyCancellation() {
        ManuallyTriggeredScheduledExecutor scheduledExecutor = new ManuallyTriggeredScheduledExecutor();
        CompletableFuture retryFuture = FutureUtils.retryWithDelay(() -> FutureUtils.completedExceptionally((Throwable)new FlinkException("Test exception")), (RetryStrategy)new FixedRetryStrategy(1, TestingUtils.infiniteDuration()), (ScheduledExecutor)scheduledExecutor);
        Assertions.assertThat((CompletableFuture)retryFuture).isNotDone();
        Collection<ScheduledFuture<?>> scheduledTasks = scheduledExecutor.getActiveScheduledTasks();
        Assertions.assertThat(scheduledTasks).isNotEmpty();
        ScheduledFuture<?> scheduledFuture = scheduledTasks.iterator().next();
        Assertions.assertThat((boolean)scheduledFuture.isDone()).isFalse();
        retryFuture.cancel(false);
        Assertions.assertThat((CompletableFuture)retryFuture).isCancelled();
        Assertions.assertThat((boolean)scheduledFuture.isCancelled()).isTrue();
    }

    @Test
    void testOrTimeout() {
        CompletableFuture future = new CompletableFuture();
        long timeout = 10L;
        String expectedErrorMessage = "testOrTimeout";
        FutureUtils.orTimeout(future, (long)10L, (TimeUnit)TimeUnit.MILLISECONDS, (String)"testOrTimeout");
        FlinkAssertions.assertThatFuture(future).eventuallyFailsWith(ExecutionException.class).withCauseInstanceOf(TimeoutException.class).withMessageContaining("testOrTimeout");
    }

    @Test
    void testRetryWithDelayRetryStrategyAndPredicate() {
        ScheduledExecutorService retryExecutor = (ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor();
        String retryableExceptionMessage = "first exception";
        String expectedErrorMessage = "should propagate";
        class TestStringSupplier
        implements Supplier<CompletableFuture<String>> {
            private final AtomicInteger counter = new AtomicInteger();

            TestStringSupplier() {
            }

            @Override
            public CompletableFuture<String> get() {
                if (this.counter.getAndIncrement() == 0) {
                    return FutureUtils.completedExceptionally((Throwable)new RuntimeException("first exception"));
                }
                return FutureUtils.completedExceptionally((Throwable)new RuntimeException("should propagate"));
            }
        }
        CompletableFuture resultFuture = FutureUtils.retryWithDelay((Supplier)new TestStringSupplier(), (RetryStrategy)new FixedRetryStrategy(1, Duration.ZERO), throwable -> throwable instanceof RuntimeException && throwable.getMessage().contains("first exception"), (ScheduledExecutor)new ScheduledExecutorServiceAdapter(retryExecutor));
        FlinkAssertions.assertThatFuture((CompletableFuture)resultFuture).eventuallyFailsWith(ExecutionException.class).withMessageContaining("should propagate");
    }

    @Test
    void testRunAfterwards() {
        CompletableFuture<Object> inputFuture = new CompletableFuture<Object>();
        OneShotLatch runnableLatch = new OneShotLatch();
        CompletableFuture runFuture = FutureUtils.runAfterwards(inputFuture, () -> ((OneShotLatch)runnableLatch).trigger());
        Assertions.assertThat((boolean)runnableLatch.isTriggered()).isFalse();
        Assertions.assertThat((CompletableFuture)runFuture).isNotDone();
        inputFuture.complete(null);
        Assertions.assertThat((boolean)runnableLatch.isTriggered()).isTrue();
        FlinkAssertions.assertThatFuture((CompletableFuture)runFuture).eventuallySucceeds();
    }

    @Test
    void testRunAfterwardsExceptional() {
        CompletableFuture inputFuture = new CompletableFuture();
        OneShotLatch runnableLatch = new OneShotLatch();
        FlinkException testException = new FlinkException("Test exception");
        CompletableFuture runFuture = FutureUtils.runAfterwards(inputFuture, () -> ((OneShotLatch)runnableLatch).trigger());
        Assertions.assertThat((boolean)runnableLatch.isTriggered()).isFalse();
        Assertions.assertThat((CompletableFuture)runFuture).isNotDone();
        inputFuture.completeExceptionally((Throwable)testException);
        Assertions.assertThat((boolean)runnableLatch.isTriggered()).isTrue();
        Assertions.assertThat((CompletableFuture)runFuture).isDone();
        FlinkAssertions.assertThatFuture((CompletableFuture)runFuture).eventuallyFailsWith(ExecutionException.class).withCause((Throwable)testException);
    }

    @Test
    void testComposeAfterwards() {
        CompletableFuture<Object> inputFuture = new CompletableFuture<Object>();
        OneShotLatch composeLatch = new OneShotLatch();
        CompletableFuture composeFuture = FutureUtils.composeAfterwards(inputFuture, () -> {
            composeLatch.trigger();
            return CompletableFuture.completedFuture(null);
        });
        Assertions.assertThat((boolean)composeLatch.isTriggered()).isFalse();
        Assertions.assertThat((CompletableFuture)composeFuture).isNotDone();
        inputFuture.complete(null);
        Assertions.assertThat((boolean)composeLatch.isTriggered()).isTrue();
        FlinkAssertions.assertThatFuture((CompletableFuture)composeFuture).eventuallySucceeds();
    }

    @Test
    void testComposeAfterwardsFirstExceptional() {
        CompletableFuture inputFuture = new CompletableFuture();
        OneShotLatch composeLatch = new OneShotLatch();
        FlinkException testException = new FlinkException("Test exception");
        CompletableFuture composeFuture = FutureUtils.composeAfterwards(inputFuture, () -> {
            composeLatch.trigger();
            return CompletableFuture.completedFuture(null);
        });
        Assertions.assertThat((boolean)composeLatch.isTriggered()).isFalse();
        Assertions.assertThat((CompletableFuture)composeFuture).isNotDone();
        inputFuture.completeExceptionally((Throwable)testException);
        Assertions.assertThat((boolean)composeLatch.isTriggered()).isTrue();
        Assertions.assertThat((CompletableFuture)composeFuture).isDone();
        FlinkAssertions.assertThatFuture((CompletableFuture)composeFuture).eventuallyFailsWith(ExecutionException.class).withCause((Throwable)testException);
    }

    @Test
    void testComposeAfterwardsSecondExceptional() {
        CompletableFuture<Object> inputFuture = new CompletableFuture<Object>();
        OneShotLatch composeLatch = new OneShotLatch();
        FlinkException testException = new FlinkException("Test exception");
        CompletableFuture composeFuture = FutureUtils.composeAfterwards(inputFuture, () -> {
            composeLatch.trigger();
            return FutureUtils.completedExceptionally((Throwable)testException);
        });
        Assertions.assertThat((boolean)composeLatch.isTriggered()).isFalse();
        Assertions.assertThat((CompletableFuture)composeFuture).isNotDone();
        inputFuture.complete(null);
        Assertions.assertThat((boolean)composeLatch.isTriggered()).isTrue();
        Assertions.assertThat((CompletableFuture)composeFuture).isDone();
        FlinkAssertions.assertThatFuture((CompletableFuture)composeFuture).eventuallyFailsWith(ExecutionException.class).withCause((Throwable)testException);
    }

    @Test
    void testComposeAfterwardsBothExceptional() {
        CompletableFuture inputFuture = new CompletableFuture();
        FlinkException testException1 = new FlinkException("Test exception1");
        FlinkException testException2 = new FlinkException("Test exception2");
        OneShotLatch composeLatch = new OneShotLatch();
        CompletableFuture composeFuture = FutureUtils.composeAfterwards(inputFuture, () -> {
            composeLatch.trigger();
            return FutureUtils.completedExceptionally((Throwable)testException2);
        });
        Assertions.assertThat((boolean)composeLatch.isTriggered()).isFalse();
        Assertions.assertThat((CompletableFuture)composeFuture).isNotDone();
        inputFuture.completeExceptionally((Throwable)testException1);
        Assertions.assertThat((boolean)composeLatch.isTriggered()).isTrue();
        Assertions.assertThat((CompletableFuture)composeFuture).isDone();
        ((AbstractObjectAssert)FlinkAssertions.assertThatFuture((CompletableFuture)composeFuture).eventuallyFailsWith(ExecutionException.class).extracting(Throwable::getCause).isEqualTo((Object)testException1)).satisfies(new ThrowingConsumer[]{cause -> {
            ObjectArrayAssert cfr_ignored_0 = (ObjectArrayAssert)Assertions.assertThat((Object[])cause.getSuppressed()).containsExactly((Object[])new Throwable[]{testException2});
        }});
    }

    @Test
    void testCompleteAll() {
        CompletableFuture<String> inputFuture1 = new CompletableFuture<String>();
        CompletableFuture<Integer> inputFuture2 = new CompletableFuture<Integer>();
        List<CompletableFuture> futuresToComplete = Arrays.asList(inputFuture1, inputFuture2);
        FutureUtils.ConjunctFuture completeFuture = FutureUtils.completeAll(futuresToComplete);
        Assertions.assertThat((CompletableFuture)completeFuture).isNotDone();
        Assertions.assertThat((int)completeFuture.getNumFuturesCompleted()).isZero();
        Assertions.assertThat((int)completeFuture.getNumFuturesTotal()).isEqualTo(futuresToComplete.size());
        inputFuture2.complete(42);
        Assertions.assertThat((CompletableFuture)completeFuture).isNotDone();
        Assertions.assertThat((int)completeFuture.getNumFuturesCompleted()).isOne();
        inputFuture1.complete("foobar");
        Assertions.assertThat((CompletableFuture)completeFuture).isDone();
        Assertions.assertThat((int)completeFuture.getNumFuturesCompleted()).isEqualTo(2);
        FlinkAssertions.assertThatFuture((CompletableFuture)completeFuture).eventuallySucceeds();
    }

    @Test
    void testCompleteAllPartialExceptional() {
        CompletableFuture<String> inputFuture1 = new CompletableFuture<String>();
        CompletableFuture inputFuture2 = new CompletableFuture();
        List<CompletableFuture> futuresToComplete = Arrays.asList(inputFuture1, inputFuture2);
        FutureUtils.ConjunctFuture completeFuture = FutureUtils.completeAll(futuresToComplete);
        Assertions.assertThat((CompletableFuture)completeFuture).isNotDone();
        Assertions.assertThat((int)completeFuture.getNumFuturesCompleted()).isZero();
        Assertions.assertThat((int)completeFuture.getNumFuturesTotal()).isEqualTo(futuresToComplete.size());
        FlinkException testException1 = new FlinkException("Test exception 1");
        inputFuture2.completeExceptionally((Throwable)testException1);
        Assertions.assertThat((CompletableFuture)completeFuture).isNotDone();
        Assertions.assertThat((int)completeFuture.getNumFuturesCompleted()).isOne();
        inputFuture1.complete("foobar");
        Assertions.assertThat((CompletableFuture)completeFuture).isDone();
        Assertions.assertThat((int)completeFuture.getNumFuturesCompleted()).isEqualTo(2);
        FlinkAssertions.assertThatFuture((CompletableFuture)completeFuture).eventuallyFailsWith(ExecutionException.class).withCause((Throwable)testException1);
    }

    @Test
    void testCompleteAllExceptional() {
        CompletableFuture inputFuture1 = new CompletableFuture();
        CompletableFuture inputFuture2 = new CompletableFuture();
        List<CompletableFuture> futuresToComplete = Arrays.asList(inputFuture1, inputFuture2);
        FutureUtils.ConjunctFuture completeFuture = FutureUtils.completeAll(futuresToComplete);
        Assertions.assertThat((CompletableFuture)completeFuture).isNotDone();
        Assertions.assertThat((int)completeFuture.getNumFuturesCompleted()).isZero();
        Assertions.assertThat((int)completeFuture.getNumFuturesTotal()).isEqualTo(futuresToComplete.size());
        FlinkException testException1 = new FlinkException("Test exception 1");
        inputFuture1.completeExceptionally((Throwable)testException1);
        Assertions.assertThat((CompletableFuture)completeFuture).isNotDone();
        Assertions.assertThat((int)completeFuture.getNumFuturesCompleted()).isOne();
        FlinkException testException2 = new FlinkException("Test exception 2");
        inputFuture2.completeExceptionally((Throwable)testException2);
        Assertions.assertThat((int)completeFuture.getNumFuturesCompleted()).isEqualTo(2);
        FlinkAssertions.assertThatFuture((CompletableFuture)completeFuture).eventuallyFailsWith(ExecutionException.class).withCauseInstanceOf(FlinkException.class).extracting(Throwable::getCause).satisfies(new ThrowingConsumer[]{e -> {
            Object[] actualSuppressedExceptions = e.getSuppressed();
            FlinkException expectedSuppressedException = e.equals(testException1) ? testException2 : testException1;
            Assertions.assertThat((Object[])actualSuppressedExceptions).containsExactly((Object[])new Throwable[]{expectedSuppressedException});
        }});
    }

    @Test
    void testSupplyAsyncFailure() {
        String exceptionMessage = "Test exception";
        FlinkException testException = new FlinkException("Test exception");
        CompletableFuture future = FutureUtils.supplyAsync(() -> {
            throw testException;
        }, (Executor)EXECUTOR_RESOURCE.getExecutor());
        FlinkAssertions.assertThatFuture((CompletableFuture)future).eventuallyFailsWith(ExecutionException.class).withCause((Throwable)testException);
    }

    @Test
    void testSupplyAsync() {
        Object expectedResult = new Object();
        CompletableFuture future = FutureUtils.supplyAsync(() -> expectedResult, (Executor)EXECUTOR_RESOURCE.getExecutor());
        FlinkAssertions.assertThatFuture((CompletableFuture)future).eventuallySucceeds().isEqualTo(expectedResult);
    }

    @Test
    void testHandleAsyncIfNotDone() {
        this.testFutureContinuation((future, executor) -> FutureUtils.handleAsyncIfNotDone((CompletableFuture)future, (Executor)executor, (o, t) -> null));
    }

    @Test
    void testApplyAsyncIfNotDone() {
        this.testFutureContinuation((future, executor) -> FutureUtils.thenApplyAsyncIfNotDone((CompletableFuture)future, (Executor)executor, o -> null));
    }

    @Test
    void testComposeAsyncIfNotDone() {
        this.testFutureContinuation((future, executor) -> FutureUtils.thenComposeAsyncIfNotDone((CompletableFuture)future, (Executor)executor, o -> null));
    }

    @Test
    void testWhenCompleteAsyncIfNotDone() {
        this.testFutureContinuation((future, executor) -> FutureUtils.whenCompleteAsyncIfNotDone((CompletableFuture)future, (Executor)executor, (o, throwable) -> {}));
    }

    @Test
    void testThenAcceptAsyncIfNotDone() {
        this.testFutureContinuation((future, executor) -> FutureUtils.thenAcceptAsyncIfNotDone((CompletableFuture)future, (Executor)executor, o -> {}));
    }

    private void testFutureContinuation(BiFunction<CompletableFuture<?>, Executor, CompletableFuture<?>> testFunctionGenerator) {
        CompletableFuture<Object> startFuture = new CompletableFuture<Object>();
        AtomicBoolean runWithExecutor = new AtomicBoolean(false);
        Executor executor = r -> {
            r.run();
            runWithExecutor.set(true);
        };
        CompletableFuture<?> continuationFuture = testFunctionGenerator.apply(startFuture, executor);
        Assertions.assertThat(continuationFuture).isNotDone();
        startFuture.complete(null);
        Assertions.assertThat((AtomicBoolean)runWithExecutor).isTrue();
        Assertions.assertThat(continuationFuture).isDone();
        runWithExecutor.set(false);
        continuationFuture = testFunctionGenerator.apply(startFuture, executor);
        Assertions.assertThat((AtomicBoolean)runWithExecutor).isFalse();
        Assertions.assertThat(continuationFuture).isDone();
    }

    @Test
    void testHandleExceptionWithCompletedFuture() {
        CompletableFuture<String> future = CompletableFuture.completedFuture("foobar");
        CompletableFuture handled = FutureUtils.handleException(future, Exception.class, exception -> "handled");
        FlinkAssertions.assertThatFuture((CompletableFuture)handled).eventuallySucceeds().isEqualTo((Object)"foobar");
    }

    @Test
    void testHandleExceptionWithNormalCompletion() {
        CompletableFuture<String> future = new CompletableFuture<String>();
        CompletableFuture handled = FutureUtils.handleException(future, Exception.class, exception -> "handled");
        future.complete("foobar");
        Assertions.assertThat((CompletableFuture)handled).isCompletedWithValue((Object)"foobar");
    }

    @Test
    void testHandleExceptionWithMatchingExceptionallyCompletedFuture() {
        CompletableFuture future = new CompletableFuture();
        CompletableFuture handled = FutureUtils.handleException(future, UnsupportedOperationException.class, exception -> "handled");
        future.completeExceptionally(new UnsupportedOperationException("foobar"));
        Assertions.assertThat((CompletableFuture)handled).isCompletedWithValue((Object)"handled");
    }

    @Test
    void testHandleExceptionWithNotMatchingExceptionallyCompletedFuture() {
        CompletableFuture future = new CompletableFuture();
        CompletableFuture handled = FutureUtils.handleException(future, UnsupportedOperationException.class, exception -> "handled");
        IllegalArgumentException futureException = new IllegalArgumentException("foobar");
        future.completeExceptionally(futureException);
        FlinkAssertions.assertThatFuture((CompletableFuture)handled).eventuallyFailsWith(ExecutionException.class).withCause((Throwable)futureException);
    }

    @Test
    void testHandleExceptionWithThrowingExceptionHandler() {
        CompletableFuture future = new CompletableFuture();
        IllegalStateException handlerException = new IllegalStateException("something went terribly wrong");
        CompletableFuture handled = FutureUtils.handleException(future, UnsupportedOperationException.class, exception -> {
            throw handlerException;
        });
        future.completeExceptionally(new UnsupportedOperationException("foobar"));
        FlinkAssertions.assertThatFuture((CompletableFuture)handled).eventuallyFailsWith(ExecutionException.class).withCause((Throwable)handlerException);
    }

    @Test
    void testHandleUncaughtExceptionWithCompletedFuture() {
        CompletableFuture<String> future = CompletableFuture.completedFuture("foobar");
        TestingUncaughtExceptionHandler uncaughtExceptionHandler = new TestingUncaughtExceptionHandler();
        FutureUtils.handleUncaughtException(future, (Thread.UncaughtExceptionHandler)uncaughtExceptionHandler);
        Assertions.assertThat((boolean)uncaughtExceptionHandler.hasBeenCalled()).isFalse();
    }

    @Test
    void testHandleUncaughtExceptionWithNormalCompletion() {
        CompletableFuture<String> future = new CompletableFuture<String>();
        TestingUncaughtExceptionHandler uncaughtExceptionHandler = new TestingUncaughtExceptionHandler();
        FutureUtils.handleUncaughtException(future, (Thread.UncaughtExceptionHandler)uncaughtExceptionHandler);
        future.complete("barfoo");
        Assertions.assertThat((boolean)uncaughtExceptionHandler.hasBeenCalled()).isFalse();
    }

    @Test
    void testHandleUncaughtExceptionWithExceptionallyCompletedFuture() {
        CompletableFuture future = FutureUtils.completedExceptionally((Throwable)new FlinkException("foobar"));
        TestingUncaughtExceptionHandler uncaughtExceptionHandler = new TestingUncaughtExceptionHandler();
        FutureUtils.handleUncaughtException((CompletableFuture)future, (Thread.UncaughtExceptionHandler)uncaughtExceptionHandler);
        Assertions.assertThat((boolean)uncaughtExceptionHandler.hasBeenCalled()).isTrue();
    }

    @Test
    void testHandleUncaughtExceptionWithExceptionallyCompletion() {
        CompletableFuture future = new CompletableFuture();
        TestingUncaughtExceptionHandler uncaughtExceptionHandler = new TestingUncaughtExceptionHandler();
        FutureUtils.handleUncaughtException(future, (Thread.UncaughtExceptionHandler)uncaughtExceptionHandler);
        Assertions.assertThat((boolean)uncaughtExceptionHandler.hasBeenCalled()).isFalse();
        future.completeExceptionally((Throwable)new FlinkException("barfoo"));
        Assertions.assertThat((boolean)uncaughtExceptionHandler.hasBeenCalled()).isTrue();
    }

    @Test
    void testHandleUncaughtExceptionWithBuggyErrorHandlingCode() {
        Exception actualProductionCodeError = new Exception("Actual production code error that should be caught by the error handler.");
        RuntimeException errorHandlingException = new RuntimeException("Expected test error in error handling code.");
        Thread.UncaughtExceptionHandler buggyActualExceptionHandler = (thread, ignoredActualException) -> {
            throw errorHandlingException;
        };
        AtomicReference caughtErrorHandlingException = new AtomicReference();
        Thread.UncaughtExceptionHandler fallbackExceptionHandler = (thread, errorHandlingEx) -> caughtErrorHandlingException.set(errorHandlingEx);
        FutureUtils.handleUncaughtException((CompletableFuture)FutureUtils.completedExceptionally((Throwable)actualProductionCodeError), (Thread.UncaughtExceptionHandler)buggyActualExceptionHandler, (Thread.UncaughtExceptionHandler)fallbackExceptionHandler);
        Assertions.assertThat(caughtErrorHandlingException).hasValueSatisfying(actualError -> {
            AbstractThrowableAssert cfr_ignored_0 = (AbstractThrowableAssert)((AbstractThrowableAssert)Assertions.assertThat((Throwable)actualError).isInstanceOf(IllegalStateException.class)).hasRootCause((Throwable)errorHandlingException).satisfies(new ThrowingConsumer[]{cause -> {
                ObjectArrayAssert cfr_ignored_0 = (ObjectArrayAssert)Assertions.assertThat((Object[])cause.getSuppressed()).containsExactly((Object[])new Throwable[]{actualProductionCodeError});
            }});
        });
    }

    @Test
    void testForwardNormal() throws Exception {
        CompletableFuture<String> source = new CompletableFuture<String>();
        CompletableFuture target = new CompletableFuture();
        FutureUtils.forward(source, target);
        Assertions.assertThat(source).isNotDone();
        Assertions.assertThat(target).isNotDone();
        source.complete("foobar");
        Assertions.assertThat(source).isDone();
        Assertions.assertThat(target).isDone();
        Assertions.assertThat((String)((String)source.get())).isEqualTo((String)target.get());
    }

    @Test
    void testForwardExceptionally() {
        CompletableFuture source = new CompletableFuture();
        CompletableFuture target = new CompletableFuture();
        FutureUtils.forward(source, target);
        Assertions.assertThat(source).isNotDone();
        Assertions.assertThat(target).isNotDone();
        FlinkException expectedCause = new FlinkException("Expected exception");
        source.completeExceptionally((Throwable)expectedCause);
        Assertions.assertThat(source).isDone();
        Assertions.assertThat(target).isDone();
        FlinkAssertions.assertThatFuture(source).eventuallyFailsWith(ExecutionException.class).extracting(Throwable::getCause).isEqualTo((Object)expectedCause);
        FlinkAssertions.assertThatFuture(target).eventuallyFailsWith(ExecutionException.class).extracting(Throwable::getCause).isEqualTo((Object)expectedCause);
    }

    @Test
    void testForwardAsync() {
        CompletableFuture<String> source = new CompletableFuture<String>();
        CompletableFuture target = new CompletableFuture();
        ManuallyTriggeredScheduledExecutor executor = new ManuallyTriggeredScheduledExecutor();
        FutureUtils.forwardAsync(source, target, (Executor)((Object)executor));
        String expectedValue = "foobar";
        source.complete("foobar");
        Assertions.assertThat(target).isNotDone();
        executor.triggerAll();
        FlinkAssertions.assertThatFuture(target).eventuallySucceeds().isEqualTo((Object)"foobar");
    }

    @Test
    void testGetWithoutException() {
        boolean expectedValue = true;
        CompletableFuture<Integer> completableFuture = new CompletableFuture<Integer>();
        completableFuture.complete(1);
        Assertions.assertThat((Integer)((Integer)FutureUtils.getWithoutException(completableFuture))).isEqualTo(1);
    }

    @Test
    void testGetWithoutExceptionWithAnException() {
        CompletableFuture completableFuture = new CompletableFuture();
        completableFuture.completeExceptionally(new RuntimeException("expected"));
        Assertions.assertThat((Integer)((Integer)FutureUtils.getWithoutException(completableFuture))).isNull();
    }

    @Test
    void testGetWithoutExceptionWithoutFinishing() {
        CompletableFuture completableFuture = new CompletableFuture();
        Assertions.assertThat((Integer)((Integer)FutureUtils.getWithoutException(completableFuture))).isNull();
    }

    @Test
    void testSwitchExecutorForNormallyCompletedFuture() {
        CompletableFuture<String> source = new CompletableFuture<String>();
        ExecutorService singleThreadExecutor = EXECUTOR_RESOURCE.getExecutor();
        CompletableFuture resultFuture = FutureUtils.switchExecutor(source, (Executor)singleThreadExecutor);
        String expectedThreadName = (String)FutureUtils.supplyAsync(() -> Thread.currentThread().getName(), (Executor)singleThreadExecutor).join();
        String expectedValue = "foobar";
        CompletionStage assertionFuture = resultFuture.handle((s, throwable) -> {
            Assertions.assertThat((String)s).isEqualTo("foobar");
            Assertions.assertThat((String)Thread.currentThread().getName()).isEqualTo(expectedThreadName);
            return null;
        });
        source.complete("foobar");
        ((CompletableFuture)assertionFuture).join();
    }

    @Test
    void testSwitchExecutorForExceptionallyCompletedFuture() {
        CompletableFuture source = new CompletableFuture();
        ExecutorService singleThreadExecutor = EXECUTOR_RESOURCE.getExecutor();
        CompletableFuture resultFuture = FutureUtils.switchExecutor(source, (Executor)singleThreadExecutor);
        String expectedThreadName = (String)FutureUtils.supplyAsync(() -> Thread.currentThread().getName(), (Executor)singleThreadExecutor).join();
        Exception expectedException = new Exception("foobar");
        CompletionStage assertionFuture = resultFuture.handle((s, throwable) -> {
            ((AbstractThrowableAssert)Assertions.assertThat((Throwable)throwable).isInstanceOf(CompletionException.class)).extracting(Throwable::getCause).isEqualTo((Object)expectedException);
            Assertions.assertThat((String)Thread.currentThread().getName()).isEqualTo(expectedThreadName);
            return null;
        });
        source.completeExceptionally(expectedException);
        ((CompletableFuture)assertionFuture).join();
    }

    private static class TestingUncaughtExceptionHandler
    implements Thread.UncaughtExceptionHandler {
        private Throwable exception = null;

        private TestingUncaughtExceptionHandler() {
        }

        @Override
        public void uncaughtException(Thread t, Throwable e) {
            this.exception = e;
        }

        private boolean hasBeenCalled() {
            return this.exception != null;
        }
    }
}

