/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.util;

import java.util.Arrays;
import java.util.concurrent.TimeoutException;
import java.util.function.BooleanSupplier;
import javax.annotation.Nullable;
import org.apache.flink.api.common.state.CheckpointListener;
import org.apache.flink.streaming.api.functions.source.legacy.SourceFunction;
import org.apache.flink.util.Preconditions;

public class FiniteTestSource<T>
implements SourceFunction<T>,
CheckpointListener {
    private static final long serialVersionUID = 1L;
    private final Iterable<T> elements;
    private volatile boolean running = true;
    private transient int numCheckpointsComplete;
    @Nullable
    private final BooleanSupplier couldExit;
    private final long waitTimeOut;

    @SafeVarargs
    public FiniteTestSource(T ... elements) {
        this(null, 0L, Arrays.asList(elements));
    }

    public FiniteTestSource(Iterable<T> elements) {
        this(null, 0L, elements);
    }

    public FiniteTestSource(@Nullable BooleanSupplier couldExit, long waitTimeOut, Iterable<T> elements) {
        Preconditions.checkState((waitTimeOut >= 0L ? 1 : 0) != 0);
        this.couldExit = couldExit;
        this.waitTimeOut = waitTimeOut;
        this.elements = elements;
    }

    public FiniteTestSource(@Nullable BooleanSupplier couldExit, Iterable<T> elements) {
        this.couldExit = couldExit;
        this.waitTimeOut = 30000L;
        this.elements = elements;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void run(SourceFunction.SourceContext<T> ctx) throws Exception {
        this.emitElementsAndWaitForCheckpoints(ctx, 2);
        this.emitElementsAndWaitForCheckpoints(ctx, 2);
        if (this.couldExit != null) {
            long beginTime = System.currentTimeMillis();
            Object object = ctx.getCheckpointLock();
            synchronized (object) {
                while (this.running && !this.couldExit.getAsBoolean()) {
                    ctx.getCheckpointLock().wait(10L);
                    if (System.currentTimeMillis() - beginTime <= this.waitTimeOut) continue;
                    throw new TimeoutException("Wait source exit time out " + this.waitTimeOut + "ms.");
                }
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void emitElementsAndWaitForCheckpoints(SourceFunction.SourceContext<T> ctx, int checkpointsToWaitFor) throws InterruptedException {
        int checkpointToAwait;
        Object lock;
        Object object = lock = ctx.getCheckpointLock();
        synchronized (object) {
            checkpointToAwait = this.numCheckpointsComplete + checkpointsToWaitFor;
            for (T t : this.elements) {
                ctx.collect(t);
            }
        }
        object = lock;
        synchronized (object) {
            while (this.running && this.numCheckpointsComplete < checkpointToAwait) {
                lock.wait(1L);
            }
        }
    }

    public void cancel() {
        this.running = false;
    }

    public void notifyCheckpointComplete(long checkpointId) throws Exception {
        ++this.numCheckpointsComplete;
    }

    public void notifyCheckpointAborted(long checkpointId) {
    }
}

