/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.mapreduce.task.reduce;

import java.io.Closeable;
import java.io.DataInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.MalformedURLException;
import java.net.URL;
import java.net.URLConnection;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import javax.crypto.SecretKey;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.compress.CodecPool;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.Decompressor;
import org.apache.hadoop.io.compress.DefaultCodec;
import org.apache.hadoop.mapred.Counters;
import org.apache.hadoop.mapred.IFileInputStream;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.security.SecureShuffleUtils;
import org.apache.hadoop.mapreduce.task.reduce.ExceptionReporter;
import org.apache.hadoop.mapreduce.task.reduce.MapHost;
import org.apache.hadoop.mapreduce.task.reduce.MapOutput;
import org.apache.hadoop.mapreduce.task.reduce.MergeManager;
import org.apache.hadoop.mapreduce.task.reduce.ShuffleClientMetrics;
import org.apache.hadoop.mapreduce.task.reduce.ShuffleHeader;
import org.apache.hadoop.mapreduce.task.reduce.ShuffleScheduler;
import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.ReflectionUtils;

class Fetcher<K, V>
extends Thread {
    private static final Log LOG = LogFactory.getLog(Fetcher.class);
    private static final int DEFAULT_STALLED_COPY_TIMEOUT = 180000;
    private static final int UNIT_CONNECT_TIMEOUT = 60000;
    private static final int DEFAULT_READ_TIMEOUT = 180000;
    private final Progressable reporter;
    private static final String SHUFFLE_ERR_GRP_NAME = "Shuffle Errors";
    private final Counters.Counter connectionErrs;
    private final Counters.Counter ioErrs;
    private final Counters.Counter wrongLengthErrs;
    private final Counters.Counter badIdErrs;
    private final Counters.Counter wrongMapErrs;
    private final Counters.Counter wrongReduceErrs;
    private final MergeManager<K, V> merger;
    private final ShuffleScheduler<K, V> scheduler;
    private final ShuffleClientMetrics metrics;
    private final ExceptionReporter exceptionReporter;
    private final int id;
    private static int nextId = 0;
    private final int reduce;
    private final int connectionTimeout;
    private final int readTimeout;
    private final CompressionCodec codec;
    private final Decompressor decompressor;
    private final SecretKey jobTokenSecret;

    public Fetcher(JobConf job, TaskAttemptID reduceId, ShuffleScheduler<K, V> scheduler, MergeManager<K, V> merger, Reporter reporter, ShuffleClientMetrics metrics, ExceptionReporter exceptionReporter, SecretKey jobTokenSecret) {
        this.reporter = reporter;
        this.scheduler = scheduler;
        this.merger = merger;
        this.metrics = metrics;
        this.exceptionReporter = exceptionReporter;
        this.id = ++nextId;
        this.reduce = reduceId.getTaskID().getId();
        this.jobTokenSecret = jobTokenSecret;
        this.ioErrs = reporter.getCounter(SHUFFLE_ERR_GRP_NAME, ShuffleErrors.IO_ERROR.toString());
        this.wrongLengthErrs = reporter.getCounter(SHUFFLE_ERR_GRP_NAME, ShuffleErrors.WRONG_LENGTH.toString());
        this.badIdErrs = reporter.getCounter(SHUFFLE_ERR_GRP_NAME, ShuffleErrors.BAD_ID.toString());
        this.wrongMapErrs = reporter.getCounter(SHUFFLE_ERR_GRP_NAME, ShuffleErrors.WRONG_MAP.toString());
        this.connectionErrs = reporter.getCounter(SHUFFLE_ERR_GRP_NAME, ShuffleErrors.CONNECTION.toString());
        this.wrongReduceErrs = reporter.getCounter(SHUFFLE_ERR_GRP_NAME, ShuffleErrors.WRONG_REDUCE.toString());
        if (job.getCompressMapOutput()) {
            Class<? extends CompressionCodec> codecClass = job.getMapOutputCompressorClass(DefaultCodec.class);
            this.codec = (CompressionCodec)ReflectionUtils.newInstance(codecClass, (Configuration)job);
            this.decompressor = CodecPool.getDecompressor((CompressionCodec)this.codec);
        } else {
            this.codec = null;
            this.decompressor = null;
        }
        this.connectionTimeout = job.getInt("mapreduce.reduce.shuffle.connect.timeout", 180000);
        this.readTimeout = job.getInt("mapreduce.reduce.shuffle.read.timeout", 180000);
        this.setName("fetcher#" + this.id);
        this.setDaemon(true);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    @Override
    public void run() {
        try {
            while (!Thread.currentThread().isInterrupted()) {
                MapHost host = null;
                try {
                    this.merger.waitForInMemoryMerge();
                    host = this.scheduler.getHost();
                    this.metrics.threadBusy();
                    this.copyFromHost(host);
                }
                finally {
                    if (host == null) continue;
                    this.scheduler.freeHost(host);
                    this.metrics.threadFree();
                }
            }
            return;
        }
        catch (InterruptedException ie) {
            return;
        }
        catch (Throwable t) {
            this.exceptionReporter.reportException(t);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void copyFromHost(MapHost host) throws IOException {
        DataInputStream input;
        List<TaskAttemptID> maps = this.scheduler.getMapsForHost(host);
        if (maps.size() == 0) {
            return;
        }
        LOG.debug((Object)("Fetcher " + this.id + " going to fetch from " + host));
        for (TaskAttemptID tmp : maps) {
            LOG.debug((Object)tmp);
        }
        HashSet<TaskAttemptID> remaining = new HashSet<TaskAttemptID>(maps);
        boolean connectSucceeded = false;
        try {
            URL url = this.getMapOutputURL(host, maps);
            URLConnection connection = url.openConnection();
            String msgToEncode = SecureShuffleUtils.buildMsgFrom(url);
            String encHash = SecureShuffleUtils.hashFromString(msgToEncode, this.jobTokenSecret);
            connection.addRequestProperty("UrlHash", encHash);
            connection.setReadTimeout(this.readTimeout);
            this.connect(connection, this.connectionTimeout);
            connectSucceeded = true;
            input = new DataInputStream(connection.getInputStream());
            String replyHash = connection.getHeaderField("ReplyHash");
            if (replyHash == null) {
                throw new IOException("security validation of TT Map output failed");
            }
            LOG.debug((Object)("url=" + msgToEncode + ";encHash=" + encHash + ";replyHash=" + replyHash));
            SecureShuffleUtils.verifyReply(replyHash, encHash, this.jobTokenSecret);
            LOG.info((Object)("for url=" + msgToEncode + " sent hash and receievd reply"));
        }
        catch (IOException ie) {
            this.ioErrs.increment(1L);
            LOG.warn((Object)("Failed to connect to " + host + " with " + remaining.size() + " map outputs"), (Throwable)ie);
            if (!connectSucceeded) {
                for (TaskAttemptID left : remaining) {
                    this.scheduler.copyFailed(left, host, connectSucceeded);
                }
            } else {
                TaskAttemptID firstMap = maps.get(0);
                this.scheduler.copyFailed(firstMap, host, connectSucceeded);
            }
            for (TaskAttemptID left : remaining) {
                this.scheduler.putBackKnownMapOutput(host, left);
            }
            return;
        }
        try {
            boolean good = true;
            while (!remaining.isEmpty() && good) {
                good = this.copyMapOutput(host, input, remaining);
            }
            IOUtils.cleanup((Log)LOG, (Closeable[])new Closeable[]{input});
            if (good && !remaining.isEmpty()) {
                throw new IOException("server didn't return all expected map outputs: " + remaining.size() + " left.");
            }
        }
        finally {
            for (TaskAttemptID left : remaining) {
                this.scheduler.putBackKnownMapOutput(host, left);
            }
        }
    }

    private boolean copyMapOutput(MapHost host, DataInputStream input, Set<TaskAttemptID> remaining) {
        MapOutput<K, V> mapOutput = null;
        TaskAttemptID mapId = null;
        long decompressedLength = -1L;
        long compressedLength = -1L;
        try {
            long startTime = System.currentTimeMillis();
            int forReduce = -1;
            try {
                ShuffleHeader header = new ShuffleHeader();
                header.readFields(input);
                mapId = TaskAttemptID.forName(header.mapId);
                compressedLength = header.compressedLength;
                decompressedLength = header.uncompressedLength;
                forReduce = header.forReduce;
            }
            catch (IllegalArgumentException e) {
                this.badIdErrs.increment(1L);
                LOG.warn((Object)"Invalid map id ", (Throwable)e);
                return false;
            }
            if (!this.verifySanity(compressedLength, decompressedLength, forReduce, remaining, mapId)) {
                return false;
            }
            LOG.debug((Object)("header: " + mapId + ", len: " + compressedLength + ", decomp len: " + decompressedLength));
            mapOutput = this.merger.reserve(mapId, decompressedLength, this.id);
            if (mapOutput.getType() == MapOutput.Type.WAIT) {
                LOG.info((Object)("fetcher#" + this.id + " - MergerManager returned Status.WAIT ..."));
                return false;
            }
            LOG.info((Object)("fetcher#" + this.id + " about to shuffle output of map " + mapOutput.getMapId() + " decomp: " + decompressedLength + " len: " + compressedLength + " to " + (Object)((Object)mapOutput.getType())));
            if (mapOutput.getType() == MapOutput.Type.MEMORY) {
                this.shuffleToMemory(host, mapOutput, input, (int)decompressedLength, (int)compressedLength);
            } else {
                this.shuffleToDisk(host, mapOutput, input, compressedLength);
            }
            long endTime = System.currentTimeMillis();
            this.scheduler.copySucceeded(mapId, host, compressedLength, endTime - startTime, mapOutput);
            remaining.remove(mapId);
            this.metrics.successFetch();
            return true;
        }
        catch (IOException ioe) {
            this.ioErrs.increment(1L);
            if (mapId == null || mapOutput == null) {
                LOG.info((Object)("fetcher#" + this.id + " failed to read map header" + mapId + " decomp: " + decompressedLength + ", " + compressedLength), (Throwable)ioe);
                return false;
            }
            LOG.info((Object)("Failed to shuffle output of " + mapId + " from " + host.getHostName()), (Throwable)ioe);
            mapOutput.abort();
            this.scheduler.copyFailed(mapId, host, true);
            this.metrics.failedFetch();
            return false;
        }
    }

    private boolean verifySanity(long compressedLength, long decompressedLength, int forReduce, Set<TaskAttemptID> remaining, TaskAttemptID mapId) {
        if (compressedLength < 0L || decompressedLength < 0L) {
            this.wrongLengthErrs.increment(1L);
            LOG.warn((Object)(this.getName() + " invalid lengths in map output header: id: " + mapId + " len: " + compressedLength + ", decomp len: " + decompressedLength));
            return false;
        }
        if (forReduce != this.reduce) {
            this.wrongReduceErrs.increment(1L);
            LOG.warn((Object)(this.getName() + " data for the wrong reduce map: " + mapId + " len: " + compressedLength + " decomp len: " + decompressedLength + " for reduce " + forReduce));
            return false;
        }
        if (!remaining.contains(mapId)) {
            this.wrongMapErrs.increment(1L);
            LOG.warn((Object)("Invalid map-output! Received output for " + mapId));
            return false;
        }
        return true;
    }

    private URL getMapOutputURL(MapHost host, List<TaskAttemptID> maps) throws MalformedURLException {
        StringBuffer url = new StringBuffer(host.getBaseUrl());
        boolean first = true;
        for (TaskAttemptID mapId : maps) {
            if (!first) {
                url.append(",");
            }
            url.append(mapId);
            first = false;
        }
        LOG.debug((Object)("MapOutput URL for " + host + " -> " + url.toString()));
        return new URL(url.toString());
    }

    private void connect(URLConnection connection, int connectionTimeout) throws IOException {
        int unit = 0;
        if (connectionTimeout < 0) {
            throw new IOException("Invalid timeout [timeout = " + connectionTimeout + " ms]");
        }
        if (connectionTimeout > 0) {
            unit = Math.min(60000, connectionTimeout);
        }
        connection.setConnectTimeout(unit);
        while (true) {
            try {
                connection.connect();
            }
            catch (IOException ioe) {
                if ((connectionTimeout -= unit) == 0) {
                    throw ioe;
                }
                if (connectionTimeout >= unit) continue;
                unit = connectionTimeout;
                connection.setConnectTimeout(unit);
                continue;
            }
            break;
        }
    }

    private void shuffleToMemory(MapHost host, MapOutput<K, V> mapOutput, InputStream input, int decompressedLength, int compressedLength) throws IOException {
        IFileInputStream checksumIn = new IFileInputStream(input, compressedLength);
        input = checksumIn;
        if (this.codec != null) {
            this.decompressor.reset();
            input = this.codec.createInputStream(input, this.decompressor);
        }
        byte[] shuffleData = mapOutput.getMemory();
        try {
            IOUtils.readFully((InputStream)input, (byte[])shuffleData, (int)0, (int)shuffleData.length);
            this.metrics.inputBytes(shuffleData.length);
            this.reporter.progress();
            LOG.info((Object)("Read " + shuffleData.length + " bytes from map-output for " + mapOutput.getMapId()));
        }
        catch (IOException ioe) {
            IOUtils.cleanup((Log)LOG, (Closeable[])new Closeable[]{input});
            throw ioe;
        }
    }

    private void shuffleToDisk(MapHost host, MapOutput<K, V> mapOutput, InputStream input, long compressedLength) throws IOException {
        long bytesLeft;
        OutputStream output = mapOutput.getDisk();
        try {
            int n;
            int BYTES_TO_READ = 65536;
            byte[] buf = new byte[65536];
            for (bytesLeft = compressedLength; bytesLeft > 0L; bytesLeft -= (long)n) {
                n = input.read(buf, 0, (int)Math.min(bytesLeft, 65536L));
                if (n < 0) {
                    throw new IOException("read past end of stream reading " + mapOutput.getMapId());
                }
                output.write(buf, 0, n);
                this.metrics.inputBytes(n);
                this.reporter.progress();
            }
            LOG.info((Object)("Read " + (compressedLength - bytesLeft) + " bytes from map-output for " + mapOutput.getMapId()));
            output.close();
        }
        catch (IOException ioe) {
            IOUtils.cleanup((Log)LOG, (Closeable[])new Closeable[]{input, output});
            throw ioe;
        }
        if (bytesLeft != 0L) {
            throw new IOException("Incomplete map output received for " + mapOutput.getMapId() + " from " + host.getHostName() + " (" + bytesLeft + " bytes missing of " + compressedLength + ")");
        }
    }

    private static enum ShuffleErrors {
        IO_ERROR,
        WRONG_LENGTH,
        BAD_ID,
        WRONG_MAP,
        CONNECTION,
        WRONG_REDUCE;

    }
}

