/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.yarn.server.resourcemanager;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.PolicyProvider;
import org.apache.hadoop.security.token.SecretManager;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.AMRMProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
import org.apache.hadoop.yarn.api.records.AMResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.ipc.RPCUtil;
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.security.ApplicationTokenSecretManager;
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptRegistrationEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptStatusupdateEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUnregistrationEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.security.authorize.RMPolicyProvider;
import org.apache.hadoop.yarn.service.AbstractService;

@InterfaceAudience.Private
public class ApplicationMasterService
extends AbstractService
implements AMRMProtocol {
    private static final Log LOG = LogFactory.getLog(ApplicationMasterService.class);
    private final AMLivelinessMonitor amLivelinessMonitor;
    private YarnScheduler rScheduler;
    private ApplicationTokenSecretManager appTokenManager;
    private InetSocketAddress masterServiceAddress;
    private Server server;
    private final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
    private final ConcurrentMap<ApplicationAttemptId, AMResponse> responseMap = new ConcurrentHashMap<ApplicationAttemptId, AMResponse>();
    private final AMResponse reboot = (AMResponse)this.recordFactory.newRecordInstance(AMResponse.class);
    private final RMContext rmContext;

    public ApplicationMasterService(RMContext rmContext, ApplicationTokenSecretManager appTokenManager, YarnScheduler scheduler) {
        super(ApplicationMasterService.class.getName());
        this.amLivelinessMonitor = rmContext.getAMLivelinessMonitor();
        this.appTokenManager = appTokenManager;
        this.rScheduler = scheduler;
        this.reboot.setReboot(true);
        this.rmContext = rmContext;
    }

    public void init(Configuration conf) {
        String bindAddress = conf.get("yarn.resourcemanager.scheduler.address", "0.0.0.0:8030");
        this.masterServiceAddress = NetUtils.createSocketAddr((String)bindAddress, (int)8030, (String)"yarn.resourcemanager.scheduler.address");
        super.init(conf);
    }

    public void start() {
        Configuration conf = this.getConfig();
        YarnRPC rpc = YarnRPC.create((Configuration)conf);
        this.server = rpc.getServer(AMRMProtocol.class, (Object)this, this.masterServiceAddress, conf, (SecretManager)this.appTokenManager, conf.getInt("yarn.resourcemanager.scheduler.client.thread-count", 10));
        if (conf.getBoolean("hadoop.security.authorization", false)) {
            this.refreshServiceAcls(conf, new RMPolicyProvider());
        }
        this.server.start();
        super.start();
    }

    private void authorizeRequest(ApplicationAttemptId appAttemptID) throws YarnRemoteException {
        UserGroupInformation remoteUgi;
        if (!UserGroupInformation.isSecurityEnabled()) {
            return;
        }
        String appAttemptIDStr = appAttemptID.toString();
        try {
            remoteUgi = UserGroupInformation.getCurrentUser();
        }
        catch (IOException e) {
            String msg = "Cannot obtain the user-name for ApplicationAttemptID: " + appAttemptIDStr + ". Got exception: " + StringUtils.stringifyException((Throwable)e);
            LOG.warn((Object)msg);
            throw RPCUtil.getRemoteException((String)msg);
        }
        if (!remoteUgi.getUserName().equals(appAttemptIDStr)) {
            String msg = "Unauthorized request from ApplicationMaster. Expected ApplicationAttemptID: " + remoteUgi.getUserName() + " Found: " + appAttemptIDStr;
            LOG.warn((Object)msg);
            throw RPCUtil.getRemoteException((String)msg);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public RegisterApplicationMasterResponse registerApplicationMaster(RegisterApplicationMasterRequest request) throws YarnRemoteException {
        ApplicationAttemptId applicationAttemptId = request.getApplicationAttemptId();
        this.authorizeRequest(applicationAttemptId);
        ApplicationId appID = applicationAttemptId.getApplicationId();
        AMResponse lastResponse = (AMResponse)this.responseMap.get(applicationAttemptId);
        if (lastResponse == null) {
            String message = "Application doesn't exist in cache " + applicationAttemptId;
            LOG.error((Object)message);
            RMAuditLogger.logFailure(((RMApp)this.rmContext.getRMApps().get(appID)).getUser(), "Register App Master", message, "ApplicationMasterService", "Error in registering application master", appID, applicationAttemptId);
            throw RPCUtil.getRemoteException((String)message);
        }
        AMResponse aMResponse = lastResponse;
        synchronized (aMResponse) {
            LOG.info((Object)("AM registration " + applicationAttemptId));
            this.amLivelinessMonitor.receivedPing(applicationAttemptId);
            this.rmContext.getDispatcher().getEventHandler().handle((Event)new RMAppAttemptRegistrationEvent(applicationAttemptId, request.getHost(), request.getRpcPort(), request.getTrackingUrl()));
            RMAuditLogger.logSuccess(((RMApp)this.rmContext.getRMApps().get(appID)).getUser(), "Register App Master", "ApplicationMasterService", appID, applicationAttemptId);
            RegisterApplicationMasterResponse response = (RegisterApplicationMasterResponse)this.recordFactory.newRecordInstance(RegisterApplicationMasterResponse.class);
            response.setMinimumResourceCapability(this.rScheduler.getMinimumResourceCapability());
            response.setMaximumResourceCapability(this.rScheduler.getMaximumResourceCapability());
            return response;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public FinishApplicationMasterResponse finishApplicationMaster(FinishApplicationMasterRequest request) throws YarnRemoteException {
        ApplicationAttemptId applicationAttemptId = request.getApplicationAttemptId();
        this.authorizeRequest(applicationAttemptId);
        AMResponse lastResponse = (AMResponse)this.responseMap.get(applicationAttemptId);
        if (lastResponse == null) {
            String message = "Application doesn't exist in cache " + applicationAttemptId;
            LOG.error((Object)message);
            throw RPCUtil.getRemoteException((String)message);
        }
        AMResponse aMResponse = lastResponse;
        synchronized (aMResponse) {
            this.amLivelinessMonitor.receivedPing(applicationAttemptId);
            this.rmContext.getDispatcher().getEventHandler().handle((Event)new RMAppAttemptUnregistrationEvent(applicationAttemptId, request.getTrackingUrl(), request.getFinalApplicationStatus(), request.getDiagnostics()));
            FinishApplicationMasterResponse response = (FinishApplicationMasterResponse)this.recordFactory.newRecordInstance(FinishApplicationMasterResponse.class);
            return response;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public AllocateResponse allocate(AllocateRequest request) throws YarnRemoteException {
        ApplicationAttemptId appAttemptId = request.getApplicationAttemptId();
        this.authorizeRequest(appAttemptId);
        this.amLivelinessMonitor.receivedPing(appAttemptId);
        AllocateResponse allocateResponse = (AllocateResponse)this.recordFactory.newRecordInstance(AllocateResponse.class);
        AMResponse lastResponse = (AMResponse)this.responseMap.get(appAttemptId);
        if (lastResponse == null) {
            LOG.error((Object)("AppAttemptId doesnt exist in cache " + appAttemptId));
            allocateResponse.setAMResponse(this.reboot);
            return allocateResponse;
        }
        if (request.getResponseId() + 1 == lastResponse.getResponseId()) {
            allocateResponse.setAMResponse(lastResponse);
            return allocateResponse;
        }
        if (request.getResponseId() + 1 < lastResponse.getResponseId()) {
            LOG.error((Object)("Invalid responseid from appAttemptId " + appAttemptId));
            allocateResponse.setAMResponse(this.reboot);
            return allocateResponse;
        }
        AMResponse aMResponse = lastResponse;
        synchronized (aMResponse) {
            this.rmContext.getDispatcher().getEventHandler().handle((Event)new RMAppAttemptStatusupdateEvent(appAttemptId, request.getProgress()));
            List ask = request.getAskList();
            List release = request.getReleaseList();
            Allocation allocation = this.rScheduler.allocate(appAttemptId, ask, release);
            RMApp app = (RMApp)this.rmContext.getRMApps().get(appAttemptId.getApplicationId());
            RMAppAttempt appAttempt = app.getRMAppAttempt(appAttemptId);
            AMResponse response = (AMResponse)this.recordFactory.newRecordInstance(AMResponse.class);
            response.setAllocatedContainers(allocation.getContainers());
            response.setCompletedContainersStatuses(appAttempt.pullJustFinishedContainers());
            response.setResponseId(lastResponse.getResponseId() + 1);
            response.setAvailableResources(allocation.getResourceLimit());
            this.responseMap.put(appAttemptId, response);
            allocateResponse.setAMResponse(response);
            return allocateResponse;
        }
    }

    public void registerAppAttempt(ApplicationAttemptId attemptId) {
        AMResponse response = (AMResponse)this.recordFactory.newRecordInstance(AMResponse.class);
        response.setResponseId(0);
        LOG.info((Object)("Registering " + attemptId));
        this.responseMap.put(attemptId, response);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void unregisterAttempt(ApplicationAttemptId attemptId) {
        AMResponse lastResponse = (AMResponse)this.responseMap.get(attemptId);
        if (lastResponse != null) {
            AMResponse aMResponse = lastResponse;
            synchronized (aMResponse) {
                this.responseMap.remove(attemptId);
            }
        }
    }

    public void refreshServiceAcls(Configuration configuration, PolicyProvider policyProvider) {
        this.server.refreshServiceAcl(configuration, policyProvider);
    }

    public void stop() {
        if (this.server != null) {
            this.server.stop();
        }
        super.stop();
    }
}

