/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;

import com.google.common.base.Supplier;
import java.io.IOException;
import java.io.OutputStream;
import java.lang.management.ManagementFactory;
import java.lang.reflect.Field;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.TimeoutException;
import javax.management.MBeanServer;
import javax.management.ObjectName;
import org.apache.commons.io.IOUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.HdfsBlockLocation;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DFSOutputStream;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataNodeFaultInjector;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.BlockPoolSlice;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetImpl;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsVolumeImpl;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.Daemon;
import org.apache.log4j.Level;
import org.hamcrest.Matcher;
import org.hamcrest.core.Is;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.mockito.Mockito;
import org.slf4j.Logger;

public class TestSpaceReservation {
    static final Log LOG = LogFactory.getLog(TestSpaceReservation.class);
    private static final int DU_REFRESH_INTERVAL_MSEC = 500;
    private static final int STORAGES_PER_DATANODE = 1;
    private static final int BLOCK_SIZE = 0x100000;
    private static final int SMALL_BLOCK_SIZE = 1024;
    protected MiniDFSCluster cluster;
    private Configuration conf;
    private DistributedFileSystem fs = null;
    private DFSClient client = null;
    FsVolumeReference singletonVolumeRef = null;
    FsVolumeImpl singletonVolume = null;
    private DataNodeFaultInjector old = null;
    private static Random rand = new Random();
    @Rule
    public ExpectedException thrown = ExpectedException.none();

    @Before
    public void before() {
        this.conf = new HdfsConfiguration();
    }

    private void initConfig(int blockSize) {
        this.conf.setInt("fs.du.interval", 500);
        this.conf.setLong("dfs.blocksize", (long)blockSize);
        this.conf.setInt("dfs.datanode.scan.period.hours", -1);
    }

    private void startCluster(int blockSize, int numDatanodes, long perVolumeCapacity) throws IOException {
        this.initConfig(blockSize);
        this.cluster = new MiniDFSCluster.Builder(this.conf).storagesPerDatanode(1).numDataNodes(numDatanodes).build();
        this.fs = this.cluster.getFileSystem();
        this.client = this.fs.getClient();
        this.cluster.waitActive();
        if (perVolumeCapacity >= 0L) {
            try (FsDatasetSpi.FsVolumeReferences volumes = this.cluster.getDataNodes().get(0).getFSDataset().getFsVolumeReferences();){
                this.singletonVolumeRef = volumes.get(0).obtainReference();
            }
            this.singletonVolume = (FsVolumeImpl)this.singletonVolumeRef.getVolume();
            this.singletonVolume.setCapacityForTesting(perVolumeCapacity);
        }
    }

