/*
 * Decompiled with CFR 0.152.
 */
package org.infinispan.api.reactive.client.impl.listener;

import io.reactivex.rxjava3.processors.FlowableProcessor;
import io.reactivex.rxjava3.processors.UnicastProcessor;
import java.nio.ByteBuffer;
import java.util.Collections;
import org.infinispan.api.client.listener.ClientKeyValueStoreListener;
import org.infinispan.api.reactive.EntryStatus;
import org.infinispan.api.reactive.KeyValueEntry;
import org.infinispan.client.hotrod.RemoteCache;
import org.infinispan.client.hotrod.annotation.ClientCacheEntryCreated;
import org.infinispan.client.hotrod.annotation.ClientCacheEntryModified;
import org.infinispan.client.hotrod.annotation.ClientCacheEntryRemoved;
import org.infinispan.client.hotrod.annotation.ClientListener;
import org.infinispan.client.hotrod.event.ClientCacheEntryCustomEvent;
import org.infinispan.commons.configuration.ClassAllowList;
import org.infinispan.commons.io.UnsignedNumeric;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;

public class ClientListenerImpl<K, V>
implements Publisher {
    private RemoteCache<Object, Object> cache;
    private final ClientKeyValueStoreListener listener;
    private final ClassAllowList whitelist = new ClassAllowList(Collections.singletonList(".*"));

    public ClientListenerImpl(RemoteCache cache, ClientKeyValueStoreListener listener) {
        this.cache = cache;
        this.listener = listener;
    }

    public void subscribe(Subscriber subscriber) {
        UnicastProcessor processor = UnicastProcessor.create();
        RemoteClientListener listener = new RemoteClientListener((FlowableProcessor)processor);
        processor.doOnError(e -> this.cache.removeClientListener((Object)listener));
        processor.doOnCancel(() -> this.cache.removeClientListener((Object)listener));
        processor.subscribe(subscriber);
        this.cache.addClientListener((Object)listener);
    }

    protected KeyValueEntry<K, V> readEvent(ClientCacheEntryCustomEvent<byte[]> event, EntryStatus entryStatus) {
        KeyValueEntry keyValuePair;
        byte[] eventData = (byte[])event.getEventData();
        ByteBuffer rawData = ByteBuffer.wrap(eventData);
        byte[] rawKey = this.readElement(rawData);
        byte[] rawValue = this.readElement(rawData);
        Object key = this.cache.getDataFormat().keyToObj(rawKey, this.whitelist);
        if (rawValue != null) {
            Object value = this.cache.getDataFormat().valueToObj(rawValue, this.whitelist);
            keyValuePair = new KeyValueEntry(key, value, entryStatus);
        } else {
            keyValuePair = new KeyValueEntry(key, null, entryStatus);
        }
        return keyValuePair;
    }

    private byte[] readElement(ByteBuffer buffer) {
        if (buffer.remaining() == 0) {
            return null;
        }
        int length = UnsignedNumeric.readUnsignedInt((ByteBuffer)buffer);
        byte[] element = new byte[length];
        buffer.get(element);
        return element;
    }

    @ClientListener(converterFactoryName="___eager-key-value-version-converter", useRawData=true, includeCurrentState=true)
    class RemoteClientListener {
        private FlowableProcessor processor;

        public RemoteClientListener(FlowableProcessor processor) {
            this.processor = processor;
        }

        @ClientCacheEntryCreated
        public void handleCreated(ClientCacheEntryCustomEvent<byte[]> event) {
            if (ClientListenerImpl.this.listener.isListenCreated()) {
                KeyValueEntry keyValueEntry = ClientListenerImpl.this.readEvent(event, EntryStatus.CREATED);
                this.processor.onNext(keyValueEntry);
            }
        }

        @ClientCacheEntryModified
        public void handleModified(ClientCacheEntryCustomEvent<byte[]> event) {
            if (ClientListenerImpl.this.listener.isListenUpdated()) {
                KeyValueEntry keyValueEntry = ClientListenerImpl.this.readEvent(event, EntryStatus.UPDATED);
                this.processor.onNext(keyValueEntry);
            }
        }

        @ClientCacheEntryRemoved
        public void handleRemoved(ClientCacheEntryCustomEvent<byte[]> event) {
            if (ClientListenerImpl.this.listener.isListenDeleted()) {
                KeyValueEntry keyValueEntry = ClientListenerImpl.this.readEvent(event, EntryStatus.DELETED);
                this.processor.onNext(keyValueEntry);
            }
        }
    }
}

