/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.api.connector.source.mocks;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import javax.annotation.concurrent.GuardedBy;
import org.apache.flink.api.connector.source.ReaderOutput;
import org.apache.flink.api.connector.source.SourceEvent;
import org.apache.flink.api.connector.source.SourceReader;
import org.apache.flink.api.connector.source.mocks.MockSourceSplit;
import org.apache.flink.core.io.InputStatus;

public class MockSourceReader
implements SourceReader<Integer, MockSourceSplit> {
    private final List<MockSourceSplit> assignedSplits = new ArrayList<MockSourceSplit>();
    private final List<SourceEvent> receivedSourceEvents = new ArrayList<SourceEvent>();
    private final List<Long> completedCheckpoints = new ArrayList<Long>();
    private final List<Long> abortedCheckpoints = new ArrayList<Long>();
    private final boolean markIdleOnNoSplits;
    private final boolean usePerSplitOutputs;
    private int currentSplitIndex = 0;
    private boolean started = false;
    private int timesClosed = 0;
    private final WaitingForSplits waitingForSplitsBehaviour;
    private SplitsAssignmentState splitsAssignmentState = SplitsAssignmentState.NO_SPLITS_ASSIGNED;
    private boolean idle = false;
    @GuardedBy(value="this")
    private CompletableFuture<Void> availableFuture = CompletableFuture.completedFuture(null);

    public MockSourceReader() {
        this(false, false);
    }

    public MockSourceReader(boolean waitingForMoreSplits, boolean markIdleOnNoSplits) {
        this(waitingForMoreSplits ? WaitingForSplits.WAIT_UNTIL_ALL_SPLITS_ASSIGNED : WaitingForSplits.DO_NOT_WAIT_FOR_SPLITS, markIdleOnNoSplits);
    }

    public MockSourceReader(WaitingForSplits waitingForSplitsBehaviour, boolean markIdleOnNoSplits) {
        this(waitingForSplitsBehaviour, markIdleOnNoSplits, false);
    }

    public MockSourceReader(WaitingForSplits waitingForSplitsBehaviour, boolean markIdleOnNoSplits, boolean usePerSplitOutputs) {
        this.waitingForSplitsBehaviour = waitingForSplitsBehaviour;
        this.markIdleOnNoSplits = markIdleOnNoSplits;
        this.usePerSplitOutputs = usePerSplitOutputs;
    }

    public void start() {
        this.started = true;
    }

    public InputStatus pollNext(ReaderOutput<Integer> sourceOutput) throws Exception {
        if (this.waitingForSplitsBehaviour == WaitingForSplits.WAIT_FOR_INITIAL && this.splitsAssignmentState == SplitsAssignmentState.NO_SPLITS_ASSIGNED) {
            this.markUnavailable();
            return InputStatus.NOTHING_AVAILABLE;
        }
        boolean finished = this.splitsAssignmentState == SplitsAssignmentState.NO_MORE_SPLITS || this.waitingForSplitsBehaviour == WaitingForSplits.DO_NOT_WAIT_FOR_SPLITS;
        this.currentSplitIndex = 0;
        while (this.currentSplitIndex < this.assignedSplits.size() && !this.assignedSplits.get(this.currentSplitIndex).isAvailable()) {
            finished &= this.assignedSplits.get(this.currentSplitIndex).isFinished();
            ++this.currentSplitIndex;
        }
        if (this.currentSplitIndex < this.assignedSplits.size()) {
            if (this.idle) {
                sourceOutput.markActive();
            }
            MockSourceSplit sourceSplit = this.assignedSplits.get(this.currentSplitIndex);
            int record = sourceSplit.getNext(false)[0];
            if (this.usePerSplitOutputs) {
                sourceOutput.createOutputForSplit(sourceSplit.splitId()).collect((Object)record);
            } else {
                sourceOutput.collect((Object)record);
            }
            return InputStatus.MORE_AVAILABLE;
        }
        if (finished) {
            return InputStatus.END_OF_INPUT;
        }
        if (this.markIdleOnNoSplits) {
            this.idle = true;
            sourceOutput.markIdle();
        }
        this.markUnavailable();
        return InputStatus.NOTHING_AVAILABLE;
    }

    public List<MockSourceSplit> snapshotState(long checkpointId) {
        return this.assignedSplits;
    }

    public synchronized CompletableFuture<Void> isAvailable() {
        return this.availableFuture;
    }

    public void addSplits(List<MockSourceSplit> splits) {
        if (this.splitsAssignmentState == SplitsAssignmentState.NO_SPLITS_ASSIGNED) {
            this.splitsAssignmentState = SplitsAssignmentState.INITIAL_SPLITS_ASSIGNED;
        }
        this.assignedSplits.addAll(splits);
        this.markAvailable();
    }

    public void notifyNoMoreSplits() {
        this.splitsAssignmentState = SplitsAssignmentState.NO_MORE_SPLITS;
        this.markAvailable();
    }

    public void close() throws Exception {
        ++this.timesClosed;
    }

    public void notifyCheckpointComplete(long checkpointId) {
        this.completedCheckpoints.add(checkpointId);
    }

    public void notifyCheckpointAborted(long checkpointId) {
        this.abortedCheckpoints.add(checkpointId);
    }

    private synchronized void markUnavailable() {
        if (this.availableFuture.isDone()) {
            this.availableFuture = new CompletableFuture();
        }
    }

    public void handleSourceEvents(SourceEvent sourceEvent) {
        this.receivedSourceEvents.add(sourceEvent);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void markAvailable() {
        CompletableFuture<Void> toNotify = null;
        MockSourceReader mockSourceReader = this;
        synchronized (mockSourceReader) {
            if (!this.availableFuture.isDone()) {
                toNotify = this.availableFuture;
            }
        }
        if (toNotify != null) {
            toNotify.complete(null);
        }
    }

    public boolean isStarted() {
        return this.started;
    }

    public boolean isClosed() {
        return this.timesClosed > 0;
    }

    public int getTimesClosed() {
        return this.timesClosed;
    }

    public List<MockSourceSplit> getAssignedSplits() {
        return this.assignedSplits;
    }

    public List<SourceEvent> getReceivedSourceEvents() {
        return this.receivedSourceEvents;
    }

    public List<Long> getCompletedCheckpoints() {
        return this.completedCheckpoints;
    }

    public List<Long> getAbortedCheckpoints() {
        return this.abortedCheckpoints;
    }

    private static enum SplitsAssignmentState {
        NO_SPLITS_ASSIGNED,
        INITIAL_SPLITS_ASSIGNED,
        NO_MORE_SPLITS;

    }

    public static enum WaitingForSplits {
        WAIT_FOR_INITIAL,
        WAIT_UNTIL_ALL_SPLITS_ASSIGNED,
        DO_NOT_WAIT_FOR_SPLITS;

    }
}

