/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.mapreduce.task.reduce;

import java.io.IOException;
import java.net.InetAddress;
import java.net.URI;
import java.net.UnknownHostException;
import java.text.DecimalFormat;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapred.Counters;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.TaskAttemptID;
import org.apache.hadoop.mapred.TaskCompletionEvent;
import org.apache.hadoop.mapred.TaskStatus;
import org.apache.hadoop.mapreduce.TaskID;
import org.apache.hadoop.mapreduce.task.reduce.ExceptionReporter;
import org.apache.hadoop.mapreduce.task.reduce.MapHost;
import org.apache.hadoop.mapreduce.task.reduce.MapOutput;
import org.apache.hadoop.mapreduce.task.reduce.ShuffleScheduler;
import org.apache.hadoop.util.Progress;
import org.apache.hadoop.util.Time;

@InterfaceAudience.Private
@InterfaceStability.Unstable
public class ShuffleSchedulerImpl<K, V>
implements ShuffleScheduler<K, V> {
    private static final ThreadLocal<Long> SHUFFLE_START = new ThreadLocal<Long>(){

        @Override
        protected Long initialValue() {
            return 0L;
        }
    };
    private static final Log LOG = LogFactory.getLog(ShuffleSchedulerImpl.class);
    private static final int MAX_MAPS_AT_ONCE = 20;
    private static final long INITIAL_PENALTY = 10000L;
    private static final float PENALTY_GROWTH_RATE = 1.3f;
    private static final int REPORT_FAILURE_LIMIT = 10;
    private static final float BYTES_PER_MILLIS_TO_MBS = 9.536743E-4f;
    private final boolean[] finishedMaps;
    private final int totalMaps;
    private int remainingMaps;
    private Map<String, MapHost> mapLocations = new HashMap<String, MapHost>();
    private Set<MapHost> pendingHosts = new HashSet<MapHost>();
    private Set<org.apache.hadoop.mapreduce.TaskAttemptID> obsoleteMaps = new HashSet<org.apache.hadoop.mapreduce.TaskAttemptID>();
    private final org.apache.hadoop.mapreduce.TaskAttemptID reduceId;
    private final Random random = new Random();
    private final DelayQueue<Penalty> penalties = new DelayQueue();
    private final Referee referee = new Referee();
    private final Map<org.apache.hadoop.mapreduce.TaskAttemptID, IntWritable> failureCounts = new HashMap<org.apache.hadoop.mapreduce.TaskAttemptID, IntWritable>();
    private final Map<String, IntWritable> hostFailures = new HashMap<String, IntWritable>();
    private final TaskStatus status;
    private final ExceptionReporter reporter;
    private final int abortFailureLimit;
    private final Progress progress;
    private final Counters.Counter shuffledMapsCounter;
    private final Counters.Counter reduceShuffleBytes;
    private final Counters.Counter failedShuffleCounter;
    private final long startTime;
    private long lastProgressTime;
    private final CopyTimeTracker copyTimeTracker;
    private volatile int maxMapRuntime = 0;
    private final int maxFailedUniqueFetches;
    private final int maxFetchFailuresBeforeReporting;
    private long totalBytesShuffledTillNow = 0L;
    private final DecimalFormat mbpsFormat = new DecimalFormat("0.00");
    private final boolean reportReadErrorImmediately;
    private long maxDelay = 60000L;
    private int maxHostFailures;

    public ShuffleSchedulerImpl(JobConf job, TaskStatus status, org.apache.hadoop.mapreduce.TaskAttemptID reduceId, ExceptionReporter reporter, Progress progress, Counters.Counter shuffledMapsCounter, Counters.Counter reduceShuffleBytes, Counters.Counter failedShuffleCounter) {
        this.totalMaps = job.getNumMapTasks();
        this.abortFailureLimit = Math.max(30, this.totalMaps / 10);
        this.copyTimeTracker = new CopyTimeTracker();
        this.remainingMaps = this.totalMaps;
        this.finishedMaps = new boolean[this.remainingMaps];
        this.reporter = reporter;
        this.status = status;
        this.reduceId = reduceId;
        this.progress = progress;
        this.shuffledMapsCounter = shuffledMapsCounter;
        this.reduceShuffleBytes = reduceShuffleBytes;
        this.failedShuffleCounter = failedShuffleCounter;
        this.lastProgressTime = this.startTime = Time.monotonicNow();
        this.referee.start();
        this.maxFailedUniqueFetches = Math.min(this.totalMaps, 5);
        this.maxFetchFailuresBeforeReporting = job.getInt("mapreduce.reduce.shuffle.maxfetchfailures", 10);
        this.reportReadErrorImmediately = job.getBoolean("mapreduce.reduce.shuffle.notify.readerror", true);
        this.maxDelay = job.getLong("mapreduce.reduce.shuffle.retry-delay.max.ms", 60000L);
        this.maxHostFailures = job.getInt("mapreduce.reduce.shuffle.max-host-failures", 5);
    }

    @Override
    public void resolve(TaskCompletionEvent event) {
        switch (event.getTaskStatus()) {
            case SUCCEEDED: {
                URI u = ShuffleSchedulerImpl.getBaseURI(this.reduceId, event.getTaskTrackerHttp());
                this.addKnownMapOutput(u.getHost() + ":" + u.getPort(), u.toString(), event.getTaskAttemptId());
                this.maxMapRuntime = Math.max(this.maxMapRuntime, event.getTaskRunTime());
                break;
            }
            case FAILED: 
            case KILLED: 
            case OBSOLETE: {
                this.obsoleteMapOutput(event.getTaskAttemptId());
                LOG.info((Object)("Ignoring obsolete output of " + (Object)((Object)event.getTaskStatus()) + " map-task: '" + event.getTaskAttemptId() + "'"));
                break;
            }
            case TIPFAILED: {
                this.tipFailed(event.getTaskAttemptId().getTaskID());
                LOG.info((Object)("Ignoring output of failed map TIP: '" + event.getTaskAttemptId() + "'"));
            }
        }
    }

    static URI getBaseURI(org.apache.hadoop.mapreduce.TaskAttemptID reduceId, String url) {
        StringBuffer baseUrl = new StringBuffer(url);
        if (!url.endsWith("/")) {
            baseUrl.append("/");
        }
        baseUrl.append("mapOutput?job=");
        baseUrl.append(reduceId.getJobID());
        baseUrl.append("&reduce=");
        baseUrl.append(reduceId.getTaskID().getId());
        baseUrl.append("&map=");
        URI u = URI.create(baseUrl.toString());
        return u;
    }

    public synchronized void copySucceeded(org.apache.hadoop.mapreduce.TaskAttemptID mapId, MapHost host, long bytes, long startMillis, long endMillis, MapOutput<K, V> output) throws IOException {
        this.failureCounts.remove(mapId);
        this.hostFailures.remove(host.getHostName());
        int mapIndex = mapId.getTaskID().getId();
        if (!this.finishedMaps[mapIndex]) {
            long copyMillis;
            output.commit();
            this.finishedMaps[mapIndex] = true;
            this.shuffledMapsCounter.increment(1L);
            if (--this.remainingMaps == 0) {
                this.notifyAll();
            }
            if ((copyMillis = endMillis - startMillis) == 0L) {
                copyMillis = 1L;
            }
            float bytesPerMillis = (float)bytes / (float)copyMillis;
            float transferRate = bytesPerMillis * 9.536743E-4f;
            String individualProgress = "copy task(" + mapId + " succeeded" + " at " + this.mbpsFormat.format(transferRate) + " MB/s)";
            this.copyTimeTracker.add(startMillis, endMillis);
            this.totalBytesShuffledTillNow += bytes;
            this.updateStatus(individualProgress);
            this.reduceShuffleBytes.increment(bytes);
            this.lastProgressTime = Time.monotonicNow();
            LOG.debug((Object)("map " + mapId + " done " + this.status.getStateString()));
        }
    }

    private synchronized void updateStatus(String individualProgress) {
        int mapsDone = this.totalMaps - this.remainingMaps;
        long totalCopyMillis = this.copyTimeTracker.getCopyMillis();
        if (totalCopyMillis == 0L) {
            totalCopyMillis = 1L;
        }
        float bytesPerMillis = (float)this.totalBytesShuffledTillNow / (float)totalCopyMillis;
        float transferRate = bytesPerMillis * 9.536743E-4f;
        this.progress.set((float)mapsDone / (float)this.totalMaps);
        String statusString = mapsDone + " / " + this.totalMaps + " copied.";
        this.status.setStateString(statusString);
        if (individualProgress != null) {
            this.progress.setStatus(individualProgress + " Aggregated copy rate(" + mapsDone + " of " + this.totalMaps + " at " + this.mbpsFormat.format(transferRate) + " MB/s)");
        } else {
            this.progress.setStatus("copy(" + mapsDone + " of " + this.totalMaps + " at " + this.mbpsFormat.format(transferRate) + " MB/s)");
        }
    }

    private void updateStatus() {
        this.updateStatus(null);
    }

    public synchronized void hostFailed(String hostname) {
        if (this.hostFailures.containsKey(hostname)) {
            IntWritable x = this.hostFailures.get(hostname);
            x.set(x.get() + 1);
        } else {
            this.hostFailures.put(hostname, new IntWritable(1));
        }
    }

    public synchronized void copyFailed(org.apache.hadoop.mapreduce.TaskAttemptID mapId, MapHost host, boolean readError, boolean connectExcpt) {
        boolean hostFail;
        host.penalize();
        int failures = 1;
        if (this.failureCounts.containsKey(mapId)) {
            IntWritable x = this.failureCounts.get(mapId);
            x.set(x.get() + 1);
            failures = x.get();
        } else {
            this.failureCounts.put(mapId, new IntWritable(1));
        }
        String hostname = host.getHostName();
        IntWritable hostFailedNum = this.hostFailures.get(hostname);
        if (hostFailedNum == null) {
            this.hostFailures.put(hostname, new IntWritable(1));
        }
        boolean bl = hostFail = this.hostFailures.get(hostname).get() > this.getMaxHostFailures();
        if (failures >= this.abortFailureLimit) {
            try {
                throw new IOException(failures + " failures downloading " + mapId);
            }
            catch (IOException ie) {
                this.reporter.reportException(ie);
            }
        }
        this.checkAndInformMRAppMaster(failures, mapId, readError, connectExcpt, hostFail);
        this.checkReducerHealth();
        long delay = (long)(10000.0 * Math.pow(1.3f, failures));
        if (delay > this.maxDelay) {
            delay = this.maxDelay;
        }
        this.penalties.add(new Penalty(host, delay));
        this.failedShuffleCounter.increment(1L);
    }

    public void reportLocalError(IOException ioe) {
        try {
            LOG.error((Object)("Shuffle failed : local error on this node: " + InetAddress.getLocalHost()));
        }
        catch (UnknownHostException e) {
            LOG.error((Object)"Shuffle failed : local error on this node");
        }
        this.reporter.reportException(ioe);
    }

    private void checkAndInformMRAppMaster(int failures, org.apache.hadoop.mapreduce.TaskAttemptID mapId, boolean readError, boolean connectExcpt, boolean hostFailed) {
        if (connectExcpt || this.reportReadErrorImmediately && readError || failures % this.maxFetchFailuresBeforeReporting == 0 || hostFailed) {
            LOG.info((Object)("Reporting fetch failure for " + mapId + " to MRAppMaster."));
            this.status.addFetchFailedMap((TaskAttemptID)mapId);
        }
    }

    private void checkReducerHealth() {
        boolean reducerStalled;
        int doneMaps;
        float MAX_ALLOWED_FAILED_FETCH_ATTEMPT_PERCENT = 0.5f;
        float MIN_REQUIRED_PROGRESS_PERCENT = 0.5f;
        float MAX_ALLOWED_STALL_TIME_PERCENT = 0.5f;
        long totalFailures = this.failedShuffleCounter.getValue();
        boolean reducerHealthy = (float)totalFailures / (float)(totalFailures + (long)(doneMaps = this.totalMaps - this.remainingMaps)) < 0.5f;
        boolean reducerProgressedEnough = (float)doneMaps / (float)this.totalMaps >= 0.5f;
        int stallDuration = (int)(Time.monotonicNow() - this.lastProgressTime);
        int shuffleProgressDuration = (int)(this.lastProgressTime - this.startTime);
        int minShuffleRunDuration = Math.max(shuffleProgressDuration, this.maxMapRuntime);
        boolean bl = reducerStalled = (float)stallDuration / (float)minShuffleRunDuration >= 0.5f;
        if (!(this.failureCounts.size() < this.maxFailedUniqueFetches && this.failureCounts.size() != this.totalMaps - doneMaps || reducerHealthy || reducerProgressedEnough && !reducerStalled)) {
            LOG.fatal((Object)"Shuffle failed with too many fetch failures and insufficient progress!");
            String errorMsg = "Exceeded MAX_FAILED_UNIQUE_FETCHES; bailing-out.";
            this.reporter.reportException(new IOException(errorMsg));
        }
    }

    public synchronized void tipFailed(TaskID taskId) {
        if (!this.finishedMaps[taskId.getId()]) {
            this.finishedMaps[taskId.getId()] = true;
            if (--this.remainingMaps == 0) {
                this.notifyAll();
            }
            this.updateStatus();
        }
    }

    public synchronized void addKnownMapOutput(String hostName, String hostUrl, org.apache.hadoop.mapreduce.TaskAttemptID mapId) {
        MapHost host = this.mapLocations.get(hostName);
        if (host == null) {
            host = new MapHost(hostName, hostUrl);
            this.mapLocations.put(hostName, host);
        }
        host.addKnownMap(mapId);
        if (host.getState() == MapHost.State.PENDING) {
            this.pendingHosts.add(host);
            this.notifyAll();
        }
    }

    public synchronized void obsoleteMapOutput(org.apache.hadoop.mapreduce.TaskAttemptID mapId) {
        this.obsoleteMaps.add(mapId);
    }

    public synchronized void putBackKnownMapOutput(MapHost host, org.apache.hadoop.mapreduce.TaskAttemptID mapId) {
        host.addKnownMap(mapId);
    }

    public synchronized MapHost getHost() throws InterruptedException {
        while (this.pendingHosts.isEmpty()) {
            this.wait();
        }
        MapHost host = null;
        Iterator<MapHost> iter = this.pendingHosts.iterator();
        int numToPick = this.random.nextInt(this.pendingHosts.size());
        for (int i = 0; i <= numToPick; ++i) {
            host = iter.next();
        }
        this.pendingHosts.remove(host);
        host.markBusy();
        LOG.debug((Object)("Assigning " + host + " with " + host.getNumKnownMapOutputs() + " to " + Thread.currentThread().getName()));
        SHUFFLE_START.set(Time.monotonicNow());
        return host;
    }

    public synchronized List<org.apache.hadoop.mapreduce.TaskAttemptID> getMapsForHost(MapHost host) {
        org.apache.hadoop.mapreduce.TaskAttemptID id;
        List<org.apache.hadoop.mapreduce.TaskAttemptID> list = host.getAndClearKnownMaps();
        Iterator<org.apache.hadoop.mapreduce.TaskAttemptID> itr = list.iterator();
        ArrayList<org.apache.hadoop.mapreduce.TaskAttemptID> result = new ArrayList<org.apache.hadoop.mapreduce.TaskAttemptID>();
        int includedMaps = 0;
        int totalSize = list.size();
        while (itr.hasNext()) {
            id = itr.next();
            if (this.obsoleteMaps.contains(id) || this.finishedMaps[id.getTaskID().getId()]) continue;
            result.add(id);
            if (++includedMaps < 20) continue;
            break;
        }
        while (itr.hasNext()) {
            id = itr.next();
            if (this.obsoleteMaps.contains(id) || this.finishedMaps[id.getTaskID().getId()]) continue;
            host.addKnownMap(id);
        }
        LOG.debug((Object)("assigned " + includedMaps + " of " + totalSize + " to " + host + " to " + Thread.currentThread().getName()));
        return result;
    }

    public synchronized void freeHost(MapHost host) {
        if (host.getState() != MapHost.State.PENALIZED && host.markAvailable() == MapHost.State.PENDING) {
            this.pendingHosts.add(host);
            this.notifyAll();
        }
        LOG.info((Object)(host + " freed by " + Thread.currentThread().getName() + " in " + (Time.monotonicNow() - SHUFFLE_START.get()) + "ms"));
    }

    public synchronized void resetKnownMaps() {
        this.mapLocations.clear();
        this.obsoleteMaps.clear();
        this.pendingHosts.clear();
    }

    @Override
    public synchronized boolean waitUntilDone(int millis) throws InterruptedException {
        if (this.remainingMaps > 0) {
            this.wait(millis);
            return this.remainingMaps == 0;
        }
        return true;
    }

    @Override
    public void close() throws InterruptedException {
        this.referee.interrupt();
        this.referee.join();
    }

    public int getMaxHostFailures() {
        return this.maxHostFailures;
    }

    private static class CopyTimeTracker {
        List<Interval> intervals = Collections.emptyList();
        long copyMillis = 0L;

        public void add(long s, long e) {
            Interval interval = new Interval(s, e);
            this.copyMillis = this.getTotalCopyMillis(interval);
        }

        public long getCopyMillis() {
            return this.copyMillis;
        }

        private long getTotalCopyMillis(Interval newInterval) {
            if (newInterval == null) {
                return this.copyMillis;
            }
            ArrayList<Interval> result = new ArrayList<Interval>(this.intervals.size() + 1);
            for (Interval interval : this.intervals) {
                if (interval.end < newInterval.start) {
                    result.add(interval);
                    continue;
                }
                if (interval.start > newInterval.end) {
                    result.add(newInterval);
                    newInterval = interval;
                    continue;
                }
                newInterval = new Interval(Math.min(interval.start, newInterval.start), Math.max(newInterval.end, interval.end));
            }
            result.add(newInterval);
            this.intervals = result;
            long length = 0L;
            for (Interval interval : this.intervals) {
                length += interval.getIntervalLength();
            }
            return length;
        }

        private static class Interval {
            final long start;
            final long end;

            public Interval(long s, long e) {
                this.start = s;
                this.end = e;
            }

            public long getIntervalLength() {
                return this.end - this.start;
            }
        }
    }

    private class Referee
    extends Thread {
        public Referee() {
            this.setName("ShufflePenaltyReferee");
            this.setDaemon(true);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         * Enabled aggressive block sorting
         * Enabled unnecessary exception pruning
         * Enabled aggressive exception aggregation
         */
        @Override
        public void run() {
            try {
                while (true) {
                    MapHost host = ((Penalty)((ShuffleSchedulerImpl)ShuffleSchedulerImpl.this).penalties.take()).host;
                    ShuffleSchedulerImpl shuffleSchedulerImpl = ShuffleSchedulerImpl.this;
                    synchronized (shuffleSchedulerImpl) {
                        if (host.markAvailable() == MapHost.State.PENDING) {
                            ShuffleSchedulerImpl.this.pendingHosts.add(host);
                            ShuffleSchedulerImpl.this.notifyAll();
                        }
                    }
                }
            }
            catch (InterruptedException ie) {
                return;
            }
            catch (Throwable t) {
                ShuffleSchedulerImpl.this.reporter.reportException(t);
                return;
            }
        }
    }

    private static class Penalty
    implements Delayed {
        MapHost host;
        private long endTime;

        Penalty(MapHost host, long delay) {
            this.host = host;
            this.endTime = Time.monotonicNow() + delay;
        }

        @Override
        public long getDelay(TimeUnit unit) {
            long remainingTime = this.endTime - Time.monotonicNow();
            return unit.convert(remainingTime, TimeUnit.MILLISECONDS);
        }

        @Override
        public int compareTo(Delayed o) {
            long other = ((Penalty)o).endTime;
            return this.endTime == other ? 0 : (this.endTime < other ? -1 : 1);
        }
    }
}

