/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.yarn.client;

import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.client.AMRMClient;
import org.apache.hadoop.yarn.client.AMRMClientImpl;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.service.AbstractService;

@InterfaceStability.Unstable
@InterfaceStability.Evolving
public class AMRMClientAsync<T extends AMRMClient.ContainerRequest>
extends AbstractService {
    private static final Log LOG = LogFactory.getLog(AMRMClientAsync.class);
    private final AMRMClient<T> client;
    private final AtomicInteger heartbeatIntervalMs = new AtomicInteger();
    private final HeartbeatThread heartbeatThread;
    private final CallbackHandlerThread handlerThread;
    private final CallbackHandler handler;
    private final BlockingQueue<AllocateResponse> responseQueue;
    private final Object unregisterHeartbeatLock = new Object();
    private volatile boolean keepRunning;
    private volatile float progress;
    private volatile Exception savedException;

    public AMRMClientAsync(ApplicationAttemptId id, int intervalMs, CallbackHandler callbackHandler) {
        this(new AMRMClientImpl(id), intervalMs, callbackHandler);
    }

    @InterfaceAudience.Private
    @VisibleForTesting
    public AMRMClientAsync(AMRMClient<T> client, int intervalMs, CallbackHandler callbackHandler) {
        super(AMRMClientAsync.class.getName());
        this.client = client;
        this.heartbeatIntervalMs.set(intervalMs);
        this.handler = callbackHandler;
        this.heartbeatThread = new HeartbeatThread();
        this.handlerThread = new CallbackHandlerThread();
        this.responseQueue = new LinkedBlockingQueue<AllocateResponse>();
        this.keepRunning = true;
        this.savedException = null;
    }

    protected void serviceInit(Configuration conf) throws Exception {
        super.serviceInit(conf);
        this.client.init(conf);
    }

    protected void serviceStart() throws Exception {
        this.handlerThread.start();
        this.client.start();
        super.serviceStart();
    }

    protected void serviceStop() throws Exception {
        if (Thread.currentThread() == this.handlerThread) {
            throw new YarnRuntimeException("Cannot call stop from callback handler thread!");
        }
        this.keepRunning = false;
        try {
            this.heartbeatThread.join();
        }
        catch (InterruptedException ex) {
            LOG.error((Object)"Error joining with heartbeat thread", (Throwable)ex);
        }
        this.client.stop();
        try {
            this.handlerThread.interrupt();
            this.handlerThread.join();
        }
        catch (InterruptedException ex) {
            LOG.error((Object)"Error joining with hander thread", (Throwable)ex);
        }
        super.serviceStop();
    }

    public void setHeartbeatInterval(int interval) {
        this.heartbeatIntervalMs.set(interval);
    }

    public List<? extends Collection<T>> getMatchingRequests(Priority priority, String resourceName, Resource capability) {
        return this.client.getMatchingRequests(priority, resourceName, capability);
    }

    public RegisterApplicationMasterResponse registerApplicationMaster(String appHostName, int appHostPort, String appTrackingUrl) throws YarnException, IOException {
        RegisterApplicationMasterResponse response = this.client.registerApplicationMaster(appHostName, appHostPort, appTrackingUrl);
        this.heartbeatThread.start();
        return response;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void unregisterApplicationMaster(FinalApplicationStatus appStatus, String appMessage, String appTrackingUrl) throws YarnException, IOException {
        Object object = this.unregisterHeartbeatLock;
        synchronized (object) {
            this.keepRunning = false;
            this.client.unregisterApplicationMaster(appStatus, appMessage, appTrackingUrl);
        }
    }

    public void addContainerRequest(T req) {
        this.client.addContainerRequest(req);
    }

    public void removeContainerRequest(T req) {
        this.client.removeContainerRequest(req);
    }

    public void releaseAssignedContainer(ContainerId containerId) {
        this.client.releaseAssignedContainer(containerId);
    }

    public Resource getClusterAvailableResources() {
        return this.client.getClusterAvailableResources();
    }

    public int getClusterNodeCount() {
        return this.client.getClusterNodeCount();
    }

    public static interface CallbackHandler {
        public void onContainersCompleted(List<ContainerStatus> var1);

        public void onContainersAllocated(List<Container> var1);

        public void onShutdownRequest();

        public void onNodesUpdated(List<NodeReport> var1);

        public float getProgress();

        public void onError(Exception var1);
    }

    private class CallbackHandlerThread
    extends Thread {
        public CallbackHandlerThread() {
            super("AMRM Callback Handler Thread");
        }

        @Override
        public void run() {
            while (AMRMClientAsync.this.keepRunning) {
                List allocated;
                List completed;
                List updatedNodes;
                AllocateResponse response;
                try {
                    if (AMRMClientAsync.this.savedException != null) {
                        LOG.error((Object)"Stopping callback due to: ", (Throwable)AMRMClientAsync.this.savedException);
                        AMRMClientAsync.this.handler.onError(AMRMClientAsync.this.savedException);
                        break;
                    }
                    response = (AllocateResponse)AMRMClientAsync.this.responseQueue.take();
                }
                catch (InterruptedException ex) {
                    LOG.info((Object)"Interrupted while waiting for queue", (Throwable)ex);
                    continue;
                }
                if (response.getAMCommand() != null) {
                    boolean stop = false;
                    switch (response.getAMCommand()) {
                        case AM_RESYNC: 
                        case AM_SHUTDOWN: {
                            AMRMClientAsync.this.handler.onShutdownRequest();
                            LOG.info((Object)"Shutdown requested. Stopping callback.");
                            stop = true;
                            break;
                        }
                        default: {
                            String msg = "Unhandled value of AMCommand: " + response.getAMCommand();
                            LOG.error((Object)msg);
                            throw new YarnRuntimeException(msg);
                        }
                    }
                    if (stop) break;
                }
                if (!(updatedNodes = response.getUpdatedNodes()).isEmpty()) {
                    AMRMClientAsync.this.handler.onNodesUpdated(updatedNodes);
                }
                if (!(completed = response.getCompletedContainersStatuses()).isEmpty()) {
                    AMRMClientAsync.this.handler.onContainersCompleted(completed);
                }
                if (!(allocated = response.getAllocatedContainers()).isEmpty()) {
                    AMRMClientAsync.this.handler.onContainersAllocated(allocated);
                }
                AMRMClientAsync.this.progress = AMRMClientAsync.this.handler.getProgress();
            }
        }
    }

    private class HeartbeatThread
    extends Thread {
        public HeartbeatThread() {
            super("AMRM Heartbeater thread");
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            while (true) {
                AllocateResponse response = null;
                Object object = AMRMClientAsync.this.unregisterHeartbeatLock;
                synchronized (object) {
                    if (!AMRMClientAsync.this.keepRunning) {
                        break;
                    }
                    try {
                        response = AMRMClientAsync.this.client.allocate(AMRMClientAsync.this.progress);
                    }
                    catch (YarnException ex) {
                        LOG.error((Object)"Yarn exception on heartbeat", (Throwable)ex);
                        AMRMClientAsync.this.savedException = (Exception)((Object)ex);
                        AMRMClientAsync.this.handlerThread.interrupt();
                        break;
                    }
                    catch (IOException e) {
                        LOG.error((Object)"IO exception on heartbeat", (Throwable)e);
                        AMRMClientAsync.this.savedException = e;
                        AMRMClientAsync.this.handlerThread.interrupt();
                        break;
                    }
                }
                if (response != null) {
                    while (true) {
                        try {
                            AMRMClientAsync.this.responseQueue.put(response);
                        }
                        catch (InterruptedException ex) {
                            LOG.info((Object)"Interrupted while waiting to put on response queue", (Throwable)ex);
                            continue;
                        }
                        break;
                    }
                }
                try {
                    Thread.sleep(AMRMClientAsync.this.heartbeatIntervalMs.get());
                }
                catch (InterruptedException ex) {
                    LOG.info((Object)"Heartbeater interrupted", (Throwable)ex);
                }
            }
        }
    }
}

