/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.contrib.streaming.state;

import java.util.Map;
import java.util.function.Function;
import javax.annotation.Nonnull;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
import org.apache.flink.contrib.streaming.state.RocksDBCachingPriorityQueueSet;
import org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend;
import org.apache.flink.contrib.streaming.state.RocksDBNativeMetricMonitor;
import org.apache.flink.contrib.streaming.state.RocksDBOperationUtils;
import org.apache.flink.contrib.streaming.state.RocksDBWriteBatchWrapper;
import org.apache.flink.contrib.streaming.state.TreeOrderedSetCache;
import org.apache.flink.contrib.streaming.state.sstmerge.RocksDBManualCompactionManager;
import org.apache.flink.core.fs.ICloseableRegistry;
import org.apache.flink.core.memory.DataInputDeserializer;
import org.apache.flink.core.memory.DataOutputSerializer;
import org.apache.flink.runtime.state.KeyExtractorFunction;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue;
import org.apache.flink.runtime.state.PriorityComparable;
import org.apache.flink.runtime.state.PriorityComparator;
import org.apache.flink.runtime.state.PriorityQueueSetFactory;
import org.apache.flink.runtime.state.RegisteredPriorityQueueStateBackendMetaInfo;
import org.apache.flink.runtime.state.RegisteredStateMetaInfoBase;
import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement;
import org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.StateMigrationException;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.ColumnFamilyOptions;
import org.rocksdb.ReadOptions;
import org.rocksdb.RocksDB;

