/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hbase.regionserver;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.MultithreadedTestUtil;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Append;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Increment;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.RowMutations;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.BinaryComparator;
import org.apache.hadoop.hbase.filter.ByteArrayComparable;
import org.apache.hadoop.hbase.filter.CompareFilter;
import org.apache.hadoop.hbase.io.HeapSize;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
import org.apache.hadoop.hbase.regionserver.ScannerContext;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.wal.WAL;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;

@Category(value={MediumTests.class})
public class TestAtomicOperation {
    static final Log LOG = LogFactory.getLog(TestAtomicOperation.class);
    @Rule
    public TestName name = new TestName();
    HRegion region = null;
    private HBaseTestingUtility TEST_UTIL = HBaseTestingUtility.createLocalHTU();
    static byte[] tableName;
    static final byte[] qual1;
    static final byte[] qual2;
    static final byte[] qual3;
    static final byte[] value1;
    static final byte[] value2;
    static final byte[] row;
    static final byte[] row2;
    private static CountDownLatch latch;
    private static volatile TestStep testStep;
    private final String family = "f1";

    @Before
    public void setup() {
        tableName = Bytes.toBytes((String)this.name.getMethodName());
    }

    @After
    public void teardown() throws IOException {
        if (this.region != null) {
            this.region.close();
            this.region = null;
        }
    }

    @Test
    public void testAppend() throws IOException {
        this.initHRegion(tableName, this.name.getMethodName(), new byte[][]{HBaseTestingUtility.fam1});
        String v1 = "Ultimate Answer to the Ultimate Question of Life, The Universe, and Everything";
        String v2 = " is... 42.";
        Append a = new Append(row);
        a.setReturnResults(false);
        a.add(HBaseTestingUtility.fam1, qual1, Bytes.toBytes((String)v1));
        a.add(HBaseTestingUtility.fam1, qual2, Bytes.toBytes((String)v2));
        Assert.assertNull((Object)this.region.append(a, 0L, 0L));
        a = new Append(row);
        a.add(HBaseTestingUtility.fam1, qual1, Bytes.toBytes((String)v2));
        a.add(HBaseTestingUtility.fam1, qual2, Bytes.toBytes((String)v1));
        Result result = this.region.append(a, 0L, 0L);
        Assert.assertEquals((long)0L, (long)Bytes.compareTo((byte[])Bytes.toBytes((String)(v1 + v2)), (byte[])result.getValue(HBaseTestingUtility.fam1, qual1)));
        Assert.assertEquals((long)0L, (long)Bytes.compareTo((byte[])Bytes.toBytes((String)(v2 + v1)), (byte[])result.getValue(HBaseTestingUtility.fam1, qual2)));
    }

    @Test
    public void testIncrementMultiThreads() throws IOException {
        int i;
        LOG.info((Object)"Starting test testIncrementMultiThreads");
        this.initHRegion(tableName, this.name.getMethodName(), new int[]{1, 3}, (byte[][])new byte[][]{HBaseTestingUtility.fam1, HBaseTestingUtility.fam2});
        int numThreads = 100;
        int incrementsPerThread = 1000;
        Incrementer[] all = new Incrementer[numThreads];
        int expectedTotal = 0;
        for (i = 0; i < numThreads; ++i) {
            all[i] = new Incrementer(this.region, i, i, incrementsPerThread);
            expectedTotal += i * incrementsPerThread;
        }
        for (i = 0; i < numThreads; ++i) {
            all[i].start();
        }
        for (i = 0; i < numThreads; ++i) {
            try {
                all[i].join();
                continue;
            }
            catch (InterruptedException interruptedException) {
                // empty catch block
            }
        }
        this.assertICV(row, HBaseTestingUtility.fam1, qual1, expectedTotal);
        this.assertICV(row, HBaseTestingUtility.fam1, qual2, expectedTotal * 2);
        this.assertICV(row, HBaseTestingUtility.fam2, qual3, expectedTotal * 3);
        LOG.info((Object)("testIncrementMultiThreads successfully verified that total is " + expectedTotal));
    }

    private void assertICV(byte[] row, byte[] familiy, byte[] qualifier, long amount) throws IOException {
        Get get = new Get(row);
        get.addColumn(familiy, qualifier);
        Result result = this.region.get(get);
        Assert.assertEquals((long)1L, (long)result.size());
        Cell kv = result.rawCells()[0];
        long r = Bytes.toLong((byte[])CellUtil.cloneValue((Cell)kv));
        Assert.assertEquals((long)amount, (long)r);
    }

