/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.mapreduce.v2.app.recover;

import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TaskID;
import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser;
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
import org.apache.hadoop.mapreduce.v2.api.records.AMInfo;
import org.apache.hadoop.mapreduce.v2.api.records.Phase;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobDiagnosticsUpdateEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerAssignedEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerLaunchedEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskTAttemptEvent;
import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncher;
import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherEvent;
import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerRemoteLaunchEvent;
import org.apache.hadoop.mapreduce.v2.app.recover.ControlledClock;
import org.apache.hadoop.mapreduce.v2.app.recover.Recovery;
import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator;
import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocatorEvent;
import org.apache.hadoop.mapreduce.v2.app.taskclean.TaskCleaner;
import org.apache.hadoop.mapreduce.v2.app.taskclean.TaskCleanupEvent;
import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils;
import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
import org.apache.hadoop.yarn.Clock;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.service.CompositeService;
import org.apache.hadoop.yarn.service.Service;
import org.apache.hadoop.yarn.util.BuilderUtils;
import org.apache.hadoop.yarn.util.ConverterUtils;

public class RecoveryService
extends CompositeService
implements Recovery {
    private static final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
    private static final Log LOG = LogFactory.getLog(RecoveryService.class);
    private final ApplicationAttemptId applicationAttemptId;
    private final OutputCommitter committer;
    private final Dispatcher dispatcher;
    private final ControlledClock clock;
    private JobHistoryParser.JobInfo jobInfo = null;
    private final Map<TaskId, JobHistoryParser.TaskInfo> completedTasks = new HashMap<TaskId, JobHistoryParser.TaskInfo>();
    private final List<TaskEvent> pendingTaskScheduleEvents = new ArrayList<TaskEvent>();
    private volatile boolean recoveryMode = false;

    public RecoveryService(ApplicationAttemptId applicationAttemptId, Clock clock, OutputCommitter committer) {
        super("RecoveringDispatcher");
        this.applicationAttemptId = applicationAttemptId;
        this.committer = committer;
        this.dispatcher = new RecoveryDispatcher();
        this.clock = new ControlledClock(clock);
        this.addService((Service)this.dispatcher);
    }

    public void init(Configuration conf) {
        super.init(conf);
        try {
            this.parse();
        }
        catch (Exception e) {
            LOG.warn((Object)e);
            LOG.warn((Object)"Could not parse the old history file. Aborting recovery. Starting afresh.", (Throwable)e);
        }
        if (this.completedTasks.size() > 0) {
            this.recoveryMode = true;
            LOG.info((Object)("SETTING THE RECOVERY MODE TO TRUE. NO OF COMPLETED TASKS TO RECOVER " + this.completedTasks.size()));
            LOG.info((Object)("Job launch time " + this.jobInfo.getLaunchTime()));
            this.clock.setTime(this.jobInfo.getLaunchTime());
        }
    }

    @Override
    public Dispatcher getDispatcher() {
        return this.dispatcher;
    }

    @Override
    public Clock getClock() {
        return this.clock;
    }

    @Override
    public Set<TaskId> getCompletedTasks() {
        return this.completedTasks.keySet();
    }

    @Override
    public List<AMInfo> getAMInfos() {
        if (this.jobInfo == null || this.jobInfo.getAMInfos() == null) {
            return new LinkedList<AMInfo>();
        }
        LinkedList<AMInfo> amInfos = new LinkedList<AMInfo>();
        for (JobHistoryParser.AMInfo jhAmInfo : this.jobInfo.getAMInfos()) {
            AMInfo amInfo = MRBuilderUtils.newAMInfo((ApplicationAttemptId)jhAmInfo.getAppAttemptId(), (long)jhAmInfo.getStartTime(), (ContainerId)jhAmInfo.getContainerId(), (String)jhAmInfo.getNodeManagerHost(), (int)jhAmInfo.getNodeManagerPort(), (int)jhAmInfo.getNodeManagerHttpPort());
            amInfos.add(amInfo);
        }
        return amInfos;
    }

    private void parse() throws IOException {
        String jobName = TypeConverter.fromYarn((ApplicationId)this.applicationAttemptId.getApplicationId()).toString();
        String jobhistoryDir = JobHistoryUtils.getConfiguredHistoryStagingDirPrefix((Configuration)this.getConfig());
        FSDataInputStream in = null;
        Path historyFile = null;
        Path histDirPath = FileContext.getFileContext((Configuration)this.getConfig()).makeQualified(new Path(jobhistoryDir));
        FileContext fc = FileContext.getFileContext((URI)histDirPath.toUri(), (Configuration)this.getConfig());
        historyFile = fc.makeQualified(JobHistoryUtils.getStagingJobHistoryFile((Path)histDirPath, (String)jobName, (int)(this.applicationAttemptId.getAttemptId() - 1)));
        in = fc.open(historyFile);
        JobHistoryParser parser = new JobHistoryParser(in);
        this.jobInfo = parser.parse();
        Map taskInfos = this.jobInfo.getAllTasks();
        for (JobHistoryParser.TaskInfo taskInfo : taskInfos.values()) {
            if (!TaskState.SUCCEEDED.toString().equals(taskInfo.getTaskStatus())) continue;
            this.completedTasks.put(TypeConverter.toYarn((TaskID)taskInfo.getTaskId()), taskInfo);
            LOG.info((Object)("Read from history task " + TypeConverter.toYarn((TaskID)taskInfo.getTaskId())));
        }
        LOG.info((Object)("Read completed tasks from history " + this.completedTasks.size()));
    }

    private JobHistoryParser.TaskAttemptInfo getTaskAttemptInfo(TaskAttemptId id) {
        JobHistoryParser.TaskInfo taskInfo = this.completedTasks.get(id.getTaskId());
        return (JobHistoryParser.TaskAttemptInfo)taskInfo.getAllTaskAttempts().get(TypeConverter.fromYarn((TaskAttemptId)id));
    }

    private class InterceptingEventHandler
    implements EventHandler {
        EventHandler actualHandler;

        InterceptingEventHandler(EventHandler actualHandler) {
            this.actualHandler = actualHandler;
        }

        public void handle(Event event) {
            if (!RecoveryService.this.recoveryMode) {
                this.actualHandler.handle(event);
                return;
            }
            if (event.getType() == TaskEventType.T_SCHEDULE) {
                TaskEvent taskEvent = (TaskEvent)event;
                if (RecoveryService.this.completedTasks.get(taskEvent.getTaskID()) == null) {
                    LOG.debug((Object)("Adding to pending task events " + taskEvent.getTaskID()));
                    RecoveryService.this.pendingTaskScheduleEvents.add(taskEvent);
                    return;
                }
            } else {
                if (event.getType() == ContainerAllocator.EventType.CONTAINER_REQ) {
                    TaskAttemptId aId = ((ContainerAllocatorEvent)event).getAttemptID();
                    JobHistoryParser.TaskAttemptInfo attInfo = RecoveryService.this.getTaskAttemptInfo(aId);
                    LOG.debug((Object)("CONTAINER_REQ " + aId));
                    this.sendAssignedEvent(aId, attInfo);
                    return;
                }
                if (event.getType() == TaskCleaner.EventType.TASK_CLEAN) {
                    TaskAttemptId aId = ((TaskCleanupEvent)event).getAttemptID();
                    LOG.debug((Object)"TASK_CLEAN");
                    this.actualHandler.handle((Event)new TaskAttemptEvent(aId, TaskAttemptEventType.TA_CLEANUP_DONE));
                    return;
                }
                if (event.getType() == ContainerLauncher.EventType.CONTAINER_REMOTE_LAUNCH) {
                    TaskAttemptId aId = ((ContainerRemoteLaunchEvent)event).getTaskAttemptID();
                    JobHistoryParser.TaskAttemptInfo attInfo = RecoveryService.this.getTaskAttemptInfo(aId);
                    this.actualHandler.handle((Event)new TaskAttemptContainerLaunchedEvent(aId, attInfo.getShufflePort()));
                    this.sendStatusUpdateEvent(aId, attInfo);
                    TaskAttemptState state = TaskAttemptState.valueOf((String)attInfo.getTaskStatus());
                    switch (state) {
                        case SUCCEEDED: {
                            TaskAttemptContextImpl taskContext = new TaskAttemptContextImpl(RecoveryService.this.getConfig(), attInfo.getAttemptId());
                            try {
                                RecoveryService.this.committer.recoverTask((TaskAttemptContext)taskContext);
                            }
                            catch (IOException e) {
                                this.actualHandler.handle((Event)new JobDiagnosticsUpdateEvent(aId.getTaskId().getJobId(), "Error in recovering task output " + e.getMessage()));
                                this.actualHandler.handle((Event)new JobEvent(aId.getTaskId().getJobId(), JobEventType.INTERNAL_ERROR));
                            }
                            LOG.info((Object)("Recovered output from task attempt " + attInfo.getAttemptId()));
                            LOG.info((Object)("Sending done event to " + aId));
                            this.actualHandler.handle((Event)new TaskAttemptEvent(aId, TaskAttemptEventType.TA_DONE));
                            break;
                        }
                        case KILLED: {
                            LOG.info((Object)("Sending kill event to " + aId));
                            this.actualHandler.handle((Event)new TaskAttemptEvent(aId, TaskAttemptEventType.TA_KILL));
                            break;
                        }
                        default: {
                            LOG.info((Object)("Sending fail event to " + aId));
                            this.actualHandler.handle((Event)new TaskAttemptEvent(aId, TaskAttemptEventType.TA_FAILMSG));
                        }
                    }
                    return;
                }
                if (event.getType() == ContainerLauncher.EventType.CONTAINER_REMOTE_CLEANUP) {
                    TaskAttemptId aId = ((ContainerLauncherEvent)event).getTaskAttemptID();
                    this.actualHandler.handle((Event)new TaskAttemptEvent(aId, TaskAttemptEventType.TA_CONTAINER_CLEANED));
                    return;
                }
            }
            this.actualHandler.handle(event);
        }

        private void sendStatusUpdateEvent(TaskAttemptId yarnAttemptID, JobHistoryParser.TaskAttemptInfo attemptInfo) {
            LOG.info((Object)("Sending status update event to " + yarnAttemptID));
            TaskAttemptStatusUpdateEvent.TaskAttemptStatus taskAttemptStatus = new TaskAttemptStatusUpdateEvent.TaskAttemptStatus();
            taskAttemptStatus.id = yarnAttemptID;
            taskAttemptStatus.progress = 1.0f;
            taskAttemptStatus.stateString = attemptInfo.getTaskStatus();
            taskAttemptStatus.phase = Phase.CLEANUP;
            Counters cntrs = attemptInfo.getCounters();
            taskAttemptStatus.counters = cntrs == null ? null : TypeConverter.toYarn((Counters)attemptInfo.getCounters());
            this.actualHandler.handle((Event)new TaskAttemptStatusUpdateEvent(taskAttemptStatus.id, taskAttemptStatus));
        }

        private void sendAssignedEvent(TaskAttemptId yarnAttemptID, JobHistoryParser.TaskAttemptInfo attemptInfo) {
            LOG.info((Object)("Sending assigned event to " + yarnAttemptID));
            ContainerId cId = attemptInfo.getContainerId();
            NodeId nodeId = ConverterUtils.toNodeId((String)attemptInfo.getHostname());
            Container container = BuilderUtils.newContainer((ContainerId)cId, (NodeId)nodeId, (String)(attemptInfo.getTrackerName() + ":" + attemptInfo.getHttpPort()), null, null, null);
            this.actualHandler.handle((Event)new TaskAttemptContainerAssignedEvent(yarnAttemptID, container, null));
        }
    }

    class RecoveryDispatcher
    extends AsyncDispatcher {
        private final EventHandler actualHandler = super.getEventHandler();
        private final EventHandler handler;

        RecoveryDispatcher() {
            this.handler = new InterceptingEventHandler(this.actualHandler);
        }

        public void dispatch(Event event) {
            if (RecoveryService.this.recoveryMode) {
                if (event.getType() == TaskAttemptEventType.TA_CONTAINER_LAUNCHED) {
                    JobHistoryParser.TaskAttemptInfo attInfo = RecoveryService.this.getTaskAttemptInfo(((TaskAttemptEvent)event).getTaskAttemptID());
                    LOG.info((Object)("Attempt start time " + attInfo.getStartTime()));
                    RecoveryService.this.clock.setTime(attInfo.getStartTime());
                } else if (event.getType() == TaskAttemptEventType.TA_DONE || event.getType() == TaskAttemptEventType.TA_FAILMSG || event.getType() == TaskAttemptEventType.TA_KILL) {
                    JobHistoryParser.TaskAttemptInfo attInfo = RecoveryService.this.getTaskAttemptInfo(((TaskAttemptEvent)event).getTaskAttemptID());
                    LOG.info((Object)("Attempt finish time " + attInfo.getFinishTime()));
                    RecoveryService.this.clock.setTime(attInfo.getFinishTime());
                } else if (event.getType() == TaskEventType.T_ATTEMPT_FAILED || event.getType() == TaskEventType.T_ATTEMPT_KILLED || event.getType() == TaskEventType.T_ATTEMPT_SUCCEEDED) {
                    TaskTAttemptEvent tEvent = (TaskTAttemptEvent)event;
                    LOG.info((Object)("Recovered Task attempt " + tEvent.getTaskAttemptID()));
                    JobHistoryParser.TaskInfo taskInfo = (JobHistoryParser.TaskInfo)RecoveryService.this.completedTasks.get(tEvent.getTaskAttemptID().getTaskId());
                    taskInfo.getAllTaskAttempts().remove(TypeConverter.fromYarn((TaskAttemptId)tEvent.getTaskAttemptID()));
                    if (taskInfo.getAllTaskAttempts().size() == 0) {
                        RecoveryService.this.completedTasks.remove(tEvent.getTaskAttemptID().getTaskId());
                        LOG.info((Object)("CompletedTasks() " + RecoveryService.this.completedTasks.size()));
                        if (RecoveryService.this.completedTasks.size() == 0) {
                            RecoveryService.this.recoveryMode = false;
                            RecoveryService.this.clock.reset();
                            LOG.info((Object)"Setting the recovery mode to false. Recovery is complete!");
                            for (TaskEvent tEv : RecoveryService.this.pendingTaskScheduleEvents) {
                                this.actualHandler.handle((Event)tEv);
                            }
                        }
                    }
                }
            }
            super.dispatch(event);
        }

        public EventHandler getEventHandler() {
            return this.handler;
        }
    }
}

