/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.source.coordinator;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Supplier;
import org.apache.flink.api.common.eventtime.WatermarkAlignmentParams;
import org.apache.flink.api.connector.source.Source;
import org.apache.flink.api.connector.source.SourceSplit;
import org.apache.flink.api.connector.source.mocks.MockSourceSplit;
import org.apache.flink.api.connector.source.mocks.MockSourceSplitSerializer;
import org.apache.flink.api.connector.source.mocks.MockSplitEnumeratorCheckpointSerializer;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.core.testutils.CommonTestUtils;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.operators.coordination.CoordinatorStore;
import org.apache.flink.runtime.operators.coordination.CoordinatorStoreImpl;
import org.apache.flink.runtime.operators.coordination.EventReceivingTasks;
import org.apache.flink.runtime.operators.coordination.MockOperatorCoordinatorContext;
import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.runtime.source.coordinator.SourceCoordinator;
import org.apache.flink.runtime.source.coordinator.SourceCoordinatorContext;
import org.apache.flink.runtime.source.coordinator.SourceCoordinatorProvider;
import org.apache.flink.runtime.source.coordinator.SplitAssignmentTracker;
import org.apache.flink.runtime.source.coordinator.TestingSplitEnumerator;
import org.apache.flink.runtime.source.event.AddSplitEvent;
import org.apache.flink.runtime.source.event.ReaderRegistrationEvent;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.concurrent.ExecutorThreadFactory;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.ObjectAssert;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;

abstract class SourceCoordinatorTestBase {
    protected static final String OPERATOR_NAME = "TestOperator";
    protected static final OperatorID TEST_OPERATOR_ID = new OperatorID(1234L, 5678L);
    protected static final int NUM_SUBTASKS = 3;
    protected boolean supportsConcurrentExecutionAttempts = false;
    protected EventReceivingTasks receivingTasks;
    protected MockOperatorCoordinatorContext operatorCoordinatorContext;
    protected SourceCoordinatorProvider.CoordinatorExecutorThreadFactory coordinatorThreadFactory;
    protected ScheduledExecutorService coordinatorExecutor;
    protected SplitAssignmentTracker<MockSourceSplit> splitSplitAssignmentTracker;
    protected SourceCoordinatorContext<MockSourceSplit> context;
    protected SourceCoordinator<MockSourceSplit, Set<MockSourceSplit>> sourceCoordinator;
    private TestingSplitEnumerator<MockSourceSplit> enumerator;

    SourceCoordinatorTestBase() {
    }

    @BeforeEach
    void setup() {
        this.receivingTasks = EventReceivingTasks.createForRunningTasks();
        this.operatorCoordinatorContext = new MockOperatorCoordinatorContext(TEST_OPERATOR_ID, 3);
        this.splitSplitAssignmentTracker = new SplitAssignmentTracker();
        String coordinatorThreadName = TEST_OPERATOR_ID.toHexString();
        this.coordinatorThreadFactory = new SourceCoordinatorProvider.CoordinatorExecutorThreadFactory(coordinatorThreadName, (OperatorCoordinator.Context)this.operatorCoordinatorContext);
        this.coordinatorExecutor = Executors.newScheduledThreadPool(1, (ThreadFactory)this.coordinatorThreadFactory);
        this.sourceCoordinator = this.getNewSourceCoordinator();
        this.context = this.sourceCoordinator.getContext();
    }

    @AfterEach
    void cleanUp() throws InterruptedException, TimeoutException {
        this.coordinatorExecutor.shutdown();
        if (!this.coordinatorExecutor.awaitTermination(10L, TimeUnit.SECONDS)) {
            throw new TimeoutException("Failed to close the CoordinatorExecutor before timeout.");
        }
    }

    protected TestingSplitEnumerator<MockSourceSplit> getEnumerator() {
        if (this.enumerator == null) {
            this.enumerator = (TestingSplitEnumerator)this.sourceCoordinator.getEnumerator();
            ((ObjectAssert)Assertions.assertThat(this.enumerator).as("source was not started", new Object[0])).isNotNull();
        }
        return this.enumerator;
    }

    protected void sourceReady() throws Exception {
        this.sourceCoordinator.start();
        this.setAllReaderTasksReady(this.sourceCoordinator);
    }

    protected void setAllReaderTasksReady(SourceCoordinator<?, ?> sourceCoordinator) {
        for (int i = 0; i < 3; ++i) {
            this.setReaderTaskReady(sourceCoordinator, i, 0);
        }
    }