    private void initHRegion(byte[] tableName, String callingMethod, byte[] ... families) throws IOException {
        this.initHRegion(tableName, callingMethod, (int[])null, families);
    }

    private void initHRegion(byte[] tableName, String callingMethod, int[] maxVersions, byte[] ... families) throws IOException {
        HTableDescriptor htd = new HTableDescriptor(TableName.valueOf((byte[])tableName));
        int i = 0;
        for (byte[] family : families) {
            HColumnDescriptor hcd = new HColumnDescriptor(family);
            hcd.setMaxVersions(maxVersions != null ? maxVersions[i++] : 1);
            htd.addFamily(hcd);
        }
        HRegionInfo info = new HRegionInfo(htd.getTableName(), null, null, false);
        this.region = this.TEST_UTIL.createLocalHRegion(info, htd);
    }

    @Test
    public void testAppendMultiThreads() throws IOException {
        int i;
        LOG.info((Object)"Starting test testAppendMultiThreads");
        this.initHRegion(tableName, this.name.getMethodName(), new int[]{1, 3}, (byte[][])new byte[][]{HBaseTestingUtility.fam1, HBaseTestingUtility.fam2});
        int numThreads = 100;
        int opsPerThread = 100;
        AtomicOperation[] all = new AtomicOperation[numThreads];
        final byte[] val = new byte[]{1};
        AtomicInteger failures = new AtomicInteger(0);
        for (i = 0; i < numThreads; ++i) {
            all[i] = new AtomicOperation(this.region, opsPerThread, null, failures){

                @Override
                public void run() {
                    for (int i = 0; i < this.numOps; ++i) {
                        try {
                            Append a = new Append(row);
                            a.add(HBaseTestingUtility.fam1, qual1, val);
                            a.add(HBaseTestingUtility.fam1, qual2, val);
                            a.add(HBaseTestingUtility.fam2, qual3, val);
                            a.setDurability(Durability.ASYNC_WAL);
                            this.region.append(a, 0L, 0L);
                            Get g = new Get(row);
                            Result result = this.region.get(g);
                            Assert.assertEquals((long)result.getValue(HBaseTestingUtility.fam1, qual1).length, (long)result.getValue(HBaseTestingUtility.fam1, qual2).length);
                            Assert.assertEquals((long)result.getValue(HBaseTestingUtility.fam1, qual1).length, (long)result.getValue(HBaseTestingUtility.fam2, qual3).length);
                            continue;
                        }
                        catch (IOException e) {
                            e.printStackTrace();
                            this.failures.incrementAndGet();
                            Assert.fail();
                        }
                    }
                }
            };
        }
        for (i = 0; i < numThreads; ++i) {
            all[i].start();
        }
        for (i = 0; i < numThreads; ++i) {
            try {
                all[i].join();
                continue;
            }
            catch (InterruptedException interruptedException) {
                // empty catch block
            }
        }
        Assert.assertEquals((long)0L, (long)failures.get());
        Get g = new Get(row);
        Result result = this.region.get(g);
        Assert.assertEquals((long)result.getValue(HBaseTestingUtility.fam1, qual1).length, (long)10000L);
        Assert.assertEquals((long)result.getValue(HBaseTestingUtility.fam1, qual2).length, (long)10000L);
        Assert.assertEquals((long)result.getValue(HBaseTestingUtility.fam2, qual3).length, (long)10000L);
    }

