/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.asyncprocessing;

import java.io.IOException;
import java.util.HashMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
import org.apache.flink.api.common.operators.MailboxExecutor;
import org.apache.flink.api.common.state.v2.State;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.core.state.StateFutureImpl;
import org.apache.flink.core.state.StateFutureUtils;
import org.apache.flink.runtime.asyncprocessing.AsyncExecutionController;
import org.apache.flink.runtime.asyncprocessing.EpochManager;
import org.apache.flink.runtime.asyncprocessing.MockStateRequestContainer;
import org.apache.flink.runtime.asyncprocessing.RecordContext;
import org.apache.flink.runtime.asyncprocessing.StateExecutor;
import org.apache.flink.runtime.asyncprocessing.StateRequest;
import org.apache.flink.runtime.asyncprocessing.StateRequestContainer;
import org.apache.flink.runtime.asyncprocessing.StateRequestHandler;
import org.apache.flink.runtime.asyncprocessing.StateRequestType;
import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutorService;
import org.apache.flink.runtime.mailbox.SyncMailboxExecutor;
import org.apache.flink.runtime.state.AsyncKeyedStateBackend;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.StateBackendTestUtils;
import org.apache.flink.runtime.state.v2.InternalValueState;
import org.apache.flink.runtime.state.v2.StateDescriptor;
import org.apache.flink.runtime.state.v2.ValueStateDescriptor;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.function.ThrowingRunnable;
import org.assertj.core.api.AssertionsForClassTypes;
import org.junit.jupiter.api.Test;

class AsyncExecutionControllerTest {
    AsyncExecutionController aec;
    AtomicInteger output;
    TestValueState valueState;
    final Runnable userCode = () -> this.valueState.asyncValue().thenCompose(val -> {
        int updated = val == null ? 1 : val + 1;
        return this.valueState.asyncUpdate(updated).thenCompose(o -> StateFutureUtils.completedFuture((Object)updated));
    }).thenAccept(val -> this.output.set((int)val));

    AsyncExecutionControllerTest() {
    }