    protected void setReaderTaskReady(SourceCoordinator<?, ?> sourceCoordinator, int subtask, int attemptNumber) {
        sourceCoordinator.executionAttemptReady(subtask, attemptNumber, this.receivingTasks.createGatewayForSubtask(subtask, attemptNumber));
    }

    protected void addTestingSplitSet(int num) {
        ArrayList<MockSourceSplit> splits = new ArrayList<MockSourceSplit>();
        for (int i = 0; i < num; ++i) {
            splits.add(new MockSourceSplit(i));
        }
        this.getEnumerator().addNewSplits((Collection<MockSourceSplit>)splits);
    }

    protected void registerReader(int subtask) {
        this.registerReader(subtask, 0);
    }

    protected void registerReader(int subtask, int attemptNumber) {
        this.sourceCoordinator.handleEventFromOperator(subtask, attemptNumber, (OperatorEvent)new ReaderRegistrationEvent(subtask, SourceCoordinatorTestBase.createLocationFor(subtask, attemptNumber)));
    }

    static String createLocationFor(int subtask, int attemptNumber) {
        return String.format("location_%d_%d", subtask, attemptNumber);
    }

    protected void waitForCoordinatorToProcessActions() {
        CompletableFuture future = new CompletableFuture();
        this.context.runInCoordinatorThread(() -> future.complete(null));
        try {
            future.get();
        }
        catch (InterruptedException e) {
            throw new AssertionError((Object)"test interrupted");
        }
        catch (ExecutionException e) {
            ExceptionUtils.rethrow((Throwable)ExceptionUtils.stripExecutionException((Throwable)e));
        }
    }

    void waitForSentEvents(int expectedEventNumber) throws Exception {
        SourceCoordinatorTestBase.waitUtilNumberReached(() -> this.receivingTasks.getNumberOfSentEvents(), expectedEventNumber);
    }

    static void waitUtilNumberReached(Supplier<Integer> numberSupplier, int expectedNumber) throws Exception {
        CommonTestUtils.waitUtil(() -> (Integer)numberSupplier.get() == expectedNumber, (Duration)Duration.ofDays(1L), (String)"Not reach expected number within timeout.");
    }

    static <SplitT extends SourceSplit> void assertAddSplitEvent(OperatorEvent event, List<SplitT> expectedSplits) throws Exception {
        Assertions.assertThat((Object)event).isInstanceOf(AddSplitEvent.class);
        List splits = ((AddSplitEvent)event).splits((SimpleVersionedSerializer)new MockSourceSplitSerializer());
        Assertions.assertThat((List)splits).isEqualTo(expectedSplits);
    }

    protected SourceCoordinator<MockSourceSplit, Set<MockSourceSplit>> getNewSourceCoordinator() {
        return this.getNewSourceCoordinator(WatermarkAlignmentParams.WATERMARK_ALIGNMENT_DISABLED);
    }

    protected SourceCoordinator<MockSourceSplit, Set<MockSourceSplit>> getNewSourceCoordinator(WatermarkAlignmentParams watermarkAlignmentParams) {
        Source<Integer, MockSourceSplit, Set<MockSourceSplit>> mockSource = this.createMockSource();
        return new SourceCoordinator(OPERATOR_NAME, mockSource, this.getNewSourceCoordinatorContext(), (CoordinatorStore)new CoordinatorStoreImpl(), watermarkAlignmentParams, null);
    }

    Source<Integer, MockSourceSplit, Set<MockSourceSplit>> createMockSource() {
        return TestingSplitEnumerator.factorySource(new MockSourceSplitSerializer(), new MockSplitEnumeratorCheckpointSerializer());
    }

    protected SourceCoordinatorContext<MockSourceSplit> getNewSourceCoordinatorContext() {
        return new SourceCoordinatorContext(this.coordinatorExecutor, Executors.newScheduledThreadPool(1, (ThreadFactory)new ExecutorThreadFactory(this.coordinatorThreadFactory.getCoordinatorThreadName() + "-worker")), this.coordinatorThreadFactory, (OperatorCoordinator.Context)this.operatorCoordinatorContext, (SimpleVersionedSerializer)new MockSourceSplitSerializer(), this.splitSplitAssignmentTracker, this.supportsConcurrentExecutionAttempts);
    }
}