    @Test
    public void testRowMutationMultiThreads() throws IOException {
        int i;
        LOG.info((Object)"Starting test testRowMutationMultiThreads");
        this.initHRegion(tableName, this.name.getMethodName(), new byte[][]{HBaseTestingUtility.fam1});
        int numThreads = 10;
        int opsPerThread = 500;
        AtomicOperation[] all = new AtomicOperation[numThreads];
        AtomicLong timeStamps = new AtomicLong(0L);
        AtomicInteger failures = new AtomicInteger(0);
        for (i = 0; i < numThreads; ++i) {
            all[i] = new AtomicOperation(this.region, opsPerThread, timeStamps, failures){

                /*
                 * WARNING - Removed try catching itself - possible behaviour change.
                 */
                @Override
                public void run() {
                    boolean op = true;
                    for (int i = 0; i < this.numOps; ++i) {
                        try {
                            if (i % 10 == 0) {
                                HRegion hRegion = this.region;
                                synchronized (hRegion) {
                                    LOG.debug((Object)"flushing");
                                    this.region.flush(true);
                                    if (i % 100 == 0) {
                                        this.region.compact(false);
                                    }
                                }
                            }
                            long ts = this.timeStamps.incrementAndGet();
                            RowMutations rm = new RowMutations(row);
                            if (op) {
                                Put p = new Put(row, ts);
                                p.add(HBaseTestingUtility.fam1, qual1, value1);
                                p.setDurability(Durability.ASYNC_WAL);
                                rm.add(p);
                                Delete d = new Delete(row);
                                d.deleteColumns(HBaseTestingUtility.fam1, qual2, ts);
                                d.setDurability(Durability.ASYNC_WAL);
                                rm.add(d);
                            } else {
                                Delete d = new Delete(row);
                                d.deleteColumns(HBaseTestingUtility.fam1, qual1, ts);
                                d.setDurability(Durability.ASYNC_WAL);
                                rm.add(d);
                                Put p = new Put(row, ts);
                                p.add(HBaseTestingUtility.fam1, qual2, value2);
                                p.setDurability(Durability.ASYNC_WAL);
                                rm.add(p);
                            }
                            this.region.mutateRow(rm);
                            op ^= true;
                            Get g = new Get(row);
                            Result r = this.region.get(g);
                            if (r.size() == 1) continue;
                            LOG.debug((Object)r);
                            this.failures.incrementAndGet();
                            Assert.fail();
                            continue;
                        }
                        catch (IOException e) {
                            e.printStackTrace();
                            this.failures.incrementAndGet();
                            Assert.fail();
                        }
                    }
                }
            };
        }
        for (i = 0; i < numThreads; ++i) {
            all[i].start();
        }
        for (i = 0; i < numThreads; ++i) {
            try {
                all[i].join();
                continue;
            }
            catch (InterruptedException interruptedException) {
                // empty catch block
            }
        }
        Assert.assertEquals((long)0L, (long)failures.get());
    }

    @Test
    public void testMultiRowMutationMultiThreads() throws IOException {
        int i;
        LOG.info((Object)"Starting test testMultiRowMutationMultiThreads");
        this.initHRegion(tableName, this.name.getMethodName(), new byte[][]{HBaseTestingUtility.fam1});
        int numThreads = 10;
        int opsPerThread = 500;
        AtomicOperation[] all = new AtomicOperation[numThreads];
        AtomicLong timeStamps = new AtomicLong(0L);
        AtomicInteger failures = new AtomicInteger(0);
        final List rowsToLock = Arrays.asList(row, row2);
        for (i = 0; i < numThreads; ++i) {
            all[i] = new AtomicOperation(this.region, opsPerThread, timeStamps, failures){

                /*
                 * WARNING - Removed try catching itself - possible behaviour change.
                 */
                @Override
                public void run() {
                    boolean op = true;
                    for (int i = 0; i < this.numOps; ++i) {
                        try {
                            if (i % 10 == 0) {
                                HRegion hRegion = this.region;
                                synchronized (hRegion) {
                                    LOG.debug((Object)"flushing");
                                    this.region.flush(true);
                                    if (i % 100 == 0) {
                                        this.region.compact(false);
                                    }
                                }
                            }
                            long ts = this.timeStamps.incrementAndGet();
                            ArrayList<Object> mrm = new ArrayList<Object>();
                            if (op) {
                                Put p = new Put(row2, ts);
                                p.add(HBaseTestingUtility.fam1, qual1, value1);
                                p.setDurability(Durability.ASYNC_WAL);
                                mrm.add(p);
                                Delete d = new Delete(row);
                                d.deleteColumns(HBaseTestingUtility.fam1, qual1, ts);
                                d.setDurability(Durability.ASYNC_WAL);
                                mrm.add(d);
                            } else {
                                Delete d = new Delete(row2);
                                d.deleteColumns(HBaseTestingUtility.fam1, qual1, ts);
                                d.setDurability(Durability.ASYNC_WAL);
                                mrm.add(d);
                                Put p = new Put(row, ts);
                                p.setDurability(Durability.ASYNC_WAL);
                                p.add(HBaseTestingUtility.fam1, qual1, value2);
                                mrm.add(p);
                            }
                            this.region.mutateRowsWithLocks(mrm, (Collection)rowsToLock, 0L, 0L);
                            op ^= true;
                            Scan s = new Scan(row);
                            RegionScanner rs = this.region.getScanner(s);
                            ArrayList r = new ArrayList();
                            while (rs.next(r)) {
                            }
                            rs.close();
                            if (r.size() == 1) continue;
                            LOG.debug(r);
                            this.failures.incrementAndGet();
                            Assert.fail();
                            continue;
                        }
                        catch (IOException e) {
                            e.printStackTrace();
                            this.failures.incrementAndGet();
                            Assert.fail();
                        }
                    }
                }
            };
        }
        for (i = 0; i < numThreads; ++i) {
            all[i].start();
        }
        for (i = 0; i < numThreads; ++i) {
            try {
                all[i].join();
                continue;
            }
            catch (InterruptedException interruptedException) {
                // empty catch block
            }
        }
        Assert.assertEquals((long)0L, (long)failures.get());
    }

