/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.mapreduce.task.reduce;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Set;
import java.util.TreeSet;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.ChecksumFileSystem;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.mapred.Counters;
import org.apache.hadoop.mapred.IFile;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapOutputFile;
import org.apache.hadoop.mapred.Merger;
import org.apache.hadoop.mapred.RawKeyValueIterator;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.Task;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.TaskID;
import org.apache.hadoop.mapreduce.task.reduce.ExceptionReporter;
import org.apache.hadoop.mapreduce.task.reduce.InMemoryReader;
import org.apache.hadoop.mapreduce.task.reduce.InMemoryWriter;
import org.apache.hadoop.mapreduce.task.reduce.MapOutput;
import org.apache.hadoop.mapreduce.task.reduce.MergeThread;
import org.apache.hadoop.util.Progress;
import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.ReflectionUtils;

@InterfaceAudience.Private
@InterfaceStability.Unstable
public class MergeManager<K, V> {
    private static final Log LOG = LogFactory.getLog(MergeManager.class);
    private static final float MAX_SINGLE_SHUFFLE_SEGMENT_FRACTION = 0.25f;
    private final TaskAttemptID reduceId;
    private final JobConf jobConf;
    private final FileSystem localFS;
    private final FileSystem rfs;
    private final LocalDirAllocator localDirAllocator;
    protected MapOutputFile mapOutputFile;
    Set<MapOutput<K, V>> inMemoryMergedMapOutputs = new TreeSet<MapOutput<K, V>>(new MapOutput.MapOutputComparator());
    private final IntermediateMemoryToMemoryMerger memToMemMerger;
    Set<MapOutput<K, V>> inMemoryMapOutputs = new TreeSet<MapOutput<K, V>>(new MapOutput.MapOutputComparator());
    private final InMemoryMerger inMemoryMerger;
    Set<Path> onDiskMapOutputs = new TreeSet<Path>();
    private final OnDiskMerger onDiskMerger;
    private final long memoryLimit;
    private long usedMemory;
    private final long maxSingleShuffleLimit;
    private final int memToMemMergeOutputsThreshold;
    private final long mergeThreshold;
    private final int ioSortFactor;
    private final Reporter reporter;
    private final ExceptionReporter exceptionReporter;
    private final Class<? extends Reducer> combinerClass;
    private final Task.CombineOutputCollector<K, V> combineCollector;
    private final Counters.Counter spilledRecordsCounter;
    private final Counters.Counter reduceCombineInputCounter;
    private final Counters.Counter mergedMapOutputsCounter;
    private final CompressionCodec codec;
    private final Progress mergePhase;
    private final MapOutput<K, V> stallShuffle = new MapOutput(null);

