/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.core.state;

import java.util.ArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.flink.api.common.state.v2.StateFuture;
import org.apache.flink.core.asyncprocessing.AsyncFutureImpl;
import org.apache.flink.core.asyncprocessing.InternalAsyncFuture;
import org.apache.flink.core.state.StateFutureUtils;
import org.apache.flink.util.concurrent.ExecutorThreadFactory;
import org.apache.flink.util.function.ThrowingRunnable;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

class StateFutureTest {
    static AsyncFutureImpl.AsyncFrameworkExceptionHandler exceptionHandler = (message, exception) -> {
        throw new RuntimeException(message, exception);
    };

    StateFutureTest() {
    }

    @Test
    void basicSyncComplete() {
        int i;
        TestCallbackRunner runner = new TestCallbackRunner(null);
        AtomicInteger counter = new AtomicInteger(0);
        AsyncFutureImpl stateFuture1 = new AsyncFutureImpl((AsyncFutureImpl.CallbackRunner)runner, exceptionHandler);
        stateFuture1.thenAccept(counter::addAndGet);
        Assertions.assertThat((AtomicInteger)counter).hasValue(0);
        stateFuture1.complete((Object)5);
        Assertions.assertThat((AtomicInteger)counter).hasValue(5);
        AsyncFutureImpl stateFuture2 = new AsyncFutureImpl((AsyncFutureImpl.CallbackRunner)runner, exceptionHandler);
        InternalAsyncFuture stateFuture3 = stateFuture2.thenApply(v -> String.valueOf(counter.addAndGet((int)v)));
        Assertions.assertThat((AtomicInteger)counter).hasValue(5);
        stateFuture2.complete((Object)3);
        Assertions.assertThat((AtomicInteger)counter).hasValue(8);
        stateFuture3.thenAccept(v -> counter.addAndGet(-Integer.parseInt(v)));
        Assertions.assertThat((AtomicInteger)counter).hasValue(0);
        AsyncFutureImpl stateFuture4 = new AsyncFutureImpl((AsyncFutureImpl.CallbackRunner)runner, exceptionHandler);
        AsyncFutureImpl stateFuture5 = new AsyncFutureImpl((AsyncFutureImpl.CallbackRunner)runner, exceptionHandler);
        stateFuture4.thenCompose(v -> {
            counter.addAndGet((int)v);
            return stateFuture5;
        }).thenAccept(counter::addAndGet);
        Assertions.assertThat((AtomicInteger)counter).hasValue(0);
        stateFuture4.complete((Object)6);
        Assertions.assertThat((AtomicInteger)counter).hasValue(6);
        stateFuture5.complete((Object)3);
        Assertions.assertThat((AtomicInteger)counter).hasValue(9);
        AsyncFutureImpl stateFuture6 = new AsyncFutureImpl((AsyncFutureImpl.CallbackRunner)runner, exceptionHandler);
        AsyncFutureImpl stateFuture7 = new AsyncFutureImpl((AsyncFutureImpl.CallbackRunner)runner, exceptionHandler);
        stateFuture6.thenCombine((StateFuture)stateFuture7, (v1, v2) -> {
            counter.addAndGet(v1 - v2);
            return StateFutureUtils.completedVoidFuture();
        });
        Assertions.assertThat((AtomicInteger)counter).hasValue(9);
        stateFuture6.complete((Object)4);
        Assertions.assertThat((AtomicInteger)counter).hasValue(9);
        stateFuture7.complete((Object)13);
        Assertions.assertThat((AtomicInteger)counter).hasValue(0);
        StateFutureUtils.completedFuture((Object)3).thenAccept(counter::addAndGet);
        Assertions.assertThat((AtomicInteger)counter).hasValue(3);
        counter.set(0);
        ArrayList<AsyncFutureImpl> futures = new ArrayList<AsyncFutureImpl>();
        for (i = 0; i < 5; ++i) {
            futures.add(new AsyncFutureImpl((AsyncFutureImpl.CallbackRunner)runner, exceptionHandler));
        }
        StateFutureUtils.combineAll(futures).thenAccept(c -> {
            int sum = 0;
            for (Integer v : c) {
                sum *= 10;
                sum += v.intValue();
            }
            counter.addAndGet(sum);
        });
        Assertions.assertThat((AtomicInteger)counter).hasValue(0);
        for (i = 0; i < 5; ++i) {
            ((AsyncFutureImpl)futures.get(i)).complete((Object)(i + 1));
            if (i == 4) continue;
            Assertions.assertThat((AtomicInteger)counter).hasValue(0);
        }
        Assertions.assertThat((AtomicInteger)counter).hasValue(12345);
    }

