/*
 * Decompiled with CFR 0.152.
 */
package com.twitter.elephantbird.mapreduce.io;

import com.twitter.data.proto.BlockStorage;
import com.twitter.elephantbird.mapreduce.io.BinaryConverter;
import com.twitter.elephantbird.mapreduce.io.BinaryWritable;
import com.twitter.elephantbird.util.Protobufs;
import com.twitter.elephantbird.util.StreamSearcher;
import java.io.IOException;
import java.io.InputStream;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.IOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class BinaryBlockReader<M> {
    private static final Logger LOG = LoggerFactory.getLogger(BinaryBlockReader.class);
    private InputStream in_;
    private final StreamSearcher searcher_;
    private final BinaryConverter<M> protoConverter_;
    private BlockStorage.SerializedBlock curBlock_;
    private int numLeftToReadThisBlock_ = 0;
    private boolean readNewBlocks_ = true;
    private boolean skipEmptyRecords = true;

    protected BinaryBlockReader(InputStream in, BinaryConverter<M> protoConverter) {
        this(in, protoConverter, true);
    }

    protected BinaryBlockReader(InputStream in, BinaryConverter<M> protoConverter, boolean skipEmptyRecords) {
        this.in_ = in;
        this.protoConverter_ = protoConverter;
        this.searcher_ = new StreamSearcher(Protobufs.KNOWN_GOOD_POSITION_MARKER);
        this.skipEmptyRecords = skipEmptyRecords;
    }

    public void close() throws IOException {
        if (this.in_ != null) {
            this.in_.close();
        }
    }

    public void setInputStream(InputStream in) {
        this.in_ = in;
    }

    public M readNext() throws IOException {
        byte[] blob = this.readNextProtoBytes();
        return blob == null ? null : (M)this.protoConverter_.fromBytes(blob);
    }

    public boolean readNext(BinaryWritable<M> writable) throws IOException {
        byte[] blob = this.readNextProtoBytes();
        if (blob != null) {
            writable.set(this.protoConverter_.fromBytes(blob));
            return true;
        }
        return false;
    }

    public byte[] readNextProtoBytes() throws IOException {
        int blobIndex;
        byte[] blob;
        do {
            if (!this.setupNewBlockIfNeeded()) {
                return null;
            }
            blobIndex = this.curBlock_.getProtoBlobsCount() - this.numLeftToReadThisBlock_;
            --this.numLeftToReadThisBlock_;
        } while ((blob = this.curBlock_.getProtoBlobs(blobIndex).toByteArray()).length == 0 && this.skipEmptyRecords);
        return blob;
    }

    public boolean readNextProtoBytes(BytesWritable writable) throws IOException {
        byte[] blob = this.readNextProtoBytes();
        if (blob != null) {
            writable.set(blob, 0, blob.length);
            return true;
        }
        return false;
    }

    public void markNoMoreNewBlocks() {
        this.readNewBlocks_ = false;
    }

    public boolean skipToNextSyncPoint() throws IOException {
        return this.searcher_.search(this.in_);
    }

    public BlockStorage.SerializedBlock parseNextBlock() throws IOException {
        LOG.debug("BlockReader: none left to read, skipping to sync point");
        if (!this.skipToNextSyncPoint()) {
            LOG.debug("BlockReader: SYNC point eof");
            return null;
        }
        int blockSize = this.readInt();
        LOG.debug("BlockReader: found sync point, next block has size " + blockSize);
        if (blockSize < 0) {
            LOG.debug("ProtobufReader: reading size after sync point eof");
            return null;
        }
        byte[] byteArray = new byte[blockSize];
        IOUtils.readFully((InputStream)this.in_, (byte[])byteArray, (int)0, (int)blockSize);
        BlockStorage.SerializedBlock block = BlockStorage.SerializedBlock.parseFrom(byteArray);
        this.numLeftToReadThisBlock_ = block.getProtoBlobsCount();
        LOG.debug("ProtobufReader: number in next block is " + this.numLeftToReadThisBlock_);
        return block;
    }

    private boolean setupNewBlockIfNeeded() throws IOException {
        if (this.numLeftToReadThisBlock_ == 0) {
            if (!this.readNewBlocks_) {
                return false;
            }
            this.curBlock_ = this.parseNextBlock();
            if (this.curBlock_ == null) {
                return false;
            }
        }
        return true;
    }

    private int readInt() throws IOException {
        int b = this.in_.read();
        if (b == -1) {
            return -1;
        }
        return b | this.in_.read() << 8 | this.in_.read() << 16 | this.in_.read() << 24;
    }
}

