/*
 * Decompiled with CFR 0.152.
 */
package net.snowflake.ingest.streaming.internal;

import java.time.ZoneId;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import net.snowflake.ingest.internal.com.google.common.annotations.VisibleForTesting;
import net.snowflake.ingest.streaming.InsertValidationResponse;
import net.snowflake.ingest.streaming.OpenChannelRequest;
import net.snowflake.ingest.streaming.internal.ChannelData;
import net.snowflake.ingest.streaming.internal.ChannelRuntimeState;
import net.snowflake.ingest.streaming.internal.ClientBufferParameters;
import net.snowflake.ingest.streaming.internal.ColumnMetadata;
import net.snowflake.ingest.streaming.internal.EpInfo;
import net.snowflake.ingest.streaming.internal.FileColumnProperties;
import net.snowflake.ingest.streaming.internal.IngestionStrategy;
import net.snowflake.ingest.streaming.internal.LiteralQuoteUtils;
import net.snowflake.ingest.streaming.internal.ParquetRowBuffer;
import net.snowflake.ingest.streaming.internal.RowBuffer;
import net.snowflake.ingest.streaming.internal.RowBufferStats;
import net.snowflake.ingest.utils.Constants;
import net.snowflake.ingest.utils.ErrorCode;
import net.snowflake.ingest.utils.Logging;
import net.snowflake.ingest.utils.Pair;
import net.snowflake.ingest.utils.SFException;

