/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.io.checkpointing;

import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import javax.annotation.Nullable;
import org.apache.flink.api.common.operators.MailboxExecutor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.InternalTimeServiceManager;
import org.apache.flink.streaming.api.operators.InternalTimer;
import org.apache.flink.streaming.api.operators.InternalTimerService;
import org.apache.flink.streaming.api.operators.MailboxWatermarkProcessor;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
import org.apache.flink.streaming.api.operators.Triggerable;
import org.apache.flink.streaming.api.operators.YieldingOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask;
import org.apache.flink.streaming.runtime.tasks.StreamTaskMailboxTestHarness;
import org.apache.flink.streaming.runtime.tasks.StreamTaskMailboxTestHarnessBuilder;
import org.apache.flink.util.TestLoggerExtension;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;

@ExtendWith(value={TestLoggerExtension.class})
class UnalignedCheckpointsInterruptibleTimersTest {
    UnalignedCheckpointsInterruptibleTimersTest() {
    }

    @Test
    void testSingleWatermarkHoldingOperatorInTheChain() throws Exception {
        Instant firstWindowEnd = Instant.ofEpochMilli(1000L);
        int numFirstWindowTimers = 2;
        Instant secondWindowEnd = Instant.ofEpochMilli(2000L);
        int numSecondWindowTimers = 2;
        try (StreamTaskMailboxTestHarness harness = new StreamTaskMailboxTestHarnessBuilder(OneInputStreamTask::new, Types.STRING).modifyStreamConfig(UnalignedCheckpointsInterruptibleTimersTest::setupStreamConfig).addInput(Types.STRING).setupOperatorChain((StreamOperatorFactory<?>)SimpleOperatorFactory.of((StreamOperator)new MultipleTimersAtTheSameTimestamp().withTimers(firstWindowEnd, 2).withTimers(secondWindowEnd, 2))).name("first").finishForSingletonOperatorChain(StringSerializer.INSTANCE).build();){
            harness.processElement(new StreamRecord((Object)"register timers"));
            harness.processAll();
            harness.processElement(UnalignedCheckpointsInterruptibleTimersTest.asWatermark(firstWindowEnd));
            harness.processElement(UnalignedCheckpointsInterruptibleTimersTest.asWatermark(secondWindowEnd));
            Assertions.assertThat(harness.getOutput()).containsExactly(new Object[]{UnalignedCheckpointsInterruptibleTimersTest.asFiredRecord("key-0"), UnalignedCheckpointsInterruptibleTimersTest.asMailRecord("key-0"), UnalignedCheckpointsInterruptibleTimersTest.asFiredRecord("key-1"), UnalignedCheckpointsInterruptibleTimersTest.asMailRecord("key-1"), UnalignedCheckpointsInterruptibleTimersTest.asWatermark(firstWindowEnd), UnalignedCheckpointsInterruptibleTimersTest.asFiredRecord("key-0"), UnalignedCheckpointsInterruptibleTimersTest.asMailRecord("key-0"), UnalignedCheckpointsInterruptibleTimersTest.asFiredRecord("key-1"), UnalignedCheckpointsInterruptibleTimersTest.asMailRecord("key-1"), UnalignedCheckpointsInterruptibleTimersTest.asWatermark(secondWindowEnd)});
        }
    }

    @Test
    void testWatermarkProgressWithNoTimers() throws Exception {
        Instant firstWindowEnd = Instant.ofEpochMilli(1000L);
        Instant secondWindowEnd = Instant.ofEpochMilli(2000L);
        try (StreamTaskMailboxTestHarness harness = new StreamTaskMailboxTestHarnessBuilder(OneInputStreamTask::new, Types.STRING).modifyStreamConfig(UnalignedCheckpointsInterruptibleTimersTest::setupStreamConfig).addInput(Types.STRING).setupOperatorChain((StreamOperatorFactory<?>)SimpleOperatorFactory.of((StreamOperator)new MultipleTimersAtTheSameTimestamp())).name("first").finishForSingletonOperatorChain(StringSerializer.INSTANCE).build();){
            harness.setAutoProcess(false);
            harness.processElement(new StreamRecord((Object)"impulse"));
            harness.processAll();
            harness.processElement(UnalignedCheckpointsInterruptibleTimersTest.asWatermark(firstWindowEnd));
            harness.processElement(UnalignedCheckpointsInterruptibleTimersTest.asWatermark(secondWindowEnd));
            ArrayList<Watermark> seenWatermarks = new ArrayList<Watermark>();
            while (seenWatermarks.size() < 2) {
                Object outputElement;
                harness.processSingleStep();
                while ((outputElement = harness.getOutput().poll()) != null) {
                    if (!(outputElement instanceof Watermark)) continue;
                    seenWatermarks.add((Watermark)outputElement);
                }
            }
            Assertions.assertThat(seenWatermarks).containsExactly((Object[])new Watermark[]{UnalignedCheckpointsInterruptibleTimersTest.asWatermark(firstWindowEnd), UnalignedCheckpointsInterruptibleTimersTest.asWatermark(secondWindowEnd)});
        }
    }