    void setup(int batchSize, long timeout, int maxInFlight, MailboxExecutor mailboxExecutor, StateFutureImpl.AsyncFrameworkExceptionHandler exceptionHandler, CloseableRegistry closeableRegistry) throws IOException {
        AsyncKeyedStateBackend asyncKeyedStateBackend;
        TestStateExecutor stateExecutor = new TestStateExecutor();
        ValueStateDescriptor stateDescriptor = new ValueStateDescriptor("test-value-state", (TypeInformation)BasicTypeInfo.INT_TYPE_INFO);
        Supplier<State> stateSupplier = () -> new TestValueState((StateRequestHandler)this.aec, new TestUnderlyingState(), (ValueStateDescriptor<Integer>)stateDescriptor);
        StateBackend testAsyncStateBackend = StateBackendTestUtils.buildAsyncStateBackend(stateSupplier, stateExecutor);
        AssertionsForClassTypes.assertThat((boolean)testAsyncStateBackend.supportsAsyncKeyedStateBackend()).isTrue();
        try {
            asyncKeyedStateBackend = testAsyncStateBackend.createAsyncKeyedStateBackend(null);
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
        closeableRegistry.registerCloseable((AutoCloseable)asyncKeyedStateBackend);
        closeableRegistry.registerCloseable(() -> ((AsyncKeyedStateBackend)asyncKeyedStateBackend).dispose());
        this.aec = new AsyncExecutionController(mailboxExecutor, exceptionHandler, (StateExecutor)stateExecutor, 128, batchSize, timeout, maxInFlight);
        asyncKeyedStateBackend.setup((StateRequestHandler)this.aec);
        try {
            this.valueState = (TestValueState)asyncKeyedStateBackend.createState((StateDescriptor)stateDescriptor);
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
        this.output = new AtomicInteger();
    }

    @Test
    void testBasicRun() throws IOException {
        CloseableRegistry resourceRegistry = new CloseableRegistry();
        this.setup(100, 10000L, 1000, new SyncMailboxExecutor(), new TestAsyncFrameworkExceptionHandler(), resourceRegistry);
        String record1 = "key1-r1";
        String key1 = "key1";
        RecordContext recordContext1 = this.aec.buildContext((Object)record1, (Object)key1);
        this.aec.setCurrentContext(recordContext1);
        this.userCode.run();
        AssertionsForClassTypes.assertThat((int)this.aec.stateRequestsBuffer.activeQueueSize()).isEqualTo(1);
        AssertionsForClassTypes.assertThat((int)this.aec.keyAccountingUnit.occupiedCount()).isEqualTo(1);
        AssertionsForClassTypes.assertThat((int)this.aec.inFlightRecordNum.get()).isEqualTo(1);
        this.aec.triggerIfNeeded(true);
        AssertionsForClassTypes.assertThat((int)this.aec.stateRequestsBuffer.activeQueueSize()).isEqualTo(1);
        AssertionsForClassTypes.assertThat((int)this.aec.keyAccountingUnit.occupiedCount()).isEqualTo(1);
        this.aec.triggerIfNeeded(true);
        AssertionsForClassTypes.assertThat((int)this.aec.stateRequestsBuffer.activeQueueSize()).isEqualTo(0);
        AssertionsForClassTypes.assertThat((int)this.aec.keyAccountingUnit.occupiedCount()).isEqualTo(0);
        AssertionsForClassTypes.assertThat((int)this.output.get()).isEqualTo(1);
        AssertionsForClassTypes.assertThat((int)recordContext1.getReferenceCount()).isEqualTo(0);
        AssertionsForClassTypes.assertThat((int)this.aec.inFlightRecordNum.get()).isEqualTo(0);
        String record2 = "key1-r2";
        String key2 = "key1";
        RecordContext recordContext2 = this.aec.buildContext((Object)record2, (Object)key2);
        this.aec.setCurrentContext(recordContext2);
        this.userCode.run();
        String record3 = "key1-r3";
        String key3 = "key1";
        RecordContext recordContext3 = this.aec.buildContext((Object)record3, (Object)key3);
        this.aec.setCurrentContext(recordContext3);
        this.userCode.run();
        AssertionsForClassTypes.assertThat((int)this.aec.stateRequestsBuffer.activeQueueSize()).isEqualTo(1);
        AssertionsForClassTypes.assertThat((int)this.aec.stateRequestsBuffer.blockingQueueSize()).isEqualTo(1);
        AssertionsForClassTypes.assertThat((int)this.aec.keyAccountingUnit.occupiedCount()).isEqualTo(1);
        AssertionsForClassTypes.assertThat((int)this.aec.inFlightRecordNum.get()).isEqualTo(2);
        this.aec.triggerIfNeeded(true);
        AssertionsForClassTypes.assertThat((int)this.aec.stateRequestsBuffer.activeQueueSize()).isEqualTo(1);
        AssertionsForClassTypes.assertThat((int)this.aec.stateRequestsBuffer.blockingQueueSize()).isEqualTo(1);
        AssertionsForClassTypes.assertThat((int)this.aec.keyAccountingUnit.occupiedCount()).isEqualTo(1);
        AssertionsForClassTypes.assertThat((int)this.aec.inFlightRecordNum.get()).isEqualTo(2);
        this.aec.triggerIfNeeded(true);
        AssertionsForClassTypes.assertThat((int)this.aec.stateRequestsBuffer.activeQueueSize()).isEqualTo(1);
        AssertionsForClassTypes.assertThat((int)this.aec.keyAccountingUnit.occupiedCount()).isEqualTo(1);
        AssertionsForClassTypes.assertThat((int)this.aec.inFlightRecordNum.get()).isEqualTo(1);
        AssertionsForClassTypes.assertThat((int)this.output.get()).isEqualTo(2);
        AssertionsForClassTypes.assertThat((int)recordContext2.getReferenceCount()).isEqualTo(0);
        AssertionsForClassTypes.assertThat((int)this.aec.stateRequestsBuffer.blockingQueueSize()).isEqualTo(0);
        this.aec.triggerIfNeeded(true);
        AssertionsForClassTypes.assertThat((int)this.aec.stateRequestsBuffer.activeQueueSize()).isEqualTo(1);
        AssertionsForClassTypes.assertThat((int)this.aec.keyAccountingUnit.occupiedCount()).isEqualTo(1);
        AssertionsForClassTypes.assertThat((int)this.aec.inFlightRecordNum.get()).isEqualTo(1);
        this.aec.triggerIfNeeded(true);
        AssertionsForClassTypes.assertThat((int)this.aec.stateRequestsBuffer.activeQueueSize()).isEqualTo(0);
        AssertionsForClassTypes.assertThat((int)this.aec.keyAccountingUnit.occupiedCount()).isEqualTo(0);
        AssertionsForClassTypes.assertThat((int)this.aec.inFlightRecordNum.get()).isEqualTo(0);
        AssertionsForClassTypes.assertThat((int)this.output.get()).isEqualTo(3);
        AssertionsForClassTypes.assertThat((int)recordContext3.getReferenceCount()).isEqualTo(0);
        String record4 = "key3-r3";
        String key4 = "key3";
        RecordContext recordContext4 = this.aec.buildContext((Object)record4, (Object)key4);
        this.aec.setCurrentContext(recordContext4);
        this.userCode.run();
        AssertionsForClassTypes.assertThat((int)this.aec.stateRequestsBuffer.activeQueueSize()).isEqualTo(1);
        AssertionsForClassTypes.assertThat((int)this.aec.keyAccountingUnit.occupiedCount()).isEqualTo(1);
        AssertionsForClassTypes.assertThat((int)this.aec.inFlightRecordNum.get()).isEqualTo(1);
        this.aec.triggerIfNeeded(true);
        AssertionsForClassTypes.assertThat((int)this.aec.stateRequestsBuffer.activeQueueSize()).isEqualTo(1);
        AssertionsForClassTypes.assertThat((int)this.aec.keyAccountingUnit.occupiedCount()).isEqualTo(1);
        AssertionsForClassTypes.assertThat((int)this.aec.inFlightRecordNum.get()).isEqualTo(1);
        this.aec.triggerIfNeeded(true);
        AssertionsForClassTypes.assertThat((int)this.aec.stateRequestsBuffer.activeQueueSize()).isEqualTo(0);
        AssertionsForClassTypes.assertThat((int)this.aec.keyAccountingUnit.occupiedCount()).isEqualTo(0);
        AssertionsForClassTypes.assertThat((int)this.aec.inFlightRecordNum.get()).isEqualTo(0);
        AssertionsForClassTypes.assertThat((int)this.output.get()).isEqualTo(1);
        AssertionsForClassTypes.assertThat((int)recordContext4.getReferenceCount()).isEqualTo(0);
        resourceRegistry.close();
    }

    @Test
    void testRecordsRunInOrder() throws IOException {
        CloseableRegistry resourceRegistry = new CloseableRegistry();
        this.setup(100, 10000L, 1000, new SyncMailboxExecutor(), new TestAsyncFrameworkExceptionHandler(), resourceRegistry);
        String record1 = "key1-r1";
        String key1 = "key1";
        RecordContext recordContext1 = this.aec.buildContext((Object)record1, (Object)key1);
        this.aec.setCurrentContext(recordContext1);
        this.userCode.run();
        String record2 = "key2-r1";
        String key2 = "key2";
        RecordContext recordContext2 = this.aec.buildContext((Object)record2, (Object)key2);
        this.aec.setCurrentContext(recordContext2);
        this.userCode.run();
        String record3 = "key1-r2";
        String key3 = "key1";
        RecordContext recordContext3 = this.aec.buildContext((Object)record3, (Object)key3);
        this.aec.setCurrentContext(recordContext3);
        this.userCode.run();
        AssertionsForClassTypes.assertThat((int)this.aec.stateRequestsBuffer.activeQueueSize()).isEqualTo(2);
        AssertionsForClassTypes.assertThat((int)this.aec.keyAccountingUnit.occupiedCount()).isEqualTo(2);
        AssertionsForClassTypes.assertThat((int)this.aec.inFlightRecordNum.get()).isEqualTo(3);
        AssertionsForClassTypes.assertThat((int)this.aec.stateRequestsBuffer.blockingQueueSize()).isEqualTo(1);
        this.aec.triggerIfNeeded(true);
        AssertionsForClassTypes.assertThat((int)this.aec.stateRequestsBuffer.activeQueueSize()).isEqualTo(2);
        AssertionsForClassTypes.assertThat((int)this.aec.keyAccountingUnit.occupiedCount()).isEqualTo(2);
        AssertionsForClassTypes.assertThat((int)this.aec.inFlightRecordNum.get()).isEqualTo(3);
        AssertionsForClassTypes.assertThat((int)this.aec.stateRequestsBuffer.blockingQueueSize()).isEqualTo(1);
        this.aec.triggerIfNeeded(true);
        AssertionsForClassTypes.assertThat((int)this.aec.stateRequestsBuffer.activeQueueSize()).isEqualTo(1);
        AssertionsForClassTypes.assertThat((int)this.aec.keyAccountingUnit.occupiedCount()).isEqualTo(1);
        AssertionsForClassTypes.assertThat((int)this.aec.inFlightRecordNum.get()).isEqualTo(1);
        AssertionsForClassTypes.assertThat((int)this.aec.stateRequestsBuffer.blockingQueueSize()).isEqualTo(0);
        AssertionsForClassTypes.assertThat((int)this.output.get()).isEqualTo(1);
        AssertionsForClassTypes.assertThat((int)recordContext1.getReferenceCount()).isEqualTo(0);
        AssertionsForClassTypes.assertThat((int)recordContext2.getReferenceCount()).isEqualTo(0);
        this.aec.triggerIfNeeded(true);
        AssertionsForClassTypes.assertThat((int)this.aec.stateRequestsBuffer.activeQueueSize()).isEqualTo(1);
        AssertionsForClassTypes.assertThat((int)this.aec.stateRequestsBuffer.blockingQueueSize()).isEqualTo(0);
        this.aec.triggerIfNeeded(true);
        AssertionsForClassTypes.assertThat((int)this.output.get()).isEqualTo(2);
        AssertionsForClassTypes.assertThat((int)recordContext3.getReferenceCount()).isEqualTo(0);
        AssertionsForClassTypes.assertThat((int)this.aec.inFlightRecordNum.get()).isEqualTo(0);
        resourceRegistry.close();
    }

    @Test
    void testInFlightRecordControl() throws IOException {
        RecordContext recordContext;
        String key;
        int i;
        int batchSize = 5;
        int maxInFlight = 10;
        CloseableRegistry resourceRegistry = new CloseableRegistry();
        this.setup(batchSize, 10000L, maxInFlight, new SyncMailboxExecutor(), new TestAsyncFrameworkExceptionHandler(), resourceRegistry);
        for (int round = 0; round < 10; ++round) {
            for (int i2 = 0; i2 < batchSize; ++i2) {
                String record = String.format("key%d-r%d", round * batchSize + i2, round * batchSize + i2);
                String key2 = String.format("key%d", round * batchSize + i2);
                RecordContext recordContext2 = this.aec.buildContext((Object)record, (Object)key2);
                this.aec.setCurrentContext(recordContext2);
                this.userCode.run();
            }
            AssertionsForClassTypes.assertThat((int)this.aec.inFlightRecordNum.get()).isEqualTo(0);
            AssertionsForClassTypes.assertThat((int)this.aec.stateRequestsBuffer.activeQueueSize()).isEqualTo(0);
            AssertionsForClassTypes.assertThat((int)this.aec.stateRequestsBuffer.blockingQueueSize()).isEqualTo(0);
        }
        for (i = 0; i < maxInFlight; ++i) {
            String record = String.format("sameKey-r%d", i, i);
            key = "sameKey";
            recordContext = this.aec.buildContext((Object)record, (Object)key);
            this.aec.setCurrentContext(recordContext);
            this.userCode.run();
        }
        AssertionsForClassTypes.assertThat((int)this.aec.inFlightRecordNum.get()).isEqualTo(maxInFlight);
        AssertionsForClassTypes.assertThat((int)this.aec.stateRequestsBuffer.activeQueueSize()).isEqualTo(1);
        AssertionsForClassTypes.assertThat((int)this.aec.stateRequestsBuffer.blockingQueueSize()).isEqualTo(maxInFlight - 1);
        for (i = maxInFlight; i < 10 * maxInFlight; ++i) {
            String record = String.format("sameKey-r%d", i, i);
            key = "sameKey";
            recordContext = this.aec.buildContext((Object)record, (Object)key);
            this.aec.setCurrentContext(recordContext);
            this.userCode.run();
            AssertionsForClassTypes.assertThat((int)this.aec.inFlightRecordNum.get()).isEqualTo(maxInFlight + 1);
            AssertionsForClassTypes.assertThat((int)this.aec.stateRequestsBuffer.activeQueueSize()).isEqualTo(1);
            AssertionsForClassTypes.assertThat((int)this.aec.stateRequestsBuffer.blockingQueueSize()).isEqualTo(maxInFlight);
        }
        resourceRegistry.close();
    }

    @Test
    public void testSyncPoint() throws IOException {
        CloseableRegistry resourceRegistry = new CloseableRegistry();
        this.setup(1000, 10000L, 6000, new SyncMailboxExecutor(), new TestAsyncFrameworkExceptionHandler(), resourceRegistry);
        AtomicInteger counter = new AtomicInteger(0);
        RecordContext recordContext = this.aec.buildContext((Object)"record", (Object)"key");
        this.aec.setCurrentContext(recordContext);
        recordContext.retain();
        this.aec.syncPointRequestWithCallback(counter::incrementAndGet);
        AssertionsForClassTypes.assertThat((int)counter.get()).isEqualTo(1);
        AssertionsForClassTypes.assertThat((int)recordContext.getReferenceCount()).isEqualTo(1);
        AssertionsForClassTypes.assertThat((int)this.aec.stateRequestsBuffer.activeQueueSize()).isEqualTo(0);
        AssertionsForClassTypes.assertThat((int)this.aec.stateRequestsBuffer.blockingQueueSize()).isEqualTo(0);
        AssertionsForClassTypes.assertThat((int)this.aec.keyAccountingUnit.occupiedCount()).isEqualTo(1);
        recordContext.release();
        AssertionsForClassTypes.assertThat((int)this.aec.keyAccountingUnit.occupiedCount()).isEqualTo(0);
        counter.set(0);
        RecordContext recordContext1 = this.aec.buildContext((Object)"record1", (Object)"occupied");
        this.aec.setCurrentContext(recordContext1);
        this.userCode.run();
        RecordContext recordContext2 = this.aec.buildContext((Object)"record2", (Object)"occupied");
        this.aec.setCurrentContext(recordContext2);
        this.aec.syncPointRequestWithCallback(counter::incrementAndGet);
        recordContext2.retain();
        AssertionsForClassTypes.assertThat((int)counter.get()).isEqualTo(0);
        AssertionsForClassTypes.assertThat((int)recordContext2.getReferenceCount()).isGreaterThan(1);
        AssertionsForClassTypes.assertThat((int)this.aec.stateRequestsBuffer.activeQueueSize()).isEqualTo(1);
        AssertionsForClassTypes.assertThat((int)this.aec.stateRequestsBuffer.blockingQueueSize()).isEqualTo(1);
        this.aec.triggerIfNeeded(true);
        AssertionsForClassTypes.assertThat((int)counter.get()).isEqualTo(0);
        AssertionsForClassTypes.assertThat((int)recordContext2.getReferenceCount()).isGreaterThan(1);
        AssertionsForClassTypes.assertThat((int)this.aec.stateRequestsBuffer.activeQueueSize()).isEqualTo(1);
        AssertionsForClassTypes.assertThat((int)this.aec.stateRequestsBuffer.blockingQueueSize()).isEqualTo(1);
        this.aec.triggerIfNeeded(true);
        AssertionsForClassTypes.assertThat((int)counter.get()).isEqualTo(1);
        AssertionsForClassTypes.assertThat((int)recordContext2.getReferenceCount()).isEqualTo(1);
        AssertionsForClassTypes.assertThat((int)this.aec.stateRequestsBuffer.activeQueueSize()).isEqualTo(0);
        AssertionsForClassTypes.assertThat((int)this.aec.stateRequestsBuffer.blockingQueueSize()).isEqualTo(0);
        recordContext2.release();
        resourceRegistry.close();
    }

    @Test
    void testBufferTimeout() throws IOException {
        int batchSize = 5;
        int timeout = 1000;
        CloseableRegistry resourceRegistry = new CloseableRegistry();
        this.setup(batchSize, timeout, 1000, new SyncMailboxExecutor(), new TestAsyncFrameworkExceptionHandler(), resourceRegistry);
        ManuallyTriggeredScheduledExecutorService scheduledExecutor = new ManuallyTriggeredScheduledExecutorService();
        this.aec.stateRequestsBuffer.scheduledExecutor = scheduledExecutor;
        Runnable userCode = () -> this.valueState.asyncValue();
        for (int i = 0; i < batchSize - 1; ++i) {
            String record = String.format("key%d-r%d", i, i);
            String key = String.format("key%d", batchSize + i);
            RecordContext recordContext = this.aec.buildContext((Object)record, (Object)key);
            this.aec.setCurrentContext(recordContext);
            userCode.run();
        }
        AssertionsForClassTypes.assertThat((long)this.aec.stateRequestsBuffer.currentSeq.get()).isEqualTo(0L);
        AssertionsForClassTypes.assertThat((long)this.aec.stateRequestsBuffer.scheduledSeq.get()).isEqualTo(0L);
        AssertionsForClassTypes.assertThat((boolean)this.aec.stateRequestsBuffer.currentScheduledFuture.isDone()).isFalse();
        AssertionsForClassTypes.assertThat((int)this.aec.inFlightRecordNum.get()).isEqualTo(batchSize - 1);
        AssertionsForClassTypes.assertThat((int)this.aec.stateRequestsBuffer.activeQueueSize()).isEqualTo(batchSize - 1);
        AssertionsForClassTypes.assertThat((int)this.aec.stateRequestsBuffer.blockingQueueSize()).isEqualTo(0);
        scheduledExecutor.triggerNonPeriodicScheduledTasks();
        AssertionsForClassTypes.assertThat((int)this.aec.stateRequestsBuffer.activeQueueSize()).isEqualTo(0);
        AssertionsForClassTypes.assertThat((boolean)this.aec.stateRequestsBuffer.currentScheduledFuture.isDone()).isTrue();
        AssertionsForClassTypes.assertThat((int)this.aec.inFlightRecordNum.get()).isEqualTo(0);
        AssertionsForClassTypes.assertThat((long)this.aec.stateRequestsBuffer.currentSeq.get()).isEqualTo(1L);
        AssertionsForClassTypes.assertThat((long)this.aec.stateRequestsBuffer.scheduledSeq.get()).isEqualTo(0L);
        String record5 = "key5-r5";
        String key5 = "key5";
        RecordContext recordContext5 = this.aec.buildContext((Object)record5, (Object)key5);
        this.aec.setCurrentContext(recordContext5);
        userCode.run();
        AssertionsForClassTypes.assertThat((int)this.aec.inFlightRecordNum.get()).isEqualTo(1);
        AssertionsForClassTypes.assertThat((int)this.aec.stateRequestsBuffer.activeQueueSize()).isEqualTo(1);
        AssertionsForClassTypes.assertThat((boolean)this.aec.stateRequestsBuffer.currentScheduledFuture.isDone()).isFalse();
        ScheduledFuture scheduledFuture = this.aec.stateRequestsBuffer.currentScheduledFuture;
        String record6 = "key6-r6";
        String key6 = "key6";
        RecordContext recordContext6 = this.aec.buildContext((Object)record6, (Object)key6);
        this.aec.setCurrentContext(recordContext6);
        userCode.run();
        AssertionsForClassTypes.assertThat((int)this.aec.inFlightRecordNum.get()).isEqualTo(2);
        AssertionsForClassTypes.assertThat((int)this.aec.stateRequestsBuffer.activeQueueSize()).isEqualTo(2);
        AssertionsForClassTypes.assertThat((int)scheduledExecutor.getActiveNonPeriodicScheduledTask().size()).isEqualTo(1);
        AssertionsForClassTypes.assertThat((int)scheduledExecutor.getAllNonPeriodicScheduledTask().size()).isEqualTo(1);
        AssertionsForClassTypes.assertThat((long)this.aec.stateRequestsBuffer.currentSeq.get()).isEqualTo(1L);
        AssertionsForClassTypes.assertThat((long)this.aec.stateRequestsBuffer.scheduledSeq.get()).isEqualTo(1L);
        scheduledExecutor.triggerNonPeriodicScheduledTasks();
        AssertionsForClassTypes.assertThat((int)this.aec.inFlightRecordNum.get()).isEqualTo(0);
        AssertionsForClassTypes.assertThat((int)this.aec.stateRequestsBuffer.activeQueueSize()).isEqualTo(0);
        AssertionsForClassTypes.assertThat((Object)scheduledFuture).isEqualTo((Object)this.aec.stateRequestsBuffer.currentScheduledFuture);
        AssertionsForClassTypes.assertThat((boolean)scheduledFuture.isDone()).isTrue();
        AssertionsForClassTypes.assertThat((long)this.aec.stateRequestsBuffer.currentSeq.get()).isEqualTo(2L);
        AssertionsForClassTypes.assertThat((long)this.aec.stateRequestsBuffer.scheduledSeq.get()).isEqualTo(1L);
        resourceRegistry.close();
    }

    @Test
    void testBufferTimeoutSkip() throws IOException {
        int batchSize = 3;
        int timeout = 1000;
        CloseableRegistry resourceRegistry = new CloseableRegistry();
        this.setup(batchSize, timeout, 1000, new SyncMailboxExecutor(), new TestAsyncFrameworkExceptionHandler(), resourceRegistry);
        ManuallyTriggeredScheduledExecutorService scheduledExecutor = new ManuallyTriggeredScheduledExecutorService();
        this.aec.stateRequestsBuffer.scheduledExecutor = scheduledExecutor;
        Runnable userCode = () -> this.valueState.asyncValue();
        AssertionsForClassTypes.assertThat((long)this.aec.stateRequestsBuffer.currentSeq.get()).isEqualTo(0L);
        AssertionsForClassTypes.assertThat((long)this.aec.stateRequestsBuffer.scheduledSeq.get()).isEqualTo(-1L);
        RecordContext recordContext = this.aec.buildContext((Object)"record1", (Object)"key1");
        this.aec.setCurrentContext(recordContext);
        userCode.run();
        AssertionsForClassTypes.assertThat((int)this.aec.stateRequestsBuffer.activeQueueSize()).isEqualTo(1);
        AssertionsForClassTypes.assertThat((int)this.aec.inFlightRecordNum.get()).isEqualTo(1);
        AssertionsForClassTypes.assertThat((long)this.aec.stateRequestsBuffer.currentSeq.get()).isEqualTo(0L);
        AssertionsForClassTypes.assertThat((long)this.aec.stateRequestsBuffer.scheduledSeq.get()).isEqualTo(0L);
        RecordContext recordContext2 = this.aec.buildContext((Object)"record2", (Object)"key2");
        this.aec.setCurrentContext(recordContext2);
        userCode.run();
        AssertionsForClassTypes.assertThat((long)this.aec.stateRequestsBuffer.currentSeq.get()).isEqualTo(0L);
        AssertionsForClassTypes.assertThat((long)this.aec.stateRequestsBuffer.scheduledSeq.get()).isEqualTo(0L);
        RecordContext recordContext3 = this.aec.buildContext((Object)"record3", (Object)"key3");
        this.aec.setCurrentContext(recordContext3);
        userCode.run();
        AssertionsForClassTypes.assertThat((int)this.aec.stateRequestsBuffer.activeQueueSize()).isEqualTo(0);
        AssertionsForClassTypes.assertThat((long)this.aec.stateRequestsBuffer.currentSeq.get()).isEqualTo(1L);
        AssertionsForClassTypes.assertThat((long)this.aec.stateRequestsBuffer.scheduledSeq.get()).isEqualTo(0L);
        AssertionsForClassTypes.assertThat((int)scheduledExecutor.getActiveNonPeriodicScheduledTask().size()).isEqualTo(1);
        AssertionsForClassTypes.assertThat((int)scheduledExecutor.getAllNonPeriodicScheduledTask().size()).isEqualTo(1);
        scheduledExecutor.triggerNonPeriodicScheduledTask();
        AssertionsForClassTypes.assertThat((boolean)this.aec.stateRequestsBuffer.currentScheduledFuture.isDone()).isTrue();
        AssertionsForClassTypes.assertThat((long)this.aec.stateRequestsBuffer.currentSeq.get()).isEqualTo(1L);
        AssertionsForClassTypes.assertThat((long)this.aec.stateRequestsBuffer.scheduledSeq.get()).isEqualTo(0L);
        RecordContext recordContext4 = this.aec.buildContext((Object)"record4", (Object)"key4");
        this.aec.setCurrentContext(recordContext4);
        userCode.run();
        AssertionsForClassTypes.assertThat((int)scheduledExecutor.getActiveNonPeriodicScheduledTask().size()).isEqualTo(1);
        AssertionsForClassTypes.assertThat((int)this.aec.stateRequestsBuffer.activeQueueSize()).isEqualTo(1);
        AssertionsForClassTypes.assertThat((int)this.aec.inFlightRecordNum.get()).isEqualTo(1);
        AssertionsForClassTypes.assertThat((boolean)this.aec.stateRequestsBuffer.currentScheduledFuture.isDone()).isFalse();
        AssertionsForClassTypes.assertThat((boolean)this.aec.stateRequestsBuffer.currentScheduledFuture.isCancelled()).isFalse();
        AssertionsForClassTypes.assertThat((long)this.aec.stateRequestsBuffer.currentSeq.get()).isEqualTo(1L);
        AssertionsForClassTypes.assertThat((long)this.aec.stateRequestsBuffer.scheduledSeq.get()).isEqualTo(1L);
        scheduledExecutor.triggerNonPeriodicScheduledTask();
        AssertionsForClassTypes.assertThat((int)this.aec.stateRequestsBuffer.activeQueueSize()).isEqualTo(0);
        AssertionsForClassTypes.assertThat((int)this.aec.inFlightRecordNum.get()).isEqualTo(0);
        AssertionsForClassTypes.assertThat((long)this.aec.stateRequestsBuffer.currentSeq.get()).isEqualTo(2L);
        AssertionsForClassTypes.assertThat((long)this.aec.stateRequestsBuffer.scheduledSeq.get()).isEqualTo(1L);
        AssertionsForClassTypes.assertThat((boolean)this.aec.stateRequestsBuffer.currentScheduledFuture.isDone()).isTrue();
        resourceRegistry.close();
    }

    @Test
    void testUserCodeException() throws IOException {
        TestAsyncFrameworkExceptionHandler exceptionHandler = new TestAsyncFrameworkExceptionHandler();
        TestMailboxExecutor testMailboxExecutor = new TestMailboxExecutor(false);
        CloseableRegistry resourceRegistry = new CloseableRegistry();
        this.setup(1000, 10000L, 6000, testMailboxExecutor, exceptionHandler, resourceRegistry);
        Runnable userCode = () -> this.valueState.asyncValue().thenAccept(val -> {
            throw new FlinkRuntimeException("Artificial exception in user code");
        });
        String record = "record";
        String key = "key";
        RecordContext recordContext = this.aec.buildContext((Object)record, (Object)key);
        this.aec.setCurrentContext(recordContext);
        userCode.run();
        AssertionsForClassTypes.assertThat((int)this.aec.inFlightRecordNum.get()).isEqualTo(1);
        AssertionsForClassTypes.assertThat((int)this.aec.stateRequestsBuffer.activeQueueSize()).isEqualTo(1);
        AssertionsForClassTypes.assertThat((int)this.aec.stateRequestsBuffer.blockingQueueSize()).isEqualTo(0);
        AssertionsForClassTypes.assertThat((Throwable)exceptionHandler.exception).isNull();
        AssertionsForClassTypes.assertThat((String)exceptionHandler.message).isNull();
        this.aec.triggerIfNeeded(true);
        AssertionsForClassTypes.assertThat((int)this.aec.inFlightRecordNum.get()).isEqualTo(1);
        AssertionsForClassTypes.assertThat((int)this.aec.stateRequestsBuffer.activeQueueSize()).isEqualTo(0);
        AssertionsForClassTypes.assertThat((Throwable)testMailboxExecutor.lastException).isInstanceOf(FlinkRuntimeException.class);
        AssertionsForClassTypes.assertThat((String)testMailboxExecutor.lastException.getMessage()).isEqualTo("Artificial exception in user code");
        AssertionsForClassTypes.assertThat((Throwable)exceptionHandler.exception).isNull();
        AssertionsForClassTypes.assertThat((String)exceptionHandler.message).isNull();
        resourceRegistry.close();
    }

    @Test
    void testFrameworkException() throws IOException {
        TestAsyncFrameworkExceptionHandler exceptionHandler = new TestAsyncFrameworkExceptionHandler();
        TestMailboxExecutor testMailboxExecutor = new TestMailboxExecutor(true);
        CloseableRegistry resourceRegistry = new CloseableRegistry();
        this.setup(1000, 10000L, 6000, testMailboxExecutor, exceptionHandler, resourceRegistry);
        Runnable userCode = () -> this.valueState.asyncValue().thenAccept(val -> {});
        String record = "record";
        String key = "key";
        RecordContext recordContext = this.aec.buildContext((Object)record, (Object)key);
        this.aec.setCurrentContext(recordContext);
        userCode.run();
        AssertionsForClassTypes.assertThat((int)this.aec.inFlightRecordNum.get()).isEqualTo(1);
        AssertionsForClassTypes.assertThat((int)this.aec.stateRequestsBuffer.activeQueueSize()).isEqualTo(1);
        AssertionsForClassTypes.assertThat((int)this.aec.stateRequestsBuffer.blockingQueueSize()).isEqualTo(0);
        AssertionsForClassTypes.assertThat((Throwable)exceptionHandler.exception).isNull();
        AssertionsForClassTypes.assertThat((String)exceptionHandler.message).isNull();
        this.aec.triggerIfNeeded(true);
        AssertionsForClassTypes.assertThat((Throwable)testMailboxExecutor.lastException).isNull();
        AssertionsForClassTypes.assertThat((Throwable)exceptionHandler.exception).isInstanceOf(RuntimeException.class);
        AssertionsForClassTypes.assertThat((String)exceptionHandler.exception.getMessage()).isEqualTo("java.lang.RuntimeException: Fail to execute.");
        AssertionsForClassTypes.assertThat((String)exceptionHandler.message).isEqualTo("Caught exception when submitting StateFuture's callback.");
        resourceRegistry.close();
    }

    @Test
    void testEpochManager() throws Exception {
        CloseableRegistry resourceRegistry = new CloseableRegistry();
        this.setup(1000, 10000L, 6000, new SyncMailboxExecutor(), new TestAsyncFrameworkExceptionHandler(), resourceRegistry);
        AtomicInteger output = new AtomicInteger(0);
        Runnable userCode = () -> this.valueState.asyncValue().thenAccept(v -> output.incrementAndGet());
        String record1 = "key1-r1";
        String key1 = "key1";
        RecordContext recordContext1 = this.aec.buildContext((Object)record1, (Object)key1);
        EpochManager.Epoch epoch1 = recordContext1.getEpoch();
        this.aec.setCurrentContext(recordContext1);
        userCode.run();
        String record2 = "key2-r2";
        String key2 = "key2";
        RecordContext recordContext2 = this.aec.buildContext((Object)record2, (Object)key2);
        EpochManager.Epoch epoch2 = recordContext2.getEpoch();
        this.aec.setCurrentContext(recordContext2);
        userCode.run();
        AssertionsForClassTypes.assertThat((Object)epoch1).isEqualTo((Object)epoch2);
        AssertionsForClassTypes.assertThat((int)epoch1.ongoingRecordCount).isEqualTo(2);
        this.aec.processNonRecord(() -> output.incrementAndGet());
        AssertionsForClassTypes.assertThat((int)output.get()).isEqualTo(3);
        AssertionsForClassTypes.assertThat((int)epoch1.ongoingRecordCount).isEqualTo(0);
        resourceRegistry.close();
    }

    @Test
    void testMixEpochMode() throws Exception {
        CloseableRegistry resourceRegistry = new CloseableRegistry();
        this.setup(1000, 10000L, 6000, new SyncMailboxExecutor(), new TestAsyncFrameworkExceptionHandler(), resourceRegistry);
        AtomicInteger output = new AtomicInteger(0);
        Runnable userCode = () -> this.valueState.asyncValue().thenAccept(v -> output.incrementAndGet());
        String record1 = "key1-r1";
        String key1 = "key1";
        RecordContext recordContext1 = this.aec.buildContext((Object)record1, (Object)key1);
        EpochManager.Epoch epoch1 = recordContext1.getEpoch();
        this.aec.setCurrentContext(recordContext1);
        userCode.run();
        this.aec.epochManager.onNonRecord(() -> output.incrementAndGet(), EpochManager.ParallelMode.PARALLEL_BETWEEN_EPOCH);
        AssertionsForClassTypes.assertThat((int)epoch1.ongoingRecordCount).isEqualTo(1);
        String record2 = "key2-r2";
        String key2 = "key2";
        RecordContext recordContext2 = this.aec.buildContext((Object)record2, (Object)key2);
        EpochManager.Epoch epoch2 = recordContext2.getEpoch();
        this.aec.setCurrentContext(recordContext2);
        userCode.run();
        AssertionsForClassTypes.assertThat((int)epoch1.ongoingRecordCount).isEqualTo(1);
        AssertionsForClassTypes.assertThat((int)epoch2.ongoingRecordCount).isEqualTo(1);
        this.aec.epochManager.onNonRecord(() -> output.incrementAndGet(), EpochManager.ParallelMode.PARALLEL_BETWEEN_EPOCH);
        AssertionsForClassTypes.assertThat((int)epoch1.ongoingRecordCount).isEqualTo(1);
        AssertionsForClassTypes.assertThat((int)epoch2.ongoingRecordCount).isEqualTo(1);
        AssertionsForClassTypes.assertThat((int)output.get()).isEqualTo(0);
        String record3 = "key3-r3";
        String key3 = "key3";
        RecordContext recordContext3 = this.aec.buildContext((Object)record3, (Object)key3);
        EpochManager.Epoch epoch3 = recordContext3.getEpoch();
        this.aec.setCurrentContext(recordContext3);
        userCode.run();
        AssertionsForClassTypes.assertThat((int)epoch1.ongoingRecordCount).isEqualTo(1);
        AssertionsForClassTypes.assertThat((int)epoch2.ongoingRecordCount).isEqualTo(1);
        AssertionsForClassTypes.assertThat((int)epoch3.ongoingRecordCount).isEqualTo(1);
        this.aec.epochManager.onNonRecord(() -> output.incrementAndGet(), EpochManager.ParallelMode.SERIAL_BETWEEN_EPOCH);
        AssertionsForClassTypes.assertThat((int)epoch1.ongoingRecordCount).isEqualTo(0);
        AssertionsForClassTypes.assertThat((int)epoch2.ongoingRecordCount).isEqualTo(0);
        AssertionsForClassTypes.assertThat((int)epoch3.ongoingRecordCount).isEqualTo(0);
        AssertionsForClassTypes.assertThat((int)output.get()).isEqualTo(6);
        resourceRegistry.close();
    }

    static class TestMailboxExecutor
    implements MailboxExecutor {
        Exception lastException = null;
        boolean failWhenExecute = false;

        public TestMailboxExecutor(boolean fail) {
            this.failWhenExecute = fail;
        }

        public void execute(MailboxExecutor.MailOptions mailOptions, ThrowingRunnable<? extends Exception> command, String descriptionFormat, Object ... descriptionArgs) {
            if (this.failWhenExecute) {
                throw new RuntimeException("Fail to execute.");
            }
            try {
                command.run();
            }
            catch (Exception e) {
                this.lastException = e;
            }
        }

        public void yield() throws InterruptedException, FlinkRuntimeException {
        }

        public boolean tryYield() throws FlinkRuntimeException {
            return false;
        }

        public boolean shouldInterrupt() {
            return false;
        }
    }

    static class TestAsyncFrameworkExceptionHandler
    implements StateFutureImpl.AsyncFrameworkExceptionHandler {
        String message = null;
        Throwable exception = null;

        TestAsyncFrameworkExceptionHandler() {
        }

        public void handleException(String message, Throwable exception) {
            this.message = message;
            this.exception = exception;
        }
    }

    static class TestStateExecutor
    implements StateExecutor {
        public CompletableFuture<Void> executeBatchRequests(StateRequestContainer stateRequestContainer) {
            Preconditions.checkArgument((boolean)(stateRequestContainer instanceof MockStateRequestContainer));
            CompletableFuture<Void> future = new CompletableFuture<Void>();
            for (StateRequest<?, ?, ?> request : ((MockStateRequestContainer)stateRequestContainer).getStateRequestList()) {
                TestValueState state;
                if (request.getRequestType() == StateRequestType.VALUE_GET) {
                    Preconditions.checkState((request.getState() != null ? 1 : 0) != 0);
                    state = (TestValueState)request.getState();
                    Integer val = state.underlyingState.get((String)request.getRecordContext().getKey());
                    request.getFuture().complete((Object)val);
                    continue;
                }
                if (request.getRequestType() == StateRequestType.VALUE_UPDATE) {
                    Preconditions.checkState((request.getState() != null ? 1 : 0) != 0);
                    state = (TestValueState)request.getState();
                    state.underlyingState.update((String)request.getRecordContext().getKey(), (Integer)request.getPayload());
                    request.getFuture().complete(null);
                    continue;
                }
                throw new UnsupportedOperationException("Unsupported request type");
            }
            future.complete(null);
            return future;
        }

        public StateRequestContainer createStateRequestContainer() {
            return new MockStateRequestContainer();
        }

        public void shutdown() {
        }
    }

    static class TestValueState
    extends InternalValueState<String, Integer> {
        private final TestUnderlyingState underlyingState;

        public TestValueState(StateRequestHandler stateRequestHandler, TestUnderlyingState underlyingState, ValueStateDescriptor<Integer> stateDescriptor) {
            super(stateRequestHandler, stateDescriptor);
            this.underlyingState = underlyingState;
            AssertionsForClassTypes.assertThat((Object)this.getValueSerializer()).isEqualTo((Object)IntSerializer.INSTANCE);
        }
    }

    static class TestUnderlyingState {
        private final HashMap<String, Integer> hashMap = new HashMap();

        public Integer get(String key) {
            return this.hashMap.get(key);
        }

        public void update(String key, Integer val) {
            this.hashMap.put(key, val);
        }
    }
}

