/*
 * Decompiled with CFR 0.152.
 */
package org.apache.spark.network.shuffle;

import java.io.DataOutput;
import java.io.DataOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.file.Files;
import java.nio.file.LinkOption;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import org.apache.spark.network.buffer.FileSegmentManagedBuffer;
import org.apache.spark.network.buffer.ManagedBuffer;
import org.apache.spark.network.client.StreamCallbackWithID;
import org.apache.spark.network.shuffle.ErrorHandler;
import org.apache.spark.network.shuffle.ExecutorDiskUtils;
import org.apache.spark.network.shuffle.MergedBlockMeta;
import org.apache.spark.network.shuffle.MergedShuffleFileManager;
import org.apache.spark.network.shuffle.ShuffleIndexInformation;
import org.apache.spark.network.shuffle.ShuffleIndexRecord;
import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo;
import org.apache.spark.network.shuffle.protocol.FinalizeShuffleMerge;
import org.apache.spark.network.shuffle.protocol.MergeStatuses;
import org.apache.spark.network.shuffle.protocol.PushBlockStream;
import org.apache.spark.network.util.JavaUtils;
import org.apache.spark.network.util.NettyUtils;
import org.apache.spark.network.util.TransportConf;
import org.roaringbitmap.RoaringBitmap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.sparkproject.guava.annotations.VisibleForTesting;
import org.sparkproject.guava.base.Objects;
import org.sparkproject.guava.base.Preconditions;
import org.sparkproject.guava.cache.CacheBuilder;
import org.sparkproject.guava.cache.CacheLoader;
import org.sparkproject.guava.cache.LoadingCache;
import org.sparkproject.guava.collect.Maps;
import org.sparkproject.guava.primitives.Ints;
import org.sparkproject.guava.primitives.Longs;

