/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hdfs.server.blockmanagement;

import java.util.ArrayList;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockCollection;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguous;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
import org.apache.hadoop.hdfs.server.blockmanagement.BlocksMap;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
import org.apache.hadoop.hdfs.server.blockmanagement.PendingReplicationBlocks;
import org.apache.hadoop.hdfs.server.blockmanagement.UnderReplicatedBlocks;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.INodeFile;
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;

public class TestPendingReplication {
    static final int TIMEOUT = 3;
    private static final int DFS_REPLICATION_INTERVAL = 1;
    private static final int DATANODE_COUNT = 5;

    private BlockInfo genBlockInfo(long id, long length, long gs) {
        return new BlockInfoContiguous(new Block(id, length, gs), 5);
    }

    @Test
    public void testPendingReplication() {
        BlockInfo block;
        int i;
        PendingReplicationBlocks pendingReplications = new PendingReplicationBlocks(3000L);
        pendingReplications.start();
        DatanodeStorageInfo[] storages = DFSTestUtil.createDatanodeStorageInfos(10);
        for (int i2 = 0; i2 < storages.length; ++i2) {
            BlockInfo block2 = this.genBlockInfo(i2, i2, 0L);
            DatanodeStorageInfo[] targets = new DatanodeStorageInfo[i2];
            System.arraycopy(storages, 0, targets, 0, i2);
            pendingReplications.increment(block2, DatanodeStorageInfo.toDatanodeDescriptors((DatanodeStorageInfo[])targets));
        }
        Assert.assertEquals((String)"Size of pendingReplications ", (long)10L, (long)pendingReplications.size());
        BlockInfo blk = this.genBlockInfo(8L, 8L, 0L);
        pendingReplications.decrement(blk, storages[7].getDatanodeDescriptor());
        Assert.assertEquals((String)"pendingReplications.getNumReplicas ", (long)7L, (long)pendingReplications.getNumReplicas(blk));
        pendingReplications.increment(blk, new DatanodeDescriptor[]{storages[0].getDatanodeDescriptor()});
        Assert.assertEquals((String)"pendingReplications.getNumReplicas ", (long)7L, (long)pendingReplications.getNumReplicas(blk));
        for (i = 0; i < 7; ++i) {
            pendingReplications.decrement(blk, storages[i].getDatanodeDescriptor());
        }
        Assert.assertTrue((pendingReplications.size() == 9 ? 1 : 0) != 0);
        pendingReplications.increment(blk, DatanodeStorageInfo.toDatanodeDescriptors((DatanodeStorageInfo[])DFSTestUtil.createDatanodeStorageInfos(8)));
        Assert.assertTrue((pendingReplications.size() == 10 ? 1 : 0) != 0);
        for (i = 0; i < 10; ++i) {
            block = this.genBlockInfo(i, i, 0L);
            int numReplicas = pendingReplications.getNumReplicas(block);
            Assert.assertTrue((numReplicas == i ? 1 : 0) != 0);
        }
        Assert.assertNull((Object)pendingReplications.getTimedOutBlocks());
        Assert.assertEquals((long)0L, (long)pendingReplications.getNumTimedOuts());
        try {
            Thread.sleep(1000L);
        }
        catch (Exception i3) {
            // empty catch block
        }
        for (int i4 = 10; i4 < 15; ++i4) {
            block = this.genBlockInfo(i4, i4, 0L);
            pendingReplications.increment(block, DatanodeStorageInfo.toDatanodeDescriptors((DatanodeStorageInfo[])DFSTestUtil.createDatanodeStorageInfos(i4)));
        }
        Assert.assertEquals((long)15L, (long)pendingReplications.size());
        Assert.assertEquals((long)0L, (long)pendingReplications.getNumTimedOuts());
        int loop = 0;
        while (pendingReplications.size() > 0) {
            try {
                Thread.sleep(1000L);
            }
            catch (Exception block3) {
                // empty catch block
            }
            ++loop;
        }
        System.out.println("Had to wait for " + loop + " seconds for the lot to timeout");
        Assert.assertEquals((String)"Size of pendingReplications ", (long)0L, (long)pendingReplications.size());
        Assert.assertEquals((long)15L, (long)pendingReplications.getNumTimedOuts());
        BlockInfo[] timedOut = pendingReplications.getTimedOutBlocks();
        Assert.assertNotNull((Object)timedOut);
        Assert.assertEquals((long)15L, (long)timedOut.length);
        Assert.assertEquals((long)15L, (long)pendingReplications.getNumTimedOuts());
        for (BlockInfo block4 : timedOut) {
            Assert.assertTrue((block4.getBlockId() < 15L ? 1 : 0) != 0);
        }
        pendingReplications.stop();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testProcessPendingReplications() throws Exception {
        HdfsConfiguration conf = new HdfsConfiguration();
        conf.setLong("dfs.namenode.replication.pending.timeout-sec", 3L);
        MiniDFSCluster cluster = null;
        try {
            cluster = new MiniDFSCluster.Builder((Configuration)conf).numDataNodes(5).build();
            cluster.waitActive();
            FSNamesystem fsn = cluster.getNamesystem();
            BlockManager blkManager = fsn.getBlockManager();
            PendingReplicationBlocks pendingReplications = blkManager.pendingReplications;
            UnderReplicatedBlocks neededReplications = blkManager.neededReplications;
            BlocksMap blocksMap = blkManager.blocksMap;
            Block block = new Block(1L, 1L, 0L);
            BlockInfoContiguous blockInfo = new BlockInfoContiguous(block, 3);
            pendingReplications.increment((BlockInfo)blockInfo, DatanodeStorageInfo.toDatanodeDescriptors((DatanodeStorageInfo[])DFSTestUtil.createDatanodeStorageInfos(1)));
            BlockCollection bc = (BlockCollection)Mockito.mock(BlockCollection.class);
            blockInfo.setGenerationStamp(1L);
            blocksMap.addBlockCollection((BlockInfo)blockInfo, bc);
            Assert.assertEquals((String)"Size of pendingReplications ", (long)1L, (long)pendingReplications.size());
            block = new Block(2L, 2L, 0L);
            blockInfo = new BlockInfoContiguous(block, 3);
            pendingReplications.increment((BlockInfo)blockInfo, DatanodeStorageInfo.toDatanodeDescriptors((DatanodeStorageInfo[])DFSTestUtil.createDatanodeStorageInfos(1)));
            Assert.assertEquals((String)"Size of pendingReplications ", (long)2L, (long)pendingReplications.size());
            while (pendingReplications.size() > 0) {
                try {
                    Thread.sleep(100L);
                }
                catch (Exception exception) {}
            }
            while (neededReplications.size() == 0) {
                try {
                    Thread.sleep(100L);
                }
                catch (Exception exception) {}
            }
            for (Block b : neededReplications) {
                Assert.assertEquals((String)"Generation stamp is 1 ", (long)1L, (long)b.getGenerationStamp());
            }
            Assert.assertEquals((String)"size of neededReplications is 1 ", (long)1L, (long)neededReplications.size());
        }
        finally {
            if (cluster != null) {
                cluster.shutdown();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testBlockReceived() throws Exception {
        HdfsConfiguration conf = new HdfsConfiguration();
        conf.setLong("dfs.blocksize", 1024L);
        MiniDFSCluster cluster = null;
        try {
            StorageReceivedDeletedBlocks[] report;
            DatanodeRegistration dnR;
            int i;
            cluster = new MiniDFSCluster.Builder((Configuration)conf).numDataNodes(5).build();
            cluster.waitActive();
            DistributedFileSystem hdfs = cluster.getFileSystem();
            FSNamesystem fsn = cluster.getNamesystem();
            BlockManager blkManager = fsn.getBlockManager();
            String file = "/tmp.txt";
            Path filePath = new Path("/tmp.txt");
            short replFactor = 1;
            DFSTestUtil.createFile((FileSystem)hdfs, filePath, 1024L, replFactor, 0L);
            ArrayList<DataNode> datanodes = cluster.getDataNodes();
            for (int i2 = 0; i2 < 5; ++i2) {
                DataNodeTestUtils.setHeartbeatsDisabledForTests(datanodes.get(i2), true);
            }
            hdfs.setReplication(filePath, (short)5);
            BlockManagerTestUtil.computeAllPendingWork(blkManager);
            Assert.assertEquals((long)1L, (long)blkManager.pendingReplications.size());
            INodeFile fileNode = fsn.getFSDirectory().getINode4Write("/tmp.txt").asFile();
            BlockInfo[] blocks = fileNode.getBlocks();
            Assert.assertEquals((long)4L, (long)blkManager.pendingReplications.getNumReplicas(blocks[0]));
            LocatedBlock locatedBlock = hdfs.getClient().getLocatedBlocks("/tmp.txt", 0L).get(0);
            DatanodeInfo existingDn = locatedBlock.getLocations()[0];
            int reportDnNum = 0;
            String poolId = cluster.getNamesystem().getBlockPoolId();
            for (i = 0; i < 5 && reportDnNum < 2; ++i) {
                if (datanodes.get(i).getDatanodeId().equals((Object)existingDn)) continue;
                dnR = datanodes.get(i).getDNRegistrationForBP(poolId);
                report = new StorageReceivedDeletedBlocks[]{new StorageReceivedDeletedBlocks(new DatanodeStorage("Fake-storage-ID-Ignored"), new ReceivedDeletedBlockInfo[]{new ReceivedDeletedBlockInfo((Block)blocks[0], ReceivedDeletedBlockInfo.BlockStatus.RECEIVED_BLOCK, "")})};
                cluster.getNameNodeRpc().blockReceivedAndDeleted(dnR, poolId, report);
                ++reportDnNum;
            }
            cluster.getNamesystem().getBlockManager().flushBlockOps();
            Assert.assertEquals((long)2L, (long)blkManager.pendingReplications.getNumReplicas(blocks[0]));
            for (i = 0; i < 5 && reportDnNum < 2; ++i) {
                if (datanodes.get(i).getDatanodeId().equals((Object)existingDn)) continue;
                dnR = datanodes.get(i).getDNRegistrationForBP(poolId);
                report = new StorageReceivedDeletedBlocks[]{new StorageReceivedDeletedBlocks(new DatanodeStorage("Fake-storage-ID-Ignored"), new ReceivedDeletedBlockInfo[]{new ReceivedDeletedBlockInfo((Block)blocks[0], ReceivedDeletedBlockInfo.BlockStatus.RECEIVED_BLOCK, "")})};
                cluster.getNameNodeRpc().blockReceivedAndDeleted(dnR, poolId, report);
                ++reportDnNum;
            }
            cluster.getNamesystem().getBlockManager().flushBlockOps();
            Assert.assertEquals((long)2L, (long)blkManager.pendingReplications.getNumReplicas(blocks[0]));
            for (i = 0; i < 5; ++i) {
                DataNodeTestUtils.setHeartbeatsDisabledForTests(datanodes.get(i), false);
                DataNodeTestUtils.triggerHeartbeat(datanodes.get(i));
            }
            Thread.sleep(5000L);
            Assert.assertEquals((long)0L, (long)blkManager.pendingReplications.size());
        }
        finally {
            if (cluster != null) {
                cluster.shutdown();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testPendingAndInvalidate() throws Exception {
        HdfsConfiguration CONF = new HdfsConfiguration();
        CONF.setLong("dfs.blocksize", 1024L);
        CONF.setLong("dfs.heartbeat.interval", 1L);
        CONF.setInt("dfs.namenode.replication.interval", 1);
        MiniDFSCluster cluster = new MiniDFSCluster.Builder((Configuration)CONF).numDataNodes(5).build();
        cluster.waitActive();
        FSNamesystem namesystem = cluster.getNamesystem();
        BlockManager bm = namesystem.getBlockManager();
        DistributedFileSystem fs = cluster.getFileSystem();
        try {
            Path filePath = new Path("/tmp.txt");
            DFSTestUtil.createFile((FileSystem)fs, filePath, 1024L, (short)3, 0L);
            for (DataNode dn : cluster.getDataNodes()) {
                DataNodeTestUtils.setHeartbeatsDisabledForTests(dn, true);
            }
            LocatedBlock block = NameNodeAdapter.getBlockLocations(cluster.getNameNode(), filePath.toString(), 0L, 1L).get(0);
            cluster.getNamesystem().writeLock();
            try {
                bm.findAndMarkBlockAsCorrupt(block.getBlock(), block.getLocations()[0], "STORAGE_ID", "TEST");
                bm.findAndMarkBlockAsCorrupt(block.getBlock(), block.getLocations()[1], "STORAGE_ID", "TEST");
            }
            finally {
                cluster.getNamesystem().writeUnlock();
            }
            BlockManagerTestUtil.computeAllPendingWork(bm);
            BlockManagerTestUtil.updateState(bm);
            Assert.assertEquals((long)bm.getPendingReplicationBlocksCount(), (long)1L);
            BlockInfo storedBlock = bm.getStoredBlock(block.getBlock().getLocalBlock());
            Assert.assertEquals((long)bm.pendingReplications.getNumReplicas(storedBlock), (long)2L);
            fs.delete(filePath, true);
            int retries = 10;
            long pendingNum = bm.getPendingReplicationBlocksCount();
            while (pendingNum != 0L && retries-- > 0) {
                Thread.sleep(1000L);
                BlockManagerTestUtil.updateState(bm);
                pendingNum = bm.getPendingReplicationBlocksCount();
            }
            Assert.assertEquals((long)pendingNum, (long)0L);
        }
        finally {
            cluster.shutdown();
        }
    }
}