    @After
    public void shutdownCluster() throws IOException {
        if (this.singletonVolumeRef != null) {
            this.singletonVolumeRef.close();
            this.singletonVolumeRef = null;
        }
        if (this.client != null) {
            this.client.close();
            this.client = null;
        }
        if (this.fs != null) {
            this.fs.close();
            this.fs = null;
        }
        if (this.cluster != null) {
            this.cluster.shutdown();
            this.cluster = null;
        }
        if (this.old != null) {
            DataNodeFaultInjector.set((DataNodeFaultInjector)this.old);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void createFileAndTestSpaceReservation(String fileNamePrefix, int fileBlockSize) throws IOException, InterruptedException {
        long configuredCapacity = fileBlockSize * 2 - 1;
        this.startCluster(0x100000, 1, configuredCapacity);
        Path path = new Path("/" + fileNamePrefix + ".dat");
        try (FSDataOutputStream out = null;){
            out = this.fs.create(path, false, 4096, (short)1, (long)fileBlockSize);
            byte[] buffer = new byte[rand.nextInt(fileBlockSize / 4)];
            out.write(buffer);
            out.hsync();
            int bytesWritten = buffer.length;
            Assert.assertThat((Object)this.singletonVolume.getReservedForReplicas(), (Matcher)Is.is((Object)((long)fileBlockSize - (long)bytesWritten)));
            out.close();
            out = null;
            Assert.assertThat((Object)this.singletonVolume.getReservedForReplicas(), (Matcher)Is.is((Object)0L));
            out = this.fs.append(path);
            out.write(buffer);
            out.hsync();
            Assert.assertThat((Object)this.singletonVolume.getReservedForReplicas(), (Matcher)Is.is((Object)((long)fileBlockSize - (long)(bytesWritten += buffer.length))));
            out.write(buffer);
            out.hsync();
            Assert.assertThat((Object)this.singletonVolume.getReservedForReplicas(), (Matcher)Is.is((Object)((long)fileBlockSize - (long)(bytesWritten += buffer.length))));
        }
    }

    @Test(timeout=300000L)
    public void testWithDefaultBlockSize() throws IOException, InterruptedException {
        this.createFileAndTestSpaceReservation(GenericTestUtils.getMethodName(), 0x100000);
    }

    @Test(timeout=300000L)
    public void testWithNonDefaultBlockSize() throws IOException, InterruptedException {
        this.createFileAndTestSpaceReservation(GenericTestUtils.getMethodName(), 0x200000);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(timeout=300000L)
    public void testWithLimitedSpace() throws IOException {
        this.startCluster(0x100000, 1, 0x1FFFFFL);
        String methodName = GenericTestUtils.getMethodName();
        Path file1 = new Path("/" + methodName + ".01.dat");
        Path file2 = new Path("/" + methodName + ".02.dat");
        FSDataOutputStream os2 = null;
        try (FSDataOutputStream os1 = null;){
            os1 = this.fs.create(file1);
            os2 = this.fs.create(file2);
            byte[] data = new byte[1];
            os1.write(data);
            os1.hsync();
            this.thrown.expect(RemoteException.class);
            os2.write(data);
            os2.hsync();
        }
    }

    @Test(timeout=300000L)
    public void testSpaceReleasedOnUnexpectedEof() throws IOException, InterruptedException, TimeoutException {
        int replication = 3;
        this.startCluster(0x100000, 3, -1L);
        String methodName = GenericTestUtils.getMethodName();
        Path file = new Path("/" + methodName + ".01.dat");
        FSDataOutputStream os = this.fs.create(file, (short)3);
        os.write(new byte[1]);
        os.hsync();
        DFSTestUtil.abortStream((DFSOutputStream)os.getWrappedStream());
        for (DataNode dn : this.cluster.getDataNodes()) {
            FsDatasetSpi.FsVolumeReferences volumes = dn.getFSDataset().getFsVolumeReferences();
            Throwable throwable = null;
            try {
                final FsVolumeImpl volume = (FsVolumeImpl)volumes.get(0);
                GenericTestUtils.waitFor((Supplier)new Supplier<Boolean>(){

                    public Boolean get() {
                        return volume.getReservedForReplicas() == 0L;
                    }
                }, (int)500, (int)Integer.MAX_VALUE);
            }
            catch (Throwable throwable2) {
                throwable = throwable2;
                throw throwable2;
            }
            finally {
                if (volumes == null) continue;
                if (throwable != null) {
                    try {
                        volumes.close();
                    }
                    catch (Throwable x2) {
                        throwable.addSuppressed(x2);
                    }
                    continue;
                }
                volumes.close();
            }
        }
    }

    @Test(timeout=30000L)
    public void testRBWFileCreationError() throws Exception {
        FSDataOutputStream os2;
        boolean replication = true;
        this.startCluster(0x100000, 1, -1L);
        FsVolumeImpl fsVolumeImpl = (FsVolumeImpl)this.cluster.getDataNodes().get(0).getFSDataset().getFsVolumeReferences().get(0);
        String methodName = GenericTestUtils.getMethodName();
        Path file = new Path("/" + methodName + ".01.dat");
        BlockPoolSlice blockPoolSlice = (BlockPoolSlice)Mockito.mock(BlockPoolSlice.class);
        Mockito.when((Object)blockPoolSlice.createRbwFile((Block)Mockito.any())).thenThrow(new Throwable[]{new IOException("Synthetic IO Exception Throgh MOCK")});
        Field field = FsVolumeImpl.class.getDeclaredField("bpSlices");
        field.setAccessible(true);
        Map bpSlices = (Map)field.get(fsVolumeImpl);
        bpSlices.put(fsVolumeImpl.getBlockPoolList()[0], blockPoolSlice);
        try {
            os2 = this.fs.create(file, (short)1);
            os2.write(new byte[1]);
            os2.hsync();
            os2.close();
            Assert.fail((String)"Expecting IOException file creation failure");
        }
        catch (IOException os2) {
            // empty catch block
        }
        Assert.assertTrue((String)("Expected ZERO but got " + fsVolumeImpl.getReservedForReplicas()), (fsVolumeImpl.getReservedForReplicas() == 0L ? 1 : 0) != 0);
        fsVolumeImpl.reserveSpaceForReplica(1000L);
        try {
            os2 = this.fs.create(new Path("/" + methodName + ".02.dat"), (short)1);
            os2.write(new byte[1]);
            os2.hsync();
            os2.close();
            Assert.fail((String)"Expecting IOException file creation failure");
        }
        catch (IOException iOException) {
            // empty catch block
        }
        Assert.assertTrue((fsVolumeImpl.getReservedForReplicas() == 1000L ? 1 : 0) != 0);
    }

    @Test(timeout=30000L)
    public void testReservedSpaceInJMXBean() throws Exception {
        boolean replication = true;
        this.startCluster(0x100000, 1, -1L);
        String methodName = GenericTestUtils.getMethodName();
        Path file = new Path("/" + methodName + ".01.dat");
        try (FSDataOutputStream os = this.fs.create(file, (short)1);){
            os.write(new byte[1]);
            os.hsync();
            MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
            ObjectName mxbeanName = new ObjectName("Hadoop:service=DataNode,name=DataNodeInfo");
            String volumeInfo = (String)mbs.getAttribute(mxbeanName, "VolumeInfo");
            Assert.assertTrue((boolean)volumeInfo.contains("reservedSpaceForReplicas"));
        }
    }

    @Test(timeout=300000L)
    public void testTmpSpaceReserve() throws Exception {
        int replication = 2;
        this.startCluster(0x100000, 2, -1L);
        int byteCount1 = 100;
        int byteCount2 = 200;
        String methodName = GenericTestUtils.getMethodName();
        Path file = new Path("/" + methodName + ".01.dat");
        try (FSDataOutputStream os = this.fs.create(file, (short)1);){
            os.write(new byte[100]);
            os.hsync();
        }
        BlockLocation[] blockLocations = this.fs.getFileBlockLocations(file, 0L, 10L);
        Object firstReplicaNode = blockLocations[0].getNames()[0];
        int newReplicaDNIndex = 0;
        if (((String)firstReplicaNode).equals(this.cluster.getDataNodes().get(0).getDisplayName())) {
            newReplicaDNIndex = 1;
        }
        FsVolumeImpl fsVolumeImpl = (FsVolumeImpl)this.cluster.getDataNodes().get(newReplicaDNIndex).getFSDataset().getFsVolumeReferences().get(0);
        this.performReReplication(file, true);
        Assert.assertEquals((String)"Wrong reserve space for Tmp ", (long)100L, (long)fsVolumeImpl.getRecentReserved());
        Assert.assertEquals((String)"Reserved Tmp space is not released", (long)0L, (long)fsVolumeImpl.getReservedForReplicas());
        file = new Path("/" + methodName + ".01.dat");
        os = this.fs.create(file, (short)1);
        firstReplicaNode = null;
        try {
            os.write(new byte[200]);
            os.hsync();
        }
        catch (Throwable x2) {
            firstReplicaNode = x2;
            throw x2;
        }
        finally {
            if (os != null) {
                if (firstReplicaNode != null) {
                    try {
                        os.close();
                    }
                    catch (Throwable x2) {
                        ((Throwable)firstReplicaNode).addSuppressed(x2);
                    }
                } else {
                    os.close();
                }
            }
        }
        blockLocations = this.fs.getFileBlockLocations(file, 0L, 10L);
        firstReplicaNode = blockLocations[0].getNames()[0];
        newReplicaDNIndex = 0;
        if (((String)firstReplicaNode).equals(this.cluster.getDataNodes().get(0).getDisplayName())) {
            newReplicaDNIndex = 1;
        }
        BlockPoolSlice blockPoolSlice = (BlockPoolSlice)Mockito.mock(BlockPoolSlice.class);
        Mockito.when((Object)blockPoolSlice.createTmpFile((Block)Mockito.any())).thenThrow(new Throwable[]{new IOException("Synthetic IO Exception Throgh MOCK")});
        FsVolumeImpl fsVolumeImpl2 = (FsVolumeImpl)this.cluster.getDataNodes().get(newReplicaDNIndex).getFSDataset().getFsVolumeReferences().get(0);
        fsVolumeImpl2.reserveSpaceForReplica(1000L);
        Field field = FsVolumeImpl.class.getDeclaredField("bpSlices");
        field.setAccessible(true);
        Map bpSlices = (Map)field.get(fsVolumeImpl2);
        bpSlices.put(fsVolumeImpl2.getBlockPoolList()[0], blockPoolSlice);
        this.performReReplication(file, false);
        Assert.assertEquals((String)"Wrong reserve space for Tmp ", (long)200L, (long)fsVolumeImpl2.getRecentReserved());
        Assert.assertEquals((String)"Tmp space is not released OR released twice", (long)1000L, (long)fsVolumeImpl2.getReservedForReplicas());
    }

    private void performReReplication(Path filePath, boolean waitForSuccess) throws Exception {
        this.fs.setReplication(filePath, (short)2);
        Thread.sleep(4000L);
        BlockLocation[] blockLocations = this.fs.getFileBlockLocations(filePath, 0L, 10L);
        if (waitForSuccess) {
            while (blockLocations[0].getNames().length < 2) {
                Thread.sleep(2000L);
                blockLocations = this.fs.getFileBlockLocations(filePath, 0L, 10L);
            }
        }
    }

    @Test(timeout=600000L)
    public void stressTest() throws IOException, InterruptedException {
        int numWriters = 5;
        this.startCluster(1024, 1, 51200L);
        Writer[] writers = new Writer[5];
        for (int i = 0; i < 5; ++i) {
            writers[i] = new Writer(this.client, 1024);
            writers[i].start();
        }
        Thread.sleep(60000L);
        for (Writer w : writers) {
            w.stopWriter();
        }
        int filesCreated = 0;
        int numFailures = 0;
        for (Writer w : writers) {
            w.join();
            filesCreated += w.getFilesCreated();
            numFailures += w.getNumFailures();
        }
        LOG.info((Object)("Stress test created " + filesCreated + " files and hit " + numFailures + " failures"));
        Assert.assertThat((Object)this.singletonVolume.getReservedForReplicas(), (Matcher)Is.is((Object)0L));
    }

    @Test(timeout=30000L)
    public void testReservedSpaceForAppend() throws Exception {
        int replication = 3;
        this.startCluster(0x100000, 3, -1L);
        String methodName = GenericTestUtils.getMethodName();
        Path file = new Path("/" + methodName + ".01.dat");
        FSDataOutputStream os = this.fs.create(file, (short)3);
        os.write(new byte[1024]);
        os.close();
        Path file2 = new Path("/" + methodName + ".02.dat");
        FSDataOutputStream os2 = this.fs.create(file2, (short)3);
        os2.write(new byte[1]);
        os2.hflush();
        int expectedFile2Reserved = 1048575;
        this.checkReservedSpace(expectedFile2Reserved);
        os = this.fs.append(file);
        os.write(new byte[1]);
        os.hflush();
        int expectedFile1Reserved = 1047551;
        this.checkReservedSpace(expectedFile2Reserved + expectedFile1Reserved);
        os.close();
        this.checkReservedSpace(expectedFile2Reserved);
        os = this.fs.append(file);
        os.write(new byte[1]);
        os.hflush();
        this.checkReservedSpace(expectedFile2Reserved + --expectedFile1Reserved);
        DFSTestUtil.abortStream((DFSOutputStream)os.getWrappedStream());
        this.checkReservedSpace(expectedFile2Reserved);
    }

    @Test(timeout=30000L)
    public void testReservedSpaceForPipelineRecovery() throws Exception {
        int replication = 3;
        this.startCluster(0x100000, 3, -1L);
        String methodName = GenericTestUtils.getMethodName();
        Path file = new Path("/" + methodName + ".01.dat");
        this.old = DataNodeFaultInjector.get();
        DataNodeFaultInjector.set((DataNodeFaultInjector)new DataNodeFaultInjector(){
            private int tries = 0;

            public void failMirrorConnection() throws IOException {
                if (this.tries++ == 0) {
                    throw new IOException("Failing Mirror for space reservation");
                }
            }
        });
        FSDataOutputStream os = this.fs.create(file, (short)3);
        os.write(new byte[1]);
        os.close();
        this.cluster.triggerBlockReports();
        for (final DataNode dn : this.cluster.getDataNodes()) {
            FsDatasetSpi.FsVolumeReferences volumes = dn.getFSDataset().getFsVolumeReferences();
            Throwable throwable = null;
            try {
                final FsVolumeImpl volume = (FsVolumeImpl)volumes.get(0);
                GenericTestUtils.waitFor((Supplier)new Supplier<Boolean>(){

                    public Boolean get() {
                        LOG.info((Object)("dn " + dn.getDisplayName() + " space : " + volume.getReservedForReplicas()));
                        return volume.getReservedForReplicas() == 0L;
                    }
                }, (int)100, (int)Integer.MAX_VALUE);
            }
            catch (Throwable throwable2) {
                throwable = throwable2;
                throw throwable2;
            }
            finally {
                if (volumes == null) continue;
                if (throwable != null) {
                    try {
                        volumes.close();
                    }
                    catch (Throwable x2) {
                        throwable.addSuppressed(x2);
                    }
                    continue;
                }
                volumes.close();
            }
        }
    }

    private void checkReservedSpace(final long expectedReserved) throws TimeoutException, InterruptedException, IOException {
        for (final DataNode dn : this.cluster.getDataNodes()) {
            FsDatasetSpi.FsVolumeReferences volumes = dn.getFSDataset().getFsVolumeReferences();
            Throwable throwable = null;
            try {
                final FsVolumeImpl volume = (FsVolumeImpl)volumes.get(0);
                GenericTestUtils.waitFor((Supplier)new Supplier<Boolean>(){

                    public Boolean get() {
                        LOG.info((Object)("dn " + dn.getDisplayName() + " space : " + volume.getReservedForReplicas() + ", Expected ReservedSpace :" + expectedReserved));
                        return volume.getReservedForReplicas() == expectedReserved;
                    }
                }, (int)100, (int)3000);
            }
            catch (Throwable throwable2) {
                throwable = throwable2;
                throw throwable2;
            }
            finally {
                if (volumes == null) continue;
                if (throwable != null) {
                    try {
                        volumes.close();
                    }
                    catch (Throwable x2) {
                        throwable.addSuppressed(x2);
                    }
                    continue;
                }
                volumes.close();
            }
        }
    }

    @Test(timeout=60000L)
    public void testReservedSpaceForLeaseRecovery() throws Exception {
        int replication = 3;
        this.conf.setInt("ipc.client.connect.max.retries", 2);
        this.conf.setInt("ipc.client.connect.retry.interval", 1000);
        this.startCluster(0x100000, 3, -1L);
        String methodName = GenericTestUtils.getMethodName();
        final Path file = new Path("/" + methodName + ".01.dat");
        FSDataOutputStream os = this.fs.create(file, (short)3);
        os.write(new byte[8192]);
        os.hflush();
        os.close();
        HdfsBlockLocation blockLocation = (HdfsBlockLocation)this.fs.getClient().getBlockLocations(file.toString(), 0L, 0x100000L)[0];
        LocatedBlock lastBlock = blockLocation.getLocatedBlock();
        this.cluster.stopDataNode(lastBlock.getLocations()[2].getName());
        try {
            os = this.fs.append(file);
            DFSTestUtil.setPipeline((DFSOutputStream)os.getWrappedStream(), lastBlock);
            os.writeBytes("hi");
            os.hsync();
        }
        catch (IOException e) {
            LOG.info((Object)"", (Throwable)e);
        }
        DFSTestUtil.abortStream((DFSOutputStream)os.getWrappedStream());
        this.cluster.setDataNodeDead((DatanodeID)lastBlock.getLocations()[2]);
        this.fs.recoverLease(file);
        GenericTestUtils.waitFor((Supplier)new Supplier<Boolean>(){

            public Boolean get() {
                try {
                    return TestSpaceReservation.this.fs.isFileClosed(file);
                }
                catch (IOException e) {
                    return false;
                }
            }
        }, (int)500, (int)30000);
        this.checkReservedSpace(0L);
    }

    static {
        GenericTestUtils.setLogLevel((Logger)FsDatasetImpl.LOG, (Level)Level.ALL);
        GenericTestUtils.setLogLevel((Logger)DataNode.LOG, (Level)Level.ALL);
    }

    private static class Writer
    extends Daemon {
        private volatile boolean keepRunning;
        private final DFSClient localClient;
        private int filesCreated = 0;
        private int numFailures = 0;
        byte[] data;

        Writer(DFSClient client, int blockSize) throws IOException {
            this.localClient = client;
            this.keepRunning = true;
            this.filesCreated = 0;
            this.numFailures = 0;
            this.data = new byte[blockSize * 2];
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         * Loose catch block
         */
        public void run() {
            while (this.keepRunning) {
                OutputStream os = null;
                try {
                    String filename = "/file-" + rand.nextLong();
                    os = this.localClient.create(filename, false);
                    os.write(this.data, 0, rand.nextInt(this.data.length));
                    IOUtils.closeQuietly((OutputStream)os);
                    os = null;
                    this.localClient.delete(filename, false);
                    Thread.sleep(50L);
                    ++this.filesCreated;
                    if (os == null) continue;
                }
                catch (IOException ioe) {
                    ++this.numFailures;
                    continue;
                }
                catch (InterruptedException ie) {
                    if (os != null) {
                        IOUtils.closeQuietly(os);
                    }
                    return;
                    {
                        catch (Throwable throwable) {
                            throw throwable;
                        }
                    }
                }
                finally {
                    if (os == null) continue;
                    IOUtils.closeQuietly(os);
                    continue;
                }
                IOUtils.closeQuietly((OutputStream)os);
            }
        }

        public void stopWriter() {
            this.keepRunning = false;
        }

        public int getFilesCreated() {
            return this.filesCreated;
        }

        public int getNumFailures() {
            return this.numFailures;
        }
    }
}

