/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.yarn.batch.am;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.util.RackResolver;
import org.springframework.batch.core.StepExecution;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.yarn.am.AbstractEventingAppmaster;
import org.springframework.yarn.am.AppmasterService;
import org.springframework.yarn.am.ContainerLauncherInterceptor;
import org.springframework.yarn.am.allocate.ContainerAllocateData;
import org.springframework.yarn.am.container.AbstractLauncher;
import org.springframework.yarn.am.container.ContainerRequestHint;
import org.springframework.yarn.batch.am.BatchYarnAppmaster;
import org.springframework.yarn.batch.event.PartitionedStepExecutionEvent;
import org.springframework.yarn.batch.listener.CompositePartitionedStepExecutionStateListener;
import org.springframework.yarn.batch.listener.PartitionedStepExecutionStateListener;
import org.springframework.yarn.batch.support.YarnJobLauncher;
import org.springframework.yarn.event.AbstractYarnEvent;

public abstract class AbstractBatchAppmaster
extends AbstractEventingAppmaster
implements BatchYarnAppmaster,
ContainerLauncherInterceptor {
    private static final Log log = LogFactory.getLog(AbstractBatchAppmaster.class);
    private YarnJobLauncher yarnJobLauncher;
    private List<StepExecution> stepExecutions = new ArrayList<StepExecution>();
    private Map<StepExecution, Set<StepExecution>> masterExecutions = new HashMap<StepExecution, Set<StepExecution>>();
    private Map<StepExecution, ContainerRequestHint> requestData = new LinkedHashMap<StepExecution, ContainerRequestHint>();
    private Map<StepExecution, String> remoteStepNames = new HashMap<StepExecution, String>();
    private Map<ContainerId, StepExecution> containerToStepMap = new HashMap<ContainerId, StepExecution>();
    private CompositePartitionedStepExecutionStateListener stepExecutionStateListener = new CompositePartitionedStepExecutionStateListener();

    protected void onInit() throws Exception {
        super.onInit();
        if (this.getLauncher() instanceof AbstractLauncher) {
            ((AbstractLauncher)this.getLauncher()).addInterceptor((ContainerLauncherInterceptor)this);
        }
        RackResolver.init((Configuration)this.getConfiguration());
    }

    protected void onContainerAllocated(Container container) {
        Map.Entry<StepExecution, ContainerRequestHint> entry;
        if (log.isDebugEnabled()) {
            log.debug((Object)("Container allocated: " + container));
        }
        StepExecution stepExecution = null;
        String host = container.getNodeId().getHost();
        String rack = RackResolver.resolve((String)host).getNetworkLocation();
        if (log.isDebugEnabled()) {
            log.debug((Object)("Matching against host=" + host + " rack=" + rack));
        }
        Iterator<Map.Entry<StepExecution, ContainerRequestHint>> iterator = this.requestData.entrySet().iterator();
        block0: while (iterator.hasNext() && stepExecution == null) {
            entry = iterator.next();
            if (entry.getValue() == null || entry.getValue().getHosts() == null) continue;
            for (String h : entry.getValue().getHosts()) {
                if (!h.equals(host)) continue;
                stepExecution = entry.getKey();
                continue block0;
            }
        }
        log.debug((Object)("stepExecution after hosts match: " + stepExecution));
        iterator = this.requestData.entrySet().iterator();
        block2: while (iterator.hasNext() && stepExecution == null) {
            entry = iterator.next();
            if (entry.getValue() == null || entry.getValue().getRacks() == null) continue;
            for (String r : entry.getValue().getRacks()) {
                if (!r.equals(rack)) continue;
                stepExecution = entry.getKey();
                continue block2;
            }
        }
        log.debug((Object)("stepExecution after racks match: " + stepExecution));
        iterator = this.requestData.entrySet().iterator();
        if (stepExecution == null && iterator.hasNext()) {
            stepExecution = iterator.next().getKey();
        }
        if (stepExecution != null) {
            this.requestData.remove(stepExecution);
            this.containerToStepMap.put(container.getId(), stepExecution);
            this.getLauncher().launchContainer(container, this.getCommands());
        } else {
            this.getAllocator().releaseContainer(container.getId());
        }
    }

    protected void onContainerLaunched(Container container) {
        if (log.isDebugEnabled()) {
            log.debug((Object)("Container launched: " + container));
        }
    }

    protected void onContainerCompleted(ContainerStatus status) {
        super.onContainerCompleted(status);
        ContainerId containerId = status.getContainerId();
        StepExecution stepExecution = this.containerToStepMap.get(containerId);
        if (stepExecution != null) {
            for (Map.Entry<StepExecution, Set<StepExecution>> entry : this.masterExecutions.entrySet()) {
                Set<StepExecution> set = entry.getValue();
                if (set.remove(stepExecution)) {
                    if (log.isDebugEnabled()) {
                        log.debug((Object)("stepExecution=" + stepExecution + " removed"));
                    }
                    this.masterExecutions.put(entry.getKey(), set);
                }
                if (set.size() != 0) continue;
                this.getYarnEventPublisher().publishEvent((AbstractYarnEvent)new PartitionedStepExecutionEvent(this, entry.getKey()));
                this.stepExecutionStateListener.state(PartitionedStepExecutionStateListener.PartitionedStepExecutionState.COMPLETED, entry.getKey());
            }
        } else {
            log.warn((Object)("No assigned step execution for containerId=" + containerId));
        }
        this.getAllocator().releaseContainer(containerId);
    }

    public ContainerLaunchContext preLaunch(Container container, ContainerLaunchContext context) {
        AppmasterService service = this.getAppmasterService();
        if (log.isDebugEnabled()) {
            log.debug((Object)("Intercept launch context: " + context));
        }
        StepExecution stepExecution = this.containerToStepMap.get(container.getId());
        String jobName = this.remoteStepNames.get(stepExecution);
        if (service != null) {
            int port = service.getPort();
            String address = service.getHost();
            HashMap<String, String> env = new HashMap<String, String>(context.getEnvironment());
            env.put("SHDP_HD_FS", this.getConfiguration().get("fs.defaultFS"));
            env.put("SHDP_AMSERVICE_PORT", Integer.toString(port));
            env.put("SHDP_AMSERVICE_HOST", address);
            env.put("SHDP_AMSERVICE_BATCH_STEPNAME", jobName);
            env.put("SHDP_AMSERVICE_BATCH_STEPNAME", jobName);
            env.put("SHDP_AMSERVICE_BATCH_STEPEXECUTIONNAME", stepExecution.getStepName());
            env.put("SHDP_AMSERVICE_BATCH_JOBEXECUTIONID", Long.toString(stepExecution.getJobExecutionId()));
            env.put("SHDP_AMSERVICE_BATCH_STEPEXECUTIONID", Long.toString(stepExecution.getId()));
            context.setEnvironment(env);
            return context;
        }
        return context;
    }

    @Autowired(required=false)
    public void setYarnJobLauncher(YarnJobLauncher yarnJobLauncher) {
        this.yarnJobLauncher = yarnJobLauncher;
    }

    public YarnJobLauncher getYarnJobLauncher() {
        return this.yarnJobLauncher;
    }

    @Override
    public void addPartitionedStepExecutionStateListener(PartitionedStepExecutionStateListener listener) {
        this.stepExecutionStateListener.register(listener);
    }

    public List<StepExecution> getStepExecutions() {
        return this.stepExecutions;
    }

    @Override
    public void addStepSplits(StepExecution masterStepExecution, String remoteStepName, Set<StepExecution> stepExecutions, Map<StepExecution, ContainerRequestHint> resourceRequests) {
        ContainerAllocateData containerAllocateData = new ContainerAllocateData();
        int countNeeded = 0;
        HashSet<String> hostUnion = new HashSet<String>();
        for (Map.Entry<StepExecution, ContainerRequestHint> entry : resourceRequests.entrySet()) {
            StepExecution se = entry.getKey();
            ContainerRequestHint crd = entry.getValue();
            this.requestData.put(se, crd);
            this.remoteStepNames.put(se, remoteStepName);
            ++countNeeded;
            for (String host : crd.getHosts()) {
                hostUnion.add(host);
            }
        }
        while (countNeeded > 0) {
            Iterator iterator = hostUnion.iterator();
            while (countNeeded > 0 && iterator.hasNext()) {
                String host = (String)iterator.next();
                containerAllocateData.addHosts(host, 1);
                --countNeeded;
            }
        }
        if (log.isDebugEnabled()) {
            log.debug((Object)("Adding " + stepExecutions.size() + " split steps into masterStepExecution=" + masterStepExecution));
        }
        HashSet<StepExecution> set = new HashSet<StepExecution>(stepExecutions.size());
        set.addAll(stepExecutions);
        this.masterExecutions.put(masterStepExecution, set);
        int remaining = stepExecutions.size() - resourceRequests.size();
        for (StepExecution execution : set) {
            if (!this.requestData.containsKey(execution)) {
                this.requestData.put(execution, null);
            }
            if (this.remoteStepNames.containsKey(execution)) continue;
            this.remoteStepNames.put(execution, remoteStepName);
        }
        this.getAllocator().allocateContainers(remaining);
        this.getAllocator().allocateContainers(containerAllocateData);
    }
}