    public MergeManager(TaskAttemptID reduceId, JobConf jobConf, FileSystem localFS, LocalDirAllocator localDirAllocator, Reporter reporter, CompressionCodec codec, Class<? extends Reducer> combinerClass, Task.CombineOutputCollector<K, V> combineCollector, Counters.Counter spilledRecordsCounter, Counters.Counter reduceCombineInputCounter, Counters.Counter mergedMapOutputsCounter, ExceptionReporter exceptionReporter, Progress mergePhase, MapOutputFile mapOutputFile) {
        this.reduceId = reduceId;
        this.jobConf = jobConf;
        this.localDirAllocator = localDirAllocator;
        this.exceptionReporter = exceptionReporter;
        this.reporter = reporter;
        this.codec = codec;
        this.combinerClass = combinerClass;
        this.combineCollector = combineCollector;
        this.reduceCombineInputCounter = reduceCombineInputCounter;
        this.spilledRecordsCounter = spilledRecordsCounter;
        this.mergedMapOutputsCounter = mergedMapOutputsCounter;
        this.mapOutputFile = mapOutputFile;
        this.mapOutputFile.setConf(jobConf);
        this.localFS = localFS;
        this.rfs = ((LocalFileSystem)localFS).getRaw();
        float maxInMemCopyUse = jobConf.getFloat("mapreduce.reduce.shuffle.input.buffer.percent", 0.9f);
        if ((double)maxInMemCopyUse > 1.0 || (double)maxInMemCopyUse < 0.0) {
            throw new IllegalArgumentException("Invalid value for mapreduce.reduce.shuffle.input.buffer.percent: " + maxInMemCopyUse);
        }
        this.memoryLimit = (long)((float)jobConf.getLong("mapreduce.reduce.memory.totalbytes", Math.min(Runtime.getRuntime().maxMemory(), Integer.MAX_VALUE)) * maxInMemCopyUse);
        this.ioSortFactor = jobConf.getInt("mapreduce.task.io.sort.factor", 100);
        this.maxSingleShuffleLimit = (long)((float)this.memoryLimit * 0.25f);
        this.memToMemMergeOutputsThreshold = jobConf.getInt("mapreduce.reduce.merge.memtomem.threshold", this.ioSortFactor);
        this.mergeThreshold = (long)((float)this.memoryLimit * jobConf.getFloat("mapreduce.reduce.shuffle.merge.percent", 0.9f));
        LOG.info((Object)("MergerManager: memoryLimit=" + this.memoryLimit + ", " + "maxSingleShuffleLimit=" + this.maxSingleShuffleLimit + ", " + "mergeThreshold=" + this.mergeThreshold + ", " + "ioSortFactor=" + this.ioSortFactor + ", " + "memToMemMergeOutputsThreshold=" + this.memToMemMergeOutputsThreshold));
        boolean allowMemToMemMerge = jobConf.getBoolean("mapreduce.reduce.merge.memtomem.enabled", false);
        if (allowMemToMemMerge) {
            this.memToMemMerger = new IntermediateMemoryToMemoryMerger(this, this.memToMemMergeOutputsThreshold);
            this.memToMemMerger.start();
        } else {
            this.memToMemMerger = null;
        }
        this.inMemoryMerger = new InMemoryMerger(this);
        this.inMemoryMerger.start();
        this.onDiskMerger = new OnDiskMerger(this);
        this.onDiskMerger.start();
        this.mergePhase = mergePhase;
    }

    TaskAttemptID getReduceId() {
        return this.reduceId;
    }

    public void waitForInMemoryMerge() throws InterruptedException {
        this.inMemoryMerger.waitForMerge();
    }

    private boolean canShuffleToMemory(long requestedSize) {
        return requestedSize < this.maxSingleShuffleLimit;
    }

    public synchronized MapOutput<K, V> reserve(TaskAttemptID mapId, long requestedSize, int fetcher) throws IOException {
        if (!this.canShuffleToMemory(requestedSize)) {
            LOG.info((Object)(mapId + ": Shuffling to disk since " + requestedSize + " is greater than maxSingleShuffleLimit (" + this.maxSingleShuffleLimit + ")"));
            return new MapOutput(mapId, this, requestedSize, this.jobConf, this.localDirAllocator, fetcher, true, this.mapOutputFile);
        }
        if (this.usedMemory > this.memoryLimit) {
            LOG.debug((Object)(mapId + ": Stalling shuffle since usedMemory (" + this.usedMemory + ") is greater than memoryLimit (" + this.memoryLimit + ")"));
            return this.stallShuffle;
        }
        LOG.debug((Object)(mapId + ": Proceeding with shuffle since usedMemory (" + this.usedMemory + ") is lesser than memoryLimit (" + this.memoryLimit + ")"));
        return this.unconditionalReserve(mapId, requestedSize, true);
    }

    private synchronized MapOutput<K, V> unconditionalReserve(TaskAttemptID mapId, long requestedSize, boolean primaryMapOutput) {
        this.usedMemory += requestedSize;
        return new MapOutput(mapId, this, (int)requestedSize, primaryMapOutput);
    }

