/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hdfs.server.datanode;

import java.io.BufferedOutputStream;
import java.io.Closeable;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.zip.Checksum;
import org.apache.commons.logging.Log;
import org.apache.hadoop.fs.FSInputChecker;
import org.apache.hadoop.fs.FSOutputSummer;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.DataTransferProtocol;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.FSConstants;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
import org.apache.hadoop.hdfs.server.datanode.BlockTransferThrottler;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.FSDataset;
import org.apache.hadoop.hdfs.server.datanode.FSDatasetInterface;
import org.apache.hadoop.hdfs.server.datanode.ReplicaAlreadyExistsException;
import org.apache.hadoop.hdfs.server.datanode.ReplicaInPipelineInterface;
import org.apache.hadoop.hdfs.server.datanode.ReplicaNotFoundException;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.Daemon;
import org.apache.hadoop.util.DataChecksum;
import org.apache.hadoop.util.PureJavaCrc32;
import org.apache.hadoop.util.StringUtils;

class BlockReceiver
implements Closeable,
FSConstants {
    public static final Log LOG = DataNode.LOG;
    static final Log ClientTraceLog = DataNode.ClientTraceLog;
    private Block block;
    private DataInputStream in = null;
    private DataChecksum checksum;
    private OutputStream out = null;
    private DataOutputStream checksumOut = null;
    private int bytesPerChecksum;
    private int checksumSize;
    private ByteBuffer buf;
    private int bufRead;
    private int maxPacketReadLen;
    protected final String inAddr;
    protected final String myAddr;
    private String mirrorAddr;
    private DataOutputStream mirrorOut;
    private Daemon responder = null;
    private BlockTransferThrottler throttler;
    private FSDatasetInterface.BlockWriteStreams streams;
    private String clientName;
    DatanodeInfo srcDataNode = null;
    private Checksum partialCrc = null;
    private final DataNode datanode;
    private final ReplicaInPipelineInterface replicaInfo;
    private volatile boolean mirrorError;

    BlockReceiver(Block block, DataInputStream in, String inAddr, String myAddr, DataTransferProtocol.BlockConstructionStage stage, long newGs, long minBytesRcvd, long maxBytesRcvd, String clientName, DatanodeInfo srcDataNode, DataNode datanode) throws IOException {
        try {
            this.block = block;
            this.in = in;
            this.inAddr = inAddr;
            this.myAddr = myAddr;
            this.clientName = clientName;
            this.srcDataNode = srcDataNode;
            this.datanode = datanode;
            if (clientName.length() == 0) {
                this.replicaInfo = datanode.data.createTemporary(block);
            } else {
                switch (stage) {
                    case PIPELINE_SETUP_CREATE: {
                        this.replicaInfo = datanode.data.createRbw(block);
                        break;
                    }
                    case PIPELINE_SETUP_STREAMING_RECOVERY: {
                        this.replicaInfo = datanode.data.recoverRbw(block, newGs, minBytesRcvd, maxBytesRcvd);
                        block.setGenerationStamp(newGs);
                        break;
                    }
                    case PIPELINE_SETUP_APPEND: {
                        this.replicaInfo = datanode.data.append(block, newGs, minBytesRcvd);
                        if (datanode.blockScanner != null) {
                            datanode.blockScanner.deleteBlock(block);
                        }
                        block.setGenerationStamp(newGs);
                        break;
                    }
                    case PIPELINE_SETUP_APPEND_RECOVERY: {
                        this.replicaInfo = datanode.data.recoverAppend(block, newGs, minBytesRcvd);
                        if (datanode.blockScanner != null) {
                            datanode.blockScanner.deleteBlock(block);
                        }
                        block.setGenerationStamp(newGs);
                        break;
                    }
                    default: {
                        throw new IOException("Unsupported stage " + (Object)((Object)stage) + " while receiving block " + block + " from " + inAddr);
                    }
                }
            }
            this.checksum = DataChecksum.newDataChecksum((DataInputStream)in);
            this.bytesPerChecksum = this.checksum.getBytesPerChecksum();
            this.checksumSize = this.checksum.getChecksumSize();
            boolean isCreate = stage == DataTransferProtocol.BlockConstructionStage.PIPELINE_SETUP_CREATE || clientName.length() == 0;
            this.streams = this.replicaInfo.createStreams(isCreate, this.bytesPerChecksum, this.checksumSize);
            if (this.streams != null) {
                this.out = this.streams.dataOut;
                this.checksumOut = new DataOutputStream(new BufferedOutputStream(this.streams.checksumOut, SMALL_BUFFER_SIZE));
                if (isCreate) {
                    BlockMetadataHeader.writeHeader(this.checksumOut, this.checksum);
                }
            }
        }
        catch (ReplicaAlreadyExistsException bae) {
            throw bae;
        }
        catch (ReplicaNotFoundException bne) {
            throw bne;
        }
        catch (IOException ioe) {
            IOUtils.closeStream((Closeable)this);
            this.cleanupBlock();
            IOException cause = FSDataset.getCauseIfDiskError(ioe);
            DataNode.LOG.warn((Object)"IOException in BlockReceiver constructor. Cause is ", (Throwable)cause);
            if (cause != null) {
                ioe = cause;
                datanode.checkDiskError(ioe);
            }
            throw ioe;
        }
    }

    DataNode getDataNode() {
        return this.datanode;
    }

    @Override
    public void close() throws IOException {
        IOException ioe = null;
        try {
            if (this.checksumOut != null) {
                this.checksumOut.flush();
                this.checksumOut.close();
                this.checksumOut = null;
            }
        }
        catch (IOException e) {
            ioe = e;
        }
        try {
            if (this.out != null) {
                this.out.flush();
                this.out.close();
                this.out = null;
            }
        }
        catch (IOException e) {
            ioe = e;
        }
        if (ioe != null) {
            this.datanode.checkDiskError(ioe);
            throw ioe;
        }
    }

    void flush() throws IOException {
        if (this.checksumOut != null) {
            this.checksumOut.flush();
        }
        if (this.out != null) {
            this.out.flush();
        }
    }

    private void handleMirrorOutError(IOException ioe) throws IOException {
        LOG.info((Object)(this.datanode.dnRegistration + ":Exception writing block " + this.block + " to mirror " + this.mirrorAddr + "\n" + StringUtils.stringifyException((Throwable)ioe)));
        if (Thread.interrupted()) {
            throw ioe;
        }
        this.mirrorError = true;
    }

    private void verifyChunks(byte[] dataBuf, int dataOff, int len, byte[] checksumBuf, int checksumOff) throws IOException {
        while (len > 0) {
            int chunkLen = Math.min(len, this.bytesPerChecksum);
            this.checksum.update(dataBuf, dataOff, chunkLen);
            if (!this.checksum.compare(checksumBuf, checksumOff)) {
                if (this.srcDataNode != null) {
                    try {
                        LOG.info((Object)("report corrupt block " + this.block + " from datanode " + this.srcDataNode + " to namenode"));
                        LocatedBlock lb = new LocatedBlock(this.block, new DatanodeInfo[]{this.srcDataNode});
                        this.datanode.namenode.reportBadBlocks(new LocatedBlock[]{lb});
                    }
                    catch (IOException e) {
                        LOG.warn((Object)("Failed to report bad block " + this.block + " from datanode " + this.srcDataNode + " to namenode"));
                    }
                }
                throw new IOException("Unexpected checksum mismatch while writing " + this.block + " from " + this.inAddr);
            }
            this.checksum.reset();
            dataOff += chunkLen;
            checksumOff += this.checksumSize;
            len -= chunkLen;
        }
    }

    private void shiftBufData() {
        if (this.bufRead != this.buf.limit()) {
            throw new IllegalStateException("bufRead should be same as buf.limit()");
        }
        if (this.buf.position() > 0) {
            int dataLeft = this.buf.remaining();
            if (dataLeft > 0) {
                byte[] b = this.buf.array();
                System.arraycopy(b, this.buf.position(), b, 0, dataLeft);
            }
            this.buf.position(0);
            this.bufRead = dataLeft;
            this.buf.limit(this.bufRead);
        }
    }

    private int readToBuf(int toRead) throws IOException {
        int nRead;
        if (toRead < 0) {
            toRead = (this.maxPacketReadLen > 0 ? this.maxPacketReadLen : this.buf.capacity()) - this.buf.limit();
        }
        if ((nRead = this.in.read(this.buf.array(), this.buf.limit(), toRead)) < 0) {
            throw new EOFException("while trying to read " + toRead + " bytes");
        }
        this.bufRead = this.buf.limit() + nRead;
        this.buf.limit(this.bufRead);
        return nRead;
    }

    private void readNextPacket() throws IOException {
        if (this.buf == null) {
            int chunkSize = this.bytesPerChecksum + this.checksumSize;
            int chunksPerPacket = (this.datanode.writePacketSize - 21 - 4 + chunkSize - 1) / chunkSize;
            this.buf = ByteBuffer.allocate(25 + Math.max(chunksPerPacket, 1) * chunkSize);
            this.buf.limit(0);
        }
        if (this.bufRead > this.buf.limit()) {
            this.buf.limit(this.bufRead);
        }
        while (this.buf.remaining() < 4) {
            if (this.buf.position() > 0) {
                this.shiftBufData();
            }
            this.readToBuf(-1);
        }
        this.buf.mark();
        int payloadLen = this.buf.getInt();
        this.buf.reset();
        if (payloadLen < 0 || payloadLen > 0x6400000) {
            throw new IOException("Incorrect value for packet payload : " + payloadLen);
        }
        int pktSize = payloadLen + 21;
        if (this.buf.remaining() < pktSize) {
            int spaceLeft;
            int toRead = pktSize - this.buf.remaining();
            if (toRead > (spaceLeft = this.buf.capacity() - this.buf.limit()) && this.buf.position() > 0) {
                this.shiftBufData();
                spaceLeft = this.buf.capacity() - this.buf.limit();
            }
            if (toRead > spaceLeft) {
                byte[] oldBuf = this.buf.array();
                int toCopy = this.buf.limit();
                this.buf = ByteBuffer.allocate(toCopy + toRead);
                System.arraycopy(oldBuf, 0, this.buf.array(), 0, toCopy);
                this.buf.limit(toCopy);
            }
            while (toRead > 0) {
                toRead -= this.readToBuf(toRead);
            }
        }
        if (this.buf.remaining() > pktSize) {
            this.buf.limit(this.buf.position() + pktSize);
        }
        if (pktSize > this.maxPacketReadLen) {
            this.maxPacketReadLen = pktSize;
        }
    }

    private int receivePacket() throws IOException {
        this.readNextPacket();
        this.buf.mark();
        this.buf.getInt();
        long offsetInBlock = this.buf.getLong();
        if (offsetInBlock > this.replicaInfo.getNumBytes()) {
            throw new IOException("Received an out-of-sequence packet for " + this.block + "from " + this.inAddr + " at offset " + offsetInBlock + ". Expecting packet starting at " + this.replicaInfo.getNumBytes());
        }
        long seqno = this.buf.getLong();
        boolean lastPacketInBlock = this.buf.get() != 0;
        int len = this.buf.getInt();
        if (len < 0) {
            throw new IOException("Got wrong length during writeBlock(" + this.block + ") from " + this.inAddr + " at offset " + offsetInBlock + ": " + len);
        }
        int endOfHeader = this.buf.position();
        this.buf.reset();
        return this.receivePacket(offsetInBlock, seqno, lastPacketInBlock, len, endOfHeader);
    }

    private void writePacketToDisk(byte[] pktBuf, int startByteToDisk, int numBytesToDisk) throws IOException {
        this.out.write(pktBuf, startByteToDisk, numBytesToDisk);
    }

    private int receivePacket(long offsetInBlock, long seqno, boolean lastPacketInBlock, int len, int endOfHeader) throws IOException {
        if (LOG.isDebugEnabled()) {
            LOG.debug((Object)("Receiving one packet for block " + this.block + " of length " + len + " seqno " + seqno + " offsetInBlock " + offsetInBlock + " lastPacketInBlock " + lastPacketInBlock));
        }
        long firstByteInBlock = offsetInBlock;
        if (this.replicaInfo.getNumBytes() < (offsetInBlock += (long)len)) {
            this.replicaInfo.setNumBytes(offsetInBlock);
        }
        if (this.responder != null) {
            ((PacketResponder)this.responder.getRunnable()).enqueue(seqno, lastPacketInBlock, offsetInBlock);
        }
        if (this.mirrorOut != null && !this.mirrorError) {
            try {
                this.mirrorOut.write(this.buf.array(), this.buf.position(), this.buf.remaining());
                this.mirrorOut.flush();
            }
            catch (IOException e) {
                this.handleMirrorOutError(e);
            }
        }
        this.buf.position(endOfHeader);
        if (lastPacketInBlock || len == 0) {
            LOG.debug((Object)("Receiving an empty packet or the end of the block " + this.block));
        } else {
            int checksumLen = (len + this.bytesPerChecksum - 1) / this.bytesPerChecksum * this.checksumSize;
            if (this.buf.remaining() != checksumLen + len) {
                throw new IOException("Data remaining in packet does not matchsum of checksumLen and dataLen  size remaining: " + this.buf.remaining() + " data len: " + len + " checksum Len: " + checksumLen);
            }
            int checksumOff = this.buf.position();
            int dataOff = checksumOff + checksumLen;
            byte[] pktBuf = this.buf.array();
            this.buf.position(this.buf.limit());
            if (this.mirrorOut == null || this.clientName.length() == 0) {
                this.verifyChunks(pktBuf, dataOff, len, pktBuf, checksumOff);
            }
            try {
                long onDiskLen = this.replicaInfo.getBytesOnDisk();
                if (onDiskLen < offsetInBlock) {
                    byte[] lastChunkChecksum;
                    if (onDiskLen % (long)this.bytesPerChecksum != 0L) {
                        this.adjustCrcFilePosition();
                    }
                    if (firstByteInBlock % (long)this.bytesPerChecksum != 0L) {
                        LOG.info((Object)("Packet starts at " + firstByteInBlock + " for block " + this.block + " which is not a multiple of bytesPerChecksum " + this.bytesPerChecksum));
                        long offsetInChecksum = (long)BlockMetadataHeader.getHeaderSize() + onDiskLen / (long)this.bytesPerChecksum * (long)this.checksumSize;
                        this.computePartialChunkCrc(onDiskLen, offsetInChecksum, this.bytesPerChecksum);
                    }
                    int startByteToDisk = dataOff + (int)(onDiskLen - firstByteInBlock);
                    int numBytesToDisk = (int)(offsetInBlock - onDiskLen);
                    this.writePacketToDisk(pktBuf, startByteToDisk, numBytesToDisk);
                    if (this.partialCrc != null) {
                        if (len > this.bytesPerChecksum) {
                            throw new IOException("Got wrong length during writeBlock(" + this.block + ") from " + this.inAddr + " " + "A packet can have only one partial chunk." + " len = " + len + " bytesPerChecksum " + this.bytesPerChecksum);
                        }
                        this.partialCrc.update(pktBuf, startByteToDisk, numBytesToDisk);
                        byte[] buf = FSOutputSummer.convertToByteStream((Checksum)this.partialCrc, (int)this.checksumSize);
                        lastChunkChecksum = Arrays.copyOfRange(buf, buf.length - this.checksumSize, buf.length);
                        this.checksumOut.write(buf);
                        LOG.debug((Object)("Writing out partial crc for data len " + len));
                        this.partialCrc = null;
                    } else {
                        lastChunkChecksum = Arrays.copyOfRange(pktBuf, checksumOff + checksumLen - this.checksumSize, checksumOff + checksumLen);
                        this.checksumOut.write(pktBuf, checksumOff, checksumLen);
                    }
                    this.flush();
                    this.replicaInfo.setLastChecksumAndDataLen(offsetInBlock, lastChunkChecksum);
                    this.datanode.myMetrics.bytesWritten.inc((long)len);
                }
            }
            catch (IOException iex) {
                this.datanode.checkDiskError(iex);
                throw iex;
            }
        }
        if (this.throttler != null) {
            this.throttler.throttle(len);
        }
        return lastPacketInBlock ? -1 : len;
    }

    void writeChecksumHeader(DataOutputStream mirrorOut) throws IOException {
        this.checksum.writeHeader(mirrorOut);
    }

    void receiveBlock(DataOutputStream mirrOut, DataInputStream mirrIn, DataOutputStream replyOut, String mirrAddr, BlockTransferThrottler throttlerArg, int numTargets) throws IOException {
        boolean responderClosed = false;
        this.mirrorOut = mirrOut;
        this.mirrorAddr = mirrAddr;
        this.throttler = throttlerArg;
        try {
            if (this.clientName.length() > 0) {
                this.responder = new Daemon(this.datanode.threadGroup, (Runnable)new PacketResponder(this, this.block, mirrIn, replyOut, numTargets, Thread.currentThread()));
                this.responder.start();
            }
            while (this.receivePacket() >= 0) {
            }
            if (this.responder != null) {
                ((PacketResponder)this.responder.getRunnable()).close();
                responderClosed = true;
            }
            if (this.clientName.length() == 0) {
                this.close();
                this.block.setNumBytes(this.replicaInfo.getNumBytes());
                this.datanode.data.finalizeBlock(this.block);
                this.datanode.myMetrics.blocksWritten.inc();
            }
        }
        catch (IOException ioe) {
            LOG.info((Object)("Exception in receiveBlock for block " + this.block + " " + ioe));
            throw ioe;
        }
        finally {
            if (!responderClosed) {
                IOUtils.closeStream((Closeable)this);
                if (this.responder != null) {
                    this.responder.interrupt();
                }
                this.cleanupBlock();
            }
            if (this.responder != null) {
                try {
                    this.responder.join();
                }
                catch (InterruptedException e) {
                    throw new IOException("Interrupted receiveBlock");
                }
                this.responder = null;
            }
        }
    }

    private void cleanupBlock() throws IOException {
        if (this.clientName.length() == 0) {
            this.datanode.data.unfinalizeBlock(this.block);
        }
    }

    private void adjustCrcFilePosition() throws IOException {
        if (this.out != null) {
            this.out.flush();
        }
        if (this.checksumOut != null) {
            this.checksumOut.flush();
        }
        this.datanode.data.adjustCrcChannelPosition(this.block, this.streams, this.checksumSize);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void computePartialChunkCrc(long blkoff, long ckoff, int bytesPerChecksum) throws IOException {
        int sizePartialChunk = (int)(blkoff % (long)bytesPerChecksum);
        int checksumSize = this.checksum.getChecksumSize();
        LOG.info((Object)("computePartialChunkCrc sizePartialChunk " + sizePartialChunk + " block " + this.block + " offset in block " + (blkoff -= (long)sizePartialChunk) + " offset in metafile " + ckoff));
        byte[] buf = new byte[sizePartialChunk];
        byte[] crcbuf = new byte[checksumSize];
        FSDatasetInterface.BlockInputStreams instr = null;
        try {
            instr = this.datanode.data.getTmpInputStreams(this.block, blkoff, ckoff);
            IOUtils.readFully((InputStream)instr.dataIn, (byte[])buf, (int)0, (int)sizePartialChunk);
            IOUtils.readFully((InputStream)instr.checksumIn, (byte[])crcbuf, (int)0, (int)crcbuf.length);
        }
        catch (Throwable throwable) {
            IOUtils.closeStream(instr);
            throw throwable;
        }
        IOUtils.closeStream((Closeable)instr);
        this.partialCrc = new PureJavaCrc32();
        this.partialCrc.update(buf, 0, sizePartialChunk);
        LOG.info((Object)("Read in partial CRC chunk from disk for block " + this.block));
        if (this.partialCrc.getValue() != FSInputChecker.checksum2long((byte[])crcbuf)) {
            String msg = "Partial CRC " + this.partialCrc.getValue() + " does not match value computed the " + " last time file was closed " + FSInputChecker.checksum2long((byte[])crcbuf);
            throw new IOException(msg);
        }
    }

    private static class Packet {
        long seqno;
        boolean lastPacketInBlock;
        long lastByteInBlock;

        Packet(long seqno, boolean lastPacketInBlock, long lastByteInPacket) {
            this.seqno = seqno;
            this.lastPacketInBlock = lastPacketInBlock;
            this.lastByteInBlock = lastByteInPacket;
        }
    }

    class PacketResponder
    implements Runnable,
    FSConstants {
        private LinkedList<Packet> ackQueue = new LinkedList();
        private volatile boolean running = true;
        private Block block;
        DataInputStream mirrorIn;
        DataOutputStream replyOut;
        private int numTargets;
        private BlockReceiver receiver;
        private Thread receiverThread;

        public String toString() {
            return "PacketResponder " + this.numTargets + " for Block " + this.block;
        }

        PacketResponder(BlockReceiver receiver, Block b, DataInputStream in, DataOutputStream out, int numTargets, Thread receiverThread) {
            this.receiverThread = receiverThread;
            this.receiver = receiver;
            this.block = b;
            this.mirrorIn = in;
            this.replyOut = out;
            this.numTargets = numTargets;
        }

        synchronized void enqueue(long seqno, boolean lastPacketInBlock, long lastByteInPacket) {
            if (this.running) {
                LOG.debug((Object)("PacketResponder " + this.numTargets + " adding seqno " + seqno + " to ack queue."));
                this.ackQueue.addLast(new Packet(seqno, lastPacketInBlock, lastByteInPacket));
                this.notifyAll();
            }
        }

        synchronized void close() {
            while (this.running && this.ackQueue.size() != 0 && ((BlockReceiver)BlockReceiver.this).datanode.shouldRun) {
                try {
                    this.wait();
                }
                catch (InterruptedException e) {
                    this.running = false;
                }
            }
            LOG.debug((Object)("PacketResponder " + this.numTargets + " for block " + this.block + " Closing down."));
            this.running = false;
            this.notifyAll();
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            long startTime;
            boolean lastPacketInBlock = false;
            long l = startTime = ClientTraceLog.isInfoEnabled() ? System.nanoTime() : 0L;
            while (this.running && ((BlockReceiver)BlockReceiver.this).datanode.shouldRun && !lastPacketInBlock) {
                boolean isInterrupted = false;
                try {
                    DataTransferProtocol.PipelineAck ack;
                    long expected;
                    Packet pkt;
                    block27: {
                        pkt = null;
                        expected = -2L;
                        ack = new DataTransferProtocol.PipelineAck();
                        long seqno = -2L;
                        try {
                            if (this.numTargets != 0 && !BlockReceiver.this.mirrorError) {
                                ack.readFields(this.mirrorIn);
                                if (LOG.isDebugEnabled()) {
                                    LOG.debug((Object)("PacketResponder " + this.numTargets + " got " + ack));
                                }
                                seqno = ack.getSeqno();
                            }
                            if (seqno == -2L && this.numTargets != 0) break block27;
                            PacketResponder packetResponder = this;
                            synchronized (packetResponder) {
                                while (this.running && ((BlockReceiver)BlockReceiver.this).datanode.shouldRun && this.ackQueue.size() == 0) {
                                    if (LOG.isDebugEnabled()) {
                                        LOG.debug((Object)("PacketResponder " + this.numTargets + " seqno = " + seqno + " for block " + this.block + " waiting for local datanode to finish write."));
                                    }
                                    this.wait();
                                }
                                if (!this.running || !((BlockReceiver)BlockReceiver.this).datanode.shouldRun) {
                                    break;
                                }
                                pkt = this.ackQueue.getFirst();
                                expected = pkt.seqno;
                                if (this.numTargets > 0 && seqno != expected) {
                                    throw new IOException("PacketResponder " + this.numTargets + " for block " + this.block + " expected seqno:" + expected + " received:" + seqno);
                                }
                                lastPacketInBlock = pkt.lastPacketInBlock;
                            }
                        }
                        catch (InterruptedException ine) {
                            isInterrupted = true;
                        }
                        catch (IOException ioe) {
                            if (Thread.interrupted()) {
                                isInterrupted = true;
                            }
                            BlockReceiver.this.mirrorError = true;
                            LOG.info((Object)("PacketResponder " + this.block + " " + this.numTargets + " Exception " + StringUtils.stringifyException((Throwable)ioe)));
                        }
                    }
                    if (Thread.interrupted() || isInterrupted) {
                        LOG.info((Object)("PacketResponder " + this.block + " " + this.numTargets + " : Thread is interrupted."));
                        this.running = false;
                        continue;
                    }
                    if (lastPacketInBlock) {
                        this.receiver.close();
                        long endTime = ClientTraceLog.isInfoEnabled() ? System.nanoTime() : 0L;
                        this.block.setNumBytes(BlockReceiver.this.replicaInfo.getNumBytes());
                        ((BlockReceiver)BlockReceiver.this).datanode.data.finalizeBlock(this.block);
                        BlockReceiver.this.datanode.closeBlock(this.block, "");
                        if (ClientTraceLog.isInfoEnabled() && this.receiver.clientName.length() > 0) {
                            long offset = 0L;
                            ClientTraceLog.info((Object)String.format("src: %s, dest: %s, bytes: %s, op: %s, cliID: %s, offset: %s, srvID: %s, blockid: %s, duration: %s", this.receiver.inAddr, this.receiver.myAddr, this.block.getNumBytes(), "HDFS_WRITE", this.receiver.clientName, offset, ((BlockReceiver)BlockReceiver.this).datanode.dnRegistration.getStorageID(), this.block, endTime - startTime));
                        } else {
                            LOG.info((Object)("Received block " + this.block + " of size " + this.block.getNumBytes() + " from " + this.receiver.inAddr));
                        }
                    }
                    DataTransferProtocol.Status[] replies = null;
                    if (BlockReceiver.this.mirrorError) {
                        replies = new DataTransferProtocol.Status[]{DataTransferProtocol.Status.SUCCESS, DataTransferProtocol.Status.ERROR};
                    } else {
                        int ackLen = this.numTargets == 0 ? 0 : ack.getNumOfReplies();
                        replies = new DataTransferProtocol.Status[1 + ackLen];
                        replies[0] = DataTransferProtocol.Status.SUCCESS;
                        for (int i = 0; i < ackLen; ++i) {
                            replies[i + 1] = ack.getReply(i);
                        }
                    }
                    DataTransferProtocol.PipelineAck replyAck = new DataTransferProtocol.PipelineAck(expected, replies);
                    replyAck.write(this.replyOut);
                    this.replyOut.flush();
                    if (LOG.isDebugEnabled()) {
                        LOG.debug((Object)("PacketResponder " + this.numTargets + " for block " + this.block + " responded an ack: " + replyAck));
                    }
                    if (pkt == null) continue;
                    this.removeAckHead();
                    if (!replyAck.isSuccess() || pkt.lastByteInBlock <= BlockReceiver.this.replicaInfo.getBytesAcked()) continue;
                    BlockReceiver.this.replicaInfo.setBytesAcked(pkt.lastByteInBlock);
                }
                catch (IOException e) {
                    LOG.warn((Object)"IOException in BlockReceiver.run(): ", (Throwable)e);
                    if (!this.running) continue;
                    try {
                        BlockReceiver.this.datanode.checkDiskError(e);
                    }
                    catch (IOException ioe) {
                        LOG.warn((Object)"DataNode.chekDiskError failed in run() with: ", (Throwable)ioe);
                    }
                    LOG.info((Object)("PacketResponder " + this.block + " " + this.numTargets + " Exception " + StringUtils.stringifyException((Throwable)e)));
                    this.running = false;
                    if (Thread.interrupted()) continue;
                    this.receiverThread.interrupt();
                }
                catch (Throwable e) {
                    if (!this.running) continue;
                    LOG.info((Object)("PacketResponder " + this.block + " " + this.numTargets + " Exception " + StringUtils.stringifyException((Throwable)e)));
                    this.running = false;
                    this.receiverThread.interrupt();
                }
            }
            LOG.info((Object)("PacketResponder " + this.numTargets + " for block " + this.block + " terminating"));
        }

        private synchronized void removeAckHead() {
            this.ackQueue.removeFirst();
            this.notifyAll();
        }
    }
}

