/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hdfs;

import java.io.BufferedOutputStream;
import java.io.Closeable;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketAddress;
import java.nio.BufferOverflowException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.EnumSet;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.zip.Checksum;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FSOutputSummer;
import org.apache.hadoop.fs.FileAlreadyExistsException;
import org.apache.hadoop.fs.ParentNotDirectoryException;
import org.apache.hadoop.fs.Syncable;
import org.apache.hadoop.fs.UnresolvedLinkException;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.HdfsProtoUtil;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException;
import org.apache.hadoop.hdfs.protocol.UnresolvedPathException;
import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage;
import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol;
import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
import org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck;
import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
import org.apache.hadoop.hdfs.server.namenode.NotReplicatedYetException;
import org.apache.hadoop.hdfs.server.namenode.SafeModeException;
import org.apache.hadoop.io.EnumSetWritable;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.Daemon;
import org.apache.hadoop.util.DataChecksum;
import org.apache.hadoop.util.Progressable;

@InterfaceAudience.Private
class DFSOutputStream
extends FSOutputSummer
implements Syncable {
    private final DFSClient dfsClient;
    private static final int MAX_PACKETS = 80;
    private Socket s;
    private volatile boolean closed = false;
    private String src;
    private final long blockSize;
    private final DataChecksum checksum;
    private final LinkedList<Packet> dataQueue = new LinkedList();
    private final LinkedList<Packet> ackQueue = new LinkedList();
    private Packet currentPacket = null;
    private DataStreamer streamer;
    private long currentSeqno = 0L;
    private long lastQueuedSeqno = -1L;
    private long lastAckedSeqno = -1L;
    private long bytesCurBlock = 0L;
    private int packetSize = 0;
    private int chunksPerPacket = 0;
    private volatile IOException lastException = null;
    private long artificialSlowdown = 0L;
    private long lastFlushOffset = 0L;
    private final AtomicBoolean persistBlocks = new AtomicBoolean(false);
    private volatile boolean appendChunk = false;
    private long initialFileSize = 0L;
    private Progressable progress;
    private short blockReplication;

    static Socket createSocketForPipeline(DatanodeInfo first, int length, DFSClient client) throws IOException {
        if (DFSClient.LOG.isDebugEnabled()) {
            DFSClient.LOG.debug((Object)("Connecting to datanode " + first.getName()));
        }
        InetSocketAddress isa = NetUtils.createSocketAddr((String)first.getName());
        Socket sock = client.socketFactory.createSocket();
        int timeout = client.getDatanodeReadTimeout(length);
        NetUtils.connect((Socket)sock, (SocketAddress)isa, (int)timeout);
        sock.setSoTimeout(timeout);
        sock.setSendBufferSize(131072);
        if (DFSClient.LOG.isDebugEnabled()) {
            DFSClient.LOG.debug((Object)("Send buf size " + sock.getSendBufferSize()));
        }
        return sock;
    }

    private void isClosed() throws IOException {
        if (this.closed) {
            IOException e = this.lastException;
            throw e != null ? e : new IOException("DFSOutputStream is closed");
        }
    }

    synchronized DatanodeInfo[] getPipeline() {
        if (this.streamer == null) {
            return null;
        }
        DatanodeInfo[] currentNodes = this.streamer.getNodes();
        if (currentNodes == null) {
            return null;
        }
        DatanodeInfo[] value = new DatanodeInfo[currentNodes.length];
        for (int i = 0; i < currentNodes.length; ++i) {
            value[i] = currentNodes[i];
        }
        return value;
    }

    private DFSOutputStream(DFSClient dfsClient, String src, long blockSize, Progressable progress, DataChecksum checksum, short replication) throws IOException {
        super((Checksum)checksum, checksum.getBytesPerChecksum(), checksum.getChecksumSize());
        int bytesPerChecksum = checksum.getBytesPerChecksum();
        this.dfsClient = dfsClient;
        this.src = src;
        this.blockSize = blockSize;
        this.blockReplication = replication;
        this.progress = progress;
        if (progress != null && DFSClient.LOG.isDebugEnabled()) {
            DFSClient.LOG.debug((Object)("Set non-null progress callback on DFSOutputStream " + src));
        }
        if (bytesPerChecksum < 1 || blockSize % (long)bytesPerChecksum != 0L) {
            throw new IOException("io.bytes.per.checksum(" + bytesPerChecksum + ") and blockSize(" + blockSize + ") do not match. " + "blockSize should be a " + "multiple of io.bytes.per.checksum");
        }
        this.checksum = checksum;
    }

    DFSOutputStream(DFSClient dfsClient, String src, FsPermission masked, EnumSet<CreateFlag> flag, boolean createParent, short replication, long blockSize, Progressable progress, int buffersize, DataChecksum checksum) throws IOException {
        this(dfsClient, src, blockSize, progress, checksum, replication);
        this.computePacketChunkSize(dfsClient.getConf().writePacketSize, checksum.getBytesPerChecksum());
        try {
            dfsClient.namenode.create(src, masked, dfsClient.clientName, (EnumSetWritable<CreateFlag>)new EnumSetWritable(flag), createParent, replication, blockSize);
        }
        catch (RemoteException re) {
            throw re.unwrapRemoteException(new Class[]{AccessControlException.class, DSQuotaExceededException.class, FileAlreadyExistsException.class, FileNotFoundException.class, ParentNotDirectoryException.class, NSQuotaExceededException.class, SafeModeException.class, UnresolvedPathException.class});
        }
        this.streamer = new DataStreamer();
        this.streamer.start();
    }

    DFSOutputStream(DFSClient dfsClient, String src, int buffersize, Progressable progress, LocatedBlock lastBlock, HdfsFileStatus stat, DataChecksum checksum) throws IOException {
        this(dfsClient, src, stat.getBlockSize(), progress, checksum, stat.getReplication());
        this.initialFileSize = stat.getLen();
        if (lastBlock != null) {
            this.bytesCurBlock = lastBlock.getBlockSize();
            this.streamer = new DataStreamer(lastBlock, stat, checksum.getBytesPerChecksum());
        } else {
            this.computePacketChunkSize(dfsClient.getConf().writePacketSize, checksum.getBytesPerChecksum());
            this.streamer = new DataStreamer();
        }
        this.streamer.start();
    }

    private void computePacketChunkSize(int psize, int csize) {
        int chunkSize = csize + this.checksum.getChecksumSize();
        int n = PacketHeader.PKT_HEADER_LEN;
        this.chunksPerPacket = Math.max((psize - n + chunkSize - 1) / chunkSize, 1);
        this.packetSize = n + chunkSize * this.chunksPerPacket;
        if (DFSClient.LOG.isDebugEnabled()) {
            DFSClient.LOG.debug((Object)("computePacketChunkSize: src=" + this.src + ", chunkSize=" + chunkSize + ", chunksPerPacket=" + this.chunksPerPacket + ", packetSize=" + this.packetSize));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void queueCurrentPacket() {
        LinkedList<Packet> linkedList = this.dataQueue;
        synchronized (linkedList) {
            if (this.currentPacket == null) {
                return;
            }
            this.dataQueue.addLast(this.currentPacket);
            this.lastQueuedSeqno = this.currentPacket.seqno;
            if (DFSClient.LOG.isDebugEnabled()) {
                DFSClient.LOG.debug((Object)("Queued packet " + this.currentPacket.seqno));
            }
            this.currentPacket = null;
            this.dataQueue.notifyAll();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void waitAndQueueCurrentPacket() throws IOException {
        LinkedList<Packet> linkedList = this.dataQueue;
        synchronized (linkedList) {
            while (!this.closed && this.dataQueue.size() + this.ackQueue.size() > 80) {
                try {
                    this.dataQueue.wait();
                }
                catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    break;
                }
            }
            this.isClosed();
            this.queueCurrentPacket();
        }
    }

    protected synchronized void writeChunk(byte[] b, int offset, int len, byte[] checksum) throws IOException {
        this.dfsClient.checkOpen();
        this.isClosed();
        int cklen = checksum.length;
        int bytesPerChecksum = this.checksum.getBytesPerChecksum();
        if (len > bytesPerChecksum) {
            throw new IOException("writeChunk() buffer size is " + len + " is larger than supported  bytesPerChecksum " + bytesPerChecksum);
        }
        if (checksum.length != this.checksum.getChecksumSize()) {
            throw new IOException("writeChunk() checksum size is supposed to be " + this.checksum.getChecksumSize() + " but found to be " + checksum.length);
        }
        if (this.currentPacket == null) {
            this.currentPacket = new Packet(this.packetSize, this.chunksPerPacket, this.bytesCurBlock);
            if (DFSClient.LOG.isDebugEnabled()) {
                DFSClient.LOG.debug((Object)("DFSClient writeChunk allocating new packet seqno=" + this.currentPacket.seqno + ", src=" + this.src + ", packetSize=" + this.packetSize + ", chunksPerPacket=" + this.chunksPerPacket + ", bytesCurBlock=" + this.bytesCurBlock));
            }
        }
        this.currentPacket.writeChecksum(checksum, 0, cklen);
        this.currentPacket.writeData(b, offset, len);
        ++this.currentPacket.numChunks;
        this.bytesCurBlock += (long)len;
        if (this.currentPacket.numChunks == this.currentPacket.maxChunks || this.bytesCurBlock == this.blockSize) {
            if (DFSClient.LOG.isDebugEnabled()) {
                DFSClient.LOG.debug((Object)("DFSClient writeChunk packet full seqno=" + this.currentPacket.seqno + ", src=" + this.src + ", bytesCurBlock=" + this.bytesCurBlock + ", blockSize=" + this.blockSize + ", appendChunk=" + this.appendChunk));
            }
            this.waitAndQueueCurrentPacket();
            if (this.appendChunk && this.bytesCurBlock % (long)bytesPerChecksum == 0L) {
                this.appendChunk = false;
                this.resetChecksumChunk(bytesPerChecksum);
            }
            if (!this.appendChunk) {
                int psize = Math.min((int)(this.blockSize - this.bytesCurBlock), this.dfsClient.getConf().writePacketSize);
                this.computePacketChunkSize(psize, bytesPerChecksum);
            }
            if (this.bytesCurBlock == this.blockSize) {
                this.currentPacket = new Packet(PacketHeader.PKT_HEADER_LEN, 0, this.bytesCurBlock);
                this.currentPacket.lastPacketInBlock = true;
                this.waitAndQueueCurrentPacket();
                this.bytesCurBlock = 0L;
                this.lastFlushOffset = 0L;
            }
        }
    }

    @Deprecated
    public synchronized void sync() throws IOException {
        this.hflush();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void hflush() throws IOException {
        this.dfsClient.checkOpen();
        this.isClosed();
        try {
            long toWaitFor;
            DFSOutputStream dFSOutputStream = this;
            synchronized (dFSOutputStream) {
                long saveOffset = this.bytesCurBlock;
                Packet oldCurrentPacket = this.currentPacket;
                this.flushBuffer(true);
                if (DFSClient.LOG.isDebugEnabled()) {
                    DFSClient.LOG.debug((Object)("DFSClient flush() : saveOffset " + saveOffset + " bytesCurBlock " + this.bytesCurBlock + " lastFlushOffset " + this.lastFlushOffset));
                }
                if (this.lastFlushOffset != this.bytesCurBlock) {
                    assert (this.bytesCurBlock > this.lastFlushOffset);
                    this.lastFlushOffset = this.bytesCurBlock;
                    this.waitAndQueueCurrentPacket();
                } else {
                    assert (oldCurrentPacket == null) : "Empty flush should not occur with a currentPacket";
                    this.currentPacket = null;
                }
                this.bytesCurBlock = saveOffset;
                toWaitFor = this.lastQueuedSeqno;
            }
            this.waitForAckedSeqno(toWaitFor);
            if (this.persistBlocks.getAndSet(false)) {
                try {
                    this.dfsClient.namenode.fsync(this.src, this.dfsClient.clientName);
                }
                catch (IOException ioe) {
                    DFSClient.LOG.warn((Object)("Unable to persist blocks in hflush for " + this.src), (Throwable)ioe);
                    this.isClosed();
                    throw ioe;
                }
            }
            dFSOutputStream = this;
            synchronized (dFSOutputStream) {
                if (this.streamer != null) {
                    this.streamer.setHflush();
                }
            }
        }
        catch (InterruptedIOException interrupt) {
            throw interrupt;
        }
        catch (IOException e) {
            DFSClient.LOG.warn((Object)"Error while syncing", (Throwable)e);
            DFSOutputStream dFSOutputStream = this;
            synchronized (dFSOutputStream) {
                if (!this.closed) {
                    this.lastException = new IOException("IOException flush:" + e);
                    this.closeThreads(true);
                }
            }
            throw e;
        }
    }

    public synchronized void hsync() throws IOException {
        this.hflush();
    }

    synchronized int getNumCurrentReplicas() throws IOException {
        this.dfsClient.checkOpen();
        this.isClosed();
        if (this.streamer == null) {
            return this.blockReplication;
        }
        DatanodeInfo[] currentNodes = this.streamer.getNodes();
        if (currentNodes == null) {
            return this.blockReplication;
        }
        return currentNodes.length;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void flushInternal() throws IOException {
        long toWaitFor;
        DFSOutputStream dFSOutputStream = this;
        synchronized (dFSOutputStream) {
            this.dfsClient.checkOpen();
            this.isClosed();
            this.queueCurrentPacket();
            toWaitFor = this.lastQueuedSeqno;
        }
        this.waitForAckedSeqno(toWaitFor);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void waitForAckedSeqno(long seqno) throws IOException {
        if (DFSClient.LOG.isDebugEnabled()) {
            DFSClient.LOG.debug((Object)("Waiting for ack for: " + seqno));
        }
        LinkedList<Packet> linkedList = this.dataQueue;
        synchronized (linkedList) {
            while (!this.closed) {
                this.isClosed();
                if (this.lastAckedSeqno >= seqno) break;
                try {
                    this.dataQueue.wait(1000L);
                }
                catch (InterruptedException ie) {
                    throw new InterruptedIOException("Interrupted while waiting for data to be acknowledged by pipeline");
                }
            }
        }
        this.isClosed();
    }

    synchronized void abort() throws IOException {
        if (this.closed) {
            return;
        }
        this.streamer.setLastException(new IOException("Lease timeout of " + this.dfsClient.hdfsTimeout / 1000 + " seconds expired."));
        this.closeThreads(true);
    }

    private void closeThreads(boolean force) throws IOException {
        try {
            this.streamer.close(force);
            this.streamer.join();
            if (this.s != null) {
                this.s.close();
            }
        }
        catch (InterruptedException e) {
            throw new IOException("Failed to shutdown streamer");
        }
        finally {
            this.streamer = null;
            this.s = null;
            this.closed = true;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public synchronized void close() throws IOException {
        if (this.closed) {
            IOException e = this.lastException;
            if (e == null) {
                return;
            }
            throw e;
        }
        try {
            this.flushBuffer();
            if (this.currentPacket != null) {
                this.waitAndQueueCurrentPacket();
            }
            if (this.bytesCurBlock != 0L) {
                this.currentPacket = new Packet(PacketHeader.PKT_HEADER_LEN, 0, this.bytesCurBlock);
                this.currentPacket.lastPacketInBlock = true;
            }
            this.flushInternal();
            ExtendedBlock lastBlock = this.streamer.getBlock();
            this.closeThreads(false);
            this.completeFile(lastBlock);
            this.dfsClient.leaserenewer.closeFile(this.src, this.dfsClient);
        }
        finally {
            this.closed = true;
        }
    }

    private void completeFile(ExtendedBlock last) throws IOException {
        long localstart = System.currentTimeMillis();
        boolean fileComplete = false;
        while (!fileComplete) {
            fileComplete = this.dfsClient.namenode.complete(this.src, this.dfsClient.clientName, last);
            if (fileComplete) continue;
            if (!this.dfsClient.clientRunning || this.dfsClient.hdfsTimeout > 0 && localstart + (long)this.dfsClient.hdfsTimeout < System.currentTimeMillis()) {
                String msg = "Unable to close file because dfsclient  was unable to contact the HDFS servers. clientRunning " + this.dfsClient.clientRunning + " hdfsTimeout " + this.dfsClient.hdfsTimeout;
                DFSClient.LOG.info((Object)msg);
                throw new IOException(msg);
            }
            try {
                Thread.sleep(400L);
                if (System.currentTimeMillis() - localstart <= 5000L) continue;
                DFSClient.LOG.info((Object)("Could not complete file " + this.src + " retrying..."));
            }
            catch (InterruptedException ie) {}
        }
    }

    void setArtificialSlowdown(long period) {
        this.artificialSlowdown = period;
    }

    synchronized void setChunksPerPacket(int value) {
        this.chunksPerPacket = Math.min(this.chunksPerPacket, value);
        this.packetSize = PacketHeader.PKT_HEADER_LEN + (this.checksum.getBytesPerChecksum() + this.checksum.getChecksumSize()) * this.chunksPerPacket;
    }

    synchronized void setTestFilename(String newname) {
        this.src = newname;
    }

    long getInitialLen() {
        return this.initialFileSize;
    }

    synchronized Token<BlockTokenIdentifier> getBlockToken() {
        return this.streamer.getBlockToken();
    }

    class DataStreamer
    extends Daemon {
        private volatile boolean streamerClosed = false;
        private ExtendedBlock block;
        private Token<BlockTokenIdentifier> accessToken;
        private DataOutputStream blockStream;
        private DataInputStream blockReplyStream;
        private ResponseProcessor response = null;
        private volatile DatanodeInfo[] nodes = null;
        private ArrayList<DatanodeInfo> excludedNodes = new ArrayList();
        volatile boolean hasError = false;
        volatile int errorIndex = -1;
        private BlockConstructionStage stage;
        private long bytesSent = 0L;
        private final List<DatanodeInfo> failed = new ArrayList<DatanodeInfo>();
        private boolean isHflushed = false;
        private final boolean isAppend;

        private DataStreamer() {
            this.isAppend = false;
            this.stage = BlockConstructionStage.PIPELINE_SETUP_CREATE;
        }

        private DataStreamer(LocatedBlock lastBlock, HdfsFileStatus stat, int bytesPerChecksum) throws IOException {
            this.isAppend = true;
            this.stage = BlockConstructionStage.PIPELINE_SETUP_APPEND;
            this.block = lastBlock.getBlock();
            this.bytesSent = this.block.getNumBytes();
            this.accessToken = lastBlock.getBlockToken();
            long usedInLastBlock = stat.getLen() % DFSOutputStream.this.blockSize;
            int freeInLastBlock = (int)(DFSOutputStream.this.blockSize - usedInLastBlock);
            int usedInCksum = (int)(stat.getLen() % (long)bytesPerChecksum);
            int freeInCksum = bytesPerChecksum - usedInCksum;
            if ((long)freeInLastBlock == DFSOutputStream.this.blockSize) {
                throw new IOException("The last block for file " + DFSOutputStream.this.src + " is full.");
            }
            if (usedInCksum > 0 && freeInCksum > 0) {
                DFSOutputStream.this.computePacketChunkSize(0, freeInCksum);
                DFSOutputStream.this.resetChecksumChunk(freeInCksum);
                DFSOutputStream.this.appendChunk = true;
            } else {
                DFSOutputStream.this.computePacketChunkSize(Math.min(((DFSOutputStream)DFSOutputStream.this).dfsClient.getConf().writePacketSize, freeInLastBlock), bytesPerChecksum);
            }
            this.nodes = lastBlock.getLocations();
            this.errorIndex = -1;
            if (this.nodes.length < 1) {
                throw new IOException("Unable to retrieve blocks locations  for last block " + this.block + "of file " + DFSOutputStream.this.src);
            }
        }

        private void initDataStreaming() {
            this.setName("DataStreamer for file " + DFSOutputStream.this.src + " block " + this.block);
            this.response = new ResponseProcessor(this.nodes);
            this.response.start();
            this.stage = BlockConstructionStage.DATA_STREAMING;
        }

        private void endBlock() {
            if (DFSClient.LOG.isDebugEnabled()) {
                DFSClient.LOG.debug((Object)("Closing old block " + this.block));
            }
            this.setName("DataStreamer for file " + DFSOutputStream.this.src);
            this.closeResponder();
            this.closeStream();
            this.nodes = null;
            this.stage = BlockConstructionStage.PIPELINE_SETUP_CREATE;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void run() {
            long lastPacket = System.currentTimeMillis();
            while (!this.streamerClosed && ((DFSOutputStream)DFSOutputStream.this).dfsClient.clientRunning) {
                if (this.hasError && this.response != null) {
                    try {
                        this.response.close();
                        this.response.join();
                        this.response = null;
                    }
                    catch (InterruptedException e) {
                        // empty catch block
                    }
                }
                Packet one = null;
                try {
                    long tmpBytesSent;
                    boolean doSleep = false;
                    if (this.hasError && this.errorIndex >= 0) {
                        doSleep = this.processDatanodeError();
                    }
                    LinkedList linkedList = DFSOutputStream.this.dataQueue;
                    synchronized (linkedList) {
                        long now = System.currentTimeMillis();
                        while (!this.streamerClosed && !this.hasError && ((DFSOutputStream)DFSOutputStream.this).dfsClient.clientRunning && DFSOutputStream.this.dataQueue.size() == 0 && (this.stage != BlockConstructionStage.DATA_STREAMING || this.stage == BlockConstructionStage.DATA_STREAMING && now - lastPacket < (long)(((DFSOutputStream)DFSOutputStream.this).dfsClient.getConf().socketTimeout / 2)) || doSleep) {
                            long timeout = (long)(((DFSOutputStream)DFSOutputStream.this).dfsClient.getConf().socketTimeout / 2) - (now - lastPacket);
                            timeout = timeout <= 0L ? 1000L : timeout;
                            timeout = this.stage == BlockConstructionStage.DATA_STREAMING ? timeout : 1000L;
                            try {
                                DFSOutputStream.this.dataQueue.wait(timeout);
                            }
                            catch (InterruptedException e) {
                                // empty catch block
                            }
                            doSleep = false;
                            now = System.currentTimeMillis();
                        }
                        if (this.streamerClosed || this.hasError || !((DFSOutputStream)DFSOutputStream.this).dfsClient.clientRunning) {
                            continue;
                        }
                        one = DFSOutputStream.this.dataQueue.isEmpty() ? new Packet() : (Packet)DFSOutputStream.this.dataQueue.getFirst();
                    }
                    assert (one != null);
                    if (this.stage == BlockConstructionStage.PIPELINE_SETUP_CREATE) {
                        if (DFSClient.LOG.isDebugEnabled()) {
                            DFSClient.LOG.debug((Object)"Allocating new block");
                        }
                        this.nodes = this.nextBlockOutputStream(DFSOutputStream.this.src);
                        this.initDataStreaming();
                    } else if (this.stage == BlockConstructionStage.PIPELINE_SETUP_APPEND) {
                        if (DFSClient.LOG.isDebugEnabled()) {
                            DFSClient.LOG.debug((Object)("Append to block " + this.block));
                        }
                        this.setupPipelineForAppendOrRecovery();
                        this.initDataStreaming();
                    }
                    long lastByteOffsetInBlock = one.getLastByteOffsetBlock();
                    if (lastByteOffsetInBlock > DFSOutputStream.this.blockSize) {
                        throw new IOException("BlockSize " + DFSOutputStream.this.blockSize + " is smaller than data size. " + " Offset of packet in block " + lastByteOffsetInBlock + " Aborting file " + DFSOutputStream.this.src);
                    }
                    if (one.lastPacketInBlock) {
                        LinkedList linkedList2 = DFSOutputStream.this.dataQueue;
                        synchronized (linkedList2) {
                            while (!this.streamerClosed && !this.hasError && DFSOutputStream.this.ackQueue.size() != 0 && ((DFSOutputStream)DFSOutputStream.this).dfsClient.clientRunning) {
                                try {
                                    DFSOutputStream.this.dataQueue.wait(1000L);
                                }
                                catch (InterruptedException e) {}
                            }
                        }
                        if (this.streamerClosed || this.hasError || !((DFSOutputStream)DFSOutputStream.this).dfsClient.clientRunning) continue;
                        this.stage = BlockConstructionStage.PIPELINE_CLOSE;
                    }
                    ByteBuffer buf = one.getBuffer();
                    LinkedList e = DFSOutputStream.this.dataQueue;
                    synchronized (e) {
                        if (!one.isHeartbeatPacket()) {
                            DFSOutputStream.this.dataQueue.removeFirst();
                            DFSOutputStream.this.ackQueue.addLast(one);
                            DFSOutputStream.this.dataQueue.notifyAll();
                        }
                    }
                    if (DFSClient.LOG.isDebugEnabled()) {
                        DFSClient.LOG.debug((Object)("DataStreamer block " + this.block + " sending packet " + one));
                    }
                    this.blockStream.write(buf.array(), buf.position(), buf.remaining());
                    this.blockStream.flush();
                    lastPacket = System.currentTimeMillis();
                    if (one.isHeartbeatPacket()) {
                        // empty if block
                    }
                    if (this.bytesSent < (tmpBytesSent = one.getLastByteOffsetBlock())) {
                        this.bytesSent = tmpBytesSent;
                    }
                    if (this.streamerClosed || this.hasError || !((DFSOutputStream)DFSOutputStream.this).dfsClient.clientRunning) continue;
                    if (one.lastPacketInBlock) {
                        LinkedList linkedList3 = DFSOutputStream.this.dataQueue;
                        synchronized (linkedList3) {
                            while (!this.streamerClosed && !this.hasError && DFSOutputStream.this.ackQueue.size() != 0 && ((DFSOutputStream)DFSOutputStream.this).dfsClient.clientRunning) {
                                DFSOutputStream.this.dataQueue.wait(1000L);
                            }
                        }
                        if (this.streamerClosed || this.hasError || !((DFSOutputStream)DFSOutputStream.this).dfsClient.clientRunning) continue;
                        this.endBlock();
                    }
                    if (DFSOutputStream.this.progress != null) {
                        DFSOutputStream.this.progress.progress();
                    }
                    if (DFSOutputStream.this.artificialSlowdown == 0L || !((DFSOutputStream)DFSOutputStream.this).dfsClient.clientRunning) continue;
                    Thread.sleep(DFSOutputStream.this.artificialSlowdown);
                }
                catch (Throwable e) {
                    DFSClient.LOG.warn((Object)"DataStreamer Exception", e);
                    if (e instanceof IOException) {
                        this.setLastException((IOException)e);
                    }
                    this.hasError = true;
                    if (this.errorIndex != -1) continue;
                    this.streamerClosed = true;
                }
            }
            this.closeInternal();
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void closeInternal() {
            this.closeResponder();
            this.closeStream();
            this.streamerClosed = true;
            DFSOutputStream.this.closed = true;
            LinkedList linkedList = DFSOutputStream.this.dataQueue;
            synchronized (linkedList) {
                DFSOutputStream.this.dataQueue.notifyAll();
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        void close(boolean force) {
            this.streamerClosed = true;
            LinkedList linkedList = DFSOutputStream.this.dataQueue;
            synchronized (linkedList) {
                DFSOutputStream.this.dataQueue.notifyAll();
            }
            if (force) {
                this.interrupt();
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void closeResponder() {
            if (this.response != null) {
                try {
                    this.response.close();
                    this.response.join();
                }
                catch (InterruptedException interruptedException) {
                }
                finally {
                    this.response = null;
                }
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void closeStream() {
            if (this.blockStream != null) {
                try {
                    this.blockStream.close();
                }
                catch (IOException e) {
                    this.setLastException(e);
                }
                finally {
                    this.blockStream = null;
                }
            }
            if (this.blockReplyStream != null) {
                try {
                    this.blockReplyStream.close();
                }
                catch (IOException e) {
                    this.setLastException(e);
                }
                finally {
                    this.blockReplyStream = null;
                }
            }
            if (null != DFSOutputStream.this.s) {
                try {
                    DFSOutputStream.this.s.close();
                }
                catch (IOException e) {
                    this.setLastException(e);
                }
                finally {
                    DFSOutputStream.this.s = null;
                }
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private boolean processDatanodeError() throws IOException {
            if (this.response != null) {
                DFSClient.LOG.info((Object)("Error Recovery for block " + this.block + " waiting for responder to exit. "));
                return true;
            }
            this.closeStream();
            LinkedList linkedList = DFSOutputStream.this.dataQueue;
            synchronized (linkedList) {
                DFSOutputStream.this.dataQueue.addAll(0, DFSOutputStream.this.ackQueue);
                DFSOutputStream.this.ackQueue.clear();
            }
            boolean doSleep = this.setupPipelineForAppendOrRecovery();
            if (!this.streamerClosed && ((DFSOutputStream)DFSOutputStream.this).dfsClient.clientRunning) {
                if (this.stage == BlockConstructionStage.PIPELINE_CLOSE) {
                    LinkedList linkedList2 = DFSOutputStream.this.dataQueue;
                    synchronized (linkedList2) {
                        assert (DFSOutputStream.this.dataQueue.size() == 1);
                        Packet endOfBlockPacket = (Packet)DFSOutputStream.this.dataQueue.remove();
                        assert (endOfBlockPacket.lastPacketInBlock);
                        assert (DFSOutputStream.this.lastAckedSeqno == endOfBlockPacket.seqno - 1L);
                        DFSOutputStream.this.lastAckedSeqno = endOfBlockPacket.seqno;
                        DFSOutputStream.this.dataQueue.notifyAll();
                    }
                    this.endBlock();
                } else {
                    this.initDataStreaming();
                }
            }
            return doSleep;
        }

        private void setHflush() {
            this.isHflushed = true;
        }

        private int findNewDatanode(DatanodeInfo[] original) throws IOException {
            if (this.nodes.length != original.length + 1) {
                throw new IOException("Failed to add a datanode: nodes.length != original.length + 1, nodes=" + Arrays.asList(this.nodes) + ", original=" + Arrays.asList(original));
            }
            for (int i = 0; i < this.nodes.length; ++i) {
                int j;
                for (j = 0; j < original.length && !this.nodes[i].equals(original[j]); ++j) {
                }
                if (j != original.length) continue;
                return i;
            }
            throw new IOException("Failed: new datanode not found: nodes=" + Arrays.asList(this.nodes) + ", original=" + Arrays.asList(original));
        }

        private void addDatanode2ExistingPipeline() throws IOException {
            if (DataTransferProtocol.LOG.isDebugEnabled()) {
                DataTransferProtocol.LOG.debug((Object)("lastAckedSeqno = " + DFSOutputStream.this.lastAckedSeqno));
            }
            if (!this.isAppend && DFSOutputStream.this.lastAckedSeqno < 0L && this.stage == BlockConstructionStage.PIPELINE_SETUP_CREATE) {
                return;
            }
            if (this.stage == BlockConstructionStage.PIPELINE_CLOSE || this.stage == BlockConstructionStage.PIPELINE_CLOSE_RECOVERY) {
                return;
            }
            DatanodeInfo[] original = this.nodes;
            LocatedBlock lb = ((DFSOutputStream)DFSOutputStream.this).dfsClient.namenode.getAdditionalDatanode(DFSOutputStream.this.src, this.block, this.nodes, this.failed.toArray(new DatanodeInfo[this.failed.size()]), 1, ((DFSOutputStream)DFSOutputStream.this).dfsClient.clientName);
            this.nodes = lb.getLocations();
            int d = this.findNewDatanode(original);
            DatanodeInfo src = d == 0 ? this.nodes[1] : this.nodes[d - 1];
            DatanodeInfo[] targets = new DatanodeInfo[]{this.nodes[d]};
            this.transfer(src, targets, lb.getBlockToken());
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void transfer(DatanodeInfo src, DatanodeInfo[] targets, Token<BlockTokenIdentifier> blockToken) throws IOException {
            Socket sock = null;
            DataOutputStream out = null;
            DataInputStream in = null;
            try {
                sock = DFSOutputStream.createSocketForPipeline(src, 2, DFSOutputStream.this.dfsClient);
                long writeTimeout = DFSOutputStream.this.dfsClient.getDatanodeWriteTimeout(2);
                out = new DataOutputStream(new BufferedOutputStream(NetUtils.getOutputStream((Socket)sock, (long)writeTimeout), HdfsConstants.SMALL_BUFFER_SIZE));
                new Sender(out).transferBlock(this.block, blockToken, ((DFSOutputStream)DFSOutputStream.this).dfsClient.clientName, targets);
                in = new DataInputStream(NetUtils.getInputStream((Socket)sock));
                DataTransferProtos.BlockOpResponseProto response = DataTransferProtos.BlockOpResponseProto.parseFrom(HdfsProtoUtil.vintPrefixed(in));
                if (DataTransferProtos.Status.SUCCESS != response.getStatus()) {
                    throw new IOException("Failed to add a datanode");
                }
            }
            catch (Throwable throwable) {
                IOUtils.closeStream(in);
                IOUtils.closeStream(out);
                IOUtils.closeSocket((Socket)sock);
                throw throwable;
            }
            IOUtils.closeStream((Closeable)in);
            IOUtils.closeStream((Closeable)out);
            IOUtils.closeSocket((Socket)sock);
        }

        private boolean setupPipelineForAppendOrRecovery() throws IOException {
            if (this.nodes == null || this.nodes.length == 0) {
                String msg = "Could not get block locations. Source file \"" + DFSOutputStream.this.src + "\" - Aborting...";
                DFSClient.LOG.warn((Object)msg);
                this.setLastException(new IOException(msg));
                this.streamerClosed = true;
                return false;
            }
            boolean success = false;
            long newGS = 0L;
            while (!success && !this.streamerClosed && ((DFSOutputStream)DFSOutputStream.this).dfsClient.clientRunning) {
                boolean isRecovery = this.hasError;
                if (this.errorIndex >= 0) {
                    StringBuilder pipelineMsg = new StringBuilder();
                    for (int j = 0; j < this.nodes.length; ++j) {
                        pipelineMsg.append(this.nodes[j].getName());
                        if (j >= this.nodes.length - 1) continue;
                        pipelineMsg.append(", ");
                    }
                    if (this.nodes.length <= 1) {
                        DFSOutputStream.this.lastException = new IOException("All datanodes " + pipelineMsg + " are bad. Aborting...");
                        this.streamerClosed = true;
                        return false;
                    }
                    DFSClient.LOG.warn((Object)("Error Recovery for block " + this.block + " in pipeline " + pipelineMsg + ": bad datanode " + this.nodes[this.errorIndex].getName()));
                    this.failed.add(this.nodes[this.errorIndex]);
                    DatanodeInfo[] newnodes = new DatanodeInfo[this.nodes.length - 1];
                    System.arraycopy(this.nodes, 0, newnodes, 0, this.errorIndex);
                    System.arraycopy(this.nodes, this.errorIndex + 1, newnodes, this.errorIndex, newnodes.length - this.errorIndex);
                    this.nodes = newnodes;
                    this.hasError = false;
                    DFSOutputStream.this.lastException = null;
                    this.errorIndex = -1;
                }
                if (((DFSOutputStream)DFSOutputStream.this).dfsClient.dtpReplaceDatanodeOnFailure.satisfy(DFSOutputStream.this.blockReplication, this.nodes, this.isAppend, this.isHflushed)) {
                    this.addDatanode2ExistingPipeline();
                }
                LocatedBlock lb = ((DFSOutputStream)DFSOutputStream.this).dfsClient.namenode.updateBlockForPipeline(this.block, ((DFSOutputStream)DFSOutputStream.this).dfsClient.clientName);
                newGS = lb.getBlock().getGenerationStamp();
                this.accessToken = lb.getBlockToken();
                success = this.createBlockOutputStream(this.nodes, newGS, isRecovery);
            }
            if (success) {
                ExtendedBlock newBlock = new ExtendedBlock(this.block.getBlockPoolId(), this.block.getBlockId(), this.block.getNumBytes(), newGS);
                ((DFSOutputStream)DFSOutputStream.this).dfsClient.namenode.updatePipeline(((DFSOutputStream)DFSOutputStream.this).dfsClient.clientName, this.block, newBlock, this.nodes);
                this.block = newBlock;
            }
            return false;
        }

        private DatanodeInfo[] nextBlockOutputStream(String client) throws IOException {
            LocatedBlock lb = null;
            DatanodeInfo[] nodes = null;
            int count = ((DFSOutputStream)DFSOutputStream.this).dfsClient.getConf().nBlockWriteRetry;
            boolean success = false;
            do {
                this.hasError = false;
                DFSOutputStream.this.lastException = null;
                this.errorIndex = -1;
                success = false;
                long startTime = System.currentTimeMillis();
                DatanodeInfo[] w = this.excludedNodes.toArray(new DatanodeInfo[this.excludedNodes.size()]);
                lb = this.locateFollowingBlock(startTime, (DatanodeInfo[])(w.length > 0 ? w : null));
                this.block = lb.getBlock();
                this.block.setNumBytes(0L);
                this.accessToken = lb.getBlockToken();
                nodes = lb.getLocations();
                success = this.createBlockOutputStream(nodes, 0L, false);
                if (success) continue;
                DFSClient.LOG.info((Object)("Abandoning block " + this.block));
                ((DFSOutputStream)DFSOutputStream.this).dfsClient.namenode.abandonBlock(this.block, DFSOutputStream.this.src, ((DFSOutputStream)DFSOutputStream.this).dfsClient.clientName);
                this.block = null;
                DFSClient.LOG.info((Object)("Excluding datanode " + nodes[this.errorIndex]));
                this.excludedNodes.add(nodes[this.errorIndex]);
            } while (!success && --count >= 0);
            if (!success) {
                throw new IOException("Unable to create new block.");
            }
            return nodes;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         * Enabled force condition propagation
         * Lifted jumps to return sites
         */
        private boolean createBlockOutputStream(DatanodeInfo[] nodes, long newGS, boolean recoveryFlag) {
            DataTransferProtos.Status pipelineStatus = DataTransferProtos.Status.SUCCESS;
            String firstBadLink = "";
            if (DFSClient.LOG.isDebugEnabled()) {
                for (int i = 0; i < nodes.length; ++i) {
                    DFSClient.LOG.debug((Object)("pipeline = " + nodes[i].getName()));
                }
            }
            DFSOutputStream.this.persistBlocks.set(true);
            boolean result = false;
            DataOutputStream out = null;
            try {
                assert (null == DFSOutputStream.this.s) : "Previous socket unclosed";
                DFSOutputStream.this.s = DFSOutputStream.createSocketForPipeline(nodes[0], nodes.length, DFSOutputStream.this.dfsClient);
                long writeTimeout = DFSOutputStream.this.dfsClient.getDatanodeWriteTimeout(nodes.length);
                out = new DataOutputStream(new BufferedOutputStream(NetUtils.getOutputStream((Socket)DFSOutputStream.this.s, (long)writeTimeout), HdfsConstants.SMALL_BUFFER_SIZE));
                assert (null == this.blockReplyStream) : "Previous blockReplyStream unclosed";
                this.blockReplyStream = new DataInputStream(NetUtils.getInputStream((Socket)DFSOutputStream.this.s));
                new Sender(out).writeBlock(this.block, this.accessToken, ((DFSOutputStream)DFSOutputStream.this).dfsClient.clientName, nodes, null, recoveryFlag ? this.stage.getRecoveryStage() : this.stage, nodes.length, this.block.getNumBytes(), this.bytesSent, newGS, DFSOutputStream.this.checksum);
                DataTransferProtos.BlockOpResponseProto resp = DataTransferProtos.BlockOpResponseProto.parseFrom(HdfsProtoUtil.vintPrefixed(this.blockReplyStream));
                pipelineStatus = resp.getStatus();
                firstBadLink = resp.getFirstBadLink();
                if (pipelineStatus != DataTransferProtos.Status.SUCCESS) {
                    if (pipelineStatus != DataTransferProtos.Status.ERROR_ACCESS_TOKEN) throw new IOException("Bad connect ack with firstBadLink as " + firstBadLink);
                    throw new InvalidBlockTokenException("Got access token error for connect ack with firstBadLink as " + firstBadLink);
                }
                assert (null == this.blockStream) : "Previous blockStream unclosed";
                this.blockStream = out;
                result = true;
                if (result) return result;
            }
            catch (IOException ie) {
                try {
                    DFSClient.LOG.info((Object)("Exception in createBlockOutputStream " + ie));
                    if (firstBadLink.length() != 0) {
                        for (int i = 0; i < nodes.length; ++i) {
                            if (!nodes[i].getName().equals(firstBadLink)) continue;
                            this.errorIndex = i;
                            break;
                        }
                    } else {
                        this.errorIndex = 0;
                    }
                    this.hasError = true;
                    this.setLastException(ie);
                    result = false;
                    if (result) return result;
                }
                catch (Throwable throwable) {
                    if (result) throw throwable;
                    IOUtils.closeSocket((Socket)DFSOutputStream.this.s);
                    DFSOutputStream.this.s = null;
                    IOUtils.closeStream(out);
                    out = null;
                    IOUtils.closeStream((Closeable)this.blockReplyStream);
                    this.blockReplyStream = null;
                    throw throwable;
                }
                IOUtils.closeSocket((Socket)DFSOutputStream.this.s);
                DFSOutputStream.this.s = null;
                IOUtils.closeStream((Closeable)out);
                out = null;
                IOUtils.closeStream((Closeable)this.blockReplyStream);
                this.blockReplyStream = null;
                return result;
            }
            IOUtils.closeSocket((Socket)DFSOutputStream.this.s);
            DFSOutputStream.this.s = null;
            IOUtils.closeStream((Closeable)out);
            out = null;
            IOUtils.closeStream((Closeable)this.blockReplyStream);
            this.blockReplyStream = null;
            return result;
        }

        private LocatedBlock locateFollowingBlock(long start, DatanodeInfo[] excludedNodes) throws IOException, UnresolvedLinkException {
            int retries = ((DFSOutputStream)DFSOutputStream.this).dfsClient.getConf().nBlockWriteLocateFollowingRetry;
            long sleeptime = 400L;
            long localstart = System.currentTimeMillis();
            while (true) {
                try {
                    return ((DFSOutputStream)DFSOutputStream.this).dfsClient.namenode.addBlock(DFSOutputStream.this.src, ((DFSOutputStream)DFSOutputStream.this).dfsClient.clientName, this.block, excludedNodes);
                }
                catch (RemoteException e) {
                    IOException ue = e.unwrapRemoteException(new Class[]{FileNotFoundException.class, AccessControlException.class, NSQuotaExceededException.class, DSQuotaExceededException.class, UnresolvedPathException.class});
                    if (ue != e) {
                        throw ue;
                    }
                    if (NotReplicatedYetException.class.getName().equals(e.getClassName())) {
                        if (retries == 0) {
                            throw e;
                        }
                        --retries;
                        DFSClient.LOG.info((Object)"Exception while adding a block", (Throwable)e);
                        if (System.currentTimeMillis() - localstart > 5000L) {
                            DFSClient.LOG.info((Object)("Waiting for replication for " + (System.currentTimeMillis() - localstart) / 1000L + " seconds"));
                        }
                        try {
                            DFSClient.LOG.warn((Object)("NotReplicatedYetException sleeping " + DFSOutputStream.this.src + " retries left " + retries));
                            Thread.sleep(sleeptime);
                            sleeptime *= 2L;
                        }
                        catch (InterruptedException ie) {}
                        continue;
                    }
                    throw e;
                }
                break;
            }
        }

        ExtendedBlock getBlock() {
            return this.block;
        }

        DatanodeInfo[] getNodes() {
            return this.nodes;
        }

        Token<BlockTokenIdentifier> getBlockToken() {
            return this.accessToken;
        }

        private void setLastException(IOException e) {
            if (DFSOutputStream.this.lastException == null) {
                DFSOutputStream.this.lastException = e;
            }
        }

        private class ResponseProcessor
        extends Daemon {
            private volatile boolean responderClosed = false;
            private DatanodeInfo[] targets = null;
            private boolean isLastPacketInBlock = false;

            ResponseProcessor(DatanodeInfo[] targets) {
                this.targets = targets;
            }

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            public void run() {
                this.setName("ResponseProcessor for block " + DataStreamer.this.block);
                PipelineAck ack = new PipelineAck();
                while (!this.responderClosed && ((DFSOutputStream)DFSOutputStream.this).dfsClient.clientRunning && !this.isLastPacketInBlock) {
                    try {
                        ack.readFields(DataStreamer.this.blockReplyStream);
                        if (DFSClient.LOG.isDebugEnabled()) {
                            DFSClient.LOG.debug((Object)("DFSClient " + ack));
                        }
                        long seqno = ack.getSeqno();
                        for (int i = ack.getNumOfReplies() - 1; i >= 0 && ((DFSOutputStream)DFSOutputStream.this).dfsClient.clientRunning; --i) {
                            DataTransferProtos.Status reply = ack.getReply(i);
                            if (reply == DataTransferProtos.Status.SUCCESS) continue;
                            DataStreamer.this.errorIndex = i;
                            throw new IOException("Bad response " + (Object)((Object)reply) + " for block " + DataStreamer.this.block + " from datanode " + this.targets[i].getName());
                        }
                        assert (seqno != -2L) : "Ack for unkown seqno should be a failed ack: " + ack;
                        if (seqno == -1L) continue;
                        Packet one = null;
                        LinkedList linkedList = DFSOutputStream.this.dataQueue;
                        synchronized (linkedList) {
                            one = (Packet)DFSOutputStream.this.ackQueue.getFirst();
                        }
                        if (one.seqno != seqno) {
                            throw new IOException("Responseprocessor: Expecting seqno  for block " + DataStreamer.this.block + one.seqno + " but received " + seqno);
                        }
                        this.isLastPacketInBlock = one.lastPacketInBlock;
                        DataStreamer.this.block.setNumBytes(one.getLastByteOffsetBlock());
                        linkedList = DFSOutputStream.this.dataQueue;
                        synchronized (linkedList) {
                            DFSOutputStream.this.lastAckedSeqno = seqno;
                            DFSOutputStream.this.ackQueue.removeFirst();
                            DFSOutputStream.this.dataQueue.notifyAll();
                        }
                    }
                    catch (Exception e) {
                        if (this.responderClosed) continue;
                        if (e instanceof IOException) {
                            DataStreamer.this.setLastException((IOException)e);
                        }
                        DataStreamer.this.hasError = true;
                        DataStreamer.this.errorIndex = DataStreamer.this.errorIndex == -1 ? 0 : DataStreamer.this.errorIndex;
                        LinkedList linkedList = DFSOutputStream.this.dataQueue;
                        synchronized (linkedList) {
                            DFSOutputStream.this.dataQueue.notifyAll();
                        }
                        DFSClient.LOG.warn((Object)("DFSOutputStream ResponseProcessor exception  for block " + DataStreamer.this.block), (Throwable)e);
                        this.responderClosed = true;
                    }
                }
            }

            void close() {
                this.responderClosed = true;
                this.interrupt();
            }
        }
    }

    private class Packet {
        long seqno;
        long offsetInBlock;
        boolean lastPacketInBlock = false;
        int numChunks = 0;
        int maxChunks;
        ByteBuffer buffer;
        byte[] buf;
        int checksumStart;
        int dataStart;
        int dataPos;
        int checksumPos;
        private static final long HEART_BEAT_SEQNO = -1L;

        Packet() {
            this.offsetInBlock = 0L;
            this.seqno = -1L;
            this.buffer = null;
            int packetSize = PacketHeader.PKT_HEADER_LEN + 4;
            this.buf = new byte[packetSize];
            this.checksumPos = this.checksumStart = (this.dataStart = packetSize);
            this.dataPos = this.dataStart;
            this.maxChunks = 0;
        }

        Packet(int pktSize, int chunksPerPkt, long offsetInBlock) {
            this.offsetInBlock = offsetInBlock;
            this.seqno = DFSOutputStream.this.currentSeqno;
            DFSOutputStream.this.currentSeqno++;
            this.buffer = null;
            this.buf = new byte[pktSize];
            this.checksumPos = this.checksumStart = PacketHeader.PKT_HEADER_LEN;
            this.dataPos = this.dataStart = this.checksumStart + chunksPerPkt * DFSOutputStream.this.checksum.getChecksumSize();
            this.maxChunks = chunksPerPkt;
        }

        void writeData(byte[] inarray, int off, int len) {
            if (this.dataPos + len > this.buf.length) {
                throw new BufferOverflowException();
            }
            System.arraycopy(inarray, off, this.buf, this.dataPos, len);
            this.dataPos += len;
        }

        void writeChecksum(byte[] inarray, int off, int len) {
            if (this.checksumPos + len > this.dataStart) {
                throw new BufferOverflowException();
            }
            System.arraycopy(inarray, off, this.buf, this.checksumPos, len);
            this.checksumPos += len;
        }

        ByteBuffer getBuffer() {
            if (this.buffer != null) {
                return this.buffer;
            }
            int dataLen = this.dataPos - this.dataStart;
            int checksumLen = this.checksumPos - this.checksumStart;
            if (this.checksumPos != this.dataStart) {
                System.arraycopy(this.buf, this.checksumStart, this.buf, this.dataStart - checksumLen, checksumLen);
            }
            int pktLen = 4 + dataLen + checksumLen;
            this.buffer = ByteBuffer.wrap(this.buf, this.dataStart - this.checksumPos, PacketHeader.PKT_HEADER_LEN + pktLen - 4);
            this.buf = null;
            this.buffer.mark();
            PacketHeader header = new PacketHeader(pktLen, this.offsetInBlock, this.seqno, this.lastPacketInBlock, dataLen);
            header.putInBuffer(this.buffer);
            this.buffer.reset();
            return this.buffer;
        }

        long getLastByteOffsetBlock() {
            return this.offsetInBlock + (long)this.dataPos - (long)this.dataStart;
        }

        private boolean isHeartbeatPacket() {
            return this.seqno == -1L;
        }

        public String toString() {
            return "packet seqno:" + this.seqno + " offsetInBlock:" + this.offsetInBlock + " lastPacketInBlock:" + this.lastPacketInBlock + " lastByteOffsetInBlock: " + this.getLastByteOffsetBlock();
        }
    }
}