    synchronized void unreserve(long size) {
        this.usedMemory -= size;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public synchronized void closeInMemoryFile(MapOutput<K, V> mapOutput) {
        this.inMemoryMapOutputs.add(mapOutput);
        LOG.info((Object)("closeInMemoryFile -> map-output of size: " + mapOutput.getSize() + ", inMemoryMapOutputs.size() -> " + this.inMemoryMapOutputs.size()));
        MergeThread mergeThread = this.inMemoryMerger;
        synchronized (mergeThread) {
            if (!this.inMemoryMerger.isInProgress() && this.usedMemory >= this.mergeThreshold) {
                LOG.info((Object)("Starting inMemoryMerger's merge since usedMemory=" + this.usedMemory + " > mergeThreshold=" + this.mergeThreshold));
                this.inMemoryMapOutputs.addAll(this.inMemoryMergedMapOutputs);
                this.inMemoryMergedMapOutputs.clear();
                this.inMemoryMerger.startMerge(this.inMemoryMapOutputs);
            }
        }
        if (this.memToMemMerger != null) {
            mergeThread = this.memToMemMerger;
            synchronized (mergeThread) {
                if (!this.memToMemMerger.isInProgress() && this.inMemoryMapOutputs.size() >= this.memToMemMergeOutputsThreshold) {
                    this.memToMemMerger.startMerge(this.inMemoryMapOutputs);
                }
            }
        }
    }

    public synchronized void closeInMemoryMergedFile(MapOutput<K, V> mapOutput) {
        this.inMemoryMergedMapOutputs.add(mapOutput);
        LOG.info((Object)("closeInMemoryMergedFile -> size: " + mapOutput.getSize() + ", inMemoryMergedMapOutputs.size() -> " + this.inMemoryMergedMapOutputs.size()));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public synchronized void closeOnDiskFile(Path file) {
        this.onDiskMapOutputs.add(file);
        OnDiskMerger onDiskMerger = this.onDiskMerger;
        synchronized (onDiskMerger) {
            if (!this.onDiskMerger.isInProgress() && this.onDiskMapOutputs.size() >= 2 * this.ioSortFactor - 1) {
                this.onDiskMerger.startMerge(this.onDiskMapOutputs);
            }
        }
    }

    public RawKeyValueIterator close() throws Throwable {
        if (this.memToMemMerger != null) {
            this.memToMemMerger.close();
        }
        this.inMemoryMerger.close();
        this.onDiskMerger.close();
        ArrayList<MapOutput<K, V>> memory = new ArrayList<MapOutput<K, V>>(this.inMemoryMergedMapOutputs);
        memory.addAll(this.inMemoryMapOutputs);
        ArrayList<Path> disk = new ArrayList<Path>(this.onDiskMapOutputs);
        return this.finalMerge(this.jobConf, this.rfs, memory, disk);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void combineAndSpill(RawKeyValueIterator kvIter, Counters.Counter inCounter) throws IOException {
        JobConf job = this.jobConf;
        Reducer combiner = (Reducer)ReflectionUtils.newInstance(this.combinerClass, (Configuration)job);
        Class<?> keyClass = job.getMapOutputKeyClass();
        Class<?> valClass = job.getMapOutputValueClass();
        RawComparator comparator = job.getOutputKeyComparator();
        try {
            Task.CombineValuesIterator values = new Task.CombineValuesIterator(kvIter, comparator, keyClass, valClass, job, Reporter.NULL, inCounter);
            while (values.more()) {
                combiner.reduce(values.getKey(), values, this.combineCollector, Reporter.NULL);
                values.nextKey();
            }
        }
        finally {
            combiner.close();
        }
    }

    private long createInMemorySegments(List<MapOutput<K, V>> inMemoryMapOutputs, List<Merger.Segment<K, V>> inMemorySegments, long leaveBytes) throws IOException {
        long totalSize = 0L;
        long fullSize = 0L;
        for (MapOutput<K, V> mo : inMemoryMapOutputs) {
            fullSize += (long)mo.getMemory().length;
        }
        while (fullSize > leaveBytes) {
            MapOutput<K, V> mo = inMemoryMapOutputs.remove(0);
            byte[] data = mo.getMemory();
            long size = data.length;
            totalSize += size;
            fullSize -= size;
            InMemoryReader reader = new InMemoryReader(this, mo.getMapId(), data, 0, (int)size);
            inMemorySegments.add(new Merger.Segment(reader, true, mo.isPrimaryMapOutput() ? this.mergedMapOutputsCounter : null));
        }
        return totalSize;
    }

    private RawKeyValueIterator finalMerge(JobConf job, FileSystem fs, List<MapOutput<K, V>> inMemoryMapOutputs, List<Path> onDiskMapOutputs) throws IOException {
        Path[] onDisk;
        LOG.info((Object)("finalMerge called with " + inMemoryMapOutputs.size() + " in-memory map-outputs and " + onDiskMapOutputs.size() + " on-disk map-outputs"));
        float maxRedPer = job.getFloat("mapreduce.reduce.input.buffer.percent", 0.0f);
        if ((double)maxRedPer > 1.0 || (double)maxRedPer < 0.0) {
            throw new IOException("mapreduce.reduce.input.buffer.percent" + maxRedPer);
        }
        int maxInMemReduce = (int)Math.min((float)Runtime.getRuntime().maxMemory() * maxRedPer, 2.1474836E9f);
        Class<?> keyClass = job.getMapOutputKeyClass();
        Class<?> valueClass = job.getMapOutputValueClass();
        boolean keepInputs = job.getKeepFailedTaskFiles();
        Path tmpDir = new Path(this.reduceId.toString());
        RawComparator comparator = job.getOutputKeyComparator();
        ArrayList<Merger.Segment<K, V>> memDiskSegments = new ArrayList<Merger.Segment<K, V>>();
        long inMemToDiskBytes = 0L;
        boolean mergePhaseFinished = false;
        if (inMemoryMapOutputs.size() > 0) {
            TaskID mapId = inMemoryMapOutputs.get(0).getMapId().getTaskID();
            inMemToDiskBytes = this.createInMemorySegments(inMemoryMapOutputs, memDiskSegments, maxInMemReduce);
            int numMemDiskSegments = memDiskSegments.size();
            if (numMemDiskSegments > 0 && this.ioSortFactor > onDiskMapOutputs.size()) {
                mergePhaseFinished = true;
                Path outputPath = this.mapOutputFile.getInputFileForWrite(mapId, inMemToDiskBytes).suffix(Task.MERGED_OUTPUT_PREFIX);
                RawKeyValueIterator rIter = Merger.merge(job, fs, keyClass, valueClass, memDiskSegments, numMemDiskSegments, tmpDir, comparator, this.reporter, this.spilledRecordsCounter, null, this.mergePhase);
                IFile.Writer writer = new IFile.Writer(job, fs, outputPath, keyClass, valueClass, this.codec, null);
                try {
                    Merger.writeFile(rIter, writer, this.reporter, job);
                    onDiskMapOutputs.add(outputPath);
                }
                catch (IOException e) {
                    if (null != outputPath) {
                        try {
                            fs.delete(outputPath, true);
                        }
                        catch (IOException ie) {
                            // empty catch block
                        }
                    }
                    throw e;
                }
                finally {
                    if (null != writer) {
                        writer.close();
                    }
                }
                LOG.info((Object)("Merged " + numMemDiskSegments + " segments, " + inMemToDiskBytes + " bytes to disk to satisfy " + "reduce memory limit"));
                inMemToDiskBytes = 0L;
                memDiskSegments.clear();
            } else if (inMemToDiskBytes != 0L) {
                LOG.info((Object)("Keeping " + numMemDiskSegments + " segments, " + inMemToDiskBytes + " bytes in memory for " + "intermediate, on-disk merge"));
            }
        }
        ArrayList diskSegments = new ArrayList();
        long onDiskBytes = inMemToDiskBytes;
        for (Path file : onDisk = onDiskMapOutputs.toArray(new Path[onDiskMapOutputs.size()])) {
            onDiskBytes += fs.getFileStatus(file).getLen();
            LOG.debug((Object)("Disk file: " + file + " Length is " + fs.getFileStatus(file).getLen()));
            diskSegments.add(new Merger.Segment(job, fs, file, this.codec, keepInputs, file.toString().endsWith(Task.MERGED_OUTPUT_PREFIX) ? null : this.mergedMapOutputsCounter));
        }
        LOG.info((Object)("Merging " + onDisk.length + " files, " + onDiskBytes + " bytes from disk"));
        Collections.sort(diskSegments, new Comparator<Merger.Segment<K, V>>(){

            @Override
            public int compare(Merger.Segment<K, V> o1, Merger.Segment<K, V> o2) {
                if (o1.getLength() == o2.getLength()) {
                    return 0;
                }
                return o1.getLength() < o2.getLength() ? -1 : 1;
            }
        });
        ArrayList<Merger.Segment<K, V>> finalSegments = new ArrayList<Merger.Segment<K, V>>();
        long inMemBytes = this.createInMemorySegments(inMemoryMapOutputs, finalSegments, 0L);
        LOG.info((Object)("Merging " + finalSegments.size() + " segments, " + inMemBytes + " bytes from memory into reduce"));
        if (0L != onDiskBytes) {
            int numInMemSegments = memDiskSegments.size();
            diskSegments.addAll(0, memDiskSegments);
            memDiskSegments.clear();
            Progress thisPhase = mergePhaseFinished ? null : this.mergePhase;
            RawKeyValueIterator diskMerge = Merger.merge((Configuration)job, fs, keyClass, valueClass, diskSegments, this.ioSortFactor, numInMemSegments, tmpDir, comparator, (Progressable)this.reporter, false, this.spilledRecordsCounter, null, thisPhase);
            diskSegments.clear();
            if (0 == finalSegments.size()) {
                return diskMerge;
            }
            finalSegments.add(new Merger.Segment(new RawKVIteratorReader(diskMerge, onDiskBytes), true));
        }
        return Merger.merge(job, fs, keyClass, valueClass, finalSegments, finalSegments.size(), tmpDir, comparator, this.reporter, this.spilledRecordsCounter, null, null);
    }

    class RawKVIteratorReader
    extends IFile.Reader<K, V> {
        private final RawKeyValueIterator kvIter;

        public RawKVIteratorReader(RawKeyValueIterator kvIter, long size) throws IOException {
            super(null, null, size, null, MergeManager.this.spilledRecordsCounter);
            this.kvIter = kvIter;
        }

        @Override
        public boolean nextRawKey(DataInputBuffer key) throws IOException {
            if (this.kvIter.next()) {
                DataInputBuffer kb = this.kvIter.getKey();
                int kp = kb.getPosition();
                int klen = kb.getLength() - kp;
                key.reset(kb.getData(), kp, klen);
                this.bytesRead += (long)klen;
                return true;
            }
            return false;
        }

        @Override
        public void nextRawValue(DataInputBuffer value) throws IOException {
            DataInputBuffer vb = this.kvIter.getValue();
            int vp = vb.getPosition();
            int vlen = vb.getLength() - vp;
            value.reset(vb.getData(), vp, vlen);
            this.bytesRead += (long)vlen;
        }

        @Override
        public long getPosition() throws IOException {
            return this.bytesRead;
        }

        @Override
        public void close() throws IOException {
            this.kvIter.close();
        }
    }

    private class OnDiskMerger
    extends MergeThread<Path, K, V> {
        public OnDiskMerger(MergeManager<K, V> manager) {
            super(manager, Integer.MAX_VALUE, MergeManager.this.exceptionReporter);
            this.setName("OnDiskMerger - Thread to merge on-disk map-outputs");
            this.setDaemon(true);
        }

        @Override
        public void merge(List<Path> inputs) throws IOException {
            if (inputs == null || inputs.isEmpty()) {
                LOG.info((Object)"No ondisk files to merge...");
                return;
            }
            long approxOutputSize = 0L;
            int bytesPerSum = MergeManager.this.jobConf.getInt("io.bytes.per.checksum", 512);
            LOG.info((Object)("OnDiskMerger: We have  " + inputs.size() + " map outputs on disk. Triggering merge..."));
            for (Path file : inputs) {
                approxOutputSize += MergeManager.this.localFS.getFileStatus(file).getLen();
            }
            approxOutputSize += ChecksumFileSystem.getChecksumLength((long)approxOutputSize, (int)bytesPerSum);
            Path outputPath = MergeManager.this.localDirAllocator.getLocalPathForWrite(inputs.get(0).toString(), approxOutputSize, (Configuration)MergeManager.this.jobConf).suffix(Task.MERGED_OUTPUT_PREFIX);
            IFile.Writer writer = new IFile.Writer(MergeManager.this.jobConf, MergeManager.this.rfs, outputPath, MergeManager.this.jobConf.getMapOutputKeyClass(), MergeManager.this.jobConf.getMapOutputValueClass(), MergeManager.this.codec, null);
            RawKeyValueIterator iter = null;
            Path tmpDir = new Path(MergeManager.this.reduceId.toString());
            try {
                iter = Merger.merge((Configuration)MergeManager.this.jobConf, MergeManager.this.rfs, MergeManager.this.jobConf.getMapOutputKeyClass(), MergeManager.this.jobConf.getMapOutputValueClass(), MergeManager.this.codec, inputs.toArray(new Path[inputs.size()]), true, MergeManager.this.ioSortFactor, tmpDir, MergeManager.this.jobConf.getOutputKeyComparator(), (Progressable)MergeManager.this.reporter, MergeManager.this.spilledRecordsCounter, null, MergeManager.this.mergedMapOutputsCounter, null);
                Merger.writeFile(iter, writer, MergeManager.this.reporter, MergeManager.this.jobConf);
                writer.close();
            }
            catch (IOException e) {
                MergeManager.this.localFS.delete(outputPath, true);
                throw e;
            }
            MergeManager.this.closeOnDiskFile(outputPath);
            LOG.info((Object)(MergeManager.this.reduceId + " Finished merging " + inputs.size() + " map output files on disk of total-size " + approxOutputSize + "." + " Local output file is " + outputPath + " of size " + MergeManager.this.localFS.getFileStatus(outputPath).getLen()));
        }
    }

    private class InMemoryMerger
    extends MergeThread<MapOutput<K, V>, K, V> {
        public InMemoryMerger(MergeManager<K, V> manager) {
            super(manager, Integer.MAX_VALUE, MergeManager.this.exceptionReporter);
            this.setName("InMemoryMerger - Thread to merge in-memory shuffled map-outputs");
            this.setDaemon(true);
        }

        @Override
        public void merge(List<MapOutput<K, V>> inputs) throws IOException {
            if (inputs == null || inputs.size() == 0) {
                return;
            }
            TaskAttemptID mapId = inputs.get(0).getMapId();
            TaskID mapTaskId = mapId.getTaskID();
            ArrayList inMemorySegments = new ArrayList();
            long mergeOutputSize = MergeManager.this.createInMemorySegments(inputs, inMemorySegments, 0L);
            int noInMemorySegments = inMemorySegments.size();
            Path outputPath = MergeManager.this.mapOutputFile.getInputFileForWrite(mapTaskId, mergeOutputSize).suffix(Task.MERGED_OUTPUT_PREFIX);
            IFile.Writer writer = new IFile.Writer(MergeManager.this.jobConf, MergeManager.this.rfs, outputPath, MergeManager.this.jobConf.getMapOutputKeyClass(), MergeManager.this.jobConf.getMapOutputValueClass(), MergeManager.this.codec, null);
            RawKeyValueIterator rIter = null;
            try {
                LOG.info((Object)("Initiating in-memory merge with " + noInMemorySegments + " segments..."));
                rIter = Merger.merge(MergeManager.this.jobConf, MergeManager.this.rfs, MergeManager.this.jobConf.getMapOutputKeyClass(), MergeManager.this.jobConf.getMapOutputValueClass(), inMemorySegments, inMemorySegments.size(), new Path(MergeManager.this.reduceId.toString()), MergeManager.this.jobConf.getOutputKeyComparator(), MergeManager.this.reporter, MergeManager.this.spilledRecordsCounter, null, null);
                if (null == MergeManager.this.combinerClass) {
                    Merger.writeFile(rIter, writer, MergeManager.this.reporter, MergeManager.this.jobConf);
                } else {
                    MergeManager.this.combineCollector.setWriter(writer);
                    MergeManager.this.combineAndSpill(rIter, MergeManager.this.reduceCombineInputCounter);
                }
                writer.close();
                LOG.info((Object)(MergeManager.this.reduceId + " Merge of the " + noInMemorySegments + " files in-memory complete." + " Local file is " + outputPath + " of size " + MergeManager.this.localFS.getFileStatus(outputPath).getLen()));
            }
            catch (IOException e) {
                MergeManager.this.localFS.delete(outputPath, true);
                throw e;
            }
            MergeManager.this.closeOnDiskFile(outputPath);
        }
    }

    private class IntermediateMemoryToMemoryMerger
    extends MergeThread<MapOutput<K, V>, K, V> {
        public IntermediateMemoryToMemoryMerger(MergeManager<K, V> manager, int mergeFactor) {
            super(manager, mergeFactor, MergeManager.this.exceptionReporter);
            this.setName("InMemoryMerger - Thread to do in-memory merge of in-memory shuffled map-outputs");
            this.setDaemon(true);
        }

        @Override
        public void merge(List<MapOutput<K, V>> inputs) throws IOException {
            if (inputs == null || inputs.size() == 0) {
                return;
            }
            TaskAttemptID dummyMapId = inputs.get(0).getMapId();
            ArrayList inMemorySegments = new ArrayList();
            long mergeOutputSize = MergeManager.this.createInMemorySegments(inputs, inMemorySegments, 0L);
            int noInMemorySegments = inMemorySegments.size();
            MapOutput mergedMapOutputs = MergeManager.this.unconditionalReserve(dummyMapId, mergeOutputSize, false);
            InMemoryWriter writer = new InMemoryWriter(mergedMapOutputs.getArrayStream());
            LOG.info((Object)("Initiating Memory-to-Memory merge with " + noInMemorySegments + " segments of total-size: " + mergeOutputSize));
            RawKeyValueIterator rIter = Merger.merge(MergeManager.this.jobConf, MergeManager.this.rfs, MergeManager.this.jobConf.getMapOutputKeyClass(), MergeManager.this.jobConf.getMapOutputValueClass(), inMemorySegments, inMemorySegments.size(), new Path(MergeManager.this.reduceId.toString()), MergeManager.this.jobConf.getOutputKeyComparator(), MergeManager.this.reporter, null, null, null);
            Merger.writeFile(rIter, writer, MergeManager.this.reporter, MergeManager.this.jobConf);
            ((IFile.Writer)writer).close();
            LOG.info((Object)(MergeManager.this.reduceId + " Memory-to-Memory merge of the " + noInMemorySegments + " files in-memory complete."));
            MergeManager.this.closeInMemoryMergedFile(mergedMapOutputs);
        }
    }
}