public class RemoteBlockPushResolver
implements MergedShuffleFileManager {
    private static final Logger logger = LoggerFactory.getLogger(RemoteBlockPushResolver.class);
    @VisibleForTesting
    static final String MERGE_MANAGER_DIR = "merge_manager";
    private final ConcurrentMap<String, AppPathsInfo> appsPathInfo;
    private final ConcurrentMap<AppShuffleId, Map<Integer, AppShufflePartitionInfo>> partitions;
    private final Executor directoryCleaner;
    private final TransportConf conf;
    private final int minChunkSize;
    private final int ioExceptionsThresholdDuringMerge;
    private final ErrorHandler.BlockPushErrorHandler errorHandler;
    private final LoadingCache<File, ShuffleIndexInformation> indexCache;

    public RemoteBlockPushResolver(TransportConf conf) {
        this.conf = conf;
        this.partitions = Maps.newConcurrentMap();
        this.appsPathInfo = Maps.newConcurrentMap();
        this.directoryCleaner = Executors.newSingleThreadExecutor(NettyUtils.createThreadFactory((String)"spark-shuffle-merged-shuffle-directory-cleaner"));
        this.minChunkSize = conf.minChunkSizeInMergedShuffleFile();
        this.ioExceptionsThresholdDuringMerge = conf.ioExceptionsThresholdDuringMerge();
        CacheLoader<File, ShuffleIndexInformation> indexCacheLoader = new CacheLoader<File, ShuffleIndexInformation>(){

            public ShuffleIndexInformation load(File file) throws IOException {
                return new ShuffleIndexInformation(file);
            }
        };
        this.indexCache = CacheBuilder.newBuilder().maximumWeight(conf.mergedIndexCacheSize()).weigher((file, indexInfo) -> indexInfo.getSize()).build((CacheLoader)indexCacheLoader);
        this.errorHandler = new ErrorHandler.BlockPushErrorHandler();
    }

    private AppShufflePartitionInfo getOrCreateAppShufflePartitionInfo(AppShuffleId appShuffleId, int reduceId) {
        File dataFile = this.getMergedShuffleDataFile(appShuffleId, reduceId);
        if (!this.partitions.containsKey(appShuffleId) && dataFile.exists()) {
            return null;
        }
        Map shufflePartitions = this.partitions.computeIfAbsent(appShuffleId, id -> Maps.newConcurrentMap());
        return shufflePartitions.computeIfAbsent(reduceId, key -> {
            File indexFile = this.getMergedShuffleIndexFile(appShuffleId, reduceId);
            File metaFile = this.getMergedShuffleMetaFile(appShuffleId, reduceId);
            try {
                if (dataFile.exists()) {
                    return null;
                }
                return this.newAppShufflePartitionInfo(appShuffleId, reduceId, dataFile, indexFile, metaFile);
            }
            catch (IOException e) {
                logger.error("Cannot create merged shuffle partition with data file {}, index file {}, and meta file {}", new Object[]{dataFile.getAbsolutePath(), indexFile.getAbsolutePath(), metaFile.getAbsolutePath()});
                throw new RuntimeException(String.format("Cannot initialize merged shuffle partition for appId %s shuffleId %s reduceId %s", appShuffleId.appId, appShuffleId.shuffleId, reduceId), e);
            }
        });
    }

    @VisibleForTesting
    AppShufflePartitionInfo newAppShufflePartitionInfo(AppShuffleId appShuffleId, int reduceId, File dataFile, File indexFile, File metaFile) throws IOException {
        return new AppShufflePartitionInfo(appShuffleId, reduceId, dataFile, new MergeShuffleFile(indexFile), new MergeShuffleFile(metaFile));
    }

    @Override
    public MergedBlockMeta getMergedBlockMeta(String appId, int shuffleId, int reduceId) {
        AppShuffleId appShuffleId = new AppShuffleId(appId, shuffleId);
        File indexFile = this.getMergedShuffleIndexFile(appShuffleId, reduceId);
        if (!indexFile.exists()) {
            throw new RuntimeException(String.format("Merged shuffle index file %s not found", indexFile.getPath()));
        }
        int size = (int)indexFile.length();
        int numChunks = size / 8 - 1;
        File metaFile = this.getMergedShuffleMetaFile(appShuffleId, reduceId);
        if (!metaFile.exists()) {
            throw new RuntimeException(String.format("Merged shuffle meta file %s not found", metaFile.getPath()));
        }
        FileSegmentManagedBuffer chunkBitMaps = new FileSegmentManagedBuffer(this.conf, metaFile, 0L, metaFile.length());
        logger.trace("{} shuffleId {} reduceId {} num chunks {}", new Object[]{appId, shuffleId, reduceId, numChunks});
        return new MergedBlockMeta(numChunks, (ManagedBuffer)chunkBitMaps);
    }

    @Override
    public ManagedBuffer getMergedBlockData(String appId, int shuffleId, int reduceId, int chunkId) {
        AppShuffleId appShuffleId = new AppShuffleId(appId, shuffleId);
        File dataFile = this.getMergedShuffleDataFile(appShuffleId, reduceId);
        if (!dataFile.exists()) {
            throw new RuntimeException(String.format("Merged shuffle data file %s not found", dataFile.getPath()));
        }
        File indexFile = this.getMergedShuffleIndexFile(appShuffleId, reduceId);
        try {
            ShuffleIndexInformation shuffleIndexInformation = (ShuffleIndexInformation)this.indexCache.get((Object)indexFile);
            ShuffleIndexRecord shuffleIndexRecord = shuffleIndexInformation.getIndex(chunkId);
            return new FileSegmentManagedBuffer(this.conf, dataFile, shuffleIndexRecord.getOffset(), shuffleIndexRecord.getLength());
        }
        catch (ExecutionException e) {
            throw new RuntimeException(String.format("Failed to open merged shuffle index file %s", indexFile.getPath()), e);
        }
    }

    private File getFile(String appId, String filename) {
        AppPathsInfo appPathsInfo = (AppPathsInfo)Preconditions.checkNotNull(this.appsPathInfo.get(appId), (Object)("application " + appId + " is not registered or NM was restarted."));
        File targetFile = ExecutorDiskUtils.getFile(appPathsInfo.activeLocalDirs, appPathsInfo.subDirsPerLocalDir, filename);
        logger.debug("Get merged file {}", (Object)targetFile.getAbsolutePath());
        return targetFile;
    }

    private File getMergedShuffleDataFile(AppShuffleId appShuffleId, int reduceId) {
        String fileName = String.format("%s.data", RemoteBlockPushResolver.generateFileName(appShuffleId, reduceId));
        return this.getFile(appShuffleId.appId, fileName);
    }

    private File getMergedShuffleIndexFile(AppShuffleId appShuffleId, int reduceId) {
        String indexName = String.format("%s.index", RemoteBlockPushResolver.generateFileName(appShuffleId, reduceId));
        return this.getFile(appShuffleId.appId, indexName);
    }

    private File getMergedShuffleMetaFile(AppShuffleId appShuffleId, int reduceId) {
        String metaName = String.format("%s.meta", RemoteBlockPushResolver.generateFileName(appShuffleId, reduceId));
        return this.getFile(appShuffleId.appId, metaName);
    }

    @Override
    public String[] getMergedBlockDirs(String appId) {
        AppPathsInfo appPathsInfo = (AppPathsInfo)Preconditions.checkNotNull(this.appsPathInfo.get(appId), (Object)("application " + appId + " is not registered or NM was restarted."));
        String[] activeLocalDirs = (String[])Preconditions.checkNotNull((Object)appPathsInfo.activeLocalDirs, (Object)("application " + appId + " active local dirs list has not been updated by any executor registration"));
        return activeLocalDirs;
    }

    @Override
    public void applicationRemoved(String appId, boolean cleanupLocalDirs) {
        logger.info("Application {} removed, cleanupLocalDirs = {}", (Object)appId, (Object)cleanupLocalDirs);
        AppPathsInfo appPathsInfo = (AppPathsInfo)Preconditions.checkNotNull(this.appsPathInfo.remove(appId), (Object)("application " + appId + " is not registered or NM was restarted."));
        Iterator iterator = this.partitions.entrySet().iterator();
        while (iterator.hasNext()) {
            Map.Entry entry = iterator.next();
            AppShuffleId appShuffleId = (AppShuffleId)entry.getKey();
            if (!appId.equals(appShuffleId.appId)) continue;
            iterator.remove();
            for (AppShufflePartitionInfo partitionInfo : ((Map)entry.getValue()).values()) {
                partitionInfo.closeAllFiles();
            }
        }
        if (cleanupLocalDirs) {
            Path[] dirs = (Path[])Arrays.stream(appPathsInfo.activeLocalDirs).map(dir -> Paths.get(dir, new String[0])).toArray(Path[]::new);
            this.directoryCleaner.execute(() -> this.deleteExecutorDirs(dirs));
        }
    }

    @VisibleForTesting
    void deleteExecutorDirs(Path[] dirs) {
        for (Path localDir : dirs) {
            try {
                if (!Files.exists(localDir, new LinkOption[0])) continue;
                JavaUtils.deleteRecursively((File)localDir.toFile());
                logger.debug("Successfully cleaned up directory: {}", (Object)localDir);
            }
            catch (Exception e) {
                logger.error("Failed to delete directory: {}", (Object)localDir, (Object)e);
            }
        }
    }

    @Override
    public StreamCallbackWithID receiveBlockDataAsStream(PushBlockStream msg) {
        AppShuffleId appShuffleId = new AppShuffleId(msg.appId, msg.shuffleId);
        AppShufflePartitionInfo partitionInfoBeforeCheck = this.getOrCreateAppShufflePartitionInfo(appShuffleId, msg.reduceId);
        final boolean isTooLate = partitionInfoBeforeCheck == null;
        AppShufflePartitionInfo partitionInfo = partitionInfoBeforeCheck != null && partitionInfoBeforeCheck.mapTracker.contains(msg.mapIndex) ? null : partitionInfoBeforeCheck;
        final String streamId = String.format("%s_%d_%d_%d", "shufflePush", appShuffleId.shuffleId, msg.mapIndex, msg.reduceId);
        if (partitionInfo != null) {
            return new PushBlockStreamCallback(this, streamId, partitionInfo, msg.mapIndex);
        }
        return new StreamCallbackWithID(){

            public String getID() {
                return streamId;
            }

            public void onData(String streamId2, ByteBuffer buf) {
            }

            public void onComplete(String streamId2) {
                if (isTooLate) {
                    throw new RuntimeException(String.format("Block %s %s", streamId2, "received after merged shuffle is finalized"));
                }
            }

            public void onFailure(String streamId2, Throwable cause) {
            }
        };
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public MergeStatuses finalizeShuffleMerge(FinalizeShuffleMerge msg) throws IOException {
        MergeStatuses mergeStatuses;
        logger.info("Finalizing shuffle {} from Application {}.", (Object)msg.shuffleId, (Object)msg.appId);
        AppShuffleId appShuffleId = new AppShuffleId(msg.appId, msg.shuffleId);
        Map shufflePartitions = (Map)this.partitions.get(appShuffleId);
        if (shufflePartitions == null || shufflePartitions.isEmpty()) {
            mergeStatuses = new MergeStatuses(msg.shuffleId, new RoaringBitmap[0], new int[0], new long[0]);
        } else {
            Collection partitionsToFinalize = shufflePartitions.values();
            ArrayList<RoaringBitmap> bitmaps = new ArrayList<RoaringBitmap>(partitionsToFinalize.size());
            ArrayList<Integer> reduceIds = new ArrayList<Integer>(partitionsToFinalize.size());
            ArrayList<Long> sizes = new ArrayList<Long>(partitionsToFinalize.size());
            Iterator partitionsIter = partitionsToFinalize.iterator();
            while (partitionsIter.hasNext()) {
                AppShufflePartitionInfo partition;
                AppShufflePartitionInfo appShufflePartitionInfo = partition = (AppShufflePartitionInfo)partitionsIter.next();
                synchronized (appShufflePartitionInfo) {
                    try {
                        partition.finalizePartition();
                        bitmaps.add(partition.mapTracker);
                        reduceIds.add(partition.reduceId);
                        sizes.add(partition.getLastChunkOffset());
                    }
                    catch (IOException ioe) {
                        logger.warn("Exception while finalizing shuffle partition {} {} {}", new Object[]{msg.appId, msg.shuffleId, partition.reduceId, ioe});
                    }
                    finally {
                        partition.closeAllFiles();
                        partitionsIter.remove();
                    }
                }
            }
            mergeStatuses = new MergeStatuses(msg.shuffleId, bitmaps.toArray(new RoaringBitmap[bitmaps.size()]), Ints.toArray(reduceIds), Longs.toArray(sizes));
        }
        this.partitions.remove(appShuffleId);
        logger.info("Finalized shuffle {} from Application {}.", (Object)msg.shuffleId, (Object)msg.appId);
        return mergeStatuses;
    }

    @Override
    public void registerExecutor(String appId, ExecutorShuffleInfo executorInfo) {
        if (logger.isDebugEnabled()) {
            logger.debug("register executor with RemoteBlockPushResolver {} local-dirs {} num sub-dirs {}", new Object[]{appId, Arrays.toString(executorInfo.localDirs), executorInfo.subDirsPerLocalDir});
        }
        this.appsPathInfo.computeIfAbsent(appId, id -> new AppPathsInfo(appId, executorInfo.localDirs, executorInfo.subDirsPerLocalDir));
    }

    private static String generateFileName(AppShuffleId appShuffleId, int reduceId) {
        return String.format("mergedShuffle_%s_%d_%d", appShuffleId.appId, appShuffleId.shuffleId, reduceId);
    }

    @VisibleForTesting
    static class MergeShuffleFile {
        private FileChannel channel;
        private DataOutputStream dos;
        private long pos;

        @VisibleForTesting
        MergeShuffleFile(File file) throws IOException {
            FileOutputStream fos = new FileOutputStream(file);
            this.channel = fos.getChannel();
            this.dos = new DataOutputStream(fos);
        }

        @VisibleForTesting
        MergeShuffleFile(FileChannel channel, DataOutputStream dos) {
            this.channel = channel;
            this.dos = dos;
        }

        private void updatePos(long numBytes) {
            this.pos += numBytes;
        }

        void close() throws IOException {
            try {
                this.dos.close();
            }
            finally {
                this.dos = null;
                this.channel = null;
            }
        }

        @VisibleForTesting
        DataOutputStream getDos() {
            return this.dos;
        }

        @VisibleForTesting
        FileChannel getChannel() {
            return this.channel;
        }

        @VisibleForTesting
        long getPos() {
            return this.pos;
        }
    }

    private static class AppPathsInfo {
        private final String[] activeLocalDirs;
        private final int subDirsPerLocalDir;

        private AppPathsInfo(String appId, String[] localDirs, int subDirsPerLocalDir) {
            this.activeLocalDirs = (String[])Arrays.stream(localDirs).map(localDir -> Paths.get(localDir, new String[0]).getParent().resolve(RemoteBlockPushResolver.MERGE_MANAGER_DIR).toFile().getPath()).toArray(String[]::new);
            this.subDirsPerLocalDir = subDirsPerLocalDir;
            if (logger.isInfoEnabled()) {
                logger.info("Updated active local dirs {} and sub dirs {} for application {}", new Object[]{Arrays.toString(this.activeLocalDirs), subDirsPerLocalDir, appId});
            }
        }
    }

    public static class AppShufflePartitionInfo {
        private final AppShuffleId appShuffleId;
        private final int reduceId;
        public FileChannel dataChannel;
        private long dataFilePos;
        private boolean encounteredFailure;
        private int currentMapIndex;
        private RoaringBitmap mapTracker;
        private MergeShuffleFile indexFile;
        private MergeShuffleFile metaFile;
        private long lastChunkOffset;
        private int lastMergedMapIndex = -1;
        private RoaringBitmap chunkTracker;
        private int numIOExceptions = 0;
        private boolean indexMetaUpdateFailed;

        AppShufflePartitionInfo(AppShuffleId appShuffleId, int reduceId, File dataFile, MergeShuffleFile indexFile, MergeShuffleFile metaFile) throws IOException {
            this.appShuffleId = (AppShuffleId)Preconditions.checkNotNull((Object)appShuffleId, (Object)"app shuffle id");
            this.reduceId = reduceId;
            this.dataChannel = new FileOutputStream(dataFile).getChannel();
            this.indexFile = indexFile;
            this.metaFile = metaFile;
            this.currentMapIndex = -1;
            this.updateChunkInfo(0L, -1);
            this.dataFilePos = 0L;
            this.encounteredFailure = false;
            this.mapTracker = new RoaringBitmap();
            this.chunkTracker = new RoaringBitmap();
        }

        public long getDataFilePos() {
            return this.dataFilePos;
        }

        public void setDataFilePos(long dataFilePos) {
            logger.trace("{} shuffleId {} reduceId {} current pos {} update pos {}", new Object[]{this.appShuffleId.appId, this.appShuffleId.shuffleId, this.reduceId, this.dataFilePos, dataFilePos});
            this.dataFilePos = dataFilePos;
        }

        boolean isEncounteredFailure() {
            return this.encounteredFailure;
        }

        void setEncounteredFailure(boolean encounteredFailure) {
            this.encounteredFailure = encounteredFailure;
        }

        int getCurrentMapIndex() {
            return this.currentMapIndex;
        }

        void setCurrentMapIndex(int mapIndex) {
            logger.trace("{} shuffleId {} reduceId {} updated mapIndex {} current mapIndex {}", new Object[]{this.appShuffleId.appId, this.appShuffleId.shuffleId, this.reduceId, this.currentMapIndex, mapIndex});
            this.currentMapIndex = mapIndex;
        }

        long getLastChunkOffset() {
            return this.lastChunkOffset;
        }

        void blockMerged(int mapIndex) {
            logger.debug("{} shuffleId {} reduceId {} updated merging mapIndex {}", new Object[]{this.appShuffleId.appId, this.appShuffleId.shuffleId, this.reduceId, mapIndex});
            this.mapTracker.add(mapIndex);
            this.chunkTracker.add(mapIndex);
            this.lastMergedMapIndex = mapIndex;
        }

        void resetChunkTracker() {
            this.chunkTracker.clear();
        }

        void updateChunkInfo(long chunkOffset, int mapIndex) throws IOException {
            try {
                logger.trace("{} shuffleId {} reduceId {} index current {} updated {}", new Object[]{this.appShuffleId.appId, this.appShuffleId.shuffleId, this.reduceId, this.lastChunkOffset, chunkOffset});
                if (this.indexMetaUpdateFailed) {
                    this.indexFile.getChannel().position(this.indexFile.getPos());
                }
                this.indexFile.getDos().writeLong(chunkOffset);
                this.writeChunkTracker(mapIndex);
                this.indexFile.updatePos(8L);
                this.lastChunkOffset = chunkOffset;
                this.indexMetaUpdateFailed = false;
            }
            catch (IOException ioe) {
                logger.warn("{} shuffleId {} reduceId {} update to index/meta failed", new Object[]{this.appShuffleId.appId, this.appShuffleId.shuffleId, this.reduceId});
                this.indexMetaUpdateFailed = true;
                throw ioe;
            }
        }

        private void writeChunkTracker(int mapIndex) throws IOException {
            if (mapIndex == -1) {
                return;
            }
            this.chunkTracker.add(mapIndex);
            logger.trace("{} shuffleId {} reduceId {} mapIndex {} write chunk to meta file", new Object[]{this.appShuffleId.appId, this.appShuffleId.shuffleId, this.reduceId, mapIndex});
            if (this.indexMetaUpdateFailed) {
                this.metaFile.getChannel().position(this.metaFile.getPos());
            }
            this.chunkTracker.serialize((DataOutput)this.metaFile.getDos());
            this.metaFile.updatePos(this.metaFile.getChannel().position() - this.metaFile.getPos());
        }

        private void incrementIOExceptions() {
            ++this.numIOExceptions;
        }

        private boolean shouldAbort(int ioExceptionsThresholdDuringMerge) {
            return this.numIOExceptions > ioExceptionsThresholdDuringMerge;
        }

        private void finalizePartition() throws IOException {
            if (this.dataFilePos != this.lastChunkOffset) {
                try {
                    this.updateChunkInfo(this.dataFilePos, this.lastMergedMapIndex);
                }
                catch (IOException iOException) {
                    // empty catch block
                }
            }
            this.dataChannel.truncate(this.lastChunkOffset);
            this.indexFile.getChannel().truncate(this.indexFile.getPos());
            this.metaFile.getChannel().truncate(this.metaFile.getPos());
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        void closeAllFiles() {
            if (this.dataChannel != null) {
                try {
                    this.dataChannel.close();
                }
                catch (IOException ioe) {
                    logger.warn("Error closing data channel for {} shuffleId {} reduceId {}", new Object[]{this.appShuffleId.appId, this.appShuffleId.shuffleId, this.reduceId});
                }
                finally {
                    this.dataChannel = null;
                }
            }
            if (this.metaFile != null) {
                try {
                    this.metaFile.close();
                }
                catch (IOException ioe) {
                    logger.warn("Error closing meta file for {} shuffleId {} reduceId {}", new Object[]{this.appShuffleId.appId, this.appShuffleId.shuffleId, this.reduceId});
                }
                finally {
                    this.metaFile = null;
                }
            }
            if (this.indexFile != null) {
                try {
                    this.indexFile.close();
                }
                catch (IOException ioe) {
                    logger.warn("Error closing index file for {} shuffleId {} reduceId {}", new Object[]{this.appShuffleId.appId, this.appShuffleId.shuffleId, this.reduceId});
                }
                finally {
                    this.indexFile = null;
                }
            }
        }

        protected void finalize() throws Throwable {
            this.closeAllFiles();
        }

        @VisibleForTesting
        MergeShuffleFile getIndexFile() {
            return this.indexFile;
        }

        @VisibleForTesting
        MergeShuffleFile getMetaFile() {
            return this.metaFile;
        }

        @VisibleForTesting
        FileChannel getDataChannel() {
            return this.dataChannel;
        }

        @VisibleForTesting
        int getNumIOExceptions() {
            return this.numIOExceptions;
        }
    }

    public static class AppShuffleId {
        public final String appId;
        public final int shuffleId;

        AppShuffleId(String appId, int shuffleId) {
            this.appId = appId;
            this.shuffleId = shuffleId;
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || this.getClass() != o.getClass()) {
                return false;
            }
            AppShuffleId that = (AppShuffleId)o;
            return this.shuffleId == that.shuffleId && Objects.equal((Object)this.appId, (Object)that.appId);
        }

        public int hashCode() {
            return Objects.hashCode((Object[])new Object[]{this.appId, this.shuffleId});
        }

        public String toString() {
            return Objects.toStringHelper((Object)this).add("appId", (Object)this.appId).add("shuffleId", this.shuffleId).toString();
        }
    }

    static class PushBlockStreamCallback
    implements StreamCallbackWithID {
        private final RemoteBlockPushResolver mergeManager;
        private final String streamId;
        private final int mapIndex;
        private final AppShufflePartitionInfo partitionInfo;
        private int length = 0;
        private boolean isWriting = false;
        private List<ByteBuffer> deferredBufs;

        private PushBlockStreamCallback(RemoteBlockPushResolver mergeManager, String streamId, AppShufflePartitionInfo partitionInfo, int mapIndex) {
            this.mergeManager = (RemoteBlockPushResolver)Preconditions.checkNotNull((Object)mergeManager);
            this.streamId = streamId;
            this.partitionInfo = (AppShufflePartitionInfo)Preconditions.checkNotNull((Object)partitionInfo);
            this.mapIndex = mapIndex;
            this.abortIfNecessary();
        }

        public String getID() {
            return this.streamId;
        }

        private void writeBuf(ByteBuffer buf) throws IOException {
            while (buf.hasRemaining()) {
                if (this.partitionInfo.isEncounteredFailure()) {
                    long updatedPos = this.partitionInfo.getDataFilePos() + (long)this.length;
                    logger.debug("{} shuffleId {} reduceId {} encountered failure current pos {} updated pos {}", new Object[]{((AppShufflePartitionInfo)this.partitionInfo).appShuffleId.appId, ((AppShufflePartitionInfo)this.partitionInfo).appShuffleId.shuffleId, this.partitionInfo.reduceId, this.partitionInfo.getDataFilePos(), updatedPos});
                    this.length += this.partitionInfo.dataChannel.write(buf, updatedPos);
                    continue;
                }
                this.length += this.partitionInfo.dataChannel.write(buf);
            }
        }

        private boolean allowedToWrite() {
            return this.partitionInfo.getCurrentMapIndex() < 0 || this.partitionInfo.getCurrentMapIndex() == this.mapIndex;
        }

        private boolean isDuplicateBlock() {
            return this.partitionInfo.getCurrentMapIndex() == this.mapIndex && this.length == 0 || this.partitionInfo.mapTracker.contains(this.mapIndex);
        }

        private void writeDeferredBufs() throws IOException {
            for (ByteBuffer deferredBuf : this.deferredBufs) {
                this.writeBuf(deferredBuf);
            }
            this.deferredBufs = null;
        }

        private void abortIfNecessary() {
            if (this.partitionInfo.shouldAbort(this.mergeManager.ioExceptionsThresholdDuringMerge)) {
                this.deferredBufs = null;
                throw new RuntimeException(String.format("%s when merging %s", "IOExceptions exceeded the threshold", this.streamId));
            }
        }

        private void incrementIOExceptionsAndAbortIfNecessary() {
            this.partitionInfo.incrementIOExceptions();
            this.abortIfNecessary();
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void onData(String streamId, ByteBuffer buf) throws IOException {
            AppShufflePartitionInfo appShufflePartitionInfo = this.partitionInfo;
            synchronized (appShufflePartitionInfo) {
                Map shufflePartitions = (Map)this.mergeManager.partitions.get(this.partitionInfo.appShuffleId);
                if (shufflePartitions == null || !shufflePartitions.containsKey(this.partitionInfo.reduceId)) {
                    this.deferredBufs = null;
                    return;
                }
                if (this.allowedToWrite()) {
                    this.isWriting = true;
                    if (this.isDuplicateBlock()) {
                        this.deferredBufs = null;
                        return;
                    }
                    this.abortIfNecessary();
                    logger.trace("{} shuffleId {} reduceId {} onData writable", new Object[]{((AppShufflePartitionInfo)this.partitionInfo).appShuffleId.appId, ((AppShufflePartitionInfo)this.partitionInfo).appShuffleId.shuffleId, this.partitionInfo.reduceId});
                    if (this.partitionInfo.getCurrentMapIndex() < 0) {
                        this.partitionInfo.setCurrentMapIndex(this.mapIndex);
                    }
                    try {
                        if (this.deferredBufs != null && !this.deferredBufs.isEmpty()) {
                            this.writeDeferredBufs();
                        }
                        this.writeBuf(buf);
                    }
                    catch (IOException ioe) {
                        this.incrementIOExceptionsAndAbortIfNecessary();
                        throw ioe;
                    }
                    if (this.partitionInfo.isEncounteredFailure()) {
                        this.partitionInfo.dataChannel.position(this.partitionInfo.getDataFilePos() + (long)this.length);
                        this.partitionInfo.setEncounteredFailure(false);
                    }
                } else {
                    logger.trace("{} shuffleId {} reduceId {} onData deferred", new Object[]{((AppShufflePartitionInfo)this.partitionInfo).appShuffleId.appId, ((AppShufflePartitionInfo)this.partitionInfo).appShuffleId.shuffleId, this.partitionInfo.reduceId});
                    if (this.deferredBufs == null) {
                        this.deferredBufs = new LinkedList<ByteBuffer>();
                    }
                    ByteBuffer deferredBuf = ByteBuffer.allocate(buf.remaining());
                    deferredBuf.put(buf);
                    deferredBuf.flip();
                    this.deferredBufs.add(deferredBuf);
                }
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void onComplete(String streamId) throws IOException {
            AppShufflePartitionInfo appShufflePartitionInfo = this.partitionInfo;
            synchronized (appShufflePartitionInfo) {
                logger.trace("{} shuffleId {} reduceId {} onComplete invoked", new Object[]{((AppShufflePartitionInfo)this.partitionInfo).appShuffleId.appId, ((AppShufflePartitionInfo)this.partitionInfo).appShuffleId.shuffleId, this.partitionInfo.reduceId});
                Map shufflePartitions = (Map)this.mergeManager.partitions.get(this.partitionInfo.appShuffleId);
                if (shufflePartitions == null || !shufflePartitions.containsKey(this.partitionInfo.reduceId)) {
                    this.deferredBufs = null;
                    throw new RuntimeException(String.format("Block %s %s", streamId, "received after merged shuffle is finalized"));
                }
                if (this.allowedToWrite()) {
                    this.isWriting = true;
                    if (this.isDuplicateBlock()) {
                        this.deferredBufs = null;
                        return;
                    }
                    if (this.partitionInfo.getCurrentMapIndex() < 0) {
                        try {
                            if (this.deferredBufs != null && !this.deferredBufs.isEmpty()) {
                                this.abortIfNecessary();
                                this.writeDeferredBufs();
                            }
                        }
                        catch (IOException ioe) {
                            this.incrementIOExceptionsAndAbortIfNecessary();
                            throw ioe;
                        }
                    }
                    long updatedPos = this.partitionInfo.getDataFilePos() + (long)this.length;
                    boolean indexUpdated = false;
                    if (updatedPos - this.partitionInfo.getLastChunkOffset() >= (long)this.mergeManager.minChunkSize) {
                        try {
                            this.partitionInfo.updateChunkInfo(updatedPos, this.mapIndex);
                            indexUpdated = true;
                        }
                        catch (IOException ioe) {
                            this.incrementIOExceptionsAndAbortIfNecessary();
                        }
                    }
                    this.partitionInfo.setDataFilePos(updatedPos);
                    this.partitionInfo.setCurrentMapIndex(-1);
                    this.partitionInfo.blockMerged(this.mapIndex);
                    if (indexUpdated) {
                        this.partitionInfo.resetChunkTracker();
                    }
                } else {
                    this.deferredBufs = null;
                    throw new RuntimeException(String.format("%s %s to merged shuffle", "Couldn't find an opportunity to write block", streamId));
                }
            }
            this.isWriting = false;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void onFailure(String streamId, Throwable throwable) throws IOException {
            if (this.mergeManager.errorHandler.shouldLogError(throwable)) {
                logger.error("Encountered issue when merging {}", (Object)streamId, (Object)throwable);
            } else {
                logger.debug("Encountered issue when merging {}", (Object)streamId, (Object)throwable);
            }
            if (this.isWriting) {
                AppShufflePartitionInfo appShufflePartitionInfo = this.partitionInfo;
                synchronized (appShufflePartitionInfo) {
                    Map shufflePartitions = (Map)this.mergeManager.partitions.get(this.partitionInfo.appShuffleId);
                    if (shufflePartitions != null && shufflePartitions.containsKey(this.partitionInfo.reduceId)) {
                        logger.debug("{} shuffleId {} reduceId {} set encountered failure", new Object[]{((AppShufflePartitionInfo)this.partitionInfo).appShuffleId.appId, ((AppShufflePartitionInfo)this.partitionInfo).appShuffleId.shuffleId, this.partitionInfo.reduceId});
                        this.partitionInfo.setCurrentMapIndex(-1);
                        this.partitionInfo.setEncounteredFailure(true);
                    }
                }
            }
        }

        @VisibleForTesting
        AppShufflePartitionInfo getPartitionInfo() {
            return this.partitionInfo;
        }
    }
}

