/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.api.connector.source.mocks;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import javax.annotation.Nonnull;
import org.apache.flink.api.connector.source.ReaderInfo;
import org.apache.flink.api.connector.source.SourceEvent;
import org.apache.flink.api.connector.source.SourceSplit;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.flink.api.connector.source.SplitsAssignment;
import org.apache.flink.api.connector.source.SupportsIntermediateNoMoreSplits;
import org.apache.flink.metrics.groups.SplitEnumeratorMetricGroup;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.flink.util.ThrowableCatchingRunnable;

public class MockSplitEnumeratorContext<SplitT extends SourceSplit>
implements SplitEnumeratorContext<SplitT>,
SupportsIntermediateNoMoreSplits,
AutoCloseable {
    private final Map<Integer, List<SourceEvent>> sentSourceEvent = new HashMap<Integer, List<SourceEvent>>();
    private final ConcurrentMap<Integer, ReaderInfo> registeredReaders = new ConcurrentHashMap<Integer, ReaderInfo>();
    private final List<SplitsAssignment<SplitT>> splitsAssignmentSequence = new ArrayList<SplitsAssignment<SplitT>>();
    private final ExecutorService workerExecutor;
    private final ExecutorService mainExecutor;
    private final TestingExecutorThreadFactory mainThreadFactory;
    private final AtomicReference<Throwable> errorInWorkerThread;
    private final AtomicReference<Throwable> errorInMainThread;
    private final BlockingQueue<Callable<Future<?>>> oneTimeCallables;
    private final List<Callable<Future<?>>> periodicCallables;
    private final AtomicBoolean stoppedAcceptAsyncCalls;
    private final boolean[] subtaskHasNoMoreSplits;
    private final int parallelism;

    public MockSplitEnumeratorContext(int parallelism) {
        this.parallelism = parallelism;
        this.errorInWorkerThread = new AtomicReference();
        this.errorInMainThread = new AtomicReference();
        this.oneTimeCallables = new ArrayBlockingQueue(100);
        this.periodicCallables = Collections.synchronizedList(new ArrayList());
        this.mainThreadFactory = MockSplitEnumeratorContext.getThreadFactory("SplitEnumerator-main", this.errorInMainThread);
        this.workerExecutor = MockSplitEnumeratorContext.getExecutor(MockSplitEnumeratorContext.getThreadFactory("SplitEnumerator-worker", this.errorInWorkerThread));
        this.mainExecutor = MockSplitEnumeratorContext.getExecutor(this.mainThreadFactory);
        this.stoppedAcceptAsyncCalls = new AtomicBoolean(false);
        this.subtaskHasNoMoreSplits = new boolean[parallelism];
    }

    public SplitEnumeratorMetricGroup metricGroup() {
        return UnregisteredMetricsGroup.createSplitEnumeratorMetricGroup();
    }

    public void sendEventToSourceReader(int subtaskId, SourceEvent event) {
        try {
            if (!this.mainThreadFactory.isCurrentThreadMainExecutorThread()) {
                this.mainExecutor.submit(() -> this.sentSourceEvent.computeIfAbsent(subtaskId, id -> new ArrayList()).add(event)).get();
            } else {
                this.sentSourceEvent.computeIfAbsent(subtaskId, id -> new ArrayList()).add(event);
            }
        }
        catch (Exception e) {
            throw new RuntimeException("Failed to assign splits", e);
        }
    }

    public int currentParallelism() {
        return this.parallelism;
    }

    public Map<Integer, ReaderInfo> registeredReaders() {
        return this.registeredReaders;
    }

    public void assignSplits(SplitsAssignment<SplitT> newSplitAssignments) {
        this.splitsAssignmentSequence.add(newSplitAssignments);
    }

    public void signalNoMoreSplits(int subtask) {
        this.subtaskHasNoMoreSplits[subtask] = true;
    }

    public void signalIntermediateNoMoreSplits(int subtask) {
    }

    public void resetNoMoreSplits(int subtask) {
        this.subtaskHasNoMoreSplits[subtask] = false;
    }

    public <T> void callAsync(Callable<T> callable, BiConsumer<T, Throwable> handler) {
        if (this.stoppedAcceptAsyncCalls.get()) {
            return;
        }
        this.oneTimeCallables.add(() -> this.workerExecutor.submit((Runnable)MockSplitEnumeratorContext.wrap(this.errorInWorkerThread, () -> {
            try {
                Object result = callable.call();
                this.mainExecutor.submit((Runnable)MockSplitEnumeratorContext.wrap(this.errorInMainThread, () -> handler.accept(result, null))).get();
            }
            catch (Throwable t) {
                handler.accept(null, t);
            }
        })));
    }

    public <T> void callAsync(Callable<T> callable, BiConsumer<T, Throwable> handler, long initialDelay, long period) {
        if (this.stoppedAcceptAsyncCalls.get()) {
            return;
        }
        this.periodicCallables.add(() -> this.workerExecutor.submit((Runnable)MockSplitEnumeratorContext.wrap(this.errorInWorkerThread, () -> {
            try {
                Object result = callable.call();
                this.mainExecutor.submit((Runnable)MockSplitEnumeratorContext.wrap(this.errorInMainThread, () -> handler.accept(result, null))).get();
            }
            catch (Throwable t) {
                handler.accept(null, t);
            }
        })));
    }

    public void runInCoordinatorThread(Runnable runnable) {
        this.mainExecutor.execute(runnable);
    }

    public void setIsProcessingBacklog(boolean isProcessingBacklog) {
    }

    @Override
    public void close() throws Exception {
        this.stoppedAcceptAsyncCalls.set(true);
        this.workerExecutor.shutdownNow();
        this.mainExecutor.shutdownNow();
    }

    public void runNextOneTimeCallable() throws Throwable {
        this.oneTimeCallables.take().call().get();
        this.checkError();
    }

    public void runPeriodicCallable(int index) throws Throwable {
        this.periodicCallables.get(index).call().get();
        this.checkError();
    }

    public Map<Integer, List<SourceEvent>> getSentSourceEvent() throws Exception {
        return this.workerExecutor.submit(() -> new HashMap<Integer, List<SourceEvent>>(this.sentSourceEvent)).get();
    }

    public void registerReader(ReaderInfo readerInfo) {
        this.registeredReaders.put(readerInfo.getSubtaskId(), readerInfo);
    }

    public void unregisterReader(int readerId) {
        this.registeredReaders.remove(readerId);
    }

    public List<Callable<Future<?>>> getPeriodicCallables() {
        return this.periodicCallables;
    }

    public BlockingQueue<Callable<Future<?>>> getOneTimeCallables() {
        return this.oneTimeCallables;
    }

    public List<SplitsAssignment<SplitT>> getSplitsAssignmentSequence() {
        return this.splitsAssignmentSequence;
    }

    public boolean hasNoMoreSplits(int subtaskIndex) {
        return this.subtaskHasNoMoreSplits[subtaskIndex];
    }

    private void checkError() throws Throwable {
        if (this.errorInMainThread.get() != null) {
            throw this.errorInMainThread.get();
        }
        if (this.errorInWorkerThread.get() != null) {
            throw this.errorInWorkerThread.get();
        }
    }

    private static TestingExecutorThreadFactory getThreadFactory(String threadName, AtomicReference<Throwable> error) {
        return new TestingExecutorThreadFactory(threadName, error);
    }

    private static ExecutorService getExecutor(TestingExecutorThreadFactory threadFactory) {
        return Executors.newSingleThreadScheduledExecutor(threadFactory);
    }

    private static ThrowableCatchingRunnable wrap(AtomicReference<Throwable> error, Runnable r) {
        return new ThrowableCatchingRunnable(t -> {
            if (!error.compareAndSet((Throwable)null, (Throwable)t)) {
                ((Throwable)error.get()).addSuppressed((Throwable)t);
            }
        }, r);
    }

    public static class TestingExecutorThreadFactory
    implements ThreadFactory {
        private final String coordinatorThreadName;
        private final AtomicReference<Throwable> error;
        private Thread t;

        TestingExecutorThreadFactory(String coordinatorThreadName, AtomicReference<Throwable> error) {
            this.coordinatorThreadName = coordinatorThreadName;
            this.error = error;
            this.t = null;
        }

        @Override
        public Thread newThread(@Nonnull Runnable r) {
            if (this.t != null) {
                throw new IllegalStateException("Should never happen. This factory should only be used by a SingleThreadExecutor.");
            }
            this.t = new Thread(r, this.coordinatorThreadName);
            this.t.setUncaughtExceptionHandler((t1, e) -> {
                if (!this.error.compareAndSet(null, e)) {
                    this.error.get().addSuppressed(e);
                }
            });
            return this.t;
        }

        boolean isCurrentThreadMainExecutorThread() {
            return Thread.currentThread() == this.t;
        }
    }
}

