/*
 * Decompiled with CFR 0.152.
 */
package com.google.cloud.spark.bigquery.v2.context;

import com.google.cloud.bigquery.connector.common.ArrowUtil;
import com.google.cloud.bigquery.connector.common.BigQueryStorageReadRowsTracer;
import com.google.cloud.bigquery.connector.common.IteratorMultiplexer;
import com.google.cloud.bigquery.connector.common.NonInterruptibleBlockingBytesChannel;
import com.google.cloud.bigquery.connector.common.ParallelArrowReader;
import com.google.cloud.bigquery.connector.common.ReadRowsHelper;
import com.google.cloud.bigquery.connector.common.ReadRowsResponseInputStreamEnumeration;
import com.google.cloud.bigquery.storage.v1.ReadRowsResponse;
import com.google.cloud.spark.bigquery.ArrowSchemaConverter;
import com.google.cloud.spark.bigquery.v2.context.InputPartitionReaderContext;
import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.protobuf.ByteString;
import java.io.IOException;
import java.io.InputStream;
import java.io.SequenceInputStream;
import java.nio.channels.ReadableByteChannel;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Enumeration;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.arrow.compression.CommonsCompressionFactory;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.util.AutoCloseables;
import org.apache.arrow.vector.ValueVector;
import org.apache.arrow.vector.VectorLoader;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.compression.CompressionCodec;
import org.apache.arrow.vector.ipc.ArrowReader;
import org.apache.arrow.vector.ipc.ArrowStreamReader;
import org.apache.arrow.vector.types.pojo.Schema;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.vectorized.ColumnVector;
import org.apache.spark.sql.vectorized.ColumnarBatch;

