/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.asyncprocessing;

import java.util.ArrayDeque;
import java.util.Deque;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.flink.api.common.operators.MailboxExecutor;
import org.apache.flink.runtime.asyncprocessing.BatchCallbackRunner;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.function.ThrowingRunnable;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

public class BatchCallbackRunnerTest {
    private static final ThrowingRunnable<? extends Exception> DUMMY = () -> {};

    @Test
    void testSingleSubmit() {
        ManualMailboxExecutor executor = new ManualMailboxExecutor();
        AtomicBoolean notified = new AtomicBoolean(false);
        BatchCallbackRunner runner = new BatchCallbackRunner((MailboxExecutor)executor, () -> notified.set(true));
        runner.submit(DUMMY);
        Assertions.assertThat((boolean)runner.isHasMail()).isTrue();
        Assertions.assertThat((boolean)notified.get()).isTrue();
        executor.runOne();
        Assertions.assertThat((boolean)runner.isHasMail()).isFalse();
    }

    @Test
    void testHugeBatch() {
        ManualMailboxExecutor executor = new ManualMailboxExecutor();
        AtomicBoolean notified = new AtomicBoolean(false);
        BatchCallbackRunner runner = new BatchCallbackRunner((MailboxExecutor)executor, () -> notified.set(true));
        for (int i = 0; i < 3001; ++i) {
            runner.submit(DUMMY);
        }
        Assertions.assertThat((boolean)runner.isHasMail()).isTrue();
        Assertions.assertThat((boolean)notified.get()).isTrue();
        executor.runOne();
        Assertions.assertThat((boolean)runner.isHasMail()).isTrue();
        notified.set(false);
        runner.submit(DUMMY);
        Assertions.assertThat((boolean)notified.get()).isFalse();
        executor.runOne();
        Assertions.assertThat((boolean)runner.isHasMail()).isFalse();
        runner.submit(DUMMY);
        Assertions.assertThat((boolean)runner.isHasMail()).isTrue();
        Assertions.assertThat((boolean)notified.get()).isTrue();
    }

    public static class ManualMailboxExecutor
    implements MailboxExecutor {
        Deque<ThrowingRunnable<? extends Exception>> commands = new ArrayDeque<ThrowingRunnable<? extends Exception>>();

        public void execute(MailboxExecutor.MailOptions mailOptions, ThrowingRunnable<? extends Exception> command, String descriptionFormat, Object ... descriptionArgs) {
            this.commands.push(command);
        }

        public void runOne() {
            ThrowingRunnable<? extends Exception> command = this.commands.pop();
            if (command != null) {
                try {
                    command.run();
                }
                catch (Exception e) {
                    throw new FlinkRuntimeException("Cannot execute mail", (Throwable)e);
                }
            }
        }

        public void yield() throws FlinkRuntimeException {
        }

        public boolean tryYield() throws FlinkRuntimeException {
            return false;
        }

        public boolean shouldInterrupt() {
            return false;
        }
    }
}

