/*
 * Decompiled with CFR 0.152.
 */
package za.co.absa.cobrix.spark.cobol.source.index;

import java.io.Serializable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.ChecksumFileSystem;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FilterFileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RawLocalFileSystem;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.spark.SparkContext;
import org.apache.spark.rdd.RDD;
import org.apache.spark.sql.SQLContext;
import org.slf4j.Logger;
import scala.Array$;
import scala.Function1;
import scala.Predef$;
import scala.Tuple2;
import scala.collection.Seq;
import scala.collection.mutable.ArrayBuffer;
import scala.collection.mutable.ArrayBuffer$;
import scala.collection.mutable.ArrayOps;
import scala.math.Ordering;
import scala.reflect.ClassTag$;
import scala.runtime.BoxedUnit;
import za.co.absa.cobrix.cobol.internal.Logging;
import za.co.absa.cobrix.cobol.reader.common.Constants$;
import za.co.absa.cobrix.cobol.reader.index.entry.SparseIndexEntry;
import za.co.absa.cobrix.spark.cobol.parameters.CobolParametersParser$;
import za.co.absa.cobrix.spark.cobol.reader.Reader;
import za.co.absa.cobrix.spark.cobol.reader.VarLenReader;
import za.co.absa.cobrix.spark.cobol.source.SerializableConfiguration;
import za.co.absa.cobrix.spark.cobol.source.index.LocationBalancer$;
import za.co.absa.cobrix.spark.cobol.source.parameters.LocalityParameters;
import za.co.absa.cobrix.spark.cobol.source.streaming.FileStreamer;
import za.co.absa.cobrix.spark.cobol.source.types.FileWithOrder;
import za.co.absa.cobrix.spark.cobol.utils.HDFSUtils$;
import za.co.absa.cobrix.spark.cobol.utils.SparkUtils$;

