/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.jobmanager;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.Executor;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobmanager.JobGraphStore;
import org.apache.flink.runtime.jobmanager.JobGraphStoreUtil;
import org.apache.flink.runtime.jobmanager.JobGraphStoreWatcher;
import org.apache.flink.runtime.persistence.ResourceVersion;
import org.apache.flink.runtime.persistence.StateHandleStore;
import org.apache.flink.runtime.state.RetrievableStateHandle;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.function.ThrowingRunnable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DefaultJobGraphStore<R extends ResourceVersion<R>>
implements JobGraphStore,
JobGraphStore.JobGraphListener {
    private static final Logger LOG = LoggerFactory.getLogger(DefaultJobGraphStore.class);
    private final Object lock = new Object();
    @GuardedBy(value="lock")
    private final Set<JobID> addedJobGraphs = new HashSet<JobID>();
    private final StateHandleStore<JobGraph, R> jobGraphStateHandleStore;
    @GuardedBy(value="lock")
    private final JobGraphStoreWatcher jobGraphStoreWatcher;
    private final JobGraphStoreUtil jobGraphStoreUtil;
    @GuardedBy(value="lock")
    private JobGraphStore.JobGraphListener jobGraphListener;
    @GuardedBy(value="lock")
    private volatile boolean running;

    public DefaultJobGraphStore(StateHandleStore<JobGraph, R> stateHandleStore, JobGraphStoreWatcher jobGraphStoreWatcher, JobGraphStoreUtil jobGraphStoreUtil) {
        this.jobGraphStateHandleStore = (StateHandleStore)Preconditions.checkNotNull(stateHandleStore);
        this.jobGraphStoreWatcher = (JobGraphStoreWatcher)Preconditions.checkNotNull((Object)jobGraphStoreWatcher);
        this.jobGraphStoreUtil = (JobGraphStoreUtil)Preconditions.checkNotNull((Object)jobGraphStoreUtil);
        this.running = false;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void start(JobGraphStore.JobGraphListener jobGraphListener) throws Exception {
        Object object = this.lock;
        synchronized (object) {
            if (!this.running) {
                this.jobGraphListener = (JobGraphStore.JobGraphListener)Preconditions.checkNotNull((Object)jobGraphListener);
                this.jobGraphStoreWatcher.start(this);
                this.running = true;
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void stop() throws Exception {
        Object object = this.lock;
        synchronized (object) {
            if (this.running) {
                this.running = false;
                LOG.info("Stopping DefaultJobGraphStore.");
                Exception exception = null;
                try {
                    this.jobGraphStateHandleStore.releaseAll();
                }
                catch (Exception e) {
                    exception = e;
                }
                try {
                    this.jobGraphStoreWatcher.stop();
                }
                catch (Exception e) {
                    exception = (Exception)ExceptionUtils.firstOrSuppressed((Throwable)e, (Throwable)exception);
                }
                if (exception != null) {
                    throw new FlinkException("Could not properly stop the DefaultJobGraphStore.", (Throwable)exception);
                }
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Loose catch block
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    @Override
    @Nullable
    public JobGraph recoverJobGraph(JobID jobId) throws Exception {
        Preconditions.checkNotNull((Object)jobId, (String)"Job ID");
        LOG.debug("Recovering job graph {} from {}.", (Object)jobId, this.jobGraphStateHandleStore);
        String name = this.jobGraphStoreUtil.jobIDToName(jobId);
        Object object = this.lock;
        synchronized (object) {
            JobGraph jobGraph;
            boolean success;
            block13: {
                RetrievableStateHandle<JobGraph> jobGraphRetrievableStateHandle;
                this.verifyIsRunning();
                success = false;
                try {
                    jobGraphRetrievableStateHandle = this.jobGraphStateHandleStore.getAndLock(name);
                }
                catch (StateHandleStore.NotExistException ignored) {
                    success = true;
                    JobGraph jobGraph2 = null;
                    if (success) return jobGraph2;
                    this.jobGraphStateHandleStore.release(name);
                    return jobGraph2;
                }
                catch (Exception e) {
                    throw new FlinkException("Could not retrieve the submitted job graph state handle for " + name + " from the submitted job graph store.", (Throwable)e);
                }
                jobGraph = jobGraphRetrievableStateHandle.retrieveState();
                break block13;
                catch (ClassNotFoundException cnfe) {
                    throw new FlinkException("Could not retrieve submitted JobGraph from state handle under " + name + ". This indicates that you are trying to recover from state written by an older Flink version which is not compatible. Try cleaning the state handle store.", (Throwable)cnfe);
                }
                catch (IOException ioe) {
                    throw new FlinkException("Could not retrieve submitted JobGraph from state handle under " + name + ". This indicates that the retrieved state handle is broken. Try cleaning the state handle store.", (Throwable)ioe);
                }
            }
            this.addedJobGraphs.add(jobGraph.getJobID());
            LOG.info("Recovered {}.", (Object)jobGraph);
            success = true;
            return jobGraph;
            finally {
                if (!success) {
                    this.jobGraphStateHandleStore.release(name);
                }
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void putJobGraph(JobGraph jobGraph) throws Exception {
        Preconditions.checkNotNull((Object)jobGraph, (String)"Job graph");
        JobID jobID = jobGraph.getJobID();
        String name = this.jobGraphStoreUtil.jobIDToName(jobID);
        LOG.debug("Adding job graph {} to {}.", (Object)jobID, this.jobGraphStateHandleStore);
        boolean success = false;
        while (!success) {
            Object object = this.lock;
            synchronized (object) {
                this.verifyIsRunning();
                R currentVersion = this.jobGraphStateHandleStore.exists(name);
                if (!currentVersion.isExisting()) {
                    try {
                        this.jobGraphStateHandleStore.addAndLock(name, jobGraph);
                        this.addedJobGraphs.add(jobID);
                        success = true;
                    }
                    catch (StateHandleStore.AlreadyExistException ignored) {
                        LOG.warn("{} already exists in {}.", (Object)jobGraph, this.jobGraphStateHandleStore);
                    }
                } else if (this.addedJobGraphs.contains(jobID)) {
                    try {
                        this.jobGraphStateHandleStore.replace(name, currentVersion, jobGraph);
                        LOG.info("Updated {} in {}.", (Object)jobGraph, (Object)this.getClass().getSimpleName());
                        success = true;
                    }
                    catch (StateHandleStore.NotExistException ignored) {
                        LOG.warn("{} does not exists in {}.", (Object)jobGraph, this.jobGraphStateHandleStore);
                    }
                } else {
                    throw new IllegalStateException("Trying to update a graph you didn't #getAllSubmittedJobGraphs() or #putJobGraph() yourself before.");
                }
            }
        }
        LOG.info("Added {} to {}.", (Object)jobGraph, this.jobGraphStateHandleStore);
    }

    @Override
    public CompletableFuture<Void> globalCleanupAsync(JobID jobId, Executor executor) {
        Preconditions.checkNotNull((Object)jobId, (String)"Job ID");
        return this.runAsyncWithLockAssertRunning((ThrowingRunnable<Exception>)((ThrowingRunnable)() -> {
            LOG.debug("Removing job graph {} from {}.", (Object)jobId, this.jobGraphStateHandleStore);
            String name = this.jobGraphStoreUtil.jobIDToName(jobId);
            this.releaseAndRemoveOrThrowCompletionException(jobId, name);
            this.addedJobGraphs.remove(jobId);
            LOG.info("Removed job graph {} from {}.", (Object)jobId, this.jobGraphStateHandleStore);
        }), executor);
    }

    @GuardedBy(value="lock")
    private void releaseAndRemoveOrThrowCompletionException(JobID jobId, String jobName) {
        boolean success;
        try {
            success = this.jobGraphStateHandleStore.releaseAndTryRemove(jobName);
        }
        catch (Exception e) {
            throw new CompletionException(e);
        }
        if (!success) {
            throw new CompletionException((Throwable)new FlinkException(String.format("Could not remove job graph with job id %s from %s.", jobId, this.jobGraphStateHandleStore)));
        }
    }

    @Override
    public CompletableFuture<Void> localCleanupAsync(JobID jobId, Executor executor) {
        Preconditions.checkNotNull((Object)jobId, (String)"Job ID");
        return this.runAsyncWithLockAssertRunning((ThrowingRunnable<Exception>)((ThrowingRunnable)() -> {
            LOG.debug("Releasing job graph {} from {}.", (Object)jobId, this.jobGraphStateHandleStore);
            this.jobGraphStateHandleStore.release(this.jobGraphStoreUtil.jobIDToName(jobId));
            this.addedJobGraphs.remove(jobId);
            LOG.info("Released job graph {} from {}.", (Object)jobId, this.jobGraphStateHandleStore);
        }), executor);
    }

    private CompletableFuture<Void> runAsyncWithLockAssertRunning(ThrowingRunnable<Exception> runnable, Executor executor) {
        return CompletableFuture.runAsync(() -> {
            Object object = this.lock;
            synchronized (object) {
                this.verifyIsRunning();
                try {
                    runnable.run();
                }
                catch (Exception e) {
                    throw new CompletionException(e);
                }
            }
        }, executor);
    }

    @Override
    public Collection<JobID> getJobIds() throws Exception {
        Collection<String> names;
        LOG.debug("Retrieving all stored job ids from {}.", this.jobGraphStateHandleStore);
        try {
            names = this.jobGraphStateHandleStore.getAllHandles();
        }
        catch (Exception e) {
            throw new Exception("Failed to retrieve all job ids from " + this.jobGraphStateHandleStore + ".", e);
        }
        ArrayList<JobID> jobIds = new ArrayList<JobID>(names.size());
        for (String name : names) {
            try {
                jobIds.add(this.jobGraphStoreUtil.nameToJobID(name));
            }
            catch (Exception exception) {
                LOG.warn("Could not parse job id from {}. This indicates a malformed name.", (Object)name, (Object)exception);
            }
        }
        LOG.info("Retrieved job ids {} from {}", jobIds, this.jobGraphStateHandleStore);
        return jobIds;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void onAddedJobGraph(JobID jobId) {
        Object object = this.lock;
        synchronized (object) {
            if (this.running && !this.addedJobGraphs.contains(jobId)) {
                try {
                    this.jobGraphListener.onAddedJobGraph(jobId);
                }
                catch (Throwable t) {
                    LOG.error("Failed to notify job graph listener onAddedJobGraph event for {}", (Object)jobId, (Object)t);
                }
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void onRemovedJobGraph(JobID jobId) {
        Object object = this.lock;
        synchronized (object) {
            if (this.running && this.addedJobGraphs.contains(jobId)) {
                try {
                    this.jobGraphListener.onRemovedJobGraph(jobId);
                }
                catch (Throwable t) {
                    LOG.error("Failed to notify job graph listener onRemovedJobGraph event for {}", (Object)jobId, (Object)t);
                }
            }
        }
    }

    private void verifyIsRunning() {
        Preconditions.checkState((boolean)this.running, (Object)"Not running. Forgot to call start()?");
    }
}