public class RocksDBPriorityQueueSetFactory
implements PriorityQueueSetFactory {
    private final int cacheSize;
    @Nonnull
    private final DataOutputSerializer sharedElementOutView;
    @Nonnull
    private final DataInputDeserializer sharedElementInView;
    private final KeyGroupRange keyGroupRange;
    private final int keyGroupPrefixBytes;
    private final int numberOfKeyGroups;
    private final Map<String, RocksDBKeyedStateBackend.RocksDbKvStateInfo> kvStateInformation;
    private final RocksDB db;
    private final ReadOptions readOptions;
    private final RocksDBWriteBatchWrapper writeBatchWrapper;
    private final RocksDBNativeMetricMonitor nativeMetricMonitor;
    private final Function<String, ColumnFamilyOptions> columnFamilyOptionsFactory;
    private final Long writeBufferManagerCapacity;
    private final RocksDBManualCompactionManager manualCompactionManager;

    RocksDBPriorityQueueSetFactory(KeyGroupRange keyGroupRange, int keyGroupPrefixBytes, int numberOfKeyGroups, Map<String, RocksDBKeyedStateBackend.RocksDbKvStateInfo> kvStateInformation, RocksDB db, ReadOptions readOptions, RocksDBWriteBatchWrapper writeBatchWrapper, RocksDBNativeMetricMonitor nativeMetricMonitor, Function<String, ColumnFamilyOptions> columnFamilyOptionsFactory, Long writeBufferManagerCapacity, int cacheSize, RocksDBManualCompactionManager manualCompactionManager) {
        this.keyGroupRange = keyGroupRange;
        this.keyGroupPrefixBytes = keyGroupPrefixBytes;
        this.numberOfKeyGroups = numberOfKeyGroups;
        this.kvStateInformation = kvStateInformation;
        this.db = db;
        this.readOptions = readOptions;
        this.writeBatchWrapper = writeBatchWrapper;
        this.nativeMetricMonitor = nativeMetricMonitor;
        this.columnFamilyOptionsFactory = columnFamilyOptionsFactory;
        this.sharedElementOutView = new DataOutputSerializer(128);
        this.sharedElementInView = new DataInputDeserializer();
        this.writeBufferManagerCapacity = writeBufferManagerCapacity;
        Preconditions.checkArgument((cacheSize > 0 ? 1 : 0) != 0);
        this.cacheSize = cacheSize;
        this.manualCompactionManager = manualCompactionManager;
    }

    @Nonnull
    public <T extends HeapPriorityQueueElement & PriorityComparable<? super T>> KeyGroupedInternalPriorityQueue<T> create(@Nonnull String stateName, @Nonnull TypeSerializer<T> byteOrderedElementSerializer) {
        return this.create(stateName, byteOrderedElementSerializer, false);
    }

    @Nonnull
    public <T extends HeapPriorityQueueElement & PriorityComparable<? super T>> KeyGroupedInternalPriorityQueue<T> create(@Nonnull String stateName, final @Nonnull TypeSerializer<T> byteOrderedElementSerializer, boolean allowFutureMetadataUpdates) {
        RocksDBKeyedStateBackend.RocksDbKvStateInfo stateCFHandle = this.tryRegisterPriorityQueueMetaInfo(stateName, byteOrderedElementSerializer, allowFutureMetadataUpdates);
        final ColumnFamilyHandle columnFamilyHandle = stateCFHandle.columnFamilyHandle;
        return new KeyGroupPartitionedPriorityQueue(KeyExtractorFunction.forKeyedObjects(), PriorityComparator.forPriorityComparableObjects(), new KeyGroupPartitionedPriorityQueue.PartitionQueueSetFactory<T, RocksDBCachingPriorityQueueSet<T>>(){

            @Nonnull
            public RocksDBCachingPriorityQueueSet<T> create(int keyGroupId, int numKeyGroups, @Nonnull KeyExtractorFunction<T> keyExtractor, @Nonnull PriorityComparator<T> elementPriorityComparator) {
                TreeOrderedSetCache orderedSetCache = new TreeOrderedSetCache(RocksDBPriorityQueueSetFactory.this.cacheSize);
                return new RocksDBCachingPriorityQueueSet(keyGroupId, RocksDBPriorityQueueSetFactory.this.keyGroupPrefixBytes, RocksDBPriorityQueueSetFactory.this.db, RocksDBPriorityQueueSetFactory.this.readOptions, columnFamilyHandle, byteOrderedElementSerializer, RocksDBPriorityQueueSetFactory.this.sharedElementOutView, RocksDBPriorityQueueSetFactory.this.sharedElementInView, RocksDBPriorityQueueSetFactory.this.writeBatchWrapper, orderedSetCache);
            }
        }, this.keyGroupRange, this.numberOfKeyGroups);
    }

    @Nonnull
    private <T> RocksDBKeyedStateBackend.RocksDbKvStateInfo tryRegisterPriorityQueueMetaInfo(@Nonnull String stateName, @Nonnull TypeSerializer<T> byteOrderedElementSerializer, boolean allowFutureMetadataUpdates) {
        RocksDBKeyedStateBackend.RocksDbKvStateInfo stateInfo = this.kvStateInformation.get(stateName);
        if (stateInfo == null) {
            RegisteredPriorityQueueStateBackendMetaInfo metaInfo = new RegisteredPriorityQueueStateBackendMetaInfo(stateName, byteOrderedElementSerializer);
            metaInfo = allowFutureMetadataUpdates ? metaInfo.withSerializerUpgradesAllowed() : metaInfo;
            stateInfo = RocksDBOperationUtils.createStateInfo((RegisteredStateMetaInfoBase)metaInfo, this.db, this.columnFamilyOptionsFactory, null, this.writeBufferManagerCapacity, ICloseableRegistry.NO_OP);
            RocksDBOperationUtils.registerKvStateInformation(this.kvStateInformation, this.nativeMetricMonitor, stateName, stateInfo);
        } else {
            RegisteredPriorityQueueStateBackendMetaInfo castedMetaInfo = (RegisteredPriorityQueueStateBackendMetaInfo)stateInfo.metaInfo;
            TypeSerializer previousElementSerializer = castedMetaInfo.getPreviousElementSerializer();
            if (previousElementSerializer != byteOrderedElementSerializer) {
                TypeSerializerSchemaCompatibility compatibilityResult = castedMetaInfo.updateElementSerializer(byteOrderedElementSerializer);
                if (compatibilityResult.isIncompatible()) {
                    throw new FlinkRuntimeException((Throwable)new StateMigrationException("The new priority queue serializer must not be incompatible."));
                }
                RegisteredPriorityQueueStateBackendMetaInfo metaInfo = new RegisteredPriorityQueueStateBackendMetaInfo(stateName, byteOrderedElementSerializer);
                metaInfo = allowFutureMetadataUpdates ? metaInfo.withSerializerUpgradesAllowed() : metaInfo;
                stateInfo = new RocksDBKeyedStateBackend.RocksDbKvStateInfo(stateInfo.columnFamilyHandle, (RegisteredStateMetaInfoBase)metaInfo);
                this.kvStateInformation.put(stateName, stateInfo);
            }
        }
        this.manualCompactionManager.register(stateInfo);
        return stateInfo;
    }

    @VisibleForTesting
    public int getCacheSize() {
        return this.cacheSize;
    }
}