public final class IndexBuilder$
implements Logging {
    public static IndexBuilder$ MODULE$;
    private transient Logger za$co$absa$cobrix$cobol$internal$Logging$$log_;

    static {
        new IndexBuilder$();
    }

    public String logName() {
        return Logging.logName$((Logging)this);
    }

    public Logger logger() {
        return Logging.logger$((Logging)this);
    }

    public Logger za$co$absa$cobrix$cobol$internal$Logging$$log_() {
        return this.za$co$absa$cobrix$cobol$internal$Logging$$log_;
    }

    public void za$co$absa$cobrix$cobol$internal$Logging$$log__$eq(Logger x$1) {
        this.za$co$absa$cobrix$cobol$internal$Logging$$log_ = x$1;
    }

    /*
     * Enabled force condition propagation
     * Lifted jumps to return sites
     */
    public RDD<SparseIndexEntry> buildIndex(FileWithOrder[] filesList, Reader cobolReader, SQLContext sqlContext, LocalityParameters localityParams) {
        FileSystem fs = new Path(((FileWithOrder)new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps((Object[])filesList)).head()).filePath()).getFileSystem(sqlContext.sparkSession().sparkContext().hadoopConfiguration());
        boolean isIndexSupported = this.isFileRandomAccessSupported(fs);
        boolean bl = false;
        VarLenReader varLenReader = null;
        Reader reader = cobolReader;
        if (reader instanceof VarLenReader) {
            bl = true;
            varLenReader = (VarLenReader)reader;
            if (isIndexSupported) {
                return varLenReader.isIndexGenerationNeeded() && localityParams.improveLocality() && this.isDataLocalitySupported(fs) ? this.buildIndexForVarLenReaderWithFullLocality(filesList, varLenReader, sqlContext, localityParams.optimizeAllocation()) : this.buildIndexForVarLenReader(filesList, varLenReader, sqlContext);
            }
        }
        if (!bl) return null;
        return this.buildIndexForFullFiles(filesList, varLenReader, sqlContext);
    }

    private RDD<SparseIndexEntry> buildIndexForVarLenReaderWithFullLocality(FileWithOrder[] filesList, VarLenReader reader, SQLContext sqlContext, boolean optimizeAllocation) {
        Configuration conf = sqlContext.sparkContext().hadoopConfiguration();
        RDD<FileWithOrder> filesRDD = this.toRDDWithLocality(filesList, conf, sqlContext);
        SerializableConfiguration sconf = new SerializableConfiguration(conf);
        RDD indexes = filesRDD.mapPartitions((Function1 & Serializable & scala.Serializable)partition -> partition.flatMap((Function1 & Serializable & scala.Serializable)row -> {
            ArrayBuffer<SparseIndexEntry> index = MODULE$.generateIndexEntry((FileWithOrder)row, sconf.value(), reader);
            String filePath = row.filePath();
            FileSystem fileSystem = new Path(filePath).getFileSystem(sconf.value());
            return (ArrayBuffer)index.map((Function1 & Serializable & scala.Serializable)entry -> {
                long offset = entry.offsetFrom() >= 0L ? entry.offsetFrom() : 0L;
                long length = MODULE$.getBlockLengthByIndexEntry((SparseIndexEntry)entry);
                return new Tuple2(entry, HDFSUtils$.MODULE$.getBlocksLocations(new Path(filePath), offset, length, fileSystem));
            }, ArrayBuffer$.MODULE$.canBuildFrom());
        }), filesRDD.mapPartitions$default$2(), ClassTag$.MODULE$.apply(Tuple2.class));
        this.logger().info("Going to collect located indexes into driver.");
        Seq<Tuple2<SparseIndexEntry, Seq<String>>> offsetsLocations = optimizeAllocation ? this.optimizeDistribution((Seq<Tuple2<SparseIndexEntry, Seq<String>>>)Predef$.MODULE$.wrapRefArray((Object[])indexes.collect()), sqlContext.sparkContext()) : Predef$.MODULE$.wrapRefArray((Object[])indexes.collect());
        this.logger().info(new StringBuilder(34).append("Creating RDD for ").append(offsetsLocations.length()).append(" located indexes.").toString());
        if (this.logger().isDebugEnabled()) {
            this.logger().debug("Preferred locations per index entry");
            offsetsLocations.foreach((Function1 & Serializable & scala.Serializable)allocation -> {
                IndexBuilder$.$anonfun$buildIndexForVarLenReaderWithFullLocality$4(allocation);
                return BoxedUnit.UNIT;
            });
        }
        RDD indexRDD = sqlContext.sparkContext().makeRDD(offsetsLocations, ClassTag$.MODULE$.apply(SparseIndexEntry.class));
        return this.repartitionIndexes((RDD<SparseIndexEntry>)indexRDD);
    }

    public RDD<SparseIndexEntry> buildIndexForVarLenReader(FileWithOrder[] filesList, VarLenReader reader, SQLContext sqlContext) {
        RDD filesRDD = sqlContext.sparkContext().parallelize((Seq)Predef$.MODULE$.wrapRefArray((Object[])filesList), filesList.length, ClassTag$.MODULE$.apply(FileWithOrder.class));
        Configuration conf = sqlContext.sparkContext().hadoopConfiguration();
        SerializableConfiguration sconf = new SerializableConfiguration(conf);
        RDD indexRDD = filesRDD.mapPartitions((Function1 & Serializable & scala.Serializable)partition -> partition.flatMap((Function1 & Serializable & scala.Serializable)row -> MODULE$.generateIndexEntry((FileWithOrder)row, sconf.value(), reader)), filesRDD.mapPartitions$default$2(), ClassTag$.MODULE$.apply(SparseIndexEntry.class)).cache();
        return this.repartitionIndexes((RDD<SparseIndexEntry>)indexRDD);
    }

    public RDD<SparseIndexEntry> buildIndexForFullFiles(FileWithOrder[] filesList, VarLenReader reader, SQLContext sqlContext) {
        RDD filesRDD = sqlContext.sparkContext().parallelize((Seq)Predef$.MODULE$.wrapRefArray((Object[])filesList), filesList.length, ClassTag$.MODULE$.apply(FileWithOrder.class));
        RDD indexRDD = filesRDD.mapPartitions((Function1 & Serializable & scala.Serializable)partition -> partition.flatMap((Function1 & Serializable & scala.Serializable)row -> {
            int fileId = row.order();
            SparseIndexEntry element = new SparseIndexEntry(0L, -1L, fileId, 0L);
            return (ArrayBuffer)ArrayBuffer$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new SparseIndexEntry[]{element}));
        }), filesRDD.mapPartitions$default$2(), ClassTag$.MODULE$.apply(SparseIndexEntry.class)).cache();
        return this.repartitionIndexes((RDD<SparseIndexEntry>)indexRDD);
    }

    private ArrayBuffer<SparseIndexEntry> generateIndexEntry(FileWithOrder fileWithOrder, Configuration config, VarLenReader reader) {
        String filePath = fileWithOrder.filePath();
        Path path = new Path(filePath);
        int fileOrder = fileWithOrder.order();
        FileSystem fileSystem = path.getFileSystem(config);
        this.logger().info(new StringBuilder(38).append("Going to generate index for the file: ").append(filePath).toString());
        ArrayBuffer<SparseIndexEntry> index = reader.generateIndex(new FileStreamer(filePath, fileSystem, 0L, 0L), fileOrder, reader.isRdwBigEndian());
        return index;
    }

    private long getBlockLengthByIndexEntry(SparseIndexEntry entry) {
        long indexedLength = entry.offsetTo() > 0L ? entry.offsetTo() : Long.MAX_VALUE;
        long significantLength = indexedLength < 10L * (long)Constants$.MODULE$.megabyte() ? indexedLength : indexedLength - (long)Constants$.MODULE$.megabyte();
        return significantLength;
    }

    private Seq<Tuple2<SparseIndexEntry, Seq<String>>> optimizeDistribution(Seq<Tuple2<SparseIndexEntry, Seq<String>>> allocation, SparkContext sc) {
        Seq<String> availableExecutors = SparkUtils$.MODULE$.currentActiveExecutors(sc);
        this.logger().info(new StringBuilder(62).append("Trying to balance ").append(allocation.size()).append(" partitions among all available executors (").append(availableExecutors).append(")").toString());
        return LocationBalancer$.MODULE$.balance(allocation, availableExecutors);
    }

    private RDD<FileWithOrder> toRDDWithLocality(FileWithOrder[] filesList, Configuration conf, SQLContext sqlContext) {
        Seq filesWithPreferredLocations;
        block0: {
            FileSystem fileSystem = FileSystem.get((Configuration)conf);
            filesWithPreferredLocations = new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps((Object[])new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps((Object[])filesList)).map((Function1 & Serializable & scala.Serializable)file -> new Tuple2(file, HDFSUtils$.MODULE$.getBlocksLocations(new Path(file.filePath()), fileSystem)), Array$.MODULE$.canBuildFrom(ClassTag$.MODULE$.apply(Tuple2.class))))).toSeq();
            if (!this.logger().isDebugEnabled()) break block0;
            filesWithPreferredLocations.foreach((Function1 & Serializable & scala.Serializable)a -> {
                IndexBuilder$.$anonfun$toRDDWithLocality$2(a);
                return BoxedUnit.UNIT;
            });
        }
        return sqlContext.sparkContext().makeRDD(filesWithPreferredLocations, ClassTag$.MODULE$.apply(FileWithOrder.class));
    }

    private RDD<SparseIndexEntry> repartitionIndexes(RDD<SparseIndexEntry> indexRDD) {
        long indexCount = indexRDD.count();
        int numPartitions = (int)Math.min(indexCount, (long)Constants$.MODULE$.maxNumPartitions());
        this.logger().warn(new StringBuilder(47).append("Index elements count: ").append(indexCount).append(", number of partitions = ").append(numPartitions).toString());
        int x$1 = numPartitions;
        Ordering x$2 = indexRDD.repartition$default$2(x$1);
        return indexRDD.repartition(x$1, x$2).cache();
    }

    public boolean isFileRandomAccessSupported(FileSystem fs) {
        block0: {
            boolean isSupportedFx;
            boolean bl = isSupportedFx = fs instanceof DistributedFileSystem || fs instanceof RawLocalFileSystem || fs instanceof FilterFileSystem || fs instanceof LocalFileSystem || fs instanceof ChecksumFileSystem;
            if (isSupportedFx) break block0;
            String q = "\"";
            this.logger().warn(new StringBuilder(139).append("Filesystem '").append(fs.getScheme()).append("://' might not support random file access. Please, disable indexes if the job fails. ").append(" You can do this using '.option(").append(q).append(CobolParametersParser$.MODULE$.PARAM_ENABLE_INDEXES()).append(q).append(", false)' ").toString());
        }
        return true;
    }

    public boolean isDataLocalitySupported(FileSystem fs) {
        return fs instanceof DistributedFileSystem;
    }

    public static final /* synthetic */ void $anonfun$buildIndexForVarLenReaderWithFullLocality$4(Tuple2 allocation) {
        MODULE$.logger().debug(allocation.toString());
    }

    public static final /* synthetic */ void $anonfun$toRDDWithLocality$2(Tuple2 a) {
        MODULE$.logger().debug(a.toString());
    }

    private IndexBuilder$() {
        MODULE$ = this;
        Logging.$init$((Logging)this);
    }
}

