/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.api.operators;

import java.io.Serializable;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.Set;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.Watermark;
import org.apache.flink.api.common.eventtime.WatermarkGenerator;
import org.apache.flink.api.common.eventtime.WatermarkGeneratorSupplier;
import org.apache.flink.api.common.eventtime.WatermarkOutput;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.connector.source.mocks.MockSourceReader;
import org.apache.flink.api.connector.source.mocks.MockSourceSplit;
import org.apache.flink.api.connector.source.mocks.MockSourceSplitSerializer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.operators.coordination.MockOperatorEventGateway;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
import org.apache.flink.runtime.source.event.AddSplitEvent;
import org.apache.flink.runtime.source.event.WatermarkAlignmentEvent;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.TaskStateManager;
import org.apache.flink.runtime.state.TestTaskStateManager;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.streaming.api.operators.StreamTaskStateInitializer;
import org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl;
import org.apache.flink.streaming.api.operators.source.CollectingDataOutput;
import org.apache.flink.streaming.api.operators.source.TestingSourceOperator;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask;
import org.apache.flink.streaming.runtime.tasks.StreamMockEnvironment;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
import org.apache.flink.streaming.util.MockOutput;
import org.apache.flink.streaming.util.MockStreamConfig;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

class SourceOperatorSplitWatermarkAlignmentTest {
    private static final WatermarkGenerator<Integer> WATERMARK_GENERATOR = new WatermarkGenerator<Integer>(){
        private long maxWatermark = Long.MIN_VALUE;

        public void onEvent(Integer event, long eventTimestamp, WatermarkOutput output) {
            if (eventTimestamp > this.maxWatermark) {
                this.maxWatermark = eventTimestamp;
                output.emitWatermark(new Watermark(this.maxWatermark));
            }
        }

        public void onPeriodicEmit(WatermarkOutput output) {
            output.emitWatermark(new Watermark(this.maxWatermark));
        }
    };

    SourceOperatorSplitWatermarkAlignmentTest() {
    }

    @Test
    void testSplitWatermarkAlignment() throws Exception {
        SplitAligningSourceReader sourceReader = new SplitAligningSourceReader();
        TestingSourceOperator operator = new TestingSourceOperator(sourceReader, WatermarkStrategy.forGenerator((WatermarkGeneratorSupplier & Serializable)ctx -> WATERMARK_GENERATOR).withTimestampAssigner((SerializableTimestampAssigner & Serializable)(r, l) -> r.intValue()).withWatermarkAlignment("group-1", Duration.ofMillis(1L)), (ProcessingTimeService)new TestProcessingTimeService(), (OperatorEventGateway)new MockOperatorEventGateway(), 1, 5, true);
        Environment env = this.getTestingEnvironment();
        operator.setup((StreamTask)new SourceOperatorStreamTask(env), new MockStreamConfig(new Configuration(), 1), new MockOutput(new ArrayList()));
        operator.initializeState((StreamTaskStateInitializer)new StreamTaskStateInitializerImpl(env, (StateBackend)new MemoryStateBackend()));
        operator.open();
        MockSourceSplit split1 = new MockSourceSplit(0, 0, 10);
        MockSourceSplit split2 = new MockSourceSplit(1, 10, 20);
        split1.addRecord(5);
        split1.addRecord(11);
        split2.addRecord(3);
        split2.addRecord(12);
        operator.handleOperatorEvent((OperatorEvent)new AddSplitEvent(Arrays.asList(split1, split2), (SimpleVersionedSerializer)new MockSourceSplitSerializer()));
        CollectingDataOutput dataOutput = new CollectingDataOutput();
        operator.emitNext(dataOutput);
        operator.handleOperatorEvent((OperatorEvent)new WatermarkAlignmentEvent(4L));
        Assertions.assertThat(sourceReader.pausedSplits).containsExactly((Object[])new String[]{"0"});
        operator.handleOperatorEvent((OperatorEvent)new WatermarkAlignmentEvent(5L));
        Assertions.assertThat(sourceReader.pausedSplits).isEmpty();
        operator.emitNext(dataOutput);
        operator.emitNext(dataOutput);
        Assertions.assertThat(sourceReader.pausedSplits).containsExactly((Object[])new String[]{"0"});
        operator.emitNext(dataOutput);
        Assertions.assertThat(sourceReader.pausedSplits).containsExactly((Object[])new String[]{"0", "1"});
    }

    private Environment getTestingEnvironment() {
        return new StreamMockEnvironment(new Configuration(), new Configuration(), new ExecutionConfig(), 1L, new MockInputSplitProvider(), 1, (TaskStateManager)new TestTaskStateManager());
    }

    private static class SplitAligningSourceReader
    extends MockSourceReader {
        Set<String> pausedSplits = new HashSet<String>();

        public SplitAligningSourceReader() {
            super(MockSourceReader.WaitingForSplits.DO_NOT_WAIT_FOR_SPLITS, false, true);
        }

        public void pauseOrResumeSplits(Collection<String> splitsToPause, Collection<String> splitsToResume) {
            this.pausedSplits.removeAll(splitsToResume);
            this.pausedSplits.addAll(splitsToPause);
        }
    }
}

