/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hdfs.server.datanode;

import java.io.BufferedOutputStream;
import java.io.Closeable;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.EOFException;
import java.io.FileDescriptor;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.zip.Checksum;
import org.apache.commons.logging.Log;
import org.apache.hadoop.fs.FSOutputSummer;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage;
import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
import org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos;
import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.FSDataset;
import org.apache.hadoop.hdfs.server.datanode.FSDatasetInterface;
import org.apache.hadoop.hdfs.server.datanode.ReplicaAlreadyExistsException;
import org.apache.hadoop.hdfs.server.datanode.ReplicaInPipelineInterface;
import org.apache.hadoop.hdfs.server.datanode.ReplicaNotFoundException;
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.util.DataTransferThrottler;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.nativeio.NativeIO;
import org.apache.hadoop.util.Daemon;
import org.apache.hadoop.util.DataChecksum;

class BlockReceiver
implements Closeable {
    public static final Log LOG = DataNode.LOG;
    static final Log ClientTraceLog = DataNode.ClientTraceLog;
    private static final long CACHE_DROP_LAG_BYTES = 0x800000L;
    private DataInputStream in = null;
    private DataChecksum clientChecksum;
    private DataChecksum diskChecksum;
    private boolean needsChecksumTranslation;
    private OutputStream out = null;
    private FileDescriptor outFd;
    private OutputStream cout = null;
    private DataOutputStream checksumOut = null;
    private int bytesPerChecksum;
    private int checksumSize;
    private ByteBuffer buf;
    private int bufRead;
    private int maxPacketReadLen;
    protected final String inAddr;
    protected final String myAddr;
    private String mirrorAddr;
    private DataOutputStream mirrorOut;
    private Daemon responder = null;
    private DataTransferThrottler throttler;
    private FSDatasetInterface.BlockWriteStreams streams;
    private DatanodeInfo srcDataNode = null;
    private Checksum partialCrc = null;
    private final DataNode datanode;
    private volatile boolean mirrorError;
    private boolean dropCacheBehindWrites;
    private boolean syncBehindWrites;
    private long lastCacheDropOffset = 0L;
    private final String clientname;
    private final boolean isClient;
    private final boolean isDatanode;
    private final ExtendedBlock block;
    private final ReplicaInPipelineInterface replicaInfo;
    private final BlockConstructionStage stage;
    private final boolean isTransfer;

    BlockReceiver(ExtendedBlock block, DataInputStream in, String inAddr, String myAddr, BlockConstructionStage stage, long newGs, long minBytesRcvd, long maxBytesRcvd, String clientname, DatanodeInfo srcDataNode, DataNode datanode, DataChecksum requestedChecksum) throws IOException {
        try {
            this.block = block;
            this.in = in;
            this.inAddr = inAddr;
            this.myAddr = myAddr;
            this.srcDataNode = srcDataNode;
            this.datanode = datanode;
            this.clientname = clientname;
            this.isDatanode = clientname.length() == 0;
            this.isClient = !this.isDatanode;
            this.stage = stage;
            boolean bl = this.isTransfer = stage == BlockConstructionStage.TRANSFER_RBW || stage == BlockConstructionStage.TRANSFER_FINALIZED;
            if (LOG.isDebugEnabled()) {
                LOG.debug((Object)(this.getClass().getSimpleName() + ": " + block + "\n  isClient  =" + this.isClient + ", clientname=" + clientname + "\n  isDatanode=" + this.isDatanode + ", srcDataNode=" + srcDataNode + "\n  inAddr=" + inAddr + ", myAddr=" + myAddr));
            }
            if (this.isDatanode) {
                this.replicaInfo = datanode.data.createTemporary(block);
            } else {
                switch (stage) {
                    case PIPELINE_SETUP_CREATE: {
                        this.replicaInfo = datanode.data.createRbw(block);
                        break;
                    }
                    case PIPELINE_SETUP_STREAMING_RECOVERY: {
                        this.replicaInfo = datanode.data.recoverRbw(block, newGs, minBytesRcvd, maxBytesRcvd);
                        block.setGenerationStamp(newGs);
                        break;
                    }
                    case PIPELINE_SETUP_APPEND: {
                        this.replicaInfo = datanode.data.append(block, newGs, minBytesRcvd);
                        if (datanode.blockScanner != null) {
                            datanode.blockScanner.deleteBlock(block.getBlockPoolId(), block.getLocalBlock());
                        }
                        block.setGenerationStamp(newGs);
                        break;
                    }
                    case PIPELINE_SETUP_APPEND_RECOVERY: {
                        this.replicaInfo = datanode.data.recoverAppend(block, newGs, minBytesRcvd);
                        if (datanode.blockScanner != null) {
                            datanode.blockScanner.deleteBlock(block.getBlockPoolId(), block.getLocalBlock());
                        }
                        block.setGenerationStamp(newGs);
                        break;
                    }
                    case TRANSFER_RBW: 
                    case TRANSFER_FINALIZED: {
                        this.replicaInfo = datanode.data.createTemporary(block);
                        break;
                    }
                    default: {
                        throw new IOException("Unsupported stage " + (Object)((Object)stage) + " while receiving block " + block + " from " + inAddr);
                    }
                }
            }
            this.dropCacheBehindWrites = datanode.getDnConf().dropCacheBehindWrites;
            this.syncBehindWrites = datanode.getDnConf().syncBehindWrites;
            boolean isCreate = this.isDatanode || this.isTransfer || stage == BlockConstructionStage.PIPELINE_SETUP_CREATE;
            this.streams = this.replicaInfo.createStreams(isCreate, requestedChecksum);
            assert (this.streams != null) : "null streams!";
            this.clientChecksum = requestedChecksum;
            this.diskChecksum = this.streams.getChecksum();
            this.needsChecksumTranslation = !this.clientChecksum.equals((Object)this.diskChecksum);
            this.bytesPerChecksum = this.diskChecksum.getBytesPerChecksum();
            this.checksumSize = this.diskChecksum.getChecksumSize();
            this.out = this.streams.dataOut;
            if (this.out instanceof FileOutputStream) {
                this.outFd = ((FileOutputStream)this.out).getFD();
            } else {
                LOG.warn((Object)("Could not get file descriptor for outputstream of class " + this.out.getClass()));
            }
            this.cout = this.streams.checksumOut;
            this.checksumOut = new DataOutputStream(new BufferedOutputStream(this.streams.checksumOut, HdfsConstants.SMALL_BUFFER_SIZE));
            if (isCreate) {
                BlockMetadataHeader.writeHeader(this.checksumOut, this.diskChecksum);
            }
        }
        catch (ReplicaAlreadyExistsException bae) {
            throw bae;
        }
        catch (ReplicaNotFoundException bne) {
            throw bne;
        }
        catch (IOException ioe) {
            IOUtils.closeStream((Closeable)this);
            this.cleanupBlock();
            IOException cause = FSDataset.getCauseIfDiskError(ioe);
            DataNode.LOG.warn((Object)"IOException in BlockReceiver constructor. Cause is ", (Throwable)cause);
            if (cause != null) {
                ioe = cause;
                datanode.checkDiskError(ioe);
            }
            throw ioe;
        }
    }

    DataNode getDataNode() {
        return this.datanode;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void close() throws IOException {
        IOException ioe = null;
        try {
            if (this.checksumOut != null) {
                this.checksumOut.flush();
                if (this.datanode.getDnConf().syncOnClose && this.cout instanceof FileOutputStream) {
                    ((FileOutputStream)this.cout).getChannel().force(true);
                }
                this.checksumOut.close();
                this.checksumOut = null;
            }
        }
        catch (IOException e) {
            ioe = e;
        }
        finally {
            IOUtils.closeStream((Closeable)this.checksumOut);
        }
        try {
            if (this.out != null) {
                this.out.flush();
                if (this.datanode.getDnConf().syncOnClose && this.out instanceof FileOutputStream) {
                    ((FileOutputStream)this.out).getChannel().force(true);
                }
                this.out.close();
                this.out = null;
            }
        }
        catch (IOException e) {
            ioe = e;
        }
        finally {
            IOUtils.closeStream((Closeable)this.out);
        }
        if (ioe != null) {
            this.datanode.checkDiskError(ioe);
            throw ioe;
        }
    }

    void flush() throws IOException {
        if (this.checksumOut != null) {
            this.checksumOut.flush();
        }
        if (this.out != null) {
            this.out.flush();
        }
    }

    private void handleMirrorOutError(IOException ioe) throws IOException {
        String bpid = this.block.getBlockPoolId();
        LOG.info((Object)(this.datanode.getDNRegistrationForBP(bpid) + ":Exception writing block " + this.block + " to mirror " + this.mirrorAddr), (Throwable)ioe);
        if (Thread.interrupted()) {
            throw ioe;
        }
        this.mirrorError = true;
    }

    private void verifyChunks(byte[] dataBuf, int dataOff, int len, byte[] checksumBuf, int checksumOff) throws IOException {
        DatanodeProtocol nn = this.datanode.getBPNamenode(this.block.getBlockPoolId());
        while (len > 0) {
            int chunkLen = Math.min(len, this.bytesPerChecksum);
            this.clientChecksum.update(dataBuf, dataOff, chunkLen);
            if (!this.clientChecksum.compare(checksumBuf, checksumOff)) {
                if (this.srcDataNode != null) {
                    try {
                        LOG.info((Object)("report corrupt block " + this.block + " from datanode " + this.srcDataNode + " to namenode"));
                        LocatedBlock lb = new LocatedBlock(this.block, new DatanodeInfo[]{this.srcDataNode});
                        nn.reportBadBlocks(new LocatedBlock[]{lb});
                    }
                    catch (IOException e) {
                        LOG.warn((Object)("Failed to report bad block " + this.block + " from datanode " + this.srcDataNode + " to namenode"));
                    }
                }
                throw new IOException("Unexpected checksum mismatch while writing " + this.block + " from " + this.inAddr);
            }
            this.clientChecksum.reset();
            dataOff += chunkLen;
            checksumOff += this.checksumSize;
            len -= chunkLen;
        }
    }

    private void translateChunks(byte[] dataBuf, int dataOff, int len, byte[] checksumBuf, int checksumOff) throws IOException {
        if (len == 0) {
            return;
        }
        int numChunks = (len - 1) / this.bytesPerChecksum + 1;
        this.diskChecksum.calculateChunkedSums(ByteBuffer.wrap(dataBuf, dataOff, len), ByteBuffer.wrap(checksumBuf, checksumOff, numChunks * this.checksumSize));
    }

    private void shiftBufData() {
        if (this.bufRead != this.buf.limit()) {
            throw new IllegalStateException("bufRead should be same as buf.limit()");
        }
        if (this.buf.position() > 0) {
            int dataLeft = this.buf.remaining();
            if (dataLeft > 0) {
                byte[] b = this.buf.array();
                System.arraycopy(b, this.buf.position(), b, 0, dataLeft);
            }
            this.buf.position(0);
            this.bufRead = dataLeft;
            this.buf.limit(this.bufRead);
        }
    }

    private int readToBuf(int toRead) throws IOException {
        int nRead;
        if (toRead < 0) {
            toRead = (this.maxPacketReadLen > 0 ? this.maxPacketReadLen : this.buf.capacity()) - this.buf.limit();
        }
        if ((nRead = this.in.read(this.buf.array(), this.buf.limit(), toRead)) < 0) {
            throw new EOFException("while trying to read " + toRead + " bytes");
        }
        this.bufRead = this.buf.limit() + nRead;
        this.buf.limit(this.bufRead);
        return nRead;
    }

    private void readNextPacket() throws IOException {
        if (this.buf == null) {
            int chunkSize = this.bytesPerChecksum + this.checksumSize;
            int chunksPerPacket = (this.datanode.getDnConf().writePacketSize - PacketHeader.PKT_HEADER_LEN + chunkSize - 1) / chunkSize;
            this.buf = ByteBuffer.allocate(PacketHeader.PKT_HEADER_LEN + Math.max(chunksPerPacket, 1) * chunkSize);
            this.buf.limit(0);
        }
        if (this.bufRead > this.buf.limit()) {
            this.buf.limit(this.bufRead);
        }
        while (this.buf.remaining() < 4) {
            if (this.buf.position() > 0) {
                this.shiftBufData();
            }
            this.readToBuf(-1);
        }
        this.buf.mark();
        int payloadLen = this.buf.getInt();
        this.buf.reset();
        if (payloadLen < 0 || payloadLen > 0x6400000) {
            throw new IOException("Incorrect value for packet payload : " + payloadLen);
        }
        int pktSize = payloadLen + PacketHeader.PKT_HEADER_LEN - 4;
        if (this.buf.remaining() < pktSize) {
            int spaceLeft;
            int toRead = pktSize - this.buf.remaining();
            if (toRead > (spaceLeft = this.buf.capacity() - this.buf.limit()) && this.buf.position() > 0) {
                this.shiftBufData();
                spaceLeft = this.buf.capacity() - this.buf.limit();
            }
            if (toRead > spaceLeft) {
                byte[] oldBuf = this.buf.array();
                int toCopy = this.buf.limit();
                this.buf = ByteBuffer.allocate(toCopy + toRead);
                System.arraycopy(oldBuf, 0, this.buf.array(), 0, toCopy);
                this.buf.limit(toCopy);
            }
            while (toRead > 0) {
                toRead -= this.readToBuf(toRead);
            }
        }
        if (this.buf.remaining() > pktSize) {
            this.buf.limit(this.buf.position() + pktSize);
        }
        if (pktSize > this.maxPacketReadLen) {
            this.maxPacketReadLen = pktSize;
        }
    }

    private int receivePacket() throws IOException {
        this.readNextPacket();
        this.buf.mark();
        PacketHeader header = new PacketHeader();
        header.readFields(this.buf);
        int endOfHeader = this.buf.position();
        this.buf.reset();
        if (header.getOffsetInBlock() > this.replicaInfo.getNumBytes()) {
            throw new IOException("Received an out-of-sequence packet for " + this.block + "from " + this.inAddr + " at offset " + header.getOffsetInBlock() + ". Expecting packet starting at " + this.replicaInfo.getNumBytes());
        }
        if (header.getDataLen() < 0) {
            throw new IOException("Got wrong length during writeBlock(" + this.block + ") from " + this.inAddr + " at offset " + header.getOffsetInBlock() + ": " + header.getDataLen());
        }
        return this.receivePacket(header.getOffsetInBlock(), header.getSeqno(), header.isLastPacketInBlock(), header.getDataLen(), endOfHeader);
    }

    private void writePacketToDisk(byte[] pktBuf, int startByteToDisk, int numBytesToDisk) throws IOException {
        this.out.write(pktBuf, startByteToDisk, numBytesToDisk);
    }

    private int receivePacket(long offsetInBlock, long seqno, boolean lastPacketInBlock, int len, int endOfHeader) throws IOException {
        if (LOG.isDebugEnabled()) {
            LOG.debug((Object)("Receiving one packet for block " + this.block + " of length " + len + " seqno " + seqno + " offsetInBlock " + offsetInBlock + " lastPacketInBlock " + lastPacketInBlock));
        }
        long firstByteInBlock = offsetInBlock;
        if (this.replicaInfo.getNumBytes() < (offsetInBlock += (long)len)) {
            this.replicaInfo.setNumBytes(offsetInBlock);
        }
        if (this.responder != null) {
            ((PacketResponder)this.responder.getRunnable()).enqueue(seqno, lastPacketInBlock, offsetInBlock);
        }
        if (this.mirrorOut != null && !this.mirrorError) {
            try {
                this.mirrorOut.write(this.buf.array(), this.buf.position(), this.buf.remaining());
                this.mirrorOut.flush();
            }
            catch (IOException e) {
                this.handleMirrorOutError(e);
            }
        }
        this.buf.position(endOfHeader);
        if (lastPacketInBlock || len == 0) {
            if (LOG.isDebugEnabled()) {
                LOG.debug((Object)("Receiving an empty packet or the end of the block " + this.block));
            }
        } else {
            int checksumLen = (len + this.bytesPerChecksum - 1) / this.bytesPerChecksum * this.checksumSize;
            if (this.buf.remaining() != checksumLen + len) {
                throw new IOException("Data remaining in packet does not matchsum of checksumLen and dataLen  size remaining: " + this.buf.remaining() + " data len: " + len + " checksum Len: " + checksumLen);
            }
            int checksumOff = this.buf.position();
            int dataOff = checksumOff + checksumLen;
            byte[] pktBuf = this.buf.array();
            this.buf.position(this.buf.limit());
            if (this.mirrorOut == null || this.isDatanode || this.needsChecksumTranslation) {
                this.verifyChunks(pktBuf, dataOff, len, pktBuf, checksumOff);
                if (this.needsChecksumTranslation) {
                    this.translateChunks(pktBuf, dataOff, len, pktBuf, checksumOff);
                }
            }
            try {
                long onDiskLen = this.replicaInfo.getBytesOnDisk();
                if (onDiskLen < offsetInBlock) {
                    byte[] lastChunkChecksum;
                    if (onDiskLen % (long)this.bytesPerChecksum != 0L) {
                        this.adjustCrcFilePosition();
                    }
                    if (firstByteInBlock % (long)this.bytesPerChecksum != 0L) {
                        LOG.info((Object)("Packet starts at " + firstByteInBlock + " for block " + this.block + " which is not a multiple of bytesPerChecksum " + this.bytesPerChecksum));
                        long offsetInChecksum = (long)BlockMetadataHeader.getHeaderSize() + onDiskLen / (long)this.bytesPerChecksum * (long)this.checksumSize;
                        this.computePartialChunkCrc(onDiskLen, offsetInChecksum, this.bytesPerChecksum);
                    }
                    int startByteToDisk = dataOff + (int)(onDiskLen - firstByteInBlock);
                    int numBytesToDisk = (int)(offsetInBlock - onDiskLen);
                    this.writePacketToDisk(pktBuf, startByteToDisk, numBytesToDisk);
                    if (this.partialCrc != null) {
                        if (len > this.bytesPerChecksum) {
                            throw new IOException("Got wrong length during writeBlock(" + this.block + ") from " + this.inAddr + " " + "A packet can have only one partial chunk." + " len = " + len + " bytesPerChecksum " + this.bytesPerChecksum);
                        }
                        this.partialCrc.update(pktBuf, startByteToDisk, numBytesToDisk);
                        byte[] buf = FSOutputSummer.convertToByteStream((Checksum)this.partialCrc, (int)this.checksumSize);
                        lastChunkChecksum = Arrays.copyOfRange(buf, buf.length - this.checksumSize, buf.length);
                        this.checksumOut.write(buf);
                        if (LOG.isDebugEnabled()) {
                            LOG.debug((Object)("Writing out partial crc for data len " + len));
                        }
                        this.partialCrc = null;
                    } else {
                        lastChunkChecksum = Arrays.copyOfRange(pktBuf, checksumOff + checksumLen - this.checksumSize, checksumOff + checksumLen);
                        this.checksumOut.write(pktBuf, checksumOff, checksumLen);
                    }
                    this.flush();
                    this.replicaInfo.setLastChecksumAndDataLen(offsetInBlock, lastChunkChecksum);
                    this.datanode.metrics.incrBytesWritten(len);
                    this.dropOsCacheBehindWriter(offsetInBlock);
                }
            }
            catch (IOException iex) {
                this.datanode.checkDiskError(iex);
                throw iex;
            }
        }
        if (this.throttler != null) {
            this.throttler.throttle(len);
        }
        return lastPacketInBlock ? -1 : len;
    }

    private void dropOsCacheBehindWriter(long offsetInBlock) throws IOException {
        try {
            if (this.outFd != null && offsetInBlock > this.lastCacheDropOffset + 0x800000L) {
                long twoWindowsAgo = this.lastCacheDropOffset - 0x800000L;
                if (twoWindowsAgo > 0L && this.dropCacheBehindWrites) {
                    NativeIO.posixFadviseIfPossible((FileDescriptor)this.outFd, (long)0L, (long)this.lastCacheDropOffset, (int)4);
                }
                if (this.syncBehindWrites) {
                    NativeIO.syncFileRangeIfPossible((FileDescriptor)this.outFd, (long)this.lastCacheDropOffset, (long)0x800000L, (int)2);
                }
                this.lastCacheDropOffset += 0x800000L;
            }
        }
        catch (Throwable t) {
            LOG.warn((Object)("Couldn't drop os cache behind writer for " + this.block), t);
        }
    }

    void receiveBlock(DataOutputStream mirrOut, DataInputStream mirrIn, DataOutputStream replyOut, String mirrAddr, DataTransferThrottler throttlerArg, DatanodeInfo[] downstreams) throws IOException {
        boolean responderClosed = false;
        this.mirrorOut = mirrOut;
        this.mirrorAddr = mirrAddr;
        this.throttler = throttlerArg;
        try {
            if (this.isClient && !this.isTransfer) {
                this.responder = new Daemon(this.datanode.threadGroup, (Runnable)new PacketResponder(replyOut, mirrIn, downstreams));
                this.responder.start();
            }
            while (this.receivePacket() >= 0) {
            }
            if (this.responder != null) {
                ((PacketResponder)this.responder.getRunnable()).close();
                responderClosed = true;
            }
            if (this.isDatanode || this.isTransfer) {
                this.close();
                this.block.setNumBytes(this.replicaInfo.getNumBytes());
                if (this.stage == BlockConstructionStage.TRANSFER_RBW) {
                    this.datanode.data.convertTemporaryToRbw(this.block);
                } else {
                    this.datanode.data.finalizeBlock(this.block);
                }
                this.datanode.metrics.incrBlocksWritten();
            }
        }
        catch (IOException ioe) {
            LOG.info((Object)("Exception in receiveBlock for " + this.block), (Throwable)ioe);
            throw ioe;
        }
        finally {
            if (!responderClosed) {
                IOUtils.closeStream((Closeable)this);
                if (this.responder != null) {
                    this.responder.interrupt();
                }
                this.cleanupBlock();
            }
            if (this.responder != null) {
                try {
                    this.responder.join();
                }
                catch (InterruptedException e) {
                    throw new IOException("Interrupted receiveBlock");
                }
                this.responder = null;
            }
        }
    }

    private void cleanupBlock() throws IOException {
        if (this.isDatanode) {
            this.datanode.data.unfinalizeBlock(this.block);
        }
    }

    private void adjustCrcFilePosition() throws IOException {
        if (this.out != null) {
            this.out.flush();
        }
        if (this.checksumOut != null) {
            this.checksumOut.flush();
        }
        this.datanode.data.adjustCrcChannelPosition(this.block, this.streams, this.checksumSize);
    }

    private static long checksum2long(byte[] checksum) {
        long crc = 0L;
        for (int i = 0; i < checksum.length; ++i) {
            crc |= (0xFFL & (long)checksum[i]) << (checksum.length - i - 1) * 8;
        }
        return crc;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void computePartialChunkCrc(long blkoff, long ckoff, int bytesPerChecksum) throws IOException {
        int sizePartialChunk = (int)(blkoff % (long)bytesPerChecksum);
        int checksumSize = this.diskChecksum.getChecksumSize();
        LOG.info((Object)("computePartialChunkCrc sizePartialChunk " + sizePartialChunk + " block " + this.block + " offset in block " + (blkoff -= (long)sizePartialChunk) + " offset in metafile " + ckoff));
        byte[] buf = new byte[sizePartialChunk];
        byte[] crcbuf = new byte[checksumSize];
        FSDatasetInterface.BlockInputStreams instr = null;
        try {
            instr = this.datanode.data.getTmpInputStreams(this.block, blkoff, ckoff);
            IOUtils.readFully((InputStream)instr.dataIn, (byte[])buf, (int)0, (int)sizePartialChunk);
            IOUtils.readFully((InputStream)instr.checksumIn, (byte[])crcbuf, (int)0, (int)crcbuf.length);
        }
        catch (Throwable throwable) {
            IOUtils.closeStream(instr);
            throw throwable;
        }
        IOUtils.closeStream((Closeable)instr);
        this.partialCrc = DataChecksum.newDataChecksum((int)this.diskChecksum.getChecksumType(), (int)this.diskChecksum.getBytesPerChecksum());
        this.partialCrc.update(buf, 0, sizePartialChunk);
        LOG.info((Object)("Read in partial CRC chunk from disk for block " + this.block));
        if (this.partialCrc.getValue() != BlockReceiver.checksum2long(crcbuf)) {
            String msg = "Partial CRC " + this.partialCrc.getValue() + " does not match value computed the " + " last time file was closed " + BlockReceiver.checksum2long(crcbuf);
            throw new IOException(msg);
        }
    }

    private static class Packet {
        final long seqno;
        final boolean lastPacketInBlock;
        final long offsetInBlock;

        Packet(long seqno, boolean lastPacketInBlock, long offsetInBlock) {
            this.seqno = seqno;
            this.lastPacketInBlock = lastPacketInBlock;
            this.offsetInBlock = offsetInBlock;
        }

        public String toString() {
            return this.getClass().getSimpleName() + "(seqno=" + this.seqno + ", lastPacketInBlock=" + this.lastPacketInBlock + ", offsetInBlock=" + this.offsetInBlock + ")";
        }
    }

    class PacketResponder
    implements Runnable,
    Closeable {
        private final LinkedList<Packet> ackQueue = new LinkedList();
        private final Thread receiverThread = Thread.currentThread();
        private volatile boolean running = true;
        private final DataInputStream downstreamIn;
        private final DataOutputStream upstreamOut;
        private final PacketResponderType type;
        private final String myString;

        public String toString() {
            return this.myString;
        }

        PacketResponder(DataOutputStream upstreamOut, DataInputStream downstreamIn, DatanodeInfo[] downstreams) {
            this.downstreamIn = downstreamIn;
            this.upstreamOut = upstreamOut;
            this.type = downstreams == null ? PacketResponderType.NON_PIPELINE : (downstreams.length == 0 ? PacketResponderType.LAST_IN_PIPELINE : PacketResponderType.HAS_DOWNSTREAM_IN_PIPELINE);
            StringBuilder b = new StringBuilder(this.getClass().getSimpleName()).append(": ").append(BlockReceiver.this.block).append(", type=").append((Object)this.type);
            if (this.type != PacketResponderType.HAS_DOWNSTREAM_IN_PIPELINE) {
                b.append(", downstreams=").append(downstreams.length).append(":").append(Arrays.asList(downstreams));
            }
            this.myString = b.toString();
        }

        synchronized void enqueue(long seqno, boolean lastPacketInBlock, long offsetInBlock) {
            if (this.running) {
                Packet p = new Packet(seqno, lastPacketInBlock, offsetInBlock);
                if (LOG.isDebugEnabled()) {
                    LOG.debug((Object)(this.myString + ": enqueue " + p));
                }
                this.ackQueue.addLast(p);
                this.notifyAll();
            }
        }

        @Override
        public synchronized void close() {
            while (this.running && this.ackQueue.size() != 0 && ((BlockReceiver)BlockReceiver.this).datanode.shouldRun) {
                try {
                    this.wait();
                }
                catch (InterruptedException e) {
                    this.running = false;
                }
            }
            if (LOG.isDebugEnabled()) {
                LOG.debug((Object)(this.myString + ": closing"));
            }
            this.running = false;
            this.notifyAll();
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            long startTime;
            boolean lastPacketInBlock = false;
            long l = startTime = ClientTraceLog.isInfoEnabled() ? System.nanoTime() : 0L;
            while (this.running && ((BlockReceiver)BlockReceiver.this).datanode.shouldRun && !lastPacketInBlock) {
                boolean isInterrupted = false;
                try {
                    PipelineAck ack;
                    long expected;
                    Packet pkt;
                    block28: {
                        pkt = null;
                        expected = -2L;
                        ack = new PipelineAck();
                        long seqno = -2L;
                        try {
                            if (this.type != PacketResponderType.LAST_IN_PIPELINE && !BlockReceiver.this.mirrorError) {
                                ack.readFields(this.downstreamIn);
                                if (LOG.isDebugEnabled()) {
                                    LOG.debug((Object)(this.myString + " got " + ack));
                                }
                                seqno = ack.getSeqno();
                            }
                            if (seqno == -2L && this.type != PacketResponderType.LAST_IN_PIPELINE) break block28;
                            PacketResponder packetResponder = this;
                            synchronized (packetResponder) {
                                while (this.running && ((BlockReceiver)BlockReceiver.this).datanode.shouldRun && this.ackQueue.size() == 0) {
                                    if (LOG.isDebugEnabled()) {
                                        LOG.debug((Object)(this.myString + ": seqno=" + seqno + " waiting for local datanode to finish write."));
                                    }
                                    this.wait();
                                }
                                if (!this.running || !((BlockReceiver)BlockReceiver.this).datanode.shouldRun) {
                                    break;
                                }
                                pkt = this.ackQueue.getFirst();
                                expected = pkt.seqno;
                                if (this.type == PacketResponderType.HAS_DOWNSTREAM_IN_PIPELINE && seqno != expected) {
                                    throw new IOException(this.myString + "seqno: expected=" + expected + ", received=" + seqno);
                                }
                                lastPacketInBlock = pkt.lastPacketInBlock;
                            }
                        }
                        catch (InterruptedException ine) {
                            isInterrupted = true;
                        }
                        catch (IOException ioe) {
                            if (Thread.interrupted()) {
                                isInterrupted = true;
                            }
                            BlockReceiver.this.mirrorError = true;
                            LOG.info((Object)this.myString, (Throwable)ioe);
                        }
                    }
                    if (Thread.interrupted() || isInterrupted) {
                        LOG.info((Object)(this.myString + ": Thread is interrupted."));
                        this.running = false;
                        continue;
                    }
                    if (lastPacketInBlock) {
                        BlockReceiver.this.close();
                        long endTime = ClientTraceLog.isInfoEnabled() ? System.nanoTime() : 0L;
                        BlockReceiver.this.block.setNumBytes(BlockReceiver.this.replicaInfo.getNumBytes());
                        ((BlockReceiver)BlockReceiver.this).datanode.data.finalizeBlock(BlockReceiver.this.block);
                        BlockReceiver.this.datanode.closeBlock(BlockReceiver.this.block, "");
                        if (ClientTraceLog.isInfoEnabled() && BlockReceiver.this.isClient) {
                            long offset = 0L;
                            DatanodeRegistration dnR = BlockReceiver.this.datanode.getDNRegistrationForBP(BlockReceiver.this.block.getBlockPoolId());
                            ClientTraceLog.info((Object)String.format("src: %s, dest: %s, bytes: %s, op: %s, cliID: %s, offset: %s, srvID: %s, blockid: %s, duration: %s", BlockReceiver.this.inAddr, BlockReceiver.this.myAddr, BlockReceiver.this.block.getNumBytes(), "HDFS_WRITE", BlockReceiver.this.clientname, offset, dnR.getStorageID(), BlockReceiver.this.block, endTime - startTime));
                        } else {
                            LOG.info((Object)("Received block " + BlockReceiver.this.block + " of size " + BlockReceiver.this.block.getNumBytes() + " from " + BlockReceiver.this.inAddr));
                        }
                    }
                    DataTransferProtos.Status[] replies = null;
                    if (BlockReceiver.this.mirrorError) {
                        replies = new DataTransferProtos.Status[]{DataTransferProtos.Status.SUCCESS, DataTransferProtos.Status.ERROR};
                    } else {
                        int ackLen = this.type == PacketResponderType.LAST_IN_PIPELINE ? 0 : ack.getNumOfReplies();
                        replies = new DataTransferProtos.Status[1 + ackLen];
                        replies[0] = DataTransferProtos.Status.SUCCESS;
                        for (int i = 0; i < ackLen; ++i) {
                            replies[i + 1] = ack.getReply(i);
                        }
                    }
                    PipelineAck replyAck = new PipelineAck(expected, replies);
                    if (replyAck.isSuccess() && pkt.offsetInBlock > BlockReceiver.this.replicaInfo.getBytesAcked()) {
                        BlockReceiver.this.replicaInfo.setBytesAcked(pkt.offsetInBlock);
                    }
                    replyAck.write(this.upstreamOut);
                    this.upstreamOut.flush();
                    if (LOG.isDebugEnabled()) {
                        LOG.debug((Object)(this.myString + ", replyAck=" + replyAck));
                    }
                    if (pkt == null) continue;
                    this.removeAckHead();
                }
                catch (IOException e) {
                    LOG.warn((Object)"IOException in BlockReceiver.run(): ", (Throwable)e);
                    if (!this.running) continue;
                    try {
                        BlockReceiver.this.datanode.checkDiskError(e);
                    }
                    catch (IOException ioe) {
                        LOG.warn((Object)"DataNode.checkDiskError failed in run() with: ", (Throwable)ioe);
                    }
                    LOG.info((Object)this.myString, (Throwable)e);
                    this.running = false;
                    if (Thread.interrupted()) continue;
                    this.receiverThread.interrupt();
                }
                catch (Throwable e) {
                    if (!this.running) continue;
                    LOG.info((Object)this.myString, e);
                    this.running = false;
                    this.receiverThread.interrupt();
                }
            }
            LOG.info((Object)(this.myString + " terminating"));
        }

        private synchronized void removeAckHead() {
            this.ackQueue.removeFirst();
            this.notifyAll();
        }
    }

    private static enum PacketResponderType {
        NON_PIPELINE,
        LAST_IN_PIPELINE,
        HAS_DOWNSTREAM_IN_PIPELINE;

    }
}

