/*
 * Decompiled with CFR 0.152.
 */
package com.google.cloud.bigquery.connector.common;

import com.google.cloud.bigquery.connector.common.BigQueryStorageReadRowsTracer;
import com.google.common.base.Preconditions;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.arrow.vector.VectorLoader;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.VectorUnloader;
import org.apache.arrow.vector.ipc.ArrowReader;
import org.apache.arrow.vector.ipc.message.ArrowRecordBatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ParallelArrowReader
implements AutoCloseable {
    private static final Logger log = LoggerFactory.getLogger(ParallelArrowReader.class);
    private static final Object DONE_SENTINEL = new Object();
    private final BlockingQueue<Object> queue;
    private final Semaphore queueSemaphore;
    private final List<ArrowReader> readers;
    private final ExecutorService executor;
    private final VectorLoader loader;
    private final BigQueryStorageReadRowsTracer rootTracer;
    private final BigQueryStorageReadRowsTracer[] tracers;
    private final AtomicInteger readersReady;
    private Thread readerThread;

    public ParallelArrowReader(List<ArrowReader> readers, ExecutorService executor, VectorLoader loader, BigQueryStorageReadRowsTracer tracer) {
        this.readers = readers;
        this.queue = new ArrayBlockingQueue<Object>(readers.size() + 2);
        this.executor = executor;
        this.loader = loader;
        this.rootTracer = tracer;
        this.queueSemaphore = new Semaphore(readers.size());
        this.readersReady = new AtomicInteger(readers.size());
        this.tracers = new BigQueryStorageReadRowsTracer[readers.size()];
        for (int x = 0; x < readers.size(); ++x) {
            this.tracers[x] = this.rootTracer.forkWithPrefix("reader-thread-" + x);
        }
        this.start();
    }

    public boolean next() throws IOException {
        this.rootTracer.nextBatchNeeded();
        this.rootTracer.readRowsResponseRequested();
        ArrowRecordBatch resolvedBatch = null;
        try {
            Object nextObject = this.queue.take();
            this.queueSemaphore.release();
            if (nextObject == DONE_SENTINEL) {
                return false;
            }
            if (nextObject instanceof Throwable) {
                if (nextObject instanceof IOException) {
                    throw (IOException)nextObject;
                }
                throw new IOException((Throwable)nextObject);
            }
            Preconditions.checkState((boolean)(nextObject instanceof ArrowRecordBatch), (Object)"Expected future object");
            resolvedBatch = (ArrowRecordBatch)nextObject;
        }
        catch (InterruptedException e) {
            log.info("Interrupted when waiting for next batch.");
            return false;
        }
        this.rootTracer.readRowsResponseObtained(0L);
        if (resolvedBatch != null) {
            this.rootTracer.rowsParseStarted();
            this.loader.load(resolvedBatch);
            this.rootTracer.rowsParseFinished(resolvedBatch.getLength());
            resolvedBatch.close();
            return true;
        }
        return false;
    }

    private void start() {
        this.readerThread = new Thread(this::consumeReaders);
        this.readerThread.setDaemon(true);
        this.readerThread.start();
        this.rootTracer.startStream();
    }

    private void consumeReaders() {
        try {
            AtomicBoolean[] hasData = new AtomicBoolean[this.readers.size()];
            long[] lastBytesRead = new long[this.readers.size()];
            VectorUnloader[] unloader = new VectorUnloader[this.readers.size()];
            VectorSchemaRoot[] roots = new VectorSchemaRoot[this.readers.size()];
            for (int x = 0; x < hasData.length; ++x) {
                hasData[x] = new AtomicBoolean();
                hasData[x].set(true);
                lastBytesRead[x] = 0L;
                roots[x] = this.readers.get(x).getVectorSchemaRoot();
                unloader[x] = new VectorUnloader(roots[x], true, false);
                this.tracers[x].startStream();
            }
            while (this.readersReady.get() > 0) {
                for (int readerIdx = 0; readerIdx < this.readers.size(); ++readerIdx) {
                    if (!hasData[readerIdx].get()) continue;
                    ArrowReader reader = this.readers.get(readerIdx);
                    int idx = readerIdx;
                    this.queueSemaphore.acquire();
                    this.executor.submit(() -> {
                        VectorSchemaRoot vectorSchemaRoot = roots[idx];
                        synchronized (vectorSchemaRoot) {
                            if (!hasData[idx].get()) {
                                return;
                            }
                            try {
                                this.tracers[idx].readRowsResponseRequested();
                                hasData[idx].set(reader.loadNextBatch());
                                if (!hasData[idx].get()) {
                                    this.queueSemaphore.release();
                                }
                                long incrementalBytesRead = reader.bytesRead() - lastBytesRead[idx];
                                this.tracers[idx].readRowsResponseObtained(incrementalBytesRead);
                                lastBytesRead[idx] = reader.bytesRead();
                            }
                            catch (Throwable e) {
                                log.info("Exception caught while consuming reader.", e);
                                hasData[idx].set(false);
                                this.readersReady.set(0);
                                Preconditions.checkState((boolean)this.queue.offer(e), (Object)"Expected space in queue");
                            }
                            ArrowRecordBatch batch = null;
                            if (!hasData[idx].get()) {
                                this.readersReady.addAndGet(-1);
                                return;
                            }
                            int rows = 0;
                            try {
                                rows = reader.getVectorSchemaRoot().getRowCount();
                            }
                            catch (IOException e) {
                                this.queue.offer(e);
                            }
                            this.tracers[idx].rowsParseStarted();
                            batch = unloader[idx].getRecordBatch();
                            this.tracers[idx].rowsParseFinished(rows);
                            try {
                                Preconditions.checkState((boolean)this.queue.offer(batch), (Object)"Expected space in queue");
                            }
                            catch (Exception e) {
                                batch.close();
                                throw e;
                            }
                        }
                    });
                }
            }
        }
        catch (Throwable e) {
            log.info("Read ahead caught exceptions", e);
            Preconditions.checkState((boolean)this.queue.offer(e), (Object)"Expected available capacity");
            return;
        }
        Preconditions.checkState((boolean)this.queue.offer(DONE_SENTINEL), (Object)"Expected available capacity");
    }

    @Override
    public void close() {
        this.rootTracer.finished();
        if (this.readerThread != null) {
            this.readersReady.set(0);
            this.readerThread.interrupt();
            try {
                this.readerThread.join(10000L);
            }
            catch (InterruptedException e) {
                log.info("Interrupted while waiting for reader thread to finish.");
            }
            if (this.readerThread.isAlive()) {
                log.warn("Reader thread did not shutdown in 10 seconds.");
            } else {
                log.info("Reader thread stopped.  Queue size: {}", (Object)this.queue.size());
            }
        }
        this.executor.shutdownNow();
        try {
            if (!this.executor.awaitTermination(10L, TimeUnit.SECONDS)) {
                log.warn("executor did not terminate after 10 seconds");
            }
        }
        catch (InterruptedException e) {
            log.info("Interrupted when awaiting executor termination");
        }
        this.queue.stream().filter(x -> x instanceof ArrowRecordBatch).map(x -> (ArrowRecordBatch)x).forEach(ArrowRecordBatch::close);
        for (BigQueryStorageReadRowsTracer tracer : this.tracers) {
            tracer.finished();
        }
        for (ArrowReader reader : this.readers) {
            try {
                reader.close(false);
            }
            catch (Exception e) {
                log.info("Trouble closing delegate readers", (Throwable)e);
            }
        }
    }
}