    @Test
    public void testPutAndCheckAndPutInParallel() throws Exception {
        String tableName = "testPutAndCheckAndPut";
        Configuration conf = this.TEST_UTIL.getConfiguration();
        conf.setClass("hbase.hregion.impl", MockHRegion.class, HeapSize.class);
        MockHRegion region = (MockHRegion)this.TEST_UTIL.createLocalHRegion(Bytes.toBytes((String)"testPutAndCheckAndPut"), null, null, "testPutAndCheckAndPut", conf, false, Durability.SYNC_WAL, null, new byte[][]{Bytes.toBytes((String)"f1")});
        Put[] puts = new Put[1];
        Put put = new Put(Bytes.toBytes((String)"r1"));
        put.add(Bytes.toBytes((String)"f1"), Bytes.toBytes((String)"q1"), Bytes.toBytes((String)"10"));
        puts[0] = put;
        region.batchMutate((Mutation[])puts, 0L, 0L);
        MultithreadedTestUtil.TestContext ctx = new MultithreadedTestUtil.TestContext(conf);
        ctx.addThread(new PutThread(ctx, region));
        ctx.addThread(new CheckAndPutThread(ctx, region));
        ctx.startThreads();
        while (testStep != TestStep.CHECKANDPUT_COMPLETED) {
            Thread.sleep(100L);
        }
        ctx.stop();
        Scan s = new Scan();
        RegionScanner scanner = region.getScanner(s);
        ArrayList results = new ArrayList();
        ScannerContext scannerContext = ScannerContext.newBuilder().setBatchLimit(2).build();
        scanner.next(results, scannerContext);
        for (Cell keyValue : results) {
            Assert.assertEquals((Object)"50", (Object)Bytes.toString((byte[])CellUtil.cloneValue((Cell)keyValue)));
        }
    }

    static {
        qual1 = Bytes.toBytes((String)"qual1");
        qual2 = Bytes.toBytes((String)"qual2");
        qual3 = Bytes.toBytes((String)"qual3");
        value1 = Bytes.toBytes((String)"value1");
        value2 = Bytes.toBytes((String)"value2");
        row = Bytes.toBytes((String)"rowA");
        row2 = Bytes.toBytes((String)"rowB");
        latch = new CountDownLatch(1);
        testStep = TestStep.INIT;
    }

