/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.source.coordinator;

import java.util.Random;
import org.apache.flink.api.common.eventtime.WatermarkAlignmentParams;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.Source;
import org.apache.flink.api.connector.source.mocks.MockSource;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.operators.coordination.MockOperatorCoordinatorContext;
import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.runtime.source.coordinator.CoordinatorTestUtils;
import org.apache.flink.runtime.source.coordinator.SourceCoordinator;
import org.apache.flink.runtime.source.coordinator.SourceCoordinatorProvider;
import org.apache.flink.runtime.source.event.ReportedWatermarkEvent;

public class SourceCoordinatorAlignmentBenchmark {
    private static final OperatorID OPERATOR_ID = new OperatorID(1234L, 5678L);
    private SourceCoordinator<?, ?> sourceCoordinator;
    private int numSubtasks;
    private long second;
    private long[] randomMilliseconds;

    public void setup(int numSubtasks) throws Exception {
        SourceCoordinatorProvider provider = new SourceCoordinatorProvider("SourceCoordinatorProviderTest", OPERATOR_ID, (Source)new MockSource(Boundedness.BOUNDED, 2), 1, new WatermarkAlignmentParams(1000L, "group1", Long.MAX_VALUE), null);
        this.sourceCoordinator = (SourceCoordinator)provider.getCoordinator((OperatorCoordinator.Context)new MockOperatorCoordinatorContext(OPERATOR_ID, numSubtasks));
        this.sourceCoordinator.start();
        this.numSubtasks = numSubtasks;
        this.second = 0L;
        this.randomMilliseconds = this.generateRandomMilliseconds(numSubtasks);
        this.sendReportedWatermarkToAllSubtasks();
    }

    public void teardown() throws Exception {
        this.sourceCoordinator.close();
    }

    public void sendReportedWatermarkToAllSubtasks() {
        for (int subtaskIndex = 0; subtaskIndex < this.numSubtasks; ++subtaskIndex) {
            this.sourceCoordinator.handleEventFromOperator(subtaskIndex, 0, (OperatorEvent)new ReportedWatermarkEvent(this.second + this.randomMilliseconds[subtaskIndex]));
        }
        CoordinatorTestUtils.waitForCoordinatorToProcessActions(this.sourceCoordinator.getContext());
        this.second += 100000L;
    }

    private long[] generateRandomMilliseconds(int numSubtasks) {
        Random random = new Random();
        long[] randomMilliseconds = new long[numSubtasks];
        for (int subtaskIndex = 0; subtaskIndex < numSubtasks; ++subtaskIndex) {
            randomMilliseconds[subtaskIndex] = random.nextInt(1000);
        }
        return randomMilliseconds;
    }
}

