/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.operators;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.MetricOptions;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.operators.testutils.MockEnvironment;
import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.operators.StreamSource;
import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
import org.apache.flink.streaming.runtime.tasks.OperatorChain;
import org.apache.flink.streaming.runtime.tasks.RegularOperatorChain;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
import org.apache.flink.streaming.runtime.tasks.TimerService;
import org.apache.flink.streaming.util.CollectorOutput;
import org.apache.flink.streaming.util.MockStreamTask;
import org.apache.flink.streaming.util.MockStreamTaskBuilder;
import org.apache.flink.util.TestLogger;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;

public class StreamSourceOperatorLatencyMetricsTest
extends TestLogger {
    private static final long maxProcessingTime = 100L;
    private static final long latencyMarkInterval = 10L;

    @Test
    public void testLatencyMarkEmissionDisabled() throws Exception {
        this.testLatencyMarkEmission(0, (operator, timeProvider) -> StreamSourceOperatorLatencyMetricsTest.setupSourceOperator(operator, new ExecutionConfig(), (Environment)MockEnvironment.builder().build(), (TimerService)timeProvider));
    }

    @Test
    public void testLatencyMarkEmissionEnabledViaExecutionConfig() throws Exception {
        this.testLatencyMarkEmission(11, (operator, timeProvider) -> {
            ExecutionConfig executionConfig = new ExecutionConfig();
            executionConfig.setLatencyTrackingInterval(10L);
            StreamSourceOperatorLatencyMetricsTest.setupSourceOperator(operator, executionConfig, (Environment)MockEnvironment.builder().build(), (TimerService)timeProvider);
        });
    }

    @Test
    public void testLatencyMarkEmissionEnabledViaFlinkConfig() throws Exception {
        this.testLatencyMarkEmission(11, (operator, timeProvider) -> {
            Configuration tmConfig = new Configuration();
            tmConfig.setLong(MetricOptions.LATENCY_INTERVAL, 10L);
            MockEnvironment env = MockEnvironment.builder().setTaskManagerRuntimeInfo((TaskManagerRuntimeInfo)new TestingTaskManagerRuntimeInfo(tmConfig)).build();
            StreamSourceOperatorLatencyMetricsTest.setupSourceOperator(operator, new ExecutionConfig(), (Environment)env, (TimerService)timeProvider);
        });
    }

    @Test
    public void testLatencyMarkEmissionEnabledOverrideViaExecutionConfig() throws Exception {
        this.testLatencyMarkEmission(11, (operator, timeProvider) -> {
            ExecutionConfig executionConfig = new ExecutionConfig();
            executionConfig.setLatencyTrackingInterval(10L);
            Configuration tmConfig = new Configuration();
            tmConfig.setLong(MetricOptions.LATENCY_INTERVAL, 0L);
            MockEnvironment env = MockEnvironment.builder().setTaskManagerRuntimeInfo((TaskManagerRuntimeInfo)new TestingTaskManagerRuntimeInfo(tmConfig)).build();
            StreamSourceOperatorLatencyMetricsTest.setupSourceOperator(operator, executionConfig, (Environment)env, (TimerService)timeProvider);
        });
    }

    @Test
    public void testLatencyMarkEmissionDisabledOverrideViaExecutionConfig() throws Exception {
        this.testLatencyMarkEmission(0, (operator, timeProvider) -> {
            Configuration tmConfig = new Configuration();
            tmConfig.setLong(MetricOptions.LATENCY_INTERVAL, 10L);
            MockEnvironment env = MockEnvironment.builder().setTaskManagerRuntimeInfo((TaskManagerRuntimeInfo)new TestingTaskManagerRuntimeInfo(tmConfig)).build();
            ExecutionConfig executionConfig = new ExecutionConfig();
            executionConfig.setLatencyTrackingInterval(0L);
            StreamSourceOperatorLatencyMetricsTest.setupSourceOperator(operator, executionConfig, (Environment)env, (TimerService)timeProvider);
        });
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void testLatencyMarkEmission(int numberLatencyMarkers, OperatorSetupOperation operatorSetup) throws Exception {
        ArrayList<StreamElement> output = new ArrayList<StreamElement>();
        TestProcessingTimeService testProcessingTimeService = new TestProcessingTimeService();
        testProcessingTimeService.setCurrentTime(0L);
        List<Long> processingTimes = Arrays.asList(1L, 10L, 11L, 21L, 100L);
        StreamSource operator = new StreamSource((SourceFunction)new ProcessingTimeServiceSource(testProcessingTimeService, processingTimes));
        operatorSetup.setupSourceOperator(operator, testProcessingTimeService);
        try (RegularOperatorChain operatorChain = new RegularOperatorChain(operator.getContainingTask(), StreamTask.createRecordWriterDelegate((StreamConfig)operator.getOperatorConfig(), (Environment)new MockEnvironmentBuilder().build()));){
            operator.run(new Object(), new CollectorOutput(output), (OperatorChain)operatorChain);
            operator.finish();
        }
        Assert.assertEquals((long)numberLatencyMarkers, (long)output.size());
        long timestamp = 0L;
        int expectedLatencyIndex = 0;
        for (int i = 0; i < numberLatencyMarkers; ++i) {
            StreamElement se = (StreamElement)output.get(i);
            Assert.assertTrue((boolean)se.isLatencyMarker());
            Assert.assertEquals((Object)operator.getOperatorID(), (Object)se.asLatencyMarker().getOperatorId());
            Assert.assertEquals((long)0L, (long)se.asLatencyMarker().getSubtaskIndex());
            while (timestamp > processingTimes.get(expectedLatencyIndex)) {
                ++expectedLatencyIndex;
            }
            Assert.assertEquals((long)processingTimes.get(expectedLatencyIndex), (long)se.asLatencyMarker().getMarkedTime());
            timestamp += 10L;
        }
    }

    private static <T> void setupSourceOperator(StreamSource<T, ?> operator, ExecutionConfig executionConfig, Environment env, TimerService timerService) {
        StreamConfig cfg = new StreamConfig(new Configuration());
        cfg.setStateBackend((StateBackend)new MemoryStateBackend());
        cfg.setTimeCharacteristic(TimeCharacteristic.EventTime);
        cfg.setOperatorID(new OperatorID());
        cfg.serializeAllConfigs();
        try {
            MockStreamTask mockTask = new MockStreamTaskBuilder(env).setConfig(cfg).setExecutionConfig(executionConfig).setTimerService(timerService).build();
            operator.setProcessingTimeService(mockTask.getProcessingTimeServiceFactory().createProcessingTimeService(null));
            operator.setup((StreamTask)mockTask, cfg, (Output)Mockito.mock(Output.class));
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    private static final class ProcessingTimeServiceSource
    implements SourceFunction<Long> {
        private final TestProcessingTimeService processingTimeService;
        private final List<Long> processingTimes;
        private boolean cancelled = false;

        private ProcessingTimeServiceSource(TestProcessingTimeService processingTimeService, List<Long> processingTimes) {
            this.processingTimeService = processingTimeService;
            this.processingTimes = processingTimes;
        }

        public void run(SourceFunction.SourceContext<Long> ctx) throws Exception {
            for (Long processingTime : this.processingTimes) {
                if (this.cancelled) break;
                this.processingTimeService.setCurrentTime(processingTime.longValue());
            }
        }

        public void cancel() {
            this.cancelled = true;
        }
    }

    private static interface OperatorSetupOperation {
        public void setupSourceOperator(StreamSource<Long, ?> var1, TestProcessingTimeService var2);
    }
}