    @Test
    void testRunOnCorrectThread() throws Exception {
        AtomicInteger threadIdProvider = new AtomicInteger(0);
        ThreadLocal<Integer> threadId = ThreadLocal.withInitial(threadIdProvider::getAndIncrement);
        AtomicReference exception = new AtomicReference();
        CountDownLatch latch = new CountDownLatch(1);
        Assertions.assertThat((Integer)threadId.get()).isZero();
        ExecutorService executor = Executors.newSingleThreadExecutor((ThreadFactory)new ExecutorThreadFactory(this.getClass().getSimpleName()));
        executor.execute(() -> {
            try {
                Assertions.assertThat((Integer)((Integer)threadId.get())).isOne();
            }
            catch (Throwable e) {
                exception.set(e);
            }
            finally {
                latch.countDown();
            }
        });
        latch.await(20L, TimeUnit.SECONDS);
        Assertions.assertThat((long)latch.getCount()).isZero();
        Assertions.assertThat(exception).hasValue(null);
        MockValueState valueState = new MockValueState(executor);
        Runnable threadChecker = () -> {
            try {
                Assertions.assertThat((Integer)((Integer)threadId.get())).isOne();
            }
            catch (Throwable e) {
                exception.set(e);
            }
        };
        CountDownLatch latch2 = new CountDownLatch(2);
        ArrayList list = new ArrayList();
        executor.execute(() -> {
            ArrayList<StateFuture<Integer>> futures = new ArrayList<StateFuture<Integer>>();
            for (int i = 0; i < 5; ++i) {
                futures.add(valueState.get());
            }
            StateFutureUtils.combineAll(futures).thenCombine(valueState.get(), (c, v) -> {
                list.addAll(c);
                list.add(v);
                threadChecker.run();
                return 0;
            }).thenCompose(v -> {
                threadChecker.run();
                return valueState.get();
            }).thenApply(v -> {
                list.add(v);
                threadChecker.run();
                return 0;
            }).thenAccept(v -> {
                threadChecker.run();
                latch2.countDown();
            });
            latch2.countDown();
        });
        latch2.await(20L, TimeUnit.SECONDS);
        Assertions.assertThat((long)latch2.getCount()).isZero();
        Assertions.assertThat((Throwable)((Throwable)exception.get())).isNull();
        Assertions.assertThat(list).hasSize(7);
    }

    @Test
    void testConditionally() {
        TestCallbackRunner runner = new TestCallbackRunner(null);
        AtomicInteger counter = new AtomicInteger(0);
        AsyncFutureImpl stateFuture1 = new AsyncFutureImpl((AsyncFutureImpl.CallbackRunner)runner, exceptionHandler);
        stateFuture1.thenConditionallyAccept(e -> e > 0, counter::addAndGet, v -> counter.addAndGet(-v.intValue())).thenConditionallyAccept(e -> e == false, v -> counter.incrementAndGet(), v -> counter.decrementAndGet());
        Assertions.assertThat((AtomicInteger)counter).hasValue(0);
        stateFuture1.complete((Object)-5);
        Assertions.assertThat((AtomicInteger)counter).hasValue(6);
        AsyncFutureImpl stateFuture2 = new AsyncFutureImpl((AsyncFutureImpl.CallbackRunner)runner, exceptionHandler);
        stateFuture2.thenConditionallyApply(v -> v > 0, v -> String.valueOf(counter.addAndGet((int)v)), v -> String.valueOf(counter.addAndGet(-v.intValue()))).thenConditionallyApply(e -> (Boolean)e.f0 == false, e -> counter.addAndGet(Integer.parseInt((String)e.f1) * 2), e -> counter.addAndGet(Integer.parseInt((String)e.f1) * 3));
        Assertions.assertThat((AtomicInteger)counter).hasValue(6);
        stateFuture2.complete((Object)-3);
        Assertions.assertThat((AtomicInteger)counter).hasValue(27);
        AsyncFutureImpl stateFuture3 = new AsyncFutureImpl((AsyncFutureImpl.CallbackRunner)runner, exceptionHandler);
        AsyncFutureImpl stateFuture4 = new AsyncFutureImpl((AsyncFutureImpl.CallbackRunner)runner, exceptionHandler);
        AsyncFutureImpl stateFuture5 = new AsyncFutureImpl((AsyncFutureImpl.CallbackRunner)runner, exceptionHandler);
        stateFuture3.thenConditionallyCompose(v -> v > 0, v -> {
            counter.addAndGet((int)v);
            return stateFuture4;
        }, v -> {
            counter.addAndGet(-v.intValue());
            return stateFuture5;
        }).thenConditionallyCompose(t -> (Boolean)t.f0 == false, t -> {
            counter.addAndGet((Integer)t.f1 * 2);
            return StateFutureUtils.completedVoidFuture();
        }, t -> {
            counter.addAndGet((Integer)t.f1 * 3);
            return StateFutureUtils.completedVoidFuture();
        });
        Assertions.assertThat((AtomicInteger)counter).hasValue(27);
        counter.set(0);
        stateFuture3.complete((Object)3);
        Assertions.assertThat((AtomicInteger)counter).hasValue(3);
        stateFuture5.complete((Object)5);
        Assertions.assertThat((AtomicInteger)counter).hasValue(3);
        stateFuture4.complete((Object)4);
        Assertions.assertThat((AtomicInteger)counter).hasValue(15);
    }

    private static class TestCallbackRunner
    implements AsyncFutureImpl.CallbackRunner {
        private final ExecutorService stateExecutor;

        TestCallbackRunner(ExecutorService stateExecutor) {
            this.stateExecutor = stateExecutor;
        }

        public void submit(ThrowingRunnable task) {
            if (this.stateExecutor == null) {
                ThrowingRunnable.unchecked((ThrowingRunnable)task).run();
            } else {
                this.stateExecutor.submit(() -> ThrowingRunnable.unchecked((ThrowingRunnable)task).run());
            }
        }
    }

    private static class MockValueState {
        AtomicInteger value = new AtomicInteger(0);
        ExecutorService stateExecutor = Executors.newFixedThreadPool(3);
        AsyncFutureImpl.CallbackRunner runner;

        MockValueState(ExecutorService executor) {
            this.runner = new TestCallbackRunner(executor);
        }

        StateFuture<Integer> get() {
            AsyncFutureImpl ret = new AsyncFutureImpl(this.runner, exceptionHandler);
            this.stateExecutor.submit(() -> {
                int a = ThreadLocalRandom.current().nextInt();
                if (a > 0) {
                    try {
                        Thread.sleep(a % 1000);
                    }
                    catch (Throwable throwable) {
                        // empty catch block
                    }
                }
                ret.complete((Object)this.value.getAndIncrement());
            });
            return ret;
        }
    }
}

