/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.yarn.client;

import com.google.common.annotations.VisibleForTesting;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.client.AMRMClient;
import org.apache.hadoop.yarn.client.AMRMClientImpl;
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
import org.apache.hadoop.yarn.service.AbstractService;

@InterfaceStability.Unstable
@InterfaceStability.Evolving
public class AMRMClientAsync
extends AbstractService {
    private static final Log LOG = LogFactory.getLog(AMRMClientAsync.class);
    private final AMRMClient client;
    private final int intervalMs;
    private final HeartbeatThread heartbeatThread;
    private final CallbackHandlerThread handlerThread;
    private final CallbackHandler handler;
    private final BlockingQueue<AllocateResponse> responseQueue;
    private volatile boolean keepRunning;
    private volatile float progress;

    public AMRMClientAsync(ApplicationAttemptId id, int intervalMs, CallbackHandler callbackHandler) {
        this(new AMRMClientImpl(id), intervalMs, callbackHandler);
    }

    @InterfaceAudience.Private
    @VisibleForTesting
    AMRMClientAsync(AMRMClient client, int intervalMs, CallbackHandler callbackHandler) {
        super(AMRMClientAsync.class.getName());
        this.client = client;
        this.intervalMs = intervalMs;
        this.handler = callbackHandler;
        this.heartbeatThread = new HeartbeatThread();
        this.handlerThread = new CallbackHandlerThread();
        this.responseQueue = new LinkedBlockingQueue<AllocateResponse>();
        this.keepRunning = true;
    }

    public void setProgress(float progress) {
        this.progress = progress;
    }

    public void init(Configuration conf) {
        super.init(conf);
        this.client.init(conf);
    }

    public void start() {
        this.handlerThread.start();
        this.client.start();
        super.start();
    }

    public void stop() {
        if (Thread.currentThread() == this.handlerThread) {
            throw new YarnException("Cannot call stop from callback handler thread!");
        }
        this.keepRunning = false;
        try {
            this.heartbeatThread.join();
        }
        catch (InterruptedException ex) {
            LOG.error((Object)"Error joining with heartbeat thread", (Throwable)ex);
        }
        this.client.stop();
        try {
            this.handlerThread.interrupt();
            this.handlerThread.join();
        }
        catch (InterruptedException ex) {
            LOG.error((Object)"Error joining with hander thread", (Throwable)ex);
        }
        super.stop();
    }

    public RegisterApplicationMasterResponse registerApplicationMaster(String appHostName, int appHostPort, String appTrackingUrl) throws YarnRemoteException {
        RegisterApplicationMasterResponse response = this.client.registerApplicationMaster(appHostName, appHostPort, appTrackingUrl);
        this.heartbeatThread.start();
        return response;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void unregisterApplicationMaster(FinalApplicationStatus appStatus, String appMessage, String appTrackingUrl) throws YarnRemoteException {
        AMRMClient aMRMClient = this.client;
        synchronized (aMRMClient) {
            this.keepRunning = false;
            this.client.unregisterApplicationMaster(appStatus, appMessage, appTrackingUrl);
        }
    }

    public void addContainerRequest(AMRMClient.ContainerRequest req) {
        this.client.addContainerRequest(req);
    }

    public void removeContainerRequest(AMRMClient.ContainerRequest req) {
        this.client.removeContainerRequest(req);
    }

    public void releaseAssignedContainer(ContainerId containerId) {
        this.client.releaseAssignedContainer(containerId);
    }

    public Resource getClusterAvailableResources() {
        return this.client.getClusterAvailableResources();
    }

    public int getClusterNodeCount() {
        return this.client.getClusterNodeCount();
    }

    public static interface CallbackHandler {
        public void onContainersCompleted(List<ContainerStatus> var1);

        public void onContainersAllocated(List<Container> var1);

        public void onRebootRequest();

        public void onNodesUpdated(List<NodeReport> var1);
    }

    private class CallbackHandlerThread
    extends Thread {
        public CallbackHandlerThread() {
            super("AMRM Callback Handler Thread");
        }

        @Override
        public void run() {
            while (AMRMClientAsync.this.keepRunning) {
                List allocated;
                List completed;
                List updatedNodes;
                AllocateResponse response;
                try {
                    response = (AllocateResponse)AMRMClientAsync.this.responseQueue.take();
                }
                catch (InterruptedException ex) {
                    LOG.info((Object)"Interrupted while waiting for queue");
                    continue;
                }
                if (response.getReboot()) {
                    AMRMClientAsync.this.handler.onRebootRequest();
                }
                if (!(updatedNodes = response.getUpdatedNodes()).isEmpty()) {
                    AMRMClientAsync.this.handler.onNodesUpdated(updatedNodes);
                }
                if (!(completed = response.getCompletedContainersStatuses()).isEmpty()) {
                    AMRMClientAsync.this.handler.onContainersCompleted(completed);
                }
                if ((allocated = response.getAllocatedContainers()).isEmpty()) continue;
                AMRMClientAsync.this.handler.onContainersAllocated(allocated);
            }
        }
    }

    private class HeartbeatThread
    extends Thread {
        public HeartbeatThread() {
            super("AMRM Heartbeater thread");
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            while (true) {
                AllocateResponse response = null;
                AMRMClient aMRMClient = AMRMClientAsync.this.client;
                synchronized (aMRMClient) {
                    if (!AMRMClientAsync.this.keepRunning) {
                        break;
                    }
                    try {
                        response = AMRMClientAsync.this.client.allocate(AMRMClientAsync.this.progress);
                    }
                    catch (YarnRemoteException ex) {
                        LOG.error((Object)"Failed to heartbeat", (Throwable)ex);
                    }
                }
                if (response != null) {
                    while (true) {
                        try {
                            AMRMClientAsync.this.responseQueue.put(response);
                        }
                        catch (InterruptedException ex) {
                            LOG.warn((Object)"Interrupted while waiting to put on response queue", (Throwable)ex);
                            continue;
                        }
                        break;
                    }
                }
                try {
                    Thread.sleep(AMRMClientAsync.this.intervalMs);
                }
                catch (InterruptedException ex) {
                    LOG.warn((Object)"Heartbeater interrupted", (Throwable)ex);
                }
            }
        }
    }
}

