/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hdfs.server.datanode;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Maps;
import java.io.Closeable;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketTimeoutException;
import java.net.URI;
import java.util.Collection;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException;
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
import org.apache.hadoop.hdfs.server.common.IncorrectVersionException;
import org.apache.hadoop.hdfs.server.common.Util;
import org.apache.hadoop.hdfs.server.datanode.BPOfferService;
import org.apache.hadoop.hdfs.server.datanode.DNConf;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.hdfs.server.protocol.DisallowedDatanodeException;
import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.VersionInfo;
import org.apache.hadoop.util.VersionUtil;

@InterfaceAudience.Private
class BPServiceActor
implements Runnable {
    static final Log LOG = DataNode.LOG;
    final InetSocketAddress nnAddr;
    BPOfferService bpos;
    long lastBlockReport = 0L;
    long lastDeletedReport = 0L;
    boolean resetBlockReportTime = true;
    Thread bpThread;
    DatanodeProtocolClientSideTranslatorPB bpNamenode;
    private long lastHeartbeat = 0L;
    private volatile boolean initialized = false;
    private final Map<Long, ReceivedDeletedBlockInfo> pendingIncrementalBR = Maps.newHashMap();
    private volatile int pendingReceivedRequests = 0;
    private volatile boolean shouldServiceRun = true;
    private final DataNode dn;
    private final DNConf dnConf;
    private DatanodeRegistration bpRegistration;

    BPServiceActor(InetSocketAddress nnAddr, BPOfferService bpos) {
        this.bpos = bpos;
        this.dn = bpos.getDataNode();
        this.nnAddr = nnAddr;
        this.dnConf = this.dn.getDnConf();
    }

    boolean isInitialized() {
        return this.initialized;
    }

    boolean isAlive() {
        return this.shouldServiceRun && this.bpThread.isAlive();
    }

    public String toString() {
        return this.bpos.toString() + " service to " + this.nnAddr;
    }

    InetSocketAddress getNNSocketAddress() {
        return this.nnAddr;
    }

    @VisibleForTesting
    void setNameNode(DatanodeProtocolClientSideTranslatorPB dnProtocol) {
        this.bpNamenode = dnProtocol;
    }

    @VisibleForTesting
    DatanodeProtocolClientSideTranslatorPB getNameNodeProxy() {
        return this.bpNamenode;
    }

    @VisibleForTesting
    NamespaceInfo retrieveNamespaceInfo() throws IOException {
        NamespaceInfo nsInfo = null;
        while (this.shouldRun()) {
            try {
                nsInfo = this.bpNamenode.versionRequest();
                LOG.debug((Object)(this + " received versionRequest response: " + nsInfo));
                break;
            }
            catch (SocketTimeoutException e) {
                LOG.warn((Object)("Problem connecting to server: " + this.nnAddr));
            }
            catch (IOException e) {
                LOG.warn((Object)("Problem connecting to server: " + this.nnAddr));
            }
            this.sleepAndLogInterrupts(5000, "requesting version info from NN");
        }
        if (nsInfo == null) {
            throw new IOException("DN shut down before block pool connected");
        }
        this.checkNNVersion(nsInfo);
        return nsInfo;
    }

    private void checkNNVersion(NamespaceInfo nsInfo) throws IncorrectVersionException {
        String minimumNameNodeVersion;
        String nnVersion = nsInfo.getSoftwareVersion();
        if (VersionUtil.compareVersions((String)nnVersion, (String)(minimumNameNodeVersion = this.dnConf.getMinimumNameNodeVersion())) < 0) {
            IncorrectVersionException ive = new IncorrectVersionException(minimumNameNodeVersion, nnVersion, "NameNode", "DataNode");
            LOG.warn((Object)ive.getMessage());
            throw ive;
        }
        String dnVersion = VersionInfo.getVersion();
        if (!nnVersion.equals(dnVersion)) {
            LOG.info((Object)("Reported NameNode version '" + nnVersion + "' does not match " + "DataNode version '" + dnVersion + "' but is within acceptable " + "limits. Note: This is normal during a rolling upgrade."));
        }
        if (HdfsConstants.LAYOUT_VERSION != nsInfo.getLayoutVersion()) {
            LOG.warn((Object)("DataNode and NameNode layout versions must be the same. Expected: " + HdfsConstants.LAYOUT_VERSION + " actual " + nsInfo.getLayoutVersion()));
            throw new IncorrectVersionException(nsInfo.getLayoutVersion(), "namenode");
        }
    }

    private void connectToNNAndHandshake() throws IOException {
        this.bpNamenode = this.dn.connectToNN(this.nnAddr);
        NamespaceInfo nsInfo = this.retrieveNamespaceInfo();
        this.bpos.verifyAndSetNamespaceInfo(nsInfo);
        this.register();
    }

    void scheduleBlockReport(long delay) {
        this.lastBlockReport = delay > 0L ? System.currentTimeMillis() - (this.dnConf.blockReportInterval - (long)DFSUtil.getRandom().nextInt((int)delay)) : this.lastHeartbeat - this.dnConf.blockReportInterval;
        this.resetBlockReportTime = true;
    }

    void reportBadBlocks(ExtendedBlock block) {
        DatanodeInfo[] dnArr = new DatanodeInfo[]{new DatanodeInfo(this.bpRegistration)};
        LocatedBlock[] blocks = new LocatedBlock[]{new LocatedBlock(block, dnArr)};
        try {
            this.bpNamenode.reportBadBlocks(blocks);
        }
        catch (IOException e) {
            LOG.warn((Object)("Failed to report bad block " + block + " to namenode : " + " Exception"), (Throwable)e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void reportReceivedDeletedBlocks() throws IOException {
        ReceivedDeletedBlockInfo[] receivedAndDeletedBlockArray = null;
        Map<Long, ReceivedDeletedBlockInfo> map = this.pendingIncrementalBR;
        synchronized (map) {
            int numBlocks = this.pendingIncrementalBR.size();
            if (numBlocks > 0) {
                receivedAndDeletedBlockArray = this.pendingIncrementalBR.values().toArray(new ReceivedDeletedBlockInfo[numBlocks]);
            }
            this.pendingIncrementalBR.clear();
        }
        if (receivedAndDeletedBlockArray != null) {
            StorageReceivedDeletedBlocks[] report = new StorageReceivedDeletedBlocks[]{new StorageReceivedDeletedBlocks(this.bpRegistration.getStorageID(), receivedAndDeletedBlockArray)};
            boolean success = false;
            try {
                this.bpNamenode.blockReceivedAndDeleted(this.bpRegistration, this.bpos.getBlockPoolId(), report);
                success = true;
            }
            finally {
                Map<Long, ReceivedDeletedBlockInfo> map2 = this.pendingIncrementalBR;
                synchronized (map2) {
                    if (!success) {
                        for (ReceivedDeletedBlockInfo rdbi : receivedAndDeletedBlockArray) {
                            if (this.pendingIncrementalBR.containsKey(rdbi.getBlock().getBlockId())) continue;
                            this.pendingIncrementalBR.put(rdbi.getBlock().getBlockId(), rdbi);
                        }
                    }
                    this.pendingReceivedRequests = this.pendingIncrementalBR.size();
                }
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void notifyNamenodeBlockImmediately(ReceivedDeletedBlockInfo bInfo) {
        Map<Long, ReceivedDeletedBlockInfo> map = this.pendingIncrementalBR;
        synchronized (map) {
            this.pendingIncrementalBR.put(bInfo.getBlock().getBlockId(), bInfo);
            ++this.pendingReceivedRequests;
            this.pendingIncrementalBR.notifyAll();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void notifyNamenodeDeletedBlock(ReceivedDeletedBlockInfo bInfo) {
        Map<Long, ReceivedDeletedBlockInfo> map = this.pendingIncrementalBR;
        synchronized (map) {
            this.pendingIncrementalBR.put(bInfo.getBlock().getBlockId(), bInfo);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @VisibleForTesting
    void triggerBlockReportForTests() throws IOException {
        Map<Long, ReceivedDeletedBlockInfo> map = this.pendingIncrementalBR;
        synchronized (map) {
            this.lastBlockReport = 0L;
            this.lastHeartbeat = 0L;
            this.pendingIncrementalBR.notifyAll();
            while (this.lastBlockReport == 0L) {
                try {
                    this.pendingIncrementalBR.wait(100L);
                }
                catch (InterruptedException e) {
                    return;
                }
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @VisibleForTesting
    void triggerHeartbeatForTests() throws IOException {
        Map<Long, ReceivedDeletedBlockInfo> map = this.pendingIncrementalBR;
        synchronized (map) {
            this.lastHeartbeat = 0L;
            this.pendingIncrementalBR.notifyAll();
            while (this.lastHeartbeat == 0L) {
                try {
                    this.pendingIncrementalBR.wait(100L);
                }
                catch (InterruptedException e) {
                    return;
                }
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @VisibleForTesting
    void triggerDeletionReportForTests() throws IOException {
        Map<Long, ReceivedDeletedBlockInfo> map = this.pendingIncrementalBR;
        synchronized (map) {
            this.lastDeletedReport = 0L;
            this.pendingIncrementalBR.notifyAll();
            while (this.lastDeletedReport == 0L) {
                try {
                    this.pendingIncrementalBR.wait(100L);
                }
                catch (InterruptedException e) {
                    return;
                }
            }
        }
    }

    DatanodeCommand blockReport() throws IOException {
        DatanodeCommand cmd = null;
        long startTime = Util.now();
        if (startTime - this.lastBlockReport > this.dnConf.blockReportInterval) {
            this.reportReceivedDeletedBlocks();
            long brCreateStartTime = Util.now();
            BlockListAsLongs bReport = this.dn.getFSDataset().getBlockReport(this.bpos.getBlockPoolId());
            long brSendStartTime = Util.now();
            StorageBlockReport[] report = new StorageBlockReport[]{new StorageBlockReport(new DatanodeStorage(this.bpRegistration.getStorageID()), bReport.getBlockListAsLongs())};
            cmd = this.bpNamenode.blockReport(this.bpRegistration, this.bpos.getBlockPoolId(), report);
            long brSendCost = Util.now() - brSendStartTime;
            long brCreateCost = brSendStartTime - brCreateStartTime;
            this.dn.getMetrics().addBlockReport(brSendCost);
            LOG.info((Object)("BlockReport of " + bReport.getNumberOfBlocks() + " blocks took " + brCreateCost + " msec to generate and " + brSendCost + " msecs for RPC and NN processing"));
            if (this.resetBlockReportTime) {
                this.lastBlockReport = startTime - (long)DFSUtil.getRandom().nextInt((int)this.dnConf.blockReportInterval);
                this.resetBlockReportTime = false;
            } else {
                this.lastBlockReport += (Util.now() - this.lastBlockReport) / this.dnConf.blockReportInterval * this.dnConf.blockReportInterval;
            }
            LOG.info((Object)("sent block report, processed command:" + cmd));
        }
        return cmd;
    }

    HeartbeatResponse sendHeartBeat() throws IOException {
        if (LOG.isDebugEnabled()) {
            LOG.debug((Object)("Sending heartbeat from service actor: " + this));
        }
        StorageReport[] report = new StorageReport[]{new StorageReport(this.bpRegistration.getStorageID(), false, this.dn.getFSDataset().getCapacity(), this.dn.getFSDataset().getDfsUsed(), this.dn.getFSDataset().getRemaining(), this.dn.getFSDataset().getBlockPoolUsed(this.bpos.getBlockPoolId()))};
        return this.bpNamenode.sendHeartbeat(this.bpRegistration, report, this.dn.getXmitsInProgress(), this.dn.getXceiverCount(), this.dn.getFSDataset().getNumFailedVolumes());
    }

    void start() {
        if (this.bpThread != null && this.bpThread.isAlive()) {
            return;
        }
        this.bpThread = new Thread((Runnable)this, this.formatThreadName());
        this.bpThread.setDaemon(true);
        this.bpThread.start();
    }

    private String formatThreadName() {
        Collection<URI> dataDirs = DataNode.getStorageDirs(this.dn.getConf());
        return "DataNode: [" + StringUtils.uriToString((URI[])dataDirs.toArray(new URI[0])) + "] " + " heartbeating to " + this.nnAddr;
    }

    void stop() {
        this.shouldServiceRun = false;
        if (this.bpThread != null) {
            this.bpThread.interrupt();
        }
    }

    void join() {
        try {
            if (this.bpThread != null) {
                this.bpThread.join();
            }
        }
        catch (InterruptedException interruptedException) {
            // empty catch block
        }
    }

    private synchronized void cleanUp() {
        this.shouldServiceRun = false;
        IOUtils.cleanup((Log)LOG, (Closeable[])new Closeable[]{this.bpNamenode});
        this.bpos.shutdownActor(this);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void offerService() throws Exception {
        LOG.info((Object)("For namenode " + this.nnAddr + " using DELETEREPORT_INTERVAL of " + this.dnConf.deleteReportInterval + " msec " + " BLOCKREPORT_INTERVAL of " + this.dnConf.blockReportInterval + "msec" + " Initial delay: " + this.dnConf.initialBlockReportDelay + "msec" + "; heartBeatInterval=" + this.dnConf.heartBeatInterval));
        while (this.shouldRun()) {
            try {
                long startTime = Util.now();
                if (startTime - this.lastHeartbeat > this.dnConf.heartBeatInterval) {
                    this.lastHeartbeat = startTime;
                    if (!this.dn.areHeartbeatsDisabledForTests()) {
                        HeartbeatResponse resp = this.sendHeartBeat();
                        assert (resp != null);
                        this.dn.getMetrics().addHeartbeat(Util.now() - startTime);
                        this.bpos.updateActorStatesFromHeartbeat(this, resp.getNameNodeHaState());
                        long startProcessCommands = Util.now();
                        if (!this.processCommand(resp.getCommands())) continue;
                        long endProcessCommands = Util.now();
                        if (endProcessCommands - startProcessCommands > 2000L) {
                            LOG.info((Object)("Took " + (endProcessCommands - startProcessCommands) + "ms to process " + resp.getCommands().length + " commands from NN"));
                        }
                    }
                }
                if (this.pendingReceivedRequests > 0 || startTime - this.lastDeletedReport > this.dnConf.deleteReportInterval) {
                    this.reportReceivedDeletedBlocks();
                    this.lastDeletedReport = startTime;
                }
                DatanodeCommand cmd = this.blockReport();
                this.processCommand(new DatanodeCommand[]{cmd});
                if (this.dn.blockScanner != null) {
                    this.dn.blockScanner.addBlockPool(this.bpos.getBlockPoolId());
                }
                long waitTime = this.dnConf.heartBeatInterval - (System.currentTimeMillis() - this.lastHeartbeat);
                Map<Long, ReceivedDeletedBlockInfo> map = this.pendingIncrementalBR;
                synchronized (map) {
                    if (waitTime > 0L && this.pendingReceivedRequests == 0) {
                        try {
                            this.pendingIncrementalBR.wait(waitTime);
                        }
                        catch (InterruptedException ie) {
                            LOG.warn((Object)("BPOfferService for " + this + " interrupted"));
                        }
                    }
                }
            }
            catch (RemoteException re) {
                String reClass = re.getClassName();
                if (UnregisteredNodeException.class.getName().equals(reClass) || DisallowedDatanodeException.class.getName().equals(reClass) || IncorrectVersionException.class.getName().equals(reClass)) {
                    LOG.warn((Object)(this + " is shutting down"), (Throwable)re);
                    this.shouldServiceRun = false;
                    return;
                }
                LOG.warn((Object)"RemoteException in offerService", (Throwable)re);
                try {
                    long sleepTime = Math.min(1000L, this.dnConf.heartBeatInterval);
                    Thread.sleep(sleepTime);
                }
                catch (InterruptedException ie) {
                    Thread.currentThread().interrupt();
                }
            }
            catch (IOException e) {
                LOG.warn((Object)"IOException in offerService", (Throwable)e);
            }
        }
    }

    void register() throws IOException {
        this.bpRegistration = this.bpos.createRegistration();
        LOG.info((Object)(this + " beginning handshake with NN"));
        while (this.shouldRun()) {
            try {
                this.bpRegistration = this.bpNamenode.registerDatanode(this.bpRegistration);
                break;
            }
            catch (SocketTimeoutException e) {
                LOG.info((Object)("Problem connecting to server: " + this.nnAddr));
                this.sleepAndLogInterrupts(1000, "connecting to server");
            }
        }
        LOG.info((Object)("Block pool " + this + " successfully registered with NN"));
        this.bpos.registrationSucceeded(this, this.bpRegistration);
        this.scheduleBlockReport(this.dnConf.initialBlockReportDelay);
    }

    private void sleepAndLogInterrupts(int millis, String stateString) {
        try {
            Thread.sleep(millis);
        }
        catch (InterruptedException ie) {
            LOG.info((Object)("BPOfferService " + this + " interrupted while " + stateString));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void run() {
        LOG.info((Object)(this + " starting to offer service"));
        try {
            try {
                this.connectToNNAndHandshake();
            }
            catch (IOException ioe) {
                LOG.fatal((Object)("Initialization failed for block pool " + this), (Throwable)ioe);
                LOG.warn((Object)("Ending block pool service for: " + this));
                this.cleanUp();
                return;
            }
            this.initialized = true;
            while (this.shouldRun()) {
                try {
                    this.bpos.startDistributedUpgradeIfNeeded();
                    this.offerService();
                }
                catch (Exception ex) {
                    LOG.error((Object)("Exception in BPOfferService for " + this), (Throwable)ex);
                    this.sleepAndLogInterrupts(5000, "offering service");
                }
            }
        }
        catch (Throwable ex) {
            LOG.warn((Object)("Unexpected exception in block pool " + this), ex);
        }
        finally {
            LOG.warn((Object)("Ending block pool service for: " + this));
            this.cleanUp();
        }
    }

    private boolean shouldRun() {
        return this.shouldServiceRun && this.dn.shouldRun();
    }

    boolean processCommand(DatanodeCommand[] cmds) {
        if (cmds != null) {
            for (DatanodeCommand cmd : cmds) {
                try {
                    if (!this.bpos.processCommandFromActor(cmd, this)) {
                        return false;
                    }
                }
                catch (IOException ioe) {
                    LOG.warn((Object)"Error processing datanode Command", (Throwable)ioe);
                }
            }
        }
        return true;
    }

    void trySendErrorReport(int errCode, String errMsg) {
        try {
            this.bpNamenode.errorReport(this.bpRegistration, errCode, errMsg);
        }
        catch (IOException e) {
            LOG.warn((Object)("Error reporting an error to NameNode " + this.nnAddr), (Throwable)e);
        }
    }

    void reportRemoteBadBlock(DatanodeInfo dnInfo, ExtendedBlock block) throws IOException {
        LocatedBlock lb = new LocatedBlock(block, new DatanodeInfo[]{dnInfo});
        this.bpNamenode.reportBadBlocks(new LocatedBlock[]{lb});
    }

    void reRegister() throws IOException {
        if (this.shouldRun()) {
            this.retrieveNamespaceInfo();
            this.register();
        }
    }
}

