/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.api.operators.source;

import java.util.ArrayList;
import java.util.Collections;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.state.OperatorStateStore;
import org.apache.flink.api.connector.source.SourceReader;
import org.apache.flink.api.connector.source.mocks.MockSourceSplit;
import org.apache.flink.api.connector.source.mocks.MockSourceSplitSerializer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.operators.coordination.MockOperatorEventGateway;
import org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
import org.apache.flink.runtime.operators.testutils.MockEnvironment;
import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
import org.apache.flink.runtime.state.OperatorStateBackend;
import org.apache.flink.runtime.state.OperatorStateBackendParametersImpl;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateInitializationContextImpl;
import org.apache.flink.runtime.state.TaskStateManager;
import org.apache.flink.runtime.state.TestTaskStateManager;
import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
import org.apache.flink.streaming.api.operators.SourceOperator;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask;
import org.apache.flink.streaming.runtime.tasks.StreamMockEnvironment;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
import org.apache.flink.streaming.util.MockOutput;
import org.apache.flink.streaming.util.MockStreamConfig;
import org.apache.flink.streaming.util.MockStreamingRuntimeContext;

public class TestingSourceOperator<T>
extends SourceOperator<T, MockSourceSplit> {
    private static final long serialVersionUID = 1L;
    private final int subtaskIndex;
    private final int parallelism;

    public TestingSourceOperator(SourceReader<T, MockSourceSplit> reader, WatermarkStrategy<T> watermarkStrategy, ProcessingTimeService timeService, boolean emitProgressiveWatermarks) {
        this(reader, watermarkStrategy, timeService, (OperatorEventGateway)new MockOperatorEventGateway(), 1, 5, emitProgressiveWatermarks);
    }

    public TestingSourceOperator(SourceReader<T, MockSourceSplit> reader, WatermarkStrategy<T> watermarkStrategy, ProcessingTimeService timeService, OperatorEventGateway eventGateway, int subtaskIndex, int parallelism, boolean emitProgressiveWatermarks) {
        super(context -> reader, eventGateway, (SimpleVersionedSerializer)new MockSourceSplitSerializer(), watermarkStrategy, timeService, new Configuration(), "localhost", emitProgressiveWatermarks, () -> false);
        this.subtaskIndex = subtaskIndex;
        this.parallelism = parallelism;
        this.metrics = UnregisteredMetricGroups.createUnregisteredOperatorMetricGroup();
        this.initSourceMetricGroup();
        try {
            this.initReader();
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    public StreamingRuntimeContext getRuntimeContext() {
        return new MockStreamingRuntimeContext(false, this.parallelism, this.subtaskIndex);
    }

    public ExecutionConfig getExecutionConfig() {
        ExecutionConfig cfg = new ExecutionConfig();
        cfg.setAutoWatermarkInterval(100L);
        return cfg;
    }

    public static <T> SourceOperator<T, MockSourceSplit> createTestOperator(SourceReader<T, MockSourceSplit> reader, WatermarkStrategy<T> watermarkStrategy, boolean emitProgressiveWatermarks) throws Exception {
        HashMapStateBackend abstractStateBackend = new HashMapStateBackend();
        MockEnvironment env = new MockEnvironmentBuilder().build();
        CloseableRegistry cancelStreamRegistry = new CloseableRegistry();
        OperatorStateBackend operatorStateStore = abstractStateBackend.createOperatorStateBackend((StateBackend.OperatorStateBackendParameters)new OperatorStateBackendParametersImpl((Environment)env, "test-operator", Collections.emptyList(), cancelStreamRegistry));
        StateInitializationContextImpl stateContext = new StateInitializationContextImpl(null, (OperatorStateStore)operatorStateStore, null, null, null);
        TestProcessingTimeService timeService = new TestProcessingTimeService();
        timeService.setCurrentTime(Integer.MAX_VALUE);
        TestingSourceOperator<T> sourceOperator = new TestingSourceOperator<T>(reader, watermarkStrategy, (ProcessingTimeService)timeService, emitProgressiveWatermarks);
        sourceOperator.setup((StreamTask)new SourceOperatorStreamTask((Environment)new StreamMockEnvironment(new Configuration(), new Configuration(), new ExecutionConfig(), 1L, new MockInputSplitProvider(), 1, (TaskStateManager)new TestTaskStateManager())), new MockStreamConfig(new Configuration(), 1), new MockOutput(new ArrayList()));
        sourceOperator.initializeState((StateInitializationContext)stateContext);
        sourceOperator.open();
        return sourceOperator;
    }
}