    private static Watermark asWatermark(Instant timestamp) {
        return new Watermark(timestamp.toEpochMilli());
    }

    private static StreamRecord<String> asFiredRecord(String key) {
        return new StreamRecord((Object)("fired-" + key));
    }

    private static StreamRecord<String> asMailRecord(String key) {
        return new StreamRecord((Object)("mail-" + key));
    }

    private static void setupStreamConfig(StreamConfig cfg) {
        cfg.setUnalignedCheckpointsEnabled(true);
        cfg.setUnalignedCheckpointsSplittableTimersEnabled(true);
        cfg.setStateKeySerializer((TypeSerializer)StringSerializer.INSTANCE);
    }

    private static class MultipleTimersAtTheSameTimestamp
    extends AbstractStreamOperator<String>
    implements OneInputStreamOperator<String, String>,
    Triggerable<String, String>,
    YieldingOperator<String> {
        private final Map<Instant, Integer> timersToRegister;
        @Nullable
        private transient MailboxExecutor mailboxExecutor;
        @Nullable
        private transient MailboxWatermarkProcessor watermarkProcessor;

        MultipleTimersAtTheSameTimestamp() {
            this(Collections.emptyMap());
        }

        MultipleTimersAtTheSameTimestamp(Map<Instant, Integer> timersToRegister) {
            this.timersToRegister = timersToRegister;
        }

        public void setMailboxExecutor(MailboxExecutor mailboxExecutor) {
            this.mailboxExecutor = mailboxExecutor;
        }

        public void open() throws Exception {
            super.open();
            if (this.getTimeServiceManager().isPresent()) {
                this.watermarkProcessor = new MailboxWatermarkProcessor(this.output, this.mailboxExecutor, (InternalTimeServiceManager)this.getTimeServiceManager().get());
            }
        }

        public void processElement(StreamRecord<String> element) {
            if (!this.timersToRegister.isEmpty()) {
                InternalTimerService timers = this.getInternalTimerService("timers", (TypeSerializer)StringSerializer.INSTANCE, this);
                for (Map.Entry<Instant, Integer> entry : this.timersToRegister.entrySet()) {
                    for (int keyIdx = 0; keyIdx < entry.getValue(); ++keyIdx) {
                        String key = String.format("key-%d", keyIdx);
                        this.setCurrentKey(key);
                        timers.registerEventTimeTimer((Object)String.format("window-%s", entry.getKey()), entry.getKey().toEpochMilli());
                    }
                }
            }
        }

        public void processWatermark(Watermark mark) throws Exception {
            if (this.watermarkProcessor == null) {
                super.processWatermark(mark);
            } else {
                this.watermarkProcessor.emitWatermarkInsideMailbox(mark);
            }
        }

        public void onEventTime(InternalTimer<String, String> timer) throws Exception {
            this.mailboxExecutor.execute(() -> this.output.collect((Object)UnalignedCheckpointsInterruptibleTimersTest.asMailRecord((String)timer.getKey())), "mail-" + (String)timer.getKey());
            this.output.collect((Object)UnalignedCheckpointsInterruptibleTimersTest.asFiredRecord((String)timer.getKey()));
        }

        public void onProcessingTime(InternalTimer<String, String> timer) throws Exception {
        }

        MultipleTimersAtTheSameTimestamp withTimers(Instant timestamp, int count) {
            HashMap<Instant, Integer> copy = new HashMap<Instant, Integer>(this.timersToRegister);
            copy.put(timestamp, count);
            return new MultipleTimersAtTheSameTimestamp(copy);
        }
    }
}

