/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.api.operators;

import org.apache.flink.annotation.Internal;
import org.apache.flink.runtime.asyncprocessing.AsyncExecutionController;
import org.apache.flink.runtime.asyncprocessing.RecordContext;
import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue;
import org.apache.flink.streaming.api.operators.InternalTimer;
import org.apache.flink.streaming.api.operators.InternalTimerServiceImpl;
import org.apache.flink.streaming.api.operators.KeyContext;
import org.apache.flink.streaming.api.operators.TimerHeapInternalTimer;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.streaming.runtime.tasks.StreamTaskCancellationContext;
import org.apache.flink.util.function.BiConsumerWithException;
import org.apache.flink.util.function.ThrowingRunnable;

@Internal
public class InternalTimerServiceAsyncImpl<K, N>
extends InternalTimerServiceImpl<K, N> {
    private AsyncExecutionController<K> asyncExecutionController;

    InternalTimerServiceAsyncImpl(TaskIOMetricGroup taskIOMetricGroup, KeyGroupRange localKeyGroupRange, KeyContext keyContext, ProcessingTimeService processingTimeService, KeyGroupedInternalPriorityQueue<TimerHeapInternalTimer<K, N>> processingTimeTimersQueue, KeyGroupedInternalPriorityQueue<TimerHeapInternalTimer<K, N>> eventTimeTimersQueue, StreamTaskCancellationContext cancellationContext, AsyncExecutionController<K> asyncExecutionController) {
        super(taskIOMetricGroup, localKeyGroupRange, keyContext, processingTimeService, processingTimeTimersQueue, eventTimeTimersQueue, cancellationContext);
        this.asyncExecutionController = asyncExecutionController;
    }

    @Override
    void onProcessingTime(long time) throws Exception {
        InternalTimer timer;
        this.nextTimer = null;
        while ((timer = (InternalTimer)this.processingTimeTimersQueue.peek()) != null && timer.getTimestamp() <= time && !this.cancellationContext.isCancelled()) {
            this.processingTimeTimersQueue.poll();
            InternalTimer timerToTrigger = timer;
            this.maintainContextAndProcess(timerToTrigger, (ThrowingRunnable<Exception>)((ThrowingRunnable)() -> this.triggerTarget.onProcessingTime(timerToTrigger)));
            this.taskIOMetricGroup.getNumFiredTimers().inc();
        }
        if (timer != null && this.nextTimer == null) {
            this.nextTimer = this.processingTimeService.registerTimer(timer.getTimestamp(), this::onProcessingTime);
        }
    }

    @Override
    public void advanceWatermark(long time) throws Exception {
        InternalTimer timer;
        this.currentWatermark = time;
        while ((timer = (InternalTimer)this.eventTimeTimersQueue.peek()) != null && timer.getTimestamp() <= time && !this.cancellationContext.isCancelled()) {
            this.eventTimeTimersQueue.poll();
            InternalTimer timerToTrigger = timer;
            this.maintainContextAndProcess(timerToTrigger, (ThrowingRunnable<Exception>)((ThrowingRunnable)() -> this.triggerTarget.onEventTime(timerToTrigger)));
            this.taskIOMetricGroup.getNumFiredTimers().inc();
        }
    }

    @Override
    protected void foreachTimer(BiConsumerWithException<N, Long, Exception> consumer, KeyGroupedInternalPriorityQueue<TimerHeapInternalTimer<K, N>> queue) throws Exception {
        throw new UnsupportedOperationException("Batch operation is not supported when using async state.");
    }

    private void maintainContextAndProcess(InternalTimer<K, N> timer, ThrowingRunnable<Exception> runnable) {
        RecordContext<K> recordCtx = this.asyncExecutionController.buildContext(null, timer.getKey());
        recordCtx.retain();
        this.asyncExecutionController.setCurrentContext(recordCtx);
        this.keyContext.setCurrentKey(timer.getKey());
        this.asyncExecutionController.syncPointRequestWithCallback(runnable);
        recordCtx.release();
    }
}