abstract class AbstractRowBuffer<T>
implements RowBuffer<T> {
    private static final Logging logger = new Logging(AbstractRowBuffer.class);
    private static final int INVALID_SERVER_SIDE_DATA_TYPE_ORDINAL = -1;
    private static final int INSERT_ROWS_RECOMMENDED_MAX_BATCH_SIZE_IN_BYTES = 0x1000000;
    @VisibleForTesting
    Map<String, RowBufferStats> statsMap;
    @VisibleForTesting
    Map<String, RowBufferStats> tempStatsMap;
    private final Lock flushLock;
    @VisibleForTesting
    volatile int bufferedRowCount;
    private volatile float bufferSize;
    private final Set<String> nonNullableFieldNames;
    final String channelFullyQualifiedName;
    private final Consumer<Float> rowSizeMetric;
    final ChannelRuntimeState channelState;
    final OpenChannelRequest.OnErrorOption onErrorOption;
    final ZoneId defaultTimezone;
    final ClientBufferParameters clientBufferParameters;

    AbstractRowBuffer(OpenChannelRequest.OnErrorOption onErrorOption, ZoneId defaultTimezone, String fullyQualifiedChannelName, Consumer<Float> rowSizeMetric, ChannelRuntimeState channelRuntimeState, ClientBufferParameters clientBufferParameters) {
        this.onErrorOption = onErrorOption;
        this.defaultTimezone = defaultTimezone;
        this.rowSizeMetric = rowSizeMetric;
        this.channelState = channelRuntimeState;
        this.channelFullyQualifiedName = fullyQualifiedChannelName;
        this.nonNullableFieldNames = new HashSet<String>();
        this.flushLock = new ReentrantLock();
        this.bufferedRowCount = 0;
        this.bufferSize = 0.0f;
        this.clientBufferParameters = clientBufferParameters;
        this.statsMap = new HashMap<String, RowBufferStats>();
        this.tempStatsMap = new HashMap<String, RowBufferStats>();
    }

    void addNonNullableFieldName(String nonNullableFieldName) {
        this.nonNullableFieldNames.add(nonNullableFieldName);
    }

    void validateColumnCollation(ColumnMetadata column) {
        if (column.getCollation() != null) {
            throw new SFException(ErrorCode.UNSUPPORTED_DATA_TYPE, String.format("Column %s with collation %s detected. Ingestion into collated columns is not supported", column.getName(), column.getCollation()));
        }
    }

    @Override
    public float getSize() {
        return this.bufferSize;
    }

    Set<String> verifyInputColumns(Map<String, Object> row, InsertValidationResponse.InsertError error, int rowIndex) {
        Map<String, String> inputColNamesMap = row.keySet().stream().collect(Collectors.toMap(LiteralQuoteUtils::unquoteColumnName, value -> value));
        ArrayList<String> extraCols = new ArrayList<String>();
        for (String columnName : inputColNamesMap.keySet()) {
            if (this.hasColumn(columnName)) continue;
            extraCols.add(inputColNamesMap.get(columnName));
        }
        if (!extraCols.isEmpty()) {
            if (error != null) {
                error.setExtraColNames(extraCols);
            }
            throw new SFException(ErrorCode.INVALID_FORMAT_ROW, "Extra columns: " + extraCols, String.format("Columns not present in the table shouldn't be specified, rowIndex:%d", rowIndex));
        }
        ArrayList<String> missingCols = new ArrayList<String>();
        for (String columnName : this.nonNullableFieldNames) {
            if (inputColNamesMap.containsKey(columnName)) continue;
            missingCols.add(this.statsMap.get(columnName).getColumnDisplayName());
        }
        if (!missingCols.isEmpty()) {
            if (error != null) {
                error.setMissingNotNullColNames(missingCols);
            }
            throw new SFException(ErrorCode.INVALID_FORMAT_ROW, "Missing columns: " + missingCols, String.format("Values for all non-nullable columns must be specified, rowIndex:%d", rowIndex));
        }
        return inputColNamesMap.keySet();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public InsertValidationResponse insertRows(Iterable<Map<String, Object>> rows, String offsetToken) {
        if (!this.hasColumns()) {
            throw new SFException(ErrorCode.INTERNAL_ERROR, "Empty column fields");
        }
        InsertValidationResponse response = null;
        this.flushLock.lock();
        try {
            this.channelState.updateInsertStats(System.currentTimeMillis(), this.bufferedRowCount);
            IngestionStrategy<T> ingestionStrategy = this.createIngestionStrategy(this.onErrorOption);
            response = ingestionStrategy.insertRows(this, rows, offsetToken);
        }
        finally {
            this.tempStatsMap.values().forEach(RowBufferStats::reset);
            this.clearTempRows();
            this.flushLock.unlock();
        }
        return response;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public ChannelData<T> flush(String filePath) {
        logger.logDebug("Start get data for channel={}", this.channelFullyQualifiedName);
        if (this.bufferedRowCount > 0) {
            Optional oldData = Optional.empty();
            int oldRowCount = 0;
            float oldBufferSize = 0.0f;
            long oldRowSequencer = 0L;
            String oldOffsetToken = null;
            HashMap<String, RowBufferStats> oldColumnEps = null;
            Pair<Long, Long> oldMinMaxInsertTimeInMs = null;
            logger.logDebug("Buffer flush about to take lock on channel={}", this.channelFullyQualifiedName);
            this.flushLock.lock();
            try {
                if (this.bufferedRowCount > 0) {
                    oldData = this.getSnapshot(filePath);
                    oldRowCount = this.bufferedRowCount;
                    oldBufferSize = this.bufferSize;
                    oldRowSequencer = this.channelState.incrementAndGetRowSequencer();
                    oldOffsetToken = this.channelState.getOffsetToken();
                    oldColumnEps = new HashMap<String, RowBufferStats>(this.statsMap);
                    oldMinMaxInsertTimeInMs = new Pair<Long, Long>(this.channelState.getFirstInsertInMs(), this.channelState.getLastInsertInMs());
                    this.reset();
                }
            }
            finally {
                this.flushLock.unlock();
            }
            logger.logDebug("Buffer flush released lock on channel={}, rowCount={}, bufferSize={}", this.channelFullyQualifiedName, oldRowCount, Float.valueOf(oldBufferSize));
            if (oldData.isPresent()) {
                ChannelData data = new ChannelData();
                data.setVectors(oldData.get());
                data.setRowCount(oldRowCount);
                data.setBufferSize(oldBufferSize);
                data.setRowSequencer(oldRowSequencer);
                data.setOffsetToken(oldOffsetToken);
                data.setColumnEps(oldColumnEps);
                data.setMinMaxInsertTimeInMs(oldMinMaxInsertTimeInMs);
                data.setFlusherFactory(this::createFlusher);
                return data;
            }
        }
        return null;
    }

    abstract boolean hasColumn(String var1);

    abstract float addRow(Map<String, Object> var1, int var2, Map<String, RowBufferStats> var3, Set<String> var4, long var5);

    abstract float addTempRow(Map<String, Object> var1, int var2, Map<String, RowBufferStats> var3, Set<String> var4, long var5);

    abstract void moveTempRowsToActualBuffer(int var1);

    abstract void clearTempRows();

    abstract boolean hasColumns();

    void reset() {
        this.bufferedRowCount = 0;
        this.bufferSize = 0.0f;
        this.statsMap.replaceAll((key, value) -> value.forkEmpty());
    }

    abstract Optional<T> getSnapshot(String var1);

    @VisibleForTesting
    abstract Object getVectorValueAt(String var1, int var2);

    @VisibleForTesting
    abstract int getTempRowCount();

    abstract void closeInternal();

    @Override
    public synchronized void close(String name) {
        this.closeInternal();
    }

    static EpInfo buildEpInfoFromStats(long rowCount, Map<String, RowBufferStats> colStats) {
        EpInfo epInfo = new EpInfo(rowCount, new HashMap<String, FileColumnProperties>());
        for (Map.Entry<String, RowBufferStats> colStat : colStats.entrySet()) {
            RowBufferStats stat = colStat.getValue();
            FileColumnProperties dto = new FileColumnProperties(stat);
            String colName = colStat.getValue().getColumnDisplayName();
            epInfo.getColumnEps().put(colName, dto);
        }
        epInfo.verifyEpInfo();
        return epInfo;
    }

    static <T> AbstractRowBuffer<T> createRowBuffer(OpenChannelRequest.OnErrorOption onErrorOption, ZoneId defaultTimezone, Constants.BdecVersion bdecVersion, String fullyQualifiedChannelName, Consumer<Float> rowSizeMetric, ChannelRuntimeState channelRuntimeState, ClientBufferParameters clientBufferParameters) {
        switch (bdecVersion) {
            case THREE: {
                return new ParquetRowBuffer(onErrorOption, defaultTimezone, fullyQualifiedChannelName, rowSizeMetric, channelRuntimeState, clientBufferParameters);
            }
        }
        throw new SFException(ErrorCode.INTERNAL_ERROR, "Unsupported BDEC format version: " + (Object)((Object)bdecVersion));
    }

    private void checkBatchSizeEnforcedMaximum(float batchSizeInBytes) {
        if (batchSizeInBytes > (float)this.clientBufferParameters.getMaxChunkSizeInBytes()) {
            throw new SFException(ErrorCode.MAX_BATCH_SIZE_EXCEEDED, this.clientBufferParameters.getMaxChunkSizeInBytes(), 0x1000000);
        }
    }

    private void checkBatchSizeRecommendedMaximum(float batchSizeInBytes) {
        if (batchSizeInBytes > 1.6777216E7f) {
            logger.logWarn("The batch of rows passed to 'insertRows' is over the recommended max batch size. Given {} bytes, recommended max batch is {} bytes. For optimal performance and memory utilization, we recommend splitting large batches into multiple smaller ones and call insertRows for each smaller batch separately.", Float.valueOf(batchSizeInBytes), 0x1000000);
        }
    }

    IngestionStrategy<T> createIngestionStrategy(OpenChannelRequest.OnErrorOption onErrorOption) {
        switch (onErrorOption) {
            case CONTINUE: {
                return new ContinueIngestionStrategy();
            }
            case ABORT: {
                return new AbortIngestionStrategy();
            }
            case SKIP_BATCH: {
                return new SkipBatchIngestionStrategy();
            }
        }
        throw new IllegalArgumentException("Unknown on error option: " + (Object)((Object)onErrorOption));
    }

    public class SkipBatchIngestionStrategy<T>
    implements IngestionStrategy<T> {
        @Override
        public InsertValidationResponse insertRows(AbstractRowBuffer<T> rowBuffer, Iterable<Map<String, Object>> rows, String offsetToken) {
            InsertValidationResponse response = new InsertValidationResponse();
            float rowsSizeInBytes = 0.0f;
            float tempRowsSizeInBytes = 0.0f;
            int tempRowCount = 0;
            int rowIndex = 0;
            for (Map<String, Object> row : rows) {
                InsertValidationResponse.InsertError error = new InsertValidationResponse.InsertError(row, rowIndex);
                try {
                    Set<String> inputColumnNames = AbstractRowBuffer.this.verifyInputColumns(row, error, rowIndex);
                    tempRowsSizeInBytes += AbstractRowBuffer.this.addTempRow(row, tempRowCount, rowBuffer.tempStatsMap, inputColumnNames, rowIndex);
                    ++tempRowCount;
                }
                catch (SFException e) {
                    error.setException(e);
                    response.addError(error);
                }
                catch (Throwable e) {
                    logger.logWarn("Unexpected error happens during insertRows: {}", e);
                    error.setException(new SFException(e, ErrorCode.INTERNAL_ERROR, e.getMessage()));
                    response.addError(error);
                }
                AbstractRowBuffer.this.checkBatchSizeEnforcedMaximum(tempRowsSizeInBytes);
                if ((long)rowBuffer.bufferedRowCount + (long)(++rowIndex) < Integer.MAX_VALUE) continue;
                throw new SFException(ErrorCode.INTERNAL_ERROR, "Row count reaches MAX value");
            }
            if (!response.hasErrors()) {
                AbstractRowBuffer.this.checkBatchSizeRecommendedMaximum(tempRowsSizeInBytes);
                AbstractRowBuffer.this.moveTempRowsToActualBuffer(tempRowCount);
                rowsSizeInBytes = tempRowsSizeInBytes;
                rowBuffer.bufferedRowCount += tempRowCount;
                rowBuffer.statsMap.forEach((colName, stats) -> rowBuffer.statsMap.put((String)colName, RowBufferStats.getCombinedStats(stats, rowBuffer.tempStatsMap.get(colName))));
                rowBuffer.channelState.setOffsetToken(offsetToken);
                ((AbstractRowBuffer)rowBuffer).bufferSize += rowsSizeInBytes;
                ((AbstractRowBuffer)rowBuffer).rowSizeMetric.accept(Float.valueOf(rowsSizeInBytes));
            }
            return response;
        }
    }

    public class AbortIngestionStrategy<T>
    implements IngestionStrategy<T> {
        @Override
        public InsertValidationResponse insertRows(AbstractRowBuffer<T> rowBuffer, Iterable<Map<String, Object>> rows, String offsetToken) {
            InsertValidationResponse response = new InsertValidationResponse();
            float rowsSizeInBytes = 0.0f;
            float tempRowsSizeInBytes = 0.0f;
            int tempRowCount = 0;
            for (Map<String, Object> row : rows) {
                Set<String> inputColumnNames = AbstractRowBuffer.this.verifyInputColumns(row, null, tempRowCount);
                AbstractRowBuffer.this.checkBatchSizeEnforcedMaximum(tempRowsSizeInBytes += AbstractRowBuffer.this.addTempRow(row, tempRowCount, rowBuffer.tempStatsMap, inputColumnNames, tempRowCount));
                if ((long)rowBuffer.bufferedRowCount + (long)(++tempRowCount) < Integer.MAX_VALUE) continue;
                throw new SFException(ErrorCode.INTERNAL_ERROR, "Row count reaches MAX value");
            }
            AbstractRowBuffer.this.checkBatchSizeRecommendedMaximum(tempRowsSizeInBytes);
            AbstractRowBuffer.this.moveTempRowsToActualBuffer(tempRowCount);
            rowsSizeInBytes = tempRowsSizeInBytes;
            rowBuffer.bufferedRowCount += tempRowCount;
            rowBuffer.statsMap.forEach((colName, stats) -> rowBuffer.statsMap.put((String)colName, RowBufferStats.getCombinedStats(stats, rowBuffer.tempStatsMap.get(colName))));
            rowBuffer.channelState.setOffsetToken(offsetToken);
            ((AbstractRowBuffer)rowBuffer).bufferSize += rowsSizeInBytes;
            ((AbstractRowBuffer)rowBuffer).rowSizeMetric.accept(Float.valueOf(rowsSizeInBytes));
            return response;
        }
    }

    public class ContinueIngestionStrategy<T>
    implements IngestionStrategy<T> {
        @Override
        public InsertValidationResponse insertRows(AbstractRowBuffer<T> rowBuffer, Iterable<Map<String, Object>> rows, String offsetToken) {
            InsertValidationResponse response = new InsertValidationResponse();
            float rowsSizeInBytes = 0.0f;
            int rowIndex = 0;
            for (Map<String, Object> row : rows) {
                InsertValidationResponse.InsertError error = new InsertValidationResponse.InsertError(row, rowIndex);
                try {
                    if (rowBuffer.bufferedRowCount == Integer.MAX_VALUE) {
                        throw new SFException(ErrorCode.INTERNAL_ERROR, "Row count reaches MAX value");
                    }
                    Set<String> inputColumnNames = AbstractRowBuffer.this.verifyInputColumns(row, error, rowIndex);
                    rowsSizeInBytes += AbstractRowBuffer.this.addRow(row, rowBuffer.bufferedRowCount, rowBuffer.statsMap, inputColumnNames, rowIndex);
                    ++rowBuffer.bufferedRowCount;
                }
                catch (SFException e) {
                    error.setException(e);
                    response.addError(error);
                }
                catch (Throwable e) {
                    logger.logWarn("Unexpected error happens during insertRows: {}", e);
                    error.setException(new SFException(e, ErrorCode.INTERNAL_ERROR, e.getMessage()));
                    response.addError(error);
                }
                ++rowIndex;
            }
            AbstractRowBuffer.this.checkBatchSizeRecommendedMaximum(rowsSizeInBytes);
            rowBuffer.channelState.setOffsetToken(offsetToken);
            ((AbstractRowBuffer)rowBuffer).bufferSize += rowsSizeInBytes;
            ((AbstractRowBuffer)rowBuffer).rowSizeMetric.accept(Float.valueOf(rowsSizeInBytes));
            return response;
        }
    }

    static enum ColumnPhysicalType {
        ROWINDEX(9),
        DOUBLE(7),
        SB1(1),
        SB2(2),
        SB4(3),
        SB8(4),
        SB16(5),
        LOB(8),
        BINARY,
        ROW(10);

        private final int ordinal;

        private ColumnPhysicalType() {
            this(-1);
        }

        private ColumnPhysicalType(int ordinal) {
            this.ordinal = ordinal;
        }

        public int getOrdinal() {
            return this.ordinal;
        }
    }

    static enum ColumnLogicalType {
        ANY,
        BOOLEAN(1),
        ROWINDEX,
        NULL(15),
        REAL(8),
        FIXED(2),
        TEXT(9),
        CHAR,
        BINARY(10),
        DATE(7),
        TIME(6),
        TIMESTAMP_LTZ(3),
        TIMESTAMP_NTZ(4),
        TIMESTAMP_TZ(5),
        INTERVAL,
        RAW,
        ARRAY(13, true),
        OBJECT(12, true),
        VARIANT(11, true),
        ROW,
        SEQUENCE,
        FUNCTION,
        USER_DEFINED_TYPE;

        private final int ordinal;
        private final boolean object;

        private ColumnLogicalType() {
            this(-1);
        }

        private ColumnLogicalType(int ordinal) {
            this(ordinal, false);
        }

        private ColumnLogicalType(int ordinal, boolean object) {
            this.ordinal = ordinal;
            this.object = object;
        }

        public int getOrdinal() {
            return this.ordinal;
        }

        public boolean isObject() {
            return this.object;
        }
    }
}

