/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hbase.replication.regionserver;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.OptionalLong;
import java.util.TreeMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL;
import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec;
import org.apache.hadoop.hbase.replication.WALEntryFilter;
import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationGlobalSourceSource;
import org.apache.hadoop.hbase.replication.regionserver.MetricsSource;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSource;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceLogQueue;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceWALReader;
import org.apache.hadoop.hbase.replication.regionserver.WALEntryBatch;
import org.apache.hadoop.hbase.replication.regionserver.WALEntryFilterRetryableException;
import org.apache.hadoop.hbase.replication.regionserver.WALEntryStream;
import org.apache.hadoop.hbase.replication.regionserver.WALEntryStreamTestBase;
import org.apache.hadoop.hbase.replication.regionserver.WALFileLengthProvider;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.hbase.wal.WALFactory;
import org.apache.hadoop.hbase.wal.WALKeyImpl;
import org.apache.hadoop.hbase.wal.WALProvider;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runners.Parameterized;
import org.mockito.Mockito;

public abstract class TestBasicWALEntryStream
extends WALEntryStreamTestBase {
    @Parameterized.Parameter
    public boolean isCompressionEnabled;

    @Parameterized.Parameters(name="{index}: isCompressionEnabled={0}")
    public static Iterable<Object[]> data() {
        return Arrays.asList({false}, {true});
    }

    @Before
    public void setUp() throws Exception {
        CONF.setBoolean("hbase.regionserver.wal.enablecompression", this.isCompressionEnabled);
        this.initWAL();
    }

    private WAL.Entry next(WALEntryStream entryStream) {
        Assert.assertEquals((Object)WALEntryStream.HasNext.YES, (Object)entryStream.hasNext());
        return entryStream.next();
    }

    @Test
    public void testAppendsWithRolls() throws Exception {
        long oldPos;
        WAL.Entry entry;
        this.appendToLogAndSync();
        try (WALEntryStream entryStream = new WALEntryStream(this.logQueue, (FileSystem)fs, CONF, 0L, (WALFileLengthProvider)this.log, new MetricsSource("1"), "fake-wal-group-id");){
            Assert.assertEquals((Object)WALEntryStream.HasNext.YES, (Object)entryStream.hasNext());
            entry = entryStream.peek();
            Assert.assertSame((Object)entry, (Object)entryStream.next());
            Assert.assertNotNull((Object)entry);
            Assert.assertEquals((Object)WALEntryStream.HasNext.RETRY, (Object)entryStream.hasNext());
            Assert.assertNull((Object)entryStream.peek());
            Assert.assertThrows(IllegalStateException.class, () -> entryStream.next());
            oldPos = entryStream.getPosition();
        }
        this.appendToLogAndSync();
        entryStream = new WALEntryStreamTestBase.WALEntryStreamWithRetries(this.logQueue, (FileSystem)fs, CONF, oldPos, (WALFileLengthProvider)this.log, new MetricsSource("1"), "fake-wal-group-id");
        var4_2 = null;
        try {
            entry = this.next(entryStream);
            Assert.assertNotEquals((long)oldPos, (long)entryStream.getPosition());
            Assert.assertNotNull((Object)entry);
            oldPos = entryStream.getPosition();
        }
        catch (Throwable entry2) {
            var4_2 = entry2;
            throw entry2;
        }
        finally {
            if (entryStream != null) {
                if (var4_2 != null) {
                    try {
                        entryStream.close();
                    }
                    catch (Throwable entry2) {
                        var4_2.addSuppressed(entry2);
                    }
                } else {
                    entryStream.close();
                }
            }
        }
        this.appendToLogAndSync();
        this.log.rollWriter();
        this.appendToLogAndSync();
        entryStream = new WALEntryStreamTestBase.WALEntryStreamWithRetries(this.logQueue, (FileSystem)fs, CONF, oldPos, (WALFileLengthProvider)this.log, new MetricsSource("1"), "fake-wal-group-id");
        var4_2 = null;
        try {
            entry = this.next(entryStream);
            Assert.assertNotEquals((long)oldPos, (long)entryStream.getPosition());
            Assert.assertNotNull((Object)entry);
            entry = this.next(entryStream);
            Assert.assertNotEquals((long)oldPos, (long)entryStream.getPosition());
            Assert.assertNotNull((Object)entry);
            entryStream.disableRetry();
            Assert.assertEquals((Object)WALEntryStream.HasNext.RETRY, (Object)entryStream.hasNext());
            oldPos = entryStream.getPosition();
        }
        catch (Throwable throwable) {
            var4_2 = throwable;
            throw throwable;
        }
        finally {
            if (entryStream != null) {
                if (var4_2 != null) {
                    try {
                        entryStream.close();
                    }
                    catch (Throwable throwable) {
                        var4_2.addSuppressed(throwable);
                    }
                } else {
                    entryStream.close();
                }
            }
        }
    }

    @Test
    public void testLogRollWhileStreaming() throws Exception {
        this.appendToLog("1");
        this.appendToLog("2");
        try (WALEntryStreamTestBase.WALEntryStreamWithRetries entryStream = new WALEntryStreamTestBase.WALEntryStreamWithRetries(this.logQueue, (FileSystem)fs, CONF, 0L, (WALFileLengthProvider)this.log, new MetricsSource("1"), "fake-wal-group-id");){
            Assert.assertEquals((Object)"1", (Object)this.getRow(this.next(entryStream)));
            this.appendToLog("3");
            this.log.rollWriter();
            this.appendToLog("4");
            Assert.assertEquals((Object)"2", (Object)this.getRow(this.next(entryStream)));
            Assert.assertEquals((long)2L, (long)this.getQueue().size());
            Assert.assertEquals((Object)"3", (Object)this.getRow(this.next(entryStream)));
            Assert.assertEquals((Object)"4", (Object)this.getRow(this.next(entryStream)));
            Assert.assertEquals((long)1L, (long)this.getQueue().size());
            entryStream.disableRetry();
            Assert.assertEquals((Object)WALEntryStream.HasNext.RETRY, (Object)entryStream.hasNext());
        }
    }

    @Test
    public void testNewEntriesWhileStreaming() throws Exception {
        this.appendToLog("1");
        try (WALEntryStream entryStream = new WALEntryStream(this.logQueue, (FileSystem)fs, CONF, 0L, (WALFileLengthProvider)this.log, new MetricsSource("1"), "fake-wal-group-id");){
            Assert.assertNotNull((Object)this.next(entryStream));
            this.appendToLog("2");
            this.appendToLog("3");
            Assert.assertEquals((Object)WALEntryStream.HasNext.RETRY, (Object)entryStream.hasNext());
            Assert.assertEquals((Object)"2", (Object)this.getRow(this.next(entryStream)));
            Assert.assertEquals((Object)"3", (Object)this.getRow(this.next(entryStream)));
            Assert.assertEquals((Object)WALEntryStream.HasNext.RETRY, (Object)entryStream.hasNext());
        }
    }

    @Test
    public void testResumeStreamingFromPosition() throws Exception {
        long lastPosition = 0L;
        this.appendToLog("1");
        try (WALEntryStream entryStream = new WALEntryStream(this.logQueue, (FileSystem)fs, CONF, 0L, (WALFileLengthProvider)this.log, new MetricsSource("1"), "fake-wal-group-id");){
            Assert.assertNotNull((Object)this.next(entryStream));
            this.appendToLog("2");
            this.appendToLog("3");
            lastPosition = entryStream.getPosition();
        }
        entryStream = new WALEntryStream(this.logQueue, (FileSystem)fs, CONF, lastPosition, (WALFileLengthProvider)this.log, new MetricsSource("1"), "fake-wal-group-id");
        var4_3 = null;
        try {
            Assert.assertEquals((Object)"2", (Object)this.getRow(this.next(entryStream)));
            Assert.assertEquals((Object)"3", (Object)this.getRow(this.next(entryStream)));
            Assert.assertEquals((Object)WALEntryStream.HasNext.RETRY, (Object)entryStream.hasNext());
            Assert.assertEquals((long)1L, (long)this.getQueue().size());
        }
        catch (Throwable throwable) {
            var4_3 = throwable;
            throw throwable;
        }
        finally {
            if (entryStream != null) {
                if (var4_3 != null) {
                    try {
                        entryStream.close();
                    }
                    catch (Throwable throwable) {
                        var4_3.addSuppressed(throwable);
                    }
                } else {
                    entryStream.close();
                }
            }
        }
    }

    @Test
    public void testPosition() throws Exception {
        long lastPosition = 0L;
        this.appendEntriesToLogAndSync(3);
        try (WALEntryStream entryStream = new WALEntryStream(this.logQueue, (FileSystem)fs, CONF, lastPosition, (WALFileLengthProvider)this.log, new MetricsSource("1"), "fake-wal-group-id");){
            Assert.assertNotNull((Object)this.next(entryStream));
            lastPosition = entryStream.getPosition();
        }
        entryStream = new WALEntryStream(this.logQueue, (FileSystem)fs, CONF, lastPosition, (WALFileLengthProvider)this.log, new MetricsSource("1"), "fake-wal-group-id");
        var4_3 = null;
        try {
            Assert.assertNotNull((Object)this.next(entryStream));
            Assert.assertNotNull((Object)this.next(entryStream));
            Assert.assertEquals((Object)WALEntryStream.HasNext.RETRY, (Object)entryStream.hasNext());
        }
        catch (Throwable throwable) {
            var4_3 = throwable;
            throw throwable;
        }
        finally {
            if (entryStream != null) {
                if (var4_3 != null) {
                    try {
                        entryStream.close();
                    }
                    catch (Throwable throwable) {
                        var4_3.addSuppressed(throwable);
                    }
                } else {
                    entryStream.close();
                }
            }
        }
    }

    @Test
    public void testEmptyStream() throws Exception {
        try (WALEntryStream entryStream = new WALEntryStream(this.logQueue, (FileSystem)fs, CONF, 0L, (WALFileLengthProvider)this.log, new MetricsSource("1"), "fake-wal-group-id");){
            Assert.assertEquals((Object)WALEntryStream.HasNext.RETRY, (Object)entryStream.hasNext());
        }
    }

    @Test
    public void testWALKeySerialization() throws Exception {
        HashMap<String, byte[]> attributes = new HashMap<String, byte[]>();
        attributes.put("foo", Bytes.toBytes((String)"foo-value"));
        attributes.put("bar", Bytes.toBytes((String)"bar-value"));
        WALKeyImpl key = new WALKeyImpl(info.getEncodedNameAsBytes(), tableName, EnvironmentEdgeManager.currentTime(), new ArrayList(), 0L, 0L, this.mvcc, scopes, attributes);
        Assert.assertEquals(attributes, (Object)key.getExtendedAttributes());
        WALProtos.WALKey.Builder builder = key.getBuilder(WALCellCodec.getNoneCompressor());
        WALProtos.WALKey serializedKey = builder.build();
        WALKeyImpl deserializedKey = new WALKeyImpl();
        deserializedKey.readFieldsFromPb(serializedKey, WALCellCodec.getNoneUncompressor());
        Assert.assertEquals((Object)key, (Object)deserializedKey);
        Assert.assertEquals(key.getExtendedAttributes().keySet(), deserializedKey.getExtendedAttributes().keySet());
        for (Map.Entry entry : deserializedKey.getExtendedAttributes().entrySet()) {
            Assert.assertArrayEquals((byte[])key.getExtendedAttribute((String)entry.getKey()), (byte[])((byte[])entry.getValue()));
        }
        Assert.assertEquals((Object)key.getReplicationScopes(), (Object)deserializedKey.getReplicationScopes());
    }

    private ReplicationSource mockReplicationSource(boolean recovered, Configuration conf) throws IOException {
        ReplicationSourceManager mockSourceManager = new ReplicationSourceManager(null, null, conf, null, null, null, null, null, null, this.createMockGlobalMetrics());
        Server mockServer = (Server)Mockito.mock(Server.class);
        ReplicationSource source = (ReplicationSource)Mockito.mock(ReplicationSource.class);
        Mockito.when((Object)source.getSourceManager()).thenReturn((Object)mockSourceManager);
        Mockito.when((Object)source.getSourceMetrics()).thenReturn((Object)new MetricsSource("1"));
        Mockito.when((Object)source.getWALFileLengthProvider()).thenReturn((Object)this.log);
        Mockito.when((Object)source.getServer()).thenReturn((Object)mockServer);
        Mockito.when((Object)source.isRecovered()).thenReturn((Object)recovered);
        return source;
    }

    private MetricsReplicationGlobalSourceSource createMockGlobalMetrics() {
        MetricsReplicationGlobalSourceSource globalMetrics = (MetricsReplicationGlobalSourceSource)Mockito.mock(MetricsReplicationGlobalSourceSource.class);
        AtomicLong bufferUsedCounter = new AtomicLong(0L);
        ((MetricsReplicationGlobalSourceSource)Mockito.doAnswer(invocationOnMock -> {
            bufferUsedCounter.set((Long)invocationOnMock.getArgument(0, Long.class));
            return null;
        }).when((Object)globalMetrics)).setWALReaderEditsBufferBytes(Mockito.anyLong());
        Mockito.when((Object)globalMetrics.getWALReaderEditsBufferBytes()).then(invocationOnMock -> bufferUsedCounter.get());
        return globalMetrics;
    }

    private ReplicationSourceWALReader createReader(boolean recovered, Configuration conf) throws IOException {
        ReplicationSource source = this.mockReplicationSource(recovered, conf);
        Mockito.when((Object)source.isPeerEnabled()).thenReturn((Object)true);
        ReplicationSourceWALReader reader = new ReplicationSourceWALReader((FileSystem)fs, conf, this.logQueue, 0L, this.getDummyFilter(), source, "fake-wal-group-id");
        reader.start();
        return reader;
    }

    private ReplicationSourceWALReader createReaderWithBadReplicationFilter(int numFailures, Configuration conf) throws IOException {
        ReplicationSource source = this.mockReplicationSource(false, conf);
        Mockito.when((Object)source.isPeerEnabled()).thenReturn((Object)true);
        ReplicationSourceWALReader reader = new ReplicationSourceWALReader((FileSystem)fs, conf, this.logQueue, 0L, this.getIntermittentFailingFilter(numFailures), source, "fake-wal-group-id");
        reader.start();
        return reader;
    }

    @Test
    public void testReplicationSourceWALReader() throws Exception {
        long position;
        this.appendEntriesToLogAndSync(3);
        try (WALEntryStream entryStream = new WALEntryStream(this.logQueue, (FileSystem)fs, CONF, 0L, (WALFileLengthProvider)this.log, new MetricsSource("1"), "fake-wal-group-id");){
            for (int i = 0; i < 3; ++i) {
                Assert.assertNotNull((Object)this.next(entryStream));
            }
            position = entryStream.getPosition();
        }
        Path walPath = this.getQueue().peek();
        ReplicationSourceWALReader reader = this.createReader(false, CONF);
        WALEntryBatch entryBatch = reader.take();
        Assert.assertNotNull((Object)entryBatch);
        Assert.assertEquals((long)3L, (long)entryBatch.getWalEntries().size());
        Assert.assertEquals((long)position, (long)entryBatch.getLastWalPosition());
        Assert.assertEquals((Object)walPath, (Object)entryBatch.getLastWalPath());
        Assert.assertEquals((long)3L, (long)entryBatch.getNbRowKeys());
        this.appendToLog("foo");
        entryBatch = reader.take();
        Assert.assertEquals((long)1L, (long)entryBatch.getNbEntries());
        Assert.assertEquals((Object)"foo", (Object)this.getRow((WAL.Entry)entryBatch.getWalEntries().get(0)));
    }

    @Test
    public void testReplicationSourceWALReaderWithFailingFilter() throws Exception {
        long position;
        this.appendEntriesToLogAndSync(3);
        try (WALEntryStream entryStream = new WALEntryStream(this.logQueue, (FileSystem)fs, CONF, 0L, (WALFileLengthProvider)this.log, new MetricsSource("1"), "fake-wal-group-id");){
            for (int i = 0; i < 3; ++i) {
                Assert.assertNotNull((Object)this.next(entryStream));
            }
            position = entryStream.getPosition();
        }
        Path walPath = this.getQueue().peek();
        int numFailuresInFilter = 5;
        ReplicationSourceWALReader reader = this.createReaderWithBadReplicationFilter(numFailuresInFilter, CONF);
        WALEntryBatch entryBatch = reader.take();
        Assert.assertEquals((long)numFailuresInFilter, (long)FailingWALEntryFilter.numFailures());
        Assert.assertNotNull((Object)entryBatch);
        Assert.assertEquals((long)3L, (long)entryBatch.getWalEntries().size());
        Assert.assertEquals((long)position, (long)entryBatch.getLastWalPosition());
        Assert.assertEquals((Object)walPath, (Object)entryBatch.getLastWalPath());
        Assert.assertEquals((long)3L, (long)entryBatch.getNbRowKeys());
    }

    @Test
    public void testReplicationSourceWALReaderRecovered() throws Exception {
        this.appendEntriesToLogAndSync(10);
        Path walPath = this.getQueue().peek();
        this.log.rollWriter();
        this.appendEntriesToLogAndSync(5);
        this.log.shutdown();
        Configuration conf = new Configuration(CONF);
        conf.setInt("replication.source.nb.capacity", 10);
        ReplicationSourceWALReader reader = this.createReader(true, conf);
        WALEntryBatch batch = reader.take();
        Assert.assertEquals((Object)walPath, (Object)batch.getLastWalPath());
        Assert.assertEquals((long)10L, (long)batch.getNbEntries());
        Assert.assertFalse((boolean)batch.isEndOfFile());
        batch = reader.take();
        Assert.assertEquals((Object)walPath, (Object)batch.getLastWalPath());
        Assert.assertEquals((long)0L, (long)batch.getNbEntries());
        Assert.assertTrue((boolean)batch.isEndOfFile());
        walPath = this.getQueue().peek();
        batch = reader.take();
        Assert.assertEquals((Object)walPath, (Object)batch.getLastWalPath());
        Assert.assertEquals((long)5L, (long)batch.getNbEntries());
        Assert.assertTrue((boolean)batch.isEndOfFile());
        Assert.assertSame((Object)WALEntryBatch.NO_MORE_DATA, (Object)reader.take());
    }

    @Test
    public void testReplicationSourceWALReaderWrongPosition() throws Exception {
        this.appendEntriesToLogAndSync(1);
        final Path walPath = this.getQueue().peek();
        this.log.rollWriter();
        this.appendEntriesToLogAndSync(20);
        TEST_UTIL.waitFor(5000L, (Waiter.Predicate)new Waiter.ExplainingPredicate<Exception>(){

            public boolean evaluate() throws Exception {
                return WALEntryStreamTestBase.fs.getFileStatus(walPath).getLen() > 0L && ((AbstractFSWAL)TestBasicWALEntryStream.this.log).getInflightWALCloseCount() == 0;
            }

            public String explainFailure() throws Exception {
                return walPath + " has not been closed yet";
            }
        });
        ReplicationSourceWALReader reader = this.createReader(false, CONF);
        WALEntryBatch entryBatch = reader.take();
        Assert.assertEquals((Object)walPath, (Object)entryBatch.getLastWalPath());
        long walLength = fs.getFileStatus(walPath).getLen();
        Assert.assertTrue((String)("Position " + entryBatch.getLastWalPosition() + " is out of range, file length is " + walLength), (entryBatch.getLastWalPosition() <= walLength ? 1 : 0) != 0);
        Assert.assertEquals((long)1L, (long)entryBatch.getNbEntries());
        Assert.assertTrue((boolean)entryBatch.isEndOfFile());
        Path walPath2 = this.getQueue().peek();
        entryBatch = reader.take();
        Assert.assertEquals((Object)walPath2, (Object)entryBatch.getLastWalPath());
        Assert.assertEquals((long)20L, (long)entryBatch.getNbEntries());
        Assert.assertFalse((boolean)entryBatch.isEndOfFile());
        this.log.rollWriter();
        this.appendEntriesToLogAndSync(10);
        entryBatch = reader.take();
        Assert.assertEquals((Object)walPath2, (Object)entryBatch.getLastWalPath());
        Assert.assertEquals((long)0L, (long)entryBatch.getNbEntries());
        Assert.assertTrue((boolean)entryBatch.isEndOfFile());
        Path walPath3 = this.getQueue().peek();
        entryBatch = reader.take();
        Assert.assertEquals((Object)walPath3, (Object)entryBatch.getLastWalPath());
        Assert.assertEquals((long)10L, (long)entryBatch.getNbEntries());
        Assert.assertFalse((boolean)entryBatch.isEndOfFile());
    }

    @Test
    public void testReplicationSourceWALReaderDisabled() throws IOException, InterruptedException, ExecutionException {
        long position;
        this.appendEntriesToLogAndSync(3);
        try (WALEntryStream entryStream = new WALEntryStream(this.logQueue, (FileSystem)fs, CONF, 0L, (WALFileLengthProvider)this.log, new MetricsSource("1"), "fake-wal-group-id");){
            for (int i2 = 0; i2 < 3; ++i2) {
                Assert.assertNotNull((Object)this.next(entryStream));
            }
            position = entryStream.getPosition();
        }
        Path walPath = this.getQueue().peek();
        ReplicationSource source = this.mockReplicationSource(false, CONF);
        AtomicInteger invokeCount = new AtomicInteger(0);
        AtomicBoolean enabled = new AtomicBoolean(false);
        Mockito.when((Object)source.isPeerEnabled()).then(i -> {
            invokeCount.incrementAndGet();
            return enabled.get();
        });
        ReplicationSourceWALReader reader = new ReplicationSourceWALReader((FileSystem)fs, CONF, this.logQueue, 0L, this.getDummyFilter(), source, "fake-wal-group-id");
        reader.start();
        Future future = ForkJoinPool.commonPool().submit(() -> reader.take());
        TEST_UTIL.waitFor(30000L, () -> invokeCount.get() >= 5);
        Assert.assertFalse((boolean)future.isDone());
        enabled.set(true);
        WALEntryBatch entryBatch = (WALEntryBatch)future.get();
        Assert.assertNotNull((Object)entryBatch);
        Assert.assertEquals((long)3L, (long)entryBatch.getWalEntries().size());
        Assert.assertEquals((long)position, (long)entryBatch.getLastWalPosition());
        Assert.assertEquals((Object)walPath, (Object)entryBatch.getLastWalPath());
        Assert.assertEquals((long)3L, (long)entryBatch.getNbRowKeys());
    }

    private String getRow(WAL.Entry entry) {
        Cell cell = (Cell)entry.getEdit().getCells().get(0);
        return Bytes.toString((byte[])cell.getRowArray(), (int)cell.getRowOffset(), (int)cell.getRowLength());
    }

    private void appendToLog(String key) throws IOException {
        long txid = this.log.appendData(info, new WALKeyImpl(info.getEncodedNameAsBytes(), tableName, EnvironmentEdgeManager.currentTime(), this.mvcc, scopes), this.getWALEdit(key));
        this.log.sync(txid);
    }

    private void appendEntriesToLogAndSync(int count) throws IOException {
        long txid = -1L;
        for (int i = 0; i < count; ++i) {
            txid = this.appendToLog(1);
        }
        this.log.sync(txid);
    }

    private WALEdit getWALEdit(String row) {
        WALEdit edit = new WALEdit();
        edit.add((Cell)new KeyValue(Bytes.toBytes((String)row), family, qualifier, EnvironmentEdgeManager.currentTime(), qualifier));
        return edit;
    }

    private WALEntryFilter getDummyFilter() {
        return new WALEntryFilter(){

            public WAL.Entry filter(WAL.Entry entry) {
                return entry;
            }
        };
    }

    private WALEntryFilter getIntermittentFailingFilter(int numFailuresInFilter) {
        return new FailingWALEntryFilter(numFailuresInFilter);
    }

    @Test
    public void testReadBeyondCommittedLength() throws IOException, InterruptedException {
        this.appendToLog("1");
        this.appendToLog("2");
        long size = this.log.getLogFileSizeIfBeingWritten(this.getQueue().peek()).getAsLong();
        AtomicLong fileLength = new AtomicLong(size - 1L);
        try (WALEntryStream entryStream = new WALEntryStream(this.logQueue, (FileSystem)fs, CONF, 0L, p -> OptionalLong.of(fileLength.get()), new MetricsSource("1"), "fake-wal-group-id");){
            Assert.assertNotNull((Object)this.next(entryStream));
            Assert.assertEquals((Object)WALEntryStream.HasNext.RETRY, (Object)entryStream.hasNext());
            Thread.sleep(1000L);
            Assert.assertEquals((Object)WALEntryStream.HasNext.RETRY, (Object)entryStream.hasNext());
            fileLength.set(size);
            Assert.assertNotNull((Object)this.next(entryStream));
            Assert.assertEquals((Object)WALEntryStream.HasNext.RETRY, (Object)entryStream.hasNext());
        }
    }

    @Test
    public void testEOFExceptionForRecoveredQueue() throws Exception {
        Path emptyLog = new Path("emptyLog");
        FSDataOutputStream fsdos = fs.create(emptyLog);
        fsdos.close();
        Assert.assertEquals((long)0L, (long)fs.getFileStatus(emptyLog).getLen());
        Configuration conf = new Configuration(CONF);
        conf.setInt("replication.source.maxretriesmultiplier", 1);
        conf.setBoolean("replication.source.eof.autorecovery", true);
        conf.setInt("replication.source.nb.batches", 10);
        ReplicationSource source = this.mockReplicationSource(true, conf);
        Mockito.when((Object)source.isPeerEnabled()).thenReturn((Object)true);
        MetricsSource metrics = (MetricsSource)Mockito.mock(MetricsSource.class);
        ((MetricsSource)Mockito.doNothing().when((Object)metrics)).incrSizeOfLogQueue();
        ((MetricsSource)Mockito.doNothing().when((Object)metrics)).decrSizeOfLogQueue();
        ReplicationSourceLogQueue localLogQueue = new ReplicationSourceLogQueue(conf, metrics, source);
        localLogQueue.enqueueLog(emptyLog, "fake-wal-group-id");
        ReplicationSourceWALReader reader = new ReplicationSourceWALReader((FileSystem)fs, conf, localLogQueue, 0L, this.getDummyFilter(), source, "fake-wal-group-id");
        reader.start();
        reader.join();
        Assert.assertEquals((long)0L, (long)localLogQueue.getQueueSize("fake-wal-group-id"));
    }

    @Test
    public void testEOFExceptionForRecoveredQueueWithMultipleLogs() throws Exception {
        Configuration conf = new Configuration(CONF);
        MetricsSource metrics = (MetricsSource)Mockito.mock(MetricsSource.class);
        ReplicationSource source = this.mockReplicationSource(true, conf);
        ReplicationSourceLogQueue localLogQueue = new ReplicationSourceLogQueue(conf, metrics, source);
        Path emptyLog = new Path(fs.getHomeDirectory(), "log.2." + this.isCompressionEnabled);
        fs.create(emptyLog).close();
        Assert.assertEquals((long)0L, (long)fs.getFileStatus(emptyLog).getLen());
        localLogQueue.enqueueLog(emptyLog, "fake-wal-group-id");
        Path log1 = new Path(fs.getHomeDirectory(), "log.1." + this.isCompressionEnabled);
        WALProvider.Writer writer1 = WALFactory.createWALWriter((FileSystem)fs, (Path)log1, (Configuration)TEST_UTIL.getConfiguration());
        this.appendEntries(writer1, 3);
        localLogQueue.enqueueLog(log1, "fake-wal-group-id");
        Mockito.when((Object)source.isPeerEnabled()).thenReturn((Object)true);
        conf.setInt("replication.source.maxretriesmultiplier", 1);
        conf.setBoolean("replication.source.eof.autorecovery", true);
        conf.setInt("replication.source.nb.batches", 10);
        ReplicationSourceWALReader reader = new ReplicationSourceWALReader((FileSystem)fs, conf, localLogQueue, 0L, this.getDummyFilter(), source, "fake-wal-group-id");
        Assert.assertEquals((String)"Initial log queue size is not correct", (long)2L, (long)localLogQueue.getQueueSize("fake-wal-group-id"));
        reader.start();
        reader.join();
        Assert.assertEquals((long)0L, (long)localLogQueue.getQueueSize("fake-wal-group-id"));
        Assert.assertEquals((String)"Log queue should be empty", (long)0L, (long)localLogQueue.getQueueSize("fake-wal-group-id"));
    }

    private PriorityBlockingQueue<Path> getQueue() {
        return this.logQueue.getQueue("fake-wal-group-id");
    }

    private void appendEntries(WALProvider.Writer writer, int numEntries) throws IOException {
        for (int i = 0; i < numEntries; ++i) {
            byte[] b = Bytes.toBytes((String)Integer.toString(i));
            KeyValue kv = new KeyValue(b, b, b);
            WALEdit edit = new WALEdit();
            edit.add((Cell)kv);
            WALKeyImpl key = new WALKeyImpl(b, TableName.valueOf((byte[])b), 0L, 0L, HConstants.DEFAULT_CLUSTER_ID);
            TreeMap<byte[], Integer> scopes = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
            scopes.put(b, 1);
            writer.append(new WAL.Entry(key, edit));
            writer.sync(false);
        }
        writer.close();
    }

    @Test
    public void testSizeOfLogQueue() throws Exception {
        Assert.assertEquals((long)1L, (long)this.logQueue.getMetrics().getSizeOfLogQueue());
        this.appendToLogAndSync();
        this.log.rollWriter();
        TEST_UTIL.waitFor(30000L, () -> fs.getClient().isFileClosed(((Path)this.logQueue.getQueue("fake-wal-group-id").peek()).makeQualified(fs.getUri(), fs.getWorkingDirectory()).toUri().getPath()));
        Assert.assertEquals((long)2L, (long)this.logQueue.getMetrics().getSizeOfLogQueue());
        try (WALEntryStream entryStream = new WALEntryStream(this.logQueue, (FileSystem)fs, CONF, 0L, (WALFileLengthProvider)this.log, this.logQueue.getMetrics(), "fake-wal-group-id");){
            Assert.assertNotNull((Object)this.next(entryStream));
            Assert.assertEquals((Object)WALEntryStream.HasNext.RETRY_IMMEDIATELY, (Object)entryStream.hasNext());
        }
        Assert.assertEquals((long)1L, (long)this.logQueue.getMetrics().getSizeOfLogQueue());
    }

    @Test
    public void testCleanClosedWALs() throws Exception {
        try (WALEntryStreamTestBase.WALEntryStreamWithRetries entryStream = new WALEntryStreamTestBase.WALEntryStreamWithRetries(this.logQueue, (FileSystem)fs, CONF, 0L, (WALFileLengthProvider)this.log, this.logQueue.getMetrics(), "fake-wal-group-id");){
            Assert.assertEquals((long)0L, (long)this.logQueue.getMetrics().getUncleanlyClosedWALs());
            this.appendToLogAndSync();
            Assert.assertNotNull((Object)this.next(entryStream));
            this.log.rollWriter();
            this.appendToLogAndSync();
            Assert.assertNotNull((Object)this.next(entryStream));
            Assert.assertEquals((long)0L, (long)this.logQueue.getMetrics().getUncleanlyClosedWALs());
        }
    }

    @Test
    public void testEOFExceptionInOldWALsDirectory() throws Exception {
        Assert.assertEquals((long)1L, (long)this.logQueue.getQueueSize("fake-wal-group-id"));
        AbstractFSWAL abstractWAL = (AbstractFSWAL)this.log;
        Path emptyLogFile = abstractWAL.getCurrentFileName();
        this.log.rollWriter(true);
        Waiter.waitFor((Configuration)CONF, (long)5000L, () -> abstractWAL.getInflightWALCloseCount() == 0);
        Assert.assertEquals((long)2L, (long)this.logQueue.getQueueSize("fake-wal-group-id"));
        Path archivePath = AbstractFSWALProvider.findArchivedLog((Path)emptyLogFile, (Configuration)CONF);
        Assert.assertNotNull((Object)archivePath);
        Assert.assertTrue((boolean)fs.exists(archivePath));
        fs.truncate(archivePath, 0L);
        Assert.assertEquals((long)0L, (long)fs.getFileStatus(archivePath).getLen());
        ReplicationSource source = (ReplicationSource)Mockito.mock(ReplicationSource.class);
        Mockito.when((Object)source.isPeerEnabled()).thenReturn((Object)true);
        Configuration localConf = new Configuration(CONF);
        localConf.setInt("replication.source.maxretriesmultiplier", 1);
        localConf.setBoolean("replication.source.eof.autorecovery", true);
        this.createReader(false, localConf);
        Waiter.waitFor((Configuration)localConf, (long)10000L, () -> this.logQueue.getQueueSize("fake-wal-group-id") == 1);
    }

    @Test
    public void testReplicationSourceWALReaderWithPartialWALEntryFailingFilter() throws Exception {
        long position;
        this.appendEntriesToLogAndSync(3);
        try (WALEntryStream entryStream = new WALEntryStream(this.logQueue, (FileSystem)fs, CONF, 0L, (WALFileLengthProvider)this.log, new MetricsSource("1"), "fake-wal-group-id");){
            for (int i = 0; i < 3; ++i) {
                Assert.assertNotNull((Object)this.next(entryStream));
            }
            position = entryStream.getPosition();
        }
        Path walPath = this.getQueue().peek();
        int maxThrowExceptionCount = 3;
        ReplicationSource source = this.mockReplicationSource(false, CONF);
        Mockito.when((Object)source.isPeerEnabled()).thenReturn((Object)true);
        PartialWALEntryFailingWALEntryFilter walEntryFilter = new PartialWALEntryFailingWALEntryFilter(maxThrowExceptionCount, 3);
        ReplicationSourceWALReader reader = new ReplicationSourceWALReader((FileSystem)fs, CONF, this.logQueue, 0L, (WALEntryFilter)walEntryFilter, source, "fake-wal-group-id");
        reader.start();
        WALEntryBatch entryBatch = reader.take();
        Assert.assertNotNull((Object)entryBatch);
        Assert.assertEquals((long)3L, (long)entryBatch.getWalEntries().size());
        long sum = entryBatch.getWalEntries().stream().mapToLong(WALEntryBatch::getEntrySizeExcludeBulkLoad).sum();
        Assert.assertEquals((long)position, (long)entryBatch.getLastWalPosition());
        Assert.assertEquals((Object)walPath, (Object)entryBatch.getLastWalPath());
        Assert.assertEquals((long)3L, (long)entryBatch.getNbRowKeys());
        Assert.assertEquals((long)sum, (long)source.getSourceManager().getTotalBufferUsed());
        Assert.assertEquals((long)sum, (long)source.getSourceManager().getGlobalMetrics().getWALReaderEditsBufferBytes());
        Assert.assertEquals((long)maxThrowExceptionCount, (long)walEntryFilter.getThrowExceptionCount());
        Assert.assertNull((Object)reader.poll(10L));
    }

    private static class PartialWALEntryFailingWALEntryFilter
    implements WALEntryFilter {
        private int filteredWALEntryCount = -1;
        private int walEntryCount = 0;
        private int throwExceptionCount = -1;
        private int maxThrowExceptionCount;

        public PartialWALEntryFailingWALEntryFilter(int throwExceptionLimit, int walEntryCount) {
            this.maxThrowExceptionCount = throwExceptionLimit;
            this.walEntryCount = walEntryCount;
        }

        public WAL.Entry filter(WAL.Entry entry) {
            ++this.filteredWALEntryCount;
            if (this.filteredWALEntryCount < this.walEntryCount - 1) {
                return entry;
            }
            this.filteredWALEntryCount = -1;
            ++this.throwExceptionCount;
            if (this.throwExceptionCount <= this.maxThrowExceptionCount - 1) {
                throw new WALEntryFilterRetryableException("failing filter");
            }
            return entry;
        }

        public int getThrowExceptionCount() {
            return this.throwExceptionCount;
        }
    }

    public static class FailingWALEntryFilter
    implements WALEntryFilter {
        private int numFailures = 0;
        private static int countFailures = 0;

        public FailingWALEntryFilter(int numFailuresInFilter) {
            this.numFailures = numFailuresInFilter;
        }

        public WAL.Entry filter(WAL.Entry entry) {
            if (countFailures == this.numFailures) {
                return entry;
            }
            ++countFailures;
            throw new WALEntryFilterRetryableException("failing filter");
        }

        public static int numFailures() {
            return countFailures;
        }
    }
}