public class ArrowColumnBatchPartitionReaderContext
implements InputPartitionReaderContext<ColumnarBatch> {
    private static final long maxAllocation = 524288000L;
    private final ReadRowsHelper readRowsHelper;
    private final ArrowReaderAdapter reader;
    private final BufferAllocator allocator;
    private final List<String> namesInOrder;
    private ColumnarBatch currentBatch;
    private final BigQueryStorageReadRowsTracer tracer;
    private boolean closed = false;
    private final Map<String, StructField> userProvidedFieldMap;
    private final List<AutoCloseable> closeables = new ArrayList<AutoCloseable>();

    ArrowColumnBatchPartitionReaderContext(Iterator<ReadRowsResponse> readRowsResponses, ByteString schema, ReadRowsHelper readRowsHelper, List<String> namesInOrder, BigQueryStorageReadRowsTracer tracer, Optional<StructType> userProvidedSchema, int numBackgroundThreads) {
        this.allocator = ArrowUtil.newRootAllocator((long)524288000L);
        this.readRowsHelper = readRowsHelper;
        this.namesInOrder = namesInOrder;
        this.tracer = tracer;
        this.closeables.add(null);
        List userProvidedFieldList = Arrays.stream(userProvidedSchema.orElse(new StructType()).fields()).collect(Collectors.toList());
        this.userProvidedFieldMap = userProvidedFieldList.stream().collect(Collectors.toMap(StructField::name, field -> field));
        if (numBackgroundThreads == 1) {
            InputStream fullStream = this.makeSingleInputStream(readRowsResponses, schema, tracer);
            this.reader = new ParallelReaderAdapter(this.allocator, (List<ArrowReader>)ImmutableList.of((Object)this.newArrowStreamReader(fullStream)), (ExecutorService)MoreExecutors.newDirectExecutorService(), tracer.forkWithPrefix("BackgroundReader"), null);
        } else if (numBackgroundThreads > 1) {
            ThreadPoolExecutor backgroundParsingService = new ThreadPoolExecutor(1, numBackgroundThreads - 1, 2L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), new ThreadPoolExecutor.CallerRunsPolicy());
            IteratorMultiplexer multiplexer = new IteratorMultiplexer(readRowsResponses, numBackgroundThreads);
            ArrayList<ArrowReader> readers = new ArrayList<ArrowReader>();
            for (int x = 0; x < numBackgroundThreads; ++x) {
                BigQueryStorageReadRowsTracer multiplexedTracer = tracer.forkWithPrefix("multiplexed-" + x);
                SequenceInputStream responseStream = new SequenceInputStream((Enumeration<? extends InputStream>)new ReadRowsResponseInputStreamEnumeration(multiplexer.getSplit(x), multiplexedTracer));
                SequenceInputStream schemaAndBatches = new SequenceInputStream(schema.newInput(), responseStream);
                this.closeables.add(() -> ((BigQueryStorageReadRowsTracer)multiplexedTracer).finished());
                readers.add((ArrowReader)this.newArrowStreamReader(schemaAndBatches));
            }
            this.reader = new ParallelReaderAdapter(this.allocator, readers, backgroundParsingService, tracer.forkWithPrefix("MultithreadReader"), (AutoCloseable)multiplexer);
        } else {
            InputStream fullStream = this.makeSingleInputStream(readRowsResponses, schema, tracer);
            this.reader = new SimpleAdapter((ArrowReader)this.newArrowStreamReader(fullStream));
        }
    }

    private InputStream makeSingleInputStream(Iterator<ReadRowsResponse> readRowsResponses, ByteString schema, BigQueryStorageReadRowsTracer tracer) {
        SequenceInputStream batchStream = new SequenceInputStream((Enumeration<? extends InputStream>)new ReadRowsResponseInputStreamEnumeration(readRowsResponses, tracer));
        return new SequenceInputStream(schema.newInput(), batchStream);
    }

    @Override
    public boolean next() throws IOException {
        this.tracer.nextBatchNeeded();
        if (this.closed) {
            return false;
        }
        this.tracer.rowsParseStarted();
        boolean bl = this.closed = !this.reader.loadNextBatch();
        if (this.closed) {
            return false;
        }
        VectorSchemaRoot root = this.reader.root();
        if (this.currentBatch == null) {
            ColumnVector[] columns = (ColumnVector[])this.namesInOrder.stream().map(arg_0 -> ((VectorSchemaRoot)root).getVector(arg_0)).map(vector -> ArrowSchemaConverter.newArrowSchemaConverter((ValueVector)vector, (StructField)this.userProvidedFieldMap.get(vector.getName()))).toArray(ColumnVector[]::new);
            this.currentBatch = new ColumnarBatch(columns);
        }
        this.currentBatch.setNumRows(root.getRowCount());
        this.tracer.rowsParseFinished((long)this.currentBatch.numRows());
        return true;
    }

    @Override
    public ColumnarBatch get() {
        return this.currentBatch;
    }

    @Override
    public Optional<BigQueryStorageReadRowsTracer> getBigQueryStorageReadRowsTracer() {
        return Optional.of(this.tracer);
    }

    @Override
    public void close() throws IOException {
        this.closed = true;
        try {
            this.tracer.finished();
            this.closeables.set(0, this.reader);
            this.closeables.add((AutoCloseable)this.allocator);
            AutoCloseables.close(this.closeables);
        }
        catch (Exception e) {
            throw new IOException("Failure closing arrow components. stream: " + this.readRowsHelper, e);
        }
        finally {
            try {
                this.readRowsHelper.close();
            }
            catch (Exception e) {
                throw new IOException("Failure closing stream: " + this.readRowsHelper, e);
            }
        }
    }

    private ArrowStreamReader newArrowStreamReader(InputStream fullStream) {
        BufferAllocator childAllocator = this.allocator.newChildAllocator("readerAllocator" + (this.closeables.size() - 1), 0L, 524288000L);
        this.closeables.add((AutoCloseable)childAllocator);
        return new ArrowStreamReader((ReadableByteChannel)new NonInterruptibleBlockingBytesChannel(fullStream), childAllocator, (CompressionCodec.Factory)CommonsCompressionFactory.INSTANCE);
    }

    static class ParallelReaderAdapter
    implements ArrowReaderAdapter {
        private final ParallelArrowReader reader;
        private final VectorLoader loader;
        private final VectorSchemaRoot root;
        private final List<AutoCloseable> closeables = new ArrayList<AutoCloseable>();
        private IOException initialException;

        ParallelReaderAdapter(BufferAllocator allocator, List<ArrowReader> readers, ExecutorService executor, BigQueryStorageReadRowsTracer tracer, AutoCloseable closeable) {
            Schema schema = null;
            this.closeables.add(closeable);
            try {
                schema = readers.get(0).getVectorSchemaRoot().getSchema();
            }
            catch (IOException e) {
                this.initialException = e;
                this.closeables.addAll(readers);
                this.reader = null;
                this.loader = null;
                this.root = null;
                return;
            }
            BufferAllocator readerAllocator = allocator.newChildAllocator("ParallelReaderAllocator", 0L, 524288000L);
            this.root = VectorSchemaRoot.create((Schema)schema, (BufferAllocator)readerAllocator);
            this.closeables.add((AutoCloseable)this.root);
            this.loader = new VectorLoader(this.root);
            this.reader = new ParallelArrowReader(readers, executor, this.loader, tracer);
            this.closeables.add(0, (AutoCloseable)this.reader);
            this.closeables.add((AutoCloseable)readerAllocator);
        }

        @Override
        public boolean loadNextBatch() throws IOException {
            if (this.initialException != null) {
                throw new IOException(this.initialException);
            }
            return this.reader.next();
        }

        @Override
        public VectorSchemaRoot root() throws IOException {
            return this.root;
        }

        @Override
        public void close() throws Exception {
            AutoCloseables.close(this.closeables);
        }
    }

    static class SimpleAdapter
    implements ArrowReaderAdapter {
        private final ArrowReader reader;

        SimpleAdapter(ArrowReader reader) {
            this.reader = reader;
        }

        @Override
        public boolean loadNextBatch() throws IOException {
            return this.reader.loadNextBatch();
        }

        @Override
        public VectorSchemaRoot root() throws IOException {
            return this.reader.getVectorSchemaRoot();
        }

        @Override
        public void close() throws Exception {
            this.reader.close(false);
        }
    }

    static interface ArrowReaderAdapter
    extends AutoCloseable {
        public boolean loadNextBatch() throws IOException;

        public VectorSchemaRoot root() throws IOException;
    }
}

