/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.mapred;

import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.PrintStream;
import java.net.URI;
import java.util.HashSet;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSError;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.UnsupportedFileSystemException;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapOutputFile;
import org.apache.hadoop.mapred.MapTask;
import org.apache.hadoop.mapred.ReduceTask;
import org.apache.hadoop.mapred.Task;
import org.apache.hadoop.mapred.TaskAttemptID;
import org.apache.hadoop.mapred.TaskUmbilicalProtocol;
import org.apache.hadoop.mapreduce.JobCounter;
import org.apache.hadoop.mapreduce.TaskID;
import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
import org.apache.hadoop.mapreduce.v2.app.AppContext;
import org.apache.hadoop.mapreduce.v2.app.job.Job;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobCounterUpdateEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerLaunchedEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncher;
import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherEvent;
import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerRemoteLaunchEvent;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.service.AbstractService;

public class LocalContainerLauncher
extends AbstractService
implements ContainerLauncher {
    private static final File curDir = new File(".");
    private static final Log LOG = LogFactory.getLog(LocalContainerLauncher.class);
    private FileContext curFC = null;
    private final HashSet<File> localizedFiles;
    private final AppContext context;
    private final TaskUmbilicalProtocol umbilical;
    private Thread eventHandlingThread;
    private BlockingQueue<ContainerLauncherEvent> eventQueue = new LinkedBlockingQueue<ContainerLauncherEvent>();

    public LocalContainerLauncher(AppContext context, TaskUmbilicalProtocol umbilical) {
        super(LocalContainerLauncher.class.getName());
        this.context = context;
        this.umbilical = umbilical;
        try {
            this.curFC = FileContext.getFileContext((URI)curDir.toURI());
        }
        catch (UnsupportedFileSystemException ufse) {
            LOG.error((Object)("Local filesystem " + curDir.toURI().toString() + " is unsupported?? (should never happen)"));
        }
        File[] curLocalFiles = curDir.listFiles();
        this.localizedFiles = new HashSet(curLocalFiles.length);
        for (int j = 0; j < curLocalFiles.length; ++j) {
            this.localizedFiles.add(curLocalFiles[j]);
        }
    }

    public void start() {
        this.eventHandlingThread = new Thread((Runnable)new SubtaskRunner(), "uber-SubtaskRunner");
        this.eventHandlingThread.start();
        super.start();
    }

    public void stop() {
        this.eventHandlingThread.interrupt();
        super.stop();
    }

    public void handle(ContainerLauncherEvent event) {
        try {
            this.eventQueue.put(event);
        }
        catch (InterruptedException e) {
            throw new YarnException((Throwable)e);
        }
    }

    private class SubtaskRunner
    implements Runnable {
        private boolean doneWithMaps = false;
        private int finishedSubMaps = 0;

        SubtaskRunner() {
        }

        @Override
        public void run() {
            ContainerLauncherEvent event = null;
            while (!Thread.currentThread().isInterrupted()) {
                try {
                    event = (ContainerLauncherEvent)((Object)LocalContainerLauncher.this.eventQueue.take());
                }
                catch (InterruptedException e) {
                    LOG.error((Object)("Returning, interrupted : " + e));
                    return;
                }
                LOG.info((Object)("Processing the event " + event.toString()));
                if (event.getType() == ContainerLauncher.EventType.CONTAINER_REMOTE_LAUNCH) {
                    ContainerRemoteLaunchEvent launchEv = (ContainerRemoteLaunchEvent)event;
                    TaskAttemptId attemptID = launchEv.getTaskAttemptID();
                    Job job = LocalContainerLauncher.this.context.getAllJobs().get(attemptID.getTaskId().getJobId());
                    int numMapTasks = job.getTotalMaps();
                    int numReduceTasks = job.getTotalReduces();
                    org.apache.hadoop.mapreduce.v2.app.job.Task ytask = job.getTask(attemptID.getTaskId());
                    Task remoteTask = launchEv.getRemoteTask();
                    LocalContainerLauncher.this.context.getEventHandler().handle((Event)new TaskAttemptContainerLaunchedEvent(attemptID, -1));
                    if (numMapTasks == 0) {
                        this.doneWithMaps = true;
                    }
                    try {
                        if (remoteTask.isMapOrReduce()) {
                            JobCounterUpdateEvent jce = new JobCounterUpdateEvent(attemptID.getTaskId().getJobId());
                            jce.addCounterUpdate((Enum<?>)JobCounter.TOTAL_LAUNCHED_UBERTASKS, 1L);
                            if (remoteTask.isMapTask()) {
                                jce.addCounterUpdate((Enum<?>)JobCounter.NUM_UBER_SUBMAPS, 1L);
                            } else {
                                jce.addCounterUpdate((Enum<?>)JobCounter.NUM_UBER_SUBREDUCES, 1L);
                            }
                            LocalContainerLauncher.this.context.getEventHandler().handle((Event)jce);
                        }
                        this.runSubtask(remoteTask, ytask.getType(), attemptID, numMapTasks, numReduceTasks > 0);
                    }
                    catch (RuntimeException re) {
                        JobCounterUpdateEvent jce = new JobCounterUpdateEvent(attemptID.getTaskId().getJobId());
                        jce.addCounterUpdate((Enum<?>)JobCounter.NUM_FAILED_UBERTASKS, 1L);
                        LocalContainerLauncher.this.context.getEventHandler().handle((Event)jce);
                        LocalContainerLauncher.this.context.getEventHandler().handle((Event)new TaskAttemptEvent(attemptID, TaskAttemptEventType.TA_CONTAINER_COMPLETED));
                    }
                    catch (IOException ioe) {
                        LOG.fatal((Object)("oopsie...  this can never happen: " + StringUtils.stringifyException((Throwable)ioe)));
                        System.exit(-1);
                    }
                    continue;
                }
                if (event.getType() == ContainerLauncher.EventType.CONTAINER_REMOTE_CLEANUP) {
                    LocalContainerLauncher.this.context.getEventHandler().handle((Event)new TaskAttemptEvent(event.getTaskAttemptID(), TaskAttemptEventType.TA_CONTAINER_CLEANED));
                    continue;
                }
                LOG.warn((Object)("Ignoring unexpected event " + event.toString()));
            }
        }

        private void runSubtask(Task task, TaskType taskType, TaskAttemptId attemptID, int numMapTasks, boolean renameOutputs) throws RuntimeException, IOException {
            TaskAttemptID classicAttemptID = TypeConverter.fromYarn((TaskAttemptId)attemptID);
            try {
                JobConf conf = new JobConf(LocalContainerLauncher.this.getConfig());
                conf.setBoolean("mapreduce.task.uberized", true);
                if (taskType == TaskType.MAP) {
                    if (this.doneWithMaps) {
                        LOG.error((Object)("CONTAINER_REMOTE_LAUNCH contains a map task (" + attemptID + "), but should be finished with maps"));
                    }
                    MapTask map = (MapTask)task;
                    map.run(conf, LocalContainerLauncher.this.umbilical);
                    if (renameOutputs) {
                        this.renameMapOutputForReduce(conf, attemptID, map.getMapOutputFile());
                    }
                    this.relocalize();
                    if (++this.finishedSubMaps == numMapTasks) {
                        this.doneWithMaps = true;
                    }
                } else {
                    if (!this.doneWithMaps) {
                        LOG.error((Object)("CONTAINER_REMOTE_LAUNCH contains a reduce task (" + attemptID + "), but not yet finished with maps"));
                    }
                    ReduceTask reduce = (ReduceTask)task;
                    conf.set("mapreduce.framework.name", "local");
                    conf.set("mapreduce.jobtracker.address", "local");
                    reduce.run(conf, LocalContainerLauncher.this.umbilical);
                }
            }
            catch (FSError e) {
                LOG.fatal((Object)"FSError from child", (Throwable)e);
                LocalContainerLauncher.this.umbilical.fsError(classicAttemptID, e.getMessage());
                throw new RuntimeException();
            }
            catch (Exception exception) {
                LOG.warn((Object)("Exception running local (uberized) 'child' : " + StringUtils.stringifyException((Throwable)exception)));
                try {
                    if (task != null) {
                        task.taskCleanup(LocalContainerLauncher.this.umbilical);
                    }
                }
                catch (Exception e) {
                    LOG.info((Object)("Exception cleaning up: " + StringUtils.stringifyException((Throwable)e)));
                }
                ByteArrayOutputStream baos = new ByteArrayOutputStream();
                exception.printStackTrace(new PrintStream(baos));
                LocalContainerLauncher.this.umbilical.reportDiagnosticInfo(classicAttemptID, baos.toString());
                throw new RuntimeException();
            }
            catch (Throwable throwable) {
                LOG.fatal((Object)("Error running local (uberized) 'child' : " + StringUtils.stringifyException((Throwable)throwable)));
                Throwable tCause = throwable.getCause();
                String cause = tCause == null ? throwable.getMessage() : StringUtils.stringifyException((Throwable)tCause);
                LocalContainerLauncher.this.umbilical.fatalError(classicAttemptID, cause);
                throw new RuntimeException();
            }
        }

        private void renameMapOutputForReduce(JobConf conf, TaskAttemptId mapId, MapOutputFile subMapOutputFile) throws IOException {
            LocalFileSystem localFs = FileSystem.getLocal((Configuration)conf);
            Path mapOut = subMapOutputFile.getOutputFile();
            Path reduceIn = subMapOutputFile.getInputFileForWrite((TaskID)TypeConverter.fromYarn((TaskAttemptId)mapId).getTaskID(), localFs.getLength(mapOut));
            if (!localFs.mkdirs(reduceIn.getParent())) {
                throw new IOException("Mkdirs failed to create " + reduceIn.getParent().toString());
            }
            if (!localFs.rename(mapOut, reduceIn)) {
                throw new IOException("Couldn't rename " + mapOut);
            }
        }

        private void relocalize() {
            File[] curLocalFiles = curDir.listFiles();
            for (int j = 0; j < curLocalFiles.length; ++j) {
                if (LocalContainerLauncher.this.localizedFiles.contains(curLocalFiles[j])) continue;
                boolean deleted = false;
                try {
                    if (LocalContainerLauncher.this.curFC != null) {
                        deleted = LocalContainerLauncher.this.curFC.delete(new Path(curLocalFiles[j].getName()), true);
                    }
                }
                catch (IOException e) {
                    deleted = false;
                }
                if (deleted) continue;
                LOG.warn((Object)("Unable to delete unexpected local file/dir " + curLocalFiles[j].getName() + ": insufficient permissions?"));
            }
        }
    }
}