    public static class MockHRegion
    extends HRegion {
        public MockHRegion(Path tableDir, WAL log, FileSystem fs, Configuration conf, HRegionInfo regionInfo, HTableDescriptor htd, RegionServerServices rsServices) {
            super(tableDir, log, fs, conf, regionInfo, htd, rsServices);
        }

        public Region.RowLock getRowLockInternal(byte[] row, boolean waitForLock) throws IOException {
            if (testStep == TestStep.CHECKANDPUT_STARTED) {
                latch.countDown();
            }
            return new WrappedRowLock(super.getRowLockInternal(row, waitForLock));
        }

        public class WrappedRowLock
        extends HRegion.RowLockImpl {
            private WrappedRowLock(Region.RowLock rowLock) {
                this.setContext(((HRegion.RowLockImpl)rowLock).getContext());
            }

            public void release() {
                if (testStep == TestStep.INIT) {
                    super.release();
                    return;
                }
                if (testStep == TestStep.PUT_STARTED) {
                    try {
                        testStep = TestStep.PUT_COMPLETED;
                        super.release();
                        latch.await();
                        Thread.sleep(1000L);
                    }
                    catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                    }
                } else if (testStep == TestStep.CHECKANDPUT_STARTED) {
                    super.release();
                }
            }
        }
    }

    private class CheckAndPutThread
    extends MultithreadedTestUtil.TestThread {
        private HRegion region;

        CheckAndPutThread(MultithreadedTestUtil.TestContext ctx, HRegion region) {
            super(ctx);
            this.region = region;
        }

        @Override
        public void doWork() throws Exception {
            Put[] puts = new Put[1];
            Put put = new Put(Bytes.toBytes((String)"r1"));
            put.add(Bytes.toBytes((String)"f1"), Bytes.toBytes((String)"q1"), Bytes.toBytes((String)"11"));
            puts[0] = put;
            while (testStep != TestStep.PUT_COMPLETED) {
                Thread.sleep(100L);
            }
            testStep = TestStep.CHECKANDPUT_STARTED;
            this.region.checkAndMutate(Bytes.toBytes((String)"r1"), Bytes.toBytes((String)"f1"), Bytes.toBytes((String)"q1"), CompareFilter.CompareOp.EQUAL, (ByteArrayComparable)new BinaryComparator(Bytes.toBytes((String)"10")), (Mutation)put, true);
            testStep = TestStep.CHECKANDPUT_COMPLETED;
        }
    }

    private class PutThread
    extends MultithreadedTestUtil.TestThread {
        private HRegion region;

        PutThread(MultithreadedTestUtil.TestContext ctx, HRegion region) {
            super(ctx);
            this.region = region;
        }

        @Override
        public void doWork() throws Exception {
            Put[] puts = new Put[1];
            Put put = new Put(Bytes.toBytes((String)"r1"));
            put.add(Bytes.toBytes((String)"f1"), Bytes.toBytes((String)"q1"), Bytes.toBytes((String)"50"));
            puts[0] = put;
            testStep = TestStep.PUT_STARTED;
            this.region.batchMutate((Mutation[])puts, 0L, 0L);
        }
    }

    private static enum TestStep {
        INIT,
        PUT_STARTED,
        PUT_COMPLETED,
        CHECKANDPUT_STARTED,
        CHECKANDPUT_COMPLETED;

    }

    public static class AtomicOperation
    extends Thread {
        protected final HRegion region;
        protected final int numOps;
        protected final AtomicLong timeStamps;
        protected final AtomicInteger failures;
        protected final Random r = new Random();

        public AtomicOperation(HRegion region, int numOps, AtomicLong timeStamps, AtomicInteger failures) {
            this.region = region;
            this.numOps = numOps;
            this.timeStamps = timeStamps;
            this.failures = failures;
        }
    }

    public static class Incrementer
    extends Thread {
        private final HRegion region;
        private final int numIncrements;
        private final int amount;

        public Incrementer(HRegion region, int threadNumber, int amount, int numIncrements) {
            super("Incrementer." + threadNumber);
            this.region = region;
            this.numIncrements = numIncrements;
            this.amount = amount;
            this.setDaemon(true);
        }

        @Override
        public void run() {
            for (int i = 0; i < this.numIncrements; ++i) {
                try {
                    Increment inc = new Increment(row);
                    inc.addColumn(HBaseTestingUtility.fam1, qual1, (long)this.amount);
                    inc.addColumn(HBaseTestingUtility.fam1, qual2, (long)(this.amount * 2));
                    inc.addColumn(HBaseTestingUtility.fam2, qual3, (long)(this.amount * 3));
                    inc.setDurability(Durability.ASYNC_WAL);
                    Result result = this.region.increment(inc);
                    Assert.assertEquals((long)(Bytes.toLong((byte[])result.getValue(HBaseTestingUtility.fam1, qual1)) * 2L), (long)Bytes.toLong((byte[])result.getValue(HBaseTestingUtility.fam1, qual2)));
                    long fam1Increment = Bytes.toLong((byte[])result.getValue(HBaseTestingUtility.fam1, qual1)) * 3L;
                    long fam2Increment = Bytes.toLong((byte[])result.getValue(HBaseTestingUtility.fam2, qual3));
                    Assert.assertEquals((String)("fam1=" + fam1Increment + ", fam2=" + fam2Increment), (long)fam1Increment, (long)fam2Increment);
                    continue;
                }
                catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

