/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hdfs.server.datanode;

import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.Closeable;
import java.io.DataInput;
import java.io.DataInputStream;
import java.io.DataOutput;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketAddress;
import java.net.SocketException;
import org.apache.commons.logging.Log;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.DataTransferProtocol;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.FSConstants;
import org.apache.hadoop.hdfs.security.AccessTokenHandler;
import org.apache.hadoop.hdfs.security.BlockAccessToken;
import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
import org.apache.hadoop.hdfs.server.datanode.BlockReceiver;
import org.apache.hadoop.hdfs.server.datanode.BlockSender;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataXceiverServer;
import org.apache.hadoop.hdfs.server.datanode.FSDatasetInterface;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.MD5Hash;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.metrics.util.MetricsTimeVaryingInt;
import org.apache.hadoop.metrics.util.MetricsTimeVaryingRate;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.util.DataChecksum;
import org.apache.hadoop.util.StringUtils;

class DataXceiver
extends DataTransferProtocol.Receiver
implements Runnable,
FSConstants {
    public static final Log LOG = DataNode.LOG;
    static final Log ClientTraceLog = DataNode.ClientTraceLog;
    private final Socket s;
    private final boolean isLocal;
    private final String remoteAddress;
    private final String localAddress;
    private final DataNode datanode;
    private final DataXceiverServer dataXceiverServer;
    private long opStartTime;

    public DataXceiver(Socket s, DataNode datanode, DataXceiverServer dataXceiverServer) {
        this.s = s;
        this.isLocal = s.getInetAddress().equals(s.getLocalAddress());
        this.datanode = datanode;
        this.dataXceiverServer = dataXceiverServer;
        dataXceiverServer.childSockets.put(s, s);
        this.remoteAddress = s.getRemoteSocketAddress().toString();
        this.localAddress = s.getLocalSocketAddress().toString();
        if (LOG.isDebugEnabled()) {
            LOG.debug((Object)("Number of active connections is: " + datanode.getXceiverCount()));
        }
    }

    DataNode getDataNode() {
        return this.datanode;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void run() {
        DataInputStream in = null;
        try {
            in = new DataInputStream(new BufferedInputStream(NetUtils.getInputStream((Socket)this.s), SMALL_BUFFER_SIZE));
            DataTransferProtocol.Op op = this.readOp(in);
            int curXceiverCount = this.datanode.getXceiverCount();
            if (curXceiverCount > this.dataXceiverServer.maxXceiverCount) {
                throw new IOException("xceiverCount " + curXceiverCount + " exceeds the limit of concurrent xcievers " + this.dataXceiverServer.maxXceiverCount);
            }
            this.opStartTime = DataNode.now();
            this.processOp(op, in);
        }
        catch (Throwable t) {
            try {
                LOG.error((Object)(this.datanode.dnRegistration + ":DataXceiver"), t);
            }
            catch (Throwable throwable) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug((Object)(this.datanode.dnRegistration + ":Number of active connections is: " + this.datanode.getXceiverCount()));
                }
                IOUtils.closeStream(in);
                IOUtils.closeSocket((Socket)this.s);
                this.dataXceiverServer.childSockets.remove(this.s);
                throw throwable;
            }
            if (LOG.isDebugEnabled()) {
                LOG.debug((Object)(this.datanode.dnRegistration + ":Number of active connections is: " + this.datanode.getXceiverCount()));
            }
            IOUtils.closeStream((Closeable)in);
            IOUtils.closeSocket((Socket)this.s);
            this.dataXceiverServer.childSockets.remove(this.s);
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug((Object)(this.datanode.dnRegistration + ":Number of active connections is: " + this.datanode.getXceiverCount()));
        }
        IOUtils.closeStream((Closeable)in);
        IOUtils.closeSocket((Socket)this.s);
        this.dataXceiverServer.childSockets.remove(this.s);
    }

    /*
     * Loose catch block
     */
    @Override
    protected void opReadBlock(DataInputStream in, long blockId, long blockGs, long startOffset, long length, String clientName, BlockAccessToken accessToken) throws IOException {
        block16: {
            Block block = new Block(blockId, 0L, blockGs);
            OutputStream baseStream = NetUtils.getOutputStream((Socket)this.s, (long)this.datanode.socketWriteTimeout);
            DataOutputStream out = new DataOutputStream(new BufferedOutputStream(baseStream, SMALL_BUFFER_SIZE));
            if (this.datanode.isAccessTokenEnabled && !this.datanode.accessTokenHandler.checkAccess(accessToken, null, blockId, AccessTokenHandler.AccessMode.READ)) {
                try {
                    DataTransferProtocol.Status.ERROR_ACCESS_TOKEN.write(out);
                    out.flush();
                    throw new IOException("Access token verification failed, for client " + this.remoteAddress + " for OP_READ_BLOCK for block " + block);
                }
                catch (Throwable throwable) {
                    IOUtils.closeStream((Closeable)out);
                    throw throwable;
                }
            }
            BlockSender blockSender = null;
            String clientTraceFmt = clientName.length() > 0 && ClientTraceLog.isInfoEnabled() ? String.format("src: %s, dest: %s, bytes: %s, op: %s, cliID: %s, offset: %s, srvID: %s, blockid: %s, duration: %s", this.localAddress, this.remoteAddress, "%d", "HDFS_READ", clientName, "%d", this.datanode.dnRegistration.getStorageID(), block, "%d") : this.datanode.dnRegistration + " Served block " + block + " to " + this.s.getInetAddress();
            try {
                try {
                    blockSender = new BlockSender(block, startOffset, length, true, true, false, this.datanode, clientTraceFmt);
                }
                catch (IOException e) {
                    DataTransferProtocol.Status.ERROR.write(out);
                    throw e;
                }
                DataTransferProtocol.Status.SUCCESS.write(out);
                long read = blockSender.sendBlock(out, baseStream, null);
                if (blockSender.isBlockReadFully()) {
                    try {
                        if (DataTransferProtocol.Status.read(in) == DataTransferProtocol.Status.CHECKSUM_OK && this.datanode.blockScanner != null) {
                            this.datanode.blockScanner.verifiedByClient(block);
                        }
                    }
                    catch (IOException ignored) {
                        // empty catch block
                    }
                }
                this.datanode.myMetrics.bytesRead.inc((long)((int)read));
                this.datanode.myMetrics.blocksRead.inc();
                IOUtils.closeStream((Closeable)out);
            }
            catch (SocketException ignored) {
                this.datanode.myMetrics.blocksRead.inc();
                break block16;
            }
            catch (IOException ioe) {
                LOG.warn((Object)(this.datanode.dnRegistration + ":Got exception while serving " + block + " to " + this.s.getInetAddress() + ":\n" + StringUtils.stringifyException((Throwable)ioe)));
                throw ioe;
                {
                    catch (Throwable throwable) {
                        throw throwable;
                    }
                }
            }
            finally {
                IOUtils.closeStream((Closeable)out);
                IOUtils.closeStream(blockSender);
            }
            IOUtils.closeStream((Closeable)blockSender);
        }
        this.updateDuration(this.datanode.myMetrics.readBlockOp);
        this.updateCounter(this.datanode.myMetrics.readsFromLocalClient, this.datanode.myMetrics.readsFromRemoteClient);
    }

    @Override
    protected void opWriteBlock(DataInputStream in, long blockId, long blockGs, int pipelineSize, DataTransferProtocol.BlockConstructionStage stage, long newGs, long minBytesRcvd, long maxBytesRcvd, String client, DatanodeInfo srcDataNode, DatanodeInfo[] targets, BlockAccessToken accessToken) throws IOException {
        if (LOG.isDebugEnabled()) {
            LOG.debug((Object)("writeBlock receive buf size " + this.s.getReceiveBufferSize() + " tcp no delay " + this.s.getTcpNoDelay()));
        }
        Block block = new Block(blockId, this.dataXceiverServer.estimateBlockSize, blockGs);
        LOG.info((Object)("Receiving block " + block + " src: " + this.remoteAddress + " dest: " + this.localAddress));
        DataOutputStream replyOut = null;
        replyOut = new DataOutputStream(NetUtils.getOutputStream((Socket)this.s, (long)this.datanode.socketWriteTimeout));
        if (this.datanode.isAccessTokenEnabled && !this.datanode.accessTokenHandler.checkAccess(accessToken, null, block.getBlockId(), AccessTokenHandler.AccessMode.WRITE)) {
            try {
                if (client.length() != 0) {
                    DataTransferProtocol.Status.ERROR_ACCESS_TOKEN.write(replyOut);
                    Text.writeString((DataOutput)replyOut, (String)this.datanode.dnRegistration.getName());
                    replyOut.flush();
                }
                throw new IOException("Access token verification failed, for client " + this.remoteAddress + " for OP_WRITE_BLOCK for block " + block);
            }
            catch (Throwable throwable) {
                IOUtils.closeStream((Closeable)replyOut);
                throw throwable;
            }
        }
        DataOutputStream mirrorOut = null;
        DataInputStream mirrorIn = null;
        Socket mirrorSock = null;
        BlockReceiver blockReceiver = null;
        String mirrorNode = null;
        String firstBadLink = "";
        DataTransferProtocol.Status mirrorInStatus = DataTransferProtocol.Status.SUCCESS;
        try {
            if (client.length() == 0 || stage != DataTransferProtocol.BlockConstructionStage.PIPELINE_CLOSE_RECOVERY) {
                blockReceiver = new BlockReceiver(block, in, this.s.getRemoteSocketAddress().toString(), this.s.getLocalSocketAddress().toString(), stage, newGs, minBytesRcvd, maxBytesRcvd, client, srcDataNode, this.datanode);
            } else {
                this.datanode.data.recoverClose(block, newGs, minBytesRcvd);
            }
            if (targets.length > 0) {
                InetSocketAddress mirrorTarget = null;
                mirrorNode = targets[0].getName();
                mirrorTarget = NetUtils.createSocketAddr((String)mirrorNode);
                mirrorSock = this.datanode.newSocket();
                try {
                    int timeoutValue = this.datanode.socketTimeout + 5000 * targets.length;
                    int writeTimeout = this.datanode.socketWriteTimeout + 5000 * targets.length;
                    NetUtils.connect((Socket)mirrorSock, (SocketAddress)mirrorTarget, (int)timeoutValue);
                    mirrorSock.setSoTimeout(timeoutValue);
                    mirrorSock.setSendBufferSize(131072);
                    mirrorOut = new DataOutputStream(new BufferedOutputStream(NetUtils.getOutputStream((Socket)mirrorSock, (long)writeTimeout), SMALL_BUFFER_SIZE));
                    mirrorIn = new DataInputStream(NetUtils.getInputStream((Socket)mirrorSock));
                    DataTransferProtocol.Sender.opWriteBlock(mirrorOut, blockId, blockGs, pipelineSize, stage, newGs, minBytesRcvd, maxBytesRcvd, client, srcDataNode, targets, accessToken);
                    if (blockReceiver != null) {
                        blockReceiver.writeChecksumHeader(mirrorOut);
                    }
                    mirrorOut.flush();
                    if (client.length() != 0) {
                        mirrorInStatus = DataTransferProtocol.Status.read(mirrorIn);
                        firstBadLink = Text.readString((DataInput)mirrorIn);
                        if (LOG.isDebugEnabled() || mirrorInStatus != DataTransferProtocol.Status.SUCCESS) {
                            LOG.info((Object)("Datanode " + targets.length + " got response for connect ack " + " from downstream datanode with firstbadlink as " + firstBadLink));
                        }
                    }
                }
                catch (IOException e) {
                    if (client.length() != 0) {
                        DataTransferProtocol.Status.ERROR.write(replyOut);
                        Text.writeString((DataOutput)replyOut, (String)mirrorNode);
                        replyOut.flush();
                    }
                    IOUtils.closeStream(mirrorOut);
                    mirrorOut = null;
                    IOUtils.closeStream(mirrorIn);
                    mirrorIn = null;
                    IOUtils.closeSocket((Socket)mirrorSock);
                    mirrorSock = null;
                    if (client.length() > 0) {
                        throw e;
                    }
                    LOG.info((Object)(this.datanode.dnRegistration + ":Exception transfering block " + block + " to mirror " + mirrorNode + ". continuing without the mirror.\n" + StringUtils.stringifyException((Throwable)e)));
                }
            }
            if (client.length() != 0) {
                if (LOG.isDebugEnabled() || mirrorInStatus != DataTransferProtocol.Status.SUCCESS) {
                    LOG.info((Object)("Datanode " + targets.length + " forwarding connect ack to upstream firstbadlink is " + firstBadLink));
                }
                mirrorInStatus.write(replyOut);
                Text.writeString((DataOutput)replyOut, (String)firstBadLink);
                replyOut.flush();
            }
            if (blockReceiver != null) {
                String mirrorAddr = mirrorSock == null ? null : mirrorNode;
                blockReceiver.receiveBlock(mirrorOut, mirrorIn, replyOut, mirrorAddr, null, targets.length);
            }
            if (client.length() != 0 && stage == DataTransferProtocol.BlockConstructionStage.PIPELINE_CLOSE_RECOVERY) {
                block.setGenerationStamp(newGs);
                block.setNumBytes(minBytesRcvd);
            }
            if (client.length() == 0 || stage == DataTransferProtocol.BlockConstructionStage.PIPELINE_CLOSE_RECOVERY) {
                this.datanode.closeBlock(block, "");
                LOG.info((Object)("Received block " + block + " src: " + this.remoteAddress + " dest: " + this.localAddress + " of size " + block.getNumBytes()));
            }
        }
        catch (IOException ioe) {
            try {
                LOG.info((Object)("writeBlock " + block + " received exception " + ioe));
                throw ioe;
            }
            catch (Throwable throwable) {
                IOUtils.closeStream(mirrorOut);
                IOUtils.closeStream(mirrorIn);
                IOUtils.closeStream((Closeable)replyOut);
                IOUtils.closeSocket(mirrorSock);
                IOUtils.closeStream(blockReceiver);
                throw throwable;
            }
        }
        IOUtils.closeStream(mirrorOut);
        IOUtils.closeStream(mirrorIn);
        IOUtils.closeStream((Closeable)replyOut);
        IOUtils.closeSocket((Socket)mirrorSock);
        IOUtils.closeStream((Closeable)blockReceiver);
        this.updateDuration(this.datanode.myMetrics.writeBlockOp);
        this.updateCounter(this.datanode.myMetrics.writesFromLocalClient, this.datanode.myMetrics.writesFromRemoteClient);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    protected void opBlockChecksum(DataInputStream in, long blockId, long blockGs, BlockAccessToken accessToken) throws IOException {
        Block block = new Block(blockId, 0L, blockGs);
        DataOutputStream out = new DataOutputStream(NetUtils.getOutputStream((Socket)this.s, (long)this.datanode.socketWriteTimeout));
        if (this.datanode.isAccessTokenEnabled && !this.datanode.accessTokenHandler.checkAccess(accessToken, null, block.getBlockId(), AccessTokenHandler.AccessMode.READ)) {
            try {
                DataTransferProtocol.Status.ERROR_ACCESS_TOKEN.write(out);
                out.flush();
                throw new IOException("Access token verification failed, for client " + this.remoteAddress + " for OP_BLOCK_CHECKSUM for block " + block);
            }
            catch (Throwable throwable) {
                IOUtils.closeStream((Closeable)out);
                throw throwable;
            }
        }
        FSDatasetInterface.MetaDataInputStream metadataIn = this.datanode.data.getMetaDataInputStream(block);
        DataInputStream checksumIn = new DataInputStream(new BufferedInputStream(metadataIn, BUFFER_SIZE));
        try {
            BlockMetadataHeader header = BlockMetadataHeader.readHeader(checksumIn);
            DataChecksum checksum = header.getChecksum();
            int bytesPerCRC = checksum.getBytesPerChecksum();
            long crcPerBlock = (metadataIn.getLength() - (long)BlockMetadataHeader.getHeaderSize()) / (long)checksum.getChecksumSize();
            MD5Hash md5 = MD5Hash.digest((InputStream)checksumIn);
            if (LOG.isDebugEnabled()) {
                LOG.debug((Object)("block=" + block + ", bytesPerCRC=" + bytesPerCRC + ", crcPerBlock=" + crcPerBlock + ", md5=" + md5));
            }
            DataTransferProtocol.Status.SUCCESS.write(out);
            out.writeInt(bytesPerCRC);
            out.writeLong(crcPerBlock);
            md5.write((DataOutput)out);
            out.flush();
        }
        finally {
            IOUtils.closeStream((Closeable)out);
            IOUtils.closeStream((Closeable)checksumIn);
            IOUtils.closeStream((Closeable)metadataIn);
        }
        this.updateDuration(this.datanode.myMetrics.blockChecksumOp);
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    @Override
    protected void opCopyBlock(DataInputStream in, long blockId, long blockGs, BlockAccessToken accessToken) throws IOException {
        DataOutputStream reply;
        BlockSender blockSender;
        block11: {
            Block block = new Block(blockId, 0L, blockGs);
            if (this.datanode.isAccessTokenEnabled && !this.datanode.accessTokenHandler.checkAccess(accessToken, null, blockId, AccessTokenHandler.AccessMode.COPY)) {
                LOG.warn((Object)("Invalid access token in request from " + this.remoteAddress + " for OP_COPY_BLOCK for block " + block));
                this.sendResponse(this.s, DataTransferProtocol.Status.ERROR_ACCESS_TOKEN, this.datanode.socketWriteTimeout);
                return;
            }
            if (!this.dataXceiverServer.balanceThrottler.acquire()) {
                LOG.info((Object)("Not able to copy block " + blockId + " to " + this.s.getRemoteSocketAddress() + " because threads quota is exceeded."));
                this.sendResponse(this.s, DataTransferProtocol.Status.ERROR, this.datanode.socketWriteTimeout);
                return;
            }
            blockSender = null;
            reply = null;
            boolean isOpSuccess = true;
            try {
                blockSender = new BlockSender(block, 0L, -1L, false, false, false, this.datanode);
                OutputStream baseStream = NetUtils.getOutputStream((Socket)this.s, (long)this.datanode.socketWriteTimeout);
                reply = new DataOutputStream(new BufferedOutputStream(baseStream, SMALL_BUFFER_SIZE));
                DataTransferProtocol.Status.SUCCESS.write(reply);
                long read = blockSender.sendBlock(reply, baseStream, this.dataXceiverServer.balanceThrottler);
                this.datanode.myMetrics.bytesRead.inc((long)((int)read));
                this.datanode.myMetrics.blocksRead.inc();
                LOG.info((Object)("Copied block " + block + " to " + this.s.getRemoteSocketAddress()));
                this.dataXceiverServer.balanceThrottler.release();
                if (!isOpSuccess) break block11;
            }
            catch (IOException ioe) {
                try {
                    isOpSuccess = false;
                    throw ioe;
                }
                catch (Throwable throwable) {
                    this.dataXceiverServer.balanceThrottler.release();
                    if (isOpSuccess) {
                        try {
                            reply.writeChar(100);
                        }
                        catch (IOException ignored) {
                            // empty catch block
                        }
                    }
                    IOUtils.closeStream(reply);
                    IOUtils.closeStream((Closeable)blockSender);
                    throw throwable;
                }
            }
            try {
                reply.writeChar(100);
            }
            catch (IOException ignored) {
                // empty catch block
            }
        }
        IOUtils.closeStream((Closeable)reply);
        IOUtils.closeStream((Closeable)blockSender);
        this.updateDuration(this.datanode.myMetrics.copyBlockOp);
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    @Override
    protected void opReplaceBlock(DataInputStream in, long blockId, long blockGs, String sourceID, DatanodeInfo proxySource, BlockAccessToken accessToken) throws IOException {
        DataInputStream proxyReply;
        BlockReceiver blockReceiver;
        DataTransferProtocol.Status opStatus;
        DataOutputStream proxyOut;
        block17: {
            Block block = new Block(blockId, this.dataXceiverServer.estimateBlockSize, blockGs);
            if (this.datanode.isAccessTokenEnabled && !this.datanode.accessTokenHandler.checkAccess(accessToken, null, blockId, AccessTokenHandler.AccessMode.REPLACE)) {
                LOG.warn((Object)("Invalid access token in request from " + this.remoteAddress + " for OP_REPLACE_BLOCK for block " + block));
                this.sendResponse(this.s, DataTransferProtocol.Status.ERROR_ACCESS_TOKEN, this.datanode.socketWriteTimeout);
                return;
            }
            if (!this.dataXceiverServer.balanceThrottler.acquire()) {
                LOG.warn((Object)("Not able to receive block " + blockId + " from " + this.s.getRemoteSocketAddress() + " because threads quota is exceeded."));
                this.sendResponse(this.s, DataTransferProtocol.Status.ERROR, this.datanode.socketWriteTimeout);
                return;
            }
            Socket proxySock = null;
            proxyOut = null;
            opStatus = DataTransferProtocol.Status.SUCCESS;
            blockReceiver = null;
            proxyReply = null;
            try {
                InetSocketAddress proxyAddr = NetUtils.createSocketAddr((String)proxySource.getName());
                proxySock = this.datanode.newSocket();
                NetUtils.connect((Socket)proxySock, (SocketAddress)proxyAddr, (int)this.datanode.socketTimeout);
                proxySock.setSoTimeout(this.datanode.socketTimeout);
                OutputStream baseStream = NetUtils.getOutputStream((Socket)proxySock, (long)this.datanode.socketWriteTimeout);
                proxyOut = new DataOutputStream(new BufferedOutputStream(baseStream, SMALL_BUFFER_SIZE));
                DataTransferProtocol.Sender.opCopyBlock(proxyOut, block.getBlockId(), block.getGenerationStamp(), accessToken);
                proxyReply = new DataInputStream(new BufferedInputStream(NetUtils.getInputStream((Socket)proxySock), BUFFER_SIZE));
                DataTransferProtocol.Status status = DataTransferProtocol.Status.read(proxyReply);
                if (status != DataTransferProtocol.Status.SUCCESS) {
                    if (status == DataTransferProtocol.Status.ERROR_ACCESS_TOKEN) {
                        throw new IOException("Copy block " + block + " from " + proxySock.getRemoteSocketAddress() + " failed due to access token error");
                    }
                    throw new IOException("Copy block " + block + " from " + proxySock.getRemoteSocketAddress() + " failed");
                }
                blockReceiver = new BlockReceiver(block, proxyReply, proxySock.getRemoteSocketAddress().toString(), proxySock.getLocalSocketAddress().toString(), null, 0L, 0L, 0L, "", null, this.datanode);
                blockReceiver.receiveBlock(null, null, null, null, this.dataXceiverServer.balanceThrottler, -1);
                this.datanode.notifyNamenodeReceivedBlock(block, sourceID);
                LOG.info((Object)("Moved block " + block + " from " + this.s.getRemoteSocketAddress()));
                if (opStatus != DataTransferProtocol.Status.SUCCESS) break block17;
            }
            catch (IOException ioe) {
                try {
                    opStatus = DataTransferProtocol.Status.ERROR;
                    throw ioe;
                }
                catch (Throwable throwable) {
                    if (opStatus == DataTransferProtocol.Status.SUCCESS) {
                        try {
                            proxyReply.readChar();
                        }
                        catch (IOException ignored) {
                            // empty catch block
                        }
                    }
                    this.dataXceiverServer.balanceThrottler.release();
                    try {
                        this.sendResponse(this.s, opStatus, this.datanode.socketWriteTimeout);
                    }
                    catch (IOException ioe2) {
                        LOG.warn((Object)("Error writing reply back to " + this.s.getRemoteSocketAddress()));
                    }
                    IOUtils.closeStream(proxyOut);
                    IOUtils.closeStream(blockReceiver);
                    IOUtils.closeStream((Closeable)proxyReply);
                    throw throwable;
                }
            }
            try {
                proxyReply.readChar();
            }
            catch (IOException ignored) {
                // empty catch block
            }
        }
        this.dataXceiverServer.balanceThrottler.release();
        try {
            this.sendResponse(this.s, opStatus, this.datanode.socketWriteTimeout);
        }
        catch (IOException ioe) {
            LOG.warn((Object)("Error writing reply back to " + this.s.getRemoteSocketAddress()));
        }
        IOUtils.closeStream((Closeable)proxyOut);
        IOUtils.closeStream((Closeable)blockReceiver);
        IOUtils.closeStream((Closeable)proxyReply);
        this.updateDuration(this.datanode.myMetrics.replaceBlockOp);
    }

    private void updateDuration(MetricsTimeVaryingRate mtvr) {
        mtvr.inc(DataNode.now() - this.opStartTime);
    }

    private void updateCounter(MetricsTimeVaryingInt localCounter, MetricsTimeVaryingInt remoteCounter) {
        (this.isLocal ? localCounter : remoteCounter).inc();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void sendResponse(Socket s, DataTransferProtocol.Status opStatus, long timeout) throws IOException {
        DataOutputStream reply = new DataOutputStream(NetUtils.getOutputStream((Socket)s, (long)timeout));
        try {
            opStatus.write(reply);
            reply.flush();
        }
        finally {
            IOUtils.closeStream((Closeable)reply);
        }
    }
}

