/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.contrib.streaming.state;

import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Comparator;
import java.util.Iterator;
import java.util.NoSuchElementException;
import javax.annotation.Nonnegative;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.contrib.streaming.state.RocksDBWriteBatchWrapper;
import org.apache.flink.contrib.streaming.state.RocksIteratorWrapper;
import org.apache.flink.core.memory.DataInputDeserializer;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputSerializer;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.runtime.state.CompositeKeySerializationUtils;
import org.apache.flink.runtime.state.InternalPriorityQueue;
import org.apache.flink.runtime.state.heap.AbstractHeapPriorityQueueElement;
import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement;
import org.apache.flink.shaded.guava32.com.google.common.primitives.UnsignedBytes;
import org.apache.flink.util.CloseableIterator;
import org.apache.flink.util.FlinkRuntimeException;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.ReadOptions;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;

public class RocksDBCachingPriorityQueueSet<E extends HeapPriorityQueueElement>
extends AbstractHeapPriorityQueueElement
implements InternalPriorityQueue<E> {
    private static final byte[] DUMMY_BYTES = new byte[0];
    @Nonnull
    private final RocksDB db;
    @Nonnull
    private final ReadOptions readOptions;
    @Nonnull
    private final ColumnFamilyHandle columnFamilyHandle;
    @Nonnull
    private final TypeSerializer<E> byteOrderProducingSerializer;
    @Nonnull
    private final RocksDBWriteBatchWrapper batchWrapper;
    @Nonnull
    private final byte[] groupPrefixBytes;
    @Nonnull
    private final DataOutputSerializer outputView;
    @Nonnull
    private final DataInputDeserializer inputView;
    @Nonnull
    private final OrderedByteArraySetCache orderedCache;
    @Nonnull
    private byte[] seekHint;
    @Nullable
    private E peekCache;
    private boolean allElementsInCache;

    RocksDBCachingPriorityQueueSet(@Nonnegative int keyGroupId, @Nonnegative int keyGroupPrefixBytes, @Nonnull RocksDB db, @Nonnull ReadOptions readOptions, @Nonnull ColumnFamilyHandle columnFamilyHandle, @Nonnull TypeSerializer<E> byteOrderProducingSerializer, @Nonnull DataOutputSerializer outputStream, @Nonnull DataInputDeserializer inputStream, @Nonnull RocksDBWriteBatchWrapper batchWrapper, @Nonnull OrderedByteArraySetCache orderedByteArraySetCache) {
        this.db = db;
        this.readOptions = readOptions;
        this.columnFamilyHandle = columnFamilyHandle;
        this.byteOrderProducingSerializer = byteOrderProducingSerializer;
        this.batchWrapper = batchWrapper;
        this.outputView = outputStream;
        this.inputView = inputStream;
        this.orderedCache = orderedByteArraySetCache;
        this.allElementsInCache = false;
        this.groupPrefixBytes = this.createKeyGroupBytes(keyGroupId, keyGroupPrefixBytes);
        this.seekHint = this.groupPrefixBytes;
    }

    @Nullable
    public E peek() {
        this.checkRefillCacheFromStore();
        if (this.peekCache != null) {
            return this.peekCache;
        }
        byte[] firstBytes = this.orderedCache.peekFirst();
        if (firstBytes != null) {
            this.peekCache = this.deserializeElement(firstBytes);
            return this.peekCache;
        }
        return null;
    }

    @Nullable
    public E poll() {
        this.checkRefillCacheFromStore();
        byte[] firstBytes = this.orderedCache.pollFirst();
        if (firstBytes == null) {
            return null;
        }
        this.removeFromRocksDB(firstBytes);
        if (this.orderedCache.isEmpty()) {
            this.seekHint = firstBytes;
        }
        if (this.peekCache != null) {
            E fromCache = this.peekCache;
            this.peekCache = null;
            return fromCache;
        }
        return this.deserializeElement(firstBytes);
    }

    public boolean add(@Nonnull E toAdd) {
        this.checkRefillCacheFromStore();
        byte[] toAddBytes = this.serializeElement(toAdd);
        boolean cacheFull = this.orderedCache.isFull();
        if (!cacheFull && this.allElementsInCache || OrderedByteArraySetCache.LEXICOGRAPHIC_BYTE_COMPARATOR.compare(toAddBytes, this.orderedCache.peekLast()) < 0) {
            if (cacheFull) {
                this.orderedCache.pollLast();
                this.allElementsInCache = false;
            }
            if (this.orderedCache.add(toAddBytes)) {
                this.addToRocksDB(toAddBytes);
                if (toAddBytes == this.orderedCache.peekFirst()) {
                    this.peekCache = null;
                    return true;
                }
            }
        } else {
            this.addToRocksDB(toAddBytes);
            this.allElementsInCache = false;
        }
        return false;
    }

    public boolean remove(@Nonnull E toRemove) {
        this.checkRefillCacheFromStore();
        byte[] oldHead = this.orderedCache.peekFirst();
        if (oldHead == null) {
            return false;
        }
        byte[] toRemoveBytes = this.serializeElement(toRemove);
        this.removeFromRocksDB(toRemoveBytes);
        this.orderedCache.remove(toRemoveBytes);
        if (this.orderedCache.isEmpty()) {
            this.seekHint = toRemoveBytes;
            this.peekCache = null;
            return true;
        }
        if (oldHead != this.orderedCache.peekFirst()) {
            this.peekCache = null;
            return true;
        }
        return false;
    }

    public void addAll(@Nullable Collection<? extends E> toAdd) {
        if (toAdd == null) {
            return;
        }
        for (HeapPriorityQueueElement element : toAdd) {
            this.add((E)element);
        }
    }

    public boolean isEmpty() {
        this.checkRefillCacheFromStore();
        return this.orderedCache.isEmpty();
    }

    @Nonnull
    public CloseableIterator<E> iterator() {
        return new DeserializingIteratorWrapper(this.orderedBytesIterator());
    }

    public int size() {
        if (this.allElementsInCache) {
            return this.orderedCache.size();
        }
        int count = 0;
        try (RocksBytesIterator iterator = this.orderedBytesIterator();){
            while (iterator.hasNext()) {
                iterator.next();
                ++count;
            }
        }
        return count;
    }

    @Nonnull
    private RocksBytesIterator orderedBytesIterator() {
        this.flushWriteBatch();
        return new RocksBytesIterator(new RocksIteratorWrapper(this.db.newIterator(this.columnFamilyHandle, this.readOptions)));
    }

    private void flushWriteBatch() {
        try {
            this.batchWrapper.flush();
        }
        catch (RocksDBException e) {
            throw new FlinkRuntimeException((Throwable)e);
        }
    }

    private void addToRocksDB(@Nonnull byte[] toAddBytes) {
        try {
            this.batchWrapper.put(this.columnFamilyHandle, toAddBytes, DUMMY_BYTES);
        }
        catch (RocksDBException e) {
            throw new FlinkRuntimeException((Throwable)e);
        }
    }

    private void removeFromRocksDB(@Nonnull byte[] toRemoveBytes) {
        try {
            this.batchWrapper.remove(this.columnFamilyHandle, toRemoveBytes);
        }
        catch (RocksDBException e) {
            throw new FlinkRuntimeException((Throwable)e);
        }
    }

    private void checkRefillCacheFromStore() {
        if (!this.allElementsInCache && this.orderedCache.isEmpty()) {
            try (RocksBytesIterator iterator = this.orderedBytesIterator();){
                this.orderedCache.bulkLoadFromOrderedIterator((Iterator<byte[]>)((Object)iterator));
                this.allElementsInCache = !iterator.hasNext();
            }
            catch (Exception e) {
                throw new FlinkRuntimeException("Exception while refilling store from iterator.", (Throwable)e);
            }
        }
    }

    private static boolean isPrefixWith(byte[] bytes, byte[] prefixBytes) {
        for (int i = 0; i < prefixBytes.length; ++i) {
            if (bytes[i] == prefixBytes[i]) continue;
            return false;
        }
        return true;
    }

    @Nonnull
    private byte[] createKeyGroupBytes(int keyGroupId, int numPrefixBytes) {
        this.outputView.clear();
        try {
            CompositeKeySerializationUtils.writeKeyGroup((int)keyGroupId, (int)numPrefixBytes, (DataOutputView)this.outputView);
        }
        catch (IOException e) {
            throw new FlinkRuntimeException("Could not write key-group bytes.", (Throwable)e);
        }
        return this.outputView.getCopyOfBuffer();
    }

    @Nonnull
    private byte[] serializeElement(@Nonnull E element) {
        try {
            this.outputView.clear();
            this.outputView.write(this.groupPrefixBytes);
            this.byteOrderProducingSerializer.serialize(element, (DataOutputView)this.outputView);
            return this.outputView.getCopyOfBuffer();
        }
        catch (IOException e) {
            throw new FlinkRuntimeException("Error while serializing the element.", (Throwable)e);
        }
    }

    @Nonnull
    private E deserializeElement(@Nonnull byte[] bytes) {
        try {
            int numPrefixBytes = this.groupPrefixBytes.length;
            this.inputView.setBuffer(bytes, numPrefixBytes, bytes.length - numPrefixBytes);
            return (E)((HeapPriorityQueueElement)this.byteOrderProducingSerializer.deserialize((DataInputView)this.inputView));
        }
        catch (IOException e) {
            throw new FlinkRuntimeException("Error while deserializing the element.", (Throwable)e);
        }
    }

    public static interface OrderedByteArraySetCache {
        public static final Comparator<byte[]> LEXICOGRAPHIC_BYTE_COMPARATOR = UnsignedBytes.lexicographicalComparator();

        public int size();

        public int maxSize();

        public boolean isEmpty();

        public boolean isFull();

        public boolean add(@Nonnull byte[] var1);

        public boolean remove(@Nonnull byte[] var1);

        @Nullable
        public byte[] peekFirst();

        @Nullable
        public byte[] peekLast();

        @Nullable
        public byte[] pollFirst();

        @Nullable
        public byte[] pollLast();

        public void bulkLoadFromOrderedIterator(@Nonnull Iterator<byte[]> var1);
    }

    private class RocksBytesIterator
    implements CloseableIterator<byte[]> {
        @Nonnull
        private final RocksIteratorWrapper iterator;
        @Nullable
        private byte[] currentElement;

        private RocksBytesIterator(RocksIteratorWrapper iterator) {
            this.iterator = iterator;
            try {
                iterator.seek(Arrays.copyOf(RocksDBCachingPriorityQueueSet.this.seekHint, RocksDBCachingPriorityQueueSet.this.seekHint.length + 1));
                this.currentElement = this.nextElementIfAvailable();
            }
            catch (Exception ex) {
                iterator.close();
                throw new FlinkRuntimeException("Could not initialize ordered iterator.", (Throwable)ex);
            }
        }

        public void close() {
            this.iterator.close();
        }

        public boolean hasNext() {
            return this.currentElement != null;
        }

        public byte[] next() {
            byte[] returnElement = this.currentElement;
            if (returnElement == null) {
                throw new NoSuchElementException("Iterator has no more elements!");
            }
            this.iterator.next();
            this.currentElement = this.nextElementIfAvailable();
            return returnElement;
        }

        private byte[] nextElementIfAvailable() {
            byte[] elementBytes;
            return this.iterator.isValid() && RocksDBCachingPriorityQueueSet.isPrefixWith(elementBytes = this.iterator.key(), RocksDBCachingPriorityQueueSet.this.groupPrefixBytes) ? elementBytes : null;
        }
    }

    private class DeserializingIteratorWrapper
    implements CloseableIterator<E> {
        @Nonnull
        private final CloseableIterator<byte[]> bytesIterator;

        private DeserializingIteratorWrapper(CloseableIterator<byte[]> bytesIterator) {
            this.bytesIterator = bytesIterator;
        }

        public void close() throws Exception {
            this.bytesIterator.close();
        }

        public boolean hasNext() {
            return this.bytesIterator.hasNext();
        }

        public E next() {
            return RocksDBCachingPriorityQueueSet.this.deserializeElement((byte[])this.bytesIterator.next());
        }
    }
}

