/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.data.redis.core;

import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Proxy;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.time.Instant;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
import org.reactivestreams.Publisher;
import org.springframework.data.redis.connection.DataType;
import org.springframework.data.redis.connection.ReactiveRedisConnection;
import org.springframework.data.redis.connection.ReactiveRedisConnectionFactory;
import org.springframework.data.redis.connection.ReactiveSubscription;
import org.springframework.data.redis.core.CloseSuppressingInvocationHandler;
import org.springframework.data.redis.core.DefaultReactiveGeoOperations;
import org.springframework.data.redis.core.DefaultReactiveHashOperations;
import org.springframework.data.redis.core.DefaultReactiveHyperLogLogOperations;
import org.springframework.data.redis.core.DefaultReactiveListOperations;
import org.springframework.data.redis.core.DefaultReactiveSetOperations;
import org.springframework.data.redis.core.DefaultReactiveStreamOperations;
import org.springframework.data.redis.core.DefaultReactiveValueOperations;
import org.springframework.data.redis.core.DefaultReactiveZSetOperations;
import org.springframework.data.redis.core.ReactiveGeoOperations;
import org.springframework.data.redis.core.ReactiveHashOperations;
import org.springframework.data.redis.core.ReactiveHyperLogLogOperations;
import org.springframework.data.redis.core.ReactiveListOperations;
import org.springframework.data.redis.core.ReactiveRedisCallback;
import org.springframework.data.redis.core.ReactiveRedisOperations;
import org.springframework.data.redis.core.ReactiveSetOperations;
import org.springframework.data.redis.core.ReactiveStreamOperations;
import org.springframework.data.redis.core.ReactiveValueOperations;
import org.springframework.data.redis.core.ReactiveZSetOperations;
import org.springframework.data.redis.core.ScanOptions;
import org.springframework.data.redis.core.script.DefaultReactiveScriptExecutor;
import org.springframework.data.redis.core.script.ReactiveScriptExecutor;
import org.springframework.data.redis.core.script.RedisScript;
import org.springframework.data.redis.hash.HashMapper;
import org.springframework.data.redis.hash.ObjectHashMapper;
import org.springframework.data.redis.listener.ReactiveRedisMessageListenerContainer;
import org.springframework.data.redis.listener.Topic;
import org.springframework.data.redis.serializer.RedisElementReader;
import org.springframework.data.redis.serializer.RedisElementWriter;
import org.springframework.data.redis.serializer.RedisSerializationContext;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
import org.springframework.util.ClassUtils;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;

public class ReactiveRedisTemplate<K, V>
implements ReactiveRedisOperations<K, V> {
    private final ReactiveRedisConnectionFactory connectionFactory;
    private final RedisSerializationContext<K, V> serializationContext;
    private final boolean exposeConnection;
    private final ReactiveScriptExecutor<K> reactiveScriptExecutor;

    public ReactiveRedisTemplate(ReactiveRedisConnectionFactory connectionFactory, RedisSerializationContext<K, V> serializationContext) {
        this(connectionFactory, serializationContext, false);
    }

    public ReactiveRedisTemplate(ReactiveRedisConnectionFactory connectionFactory, RedisSerializationContext<K, V> serializationContext, boolean exposeConnection) {
        Assert.notNull((Object)connectionFactory, (String)"ConnectionFactory must not be null!");
        Assert.notNull(serializationContext, (String)"SerializationContext must not be null!");
        this.connectionFactory = connectionFactory;
        this.serializationContext = serializationContext;
        this.exposeConnection = exposeConnection;
        this.reactiveScriptExecutor = new DefaultReactiveScriptExecutor<K>(connectionFactory, serializationContext);
    }

    public ReactiveRedisConnectionFactory getConnectionFactory() {
        return this.connectionFactory;
    }

    @Override
    public <T> Flux<T> execute(ReactiveRedisCallback<T> action) {
        return this.execute(action, this.exposeConnection);
    }

    public <T> Flux<T> execute(ReactiveRedisCallback<T> action, boolean exposeConnection) {
        Assert.notNull(action, (String)"Callback object must not be null");
        ReactiveRedisConnectionFactory factory = this.getConnectionFactory();
        ReactiveRedisConnection conn = factory.getReactiveConnection();
        try {
            ReactiveRedisConnection connToUse = this.preProcessConnection(conn, false);
            ReactiveRedisConnection connToExpose = exposeConnection ? connToUse : this.createRedisConnectionProxy(connToUse);
            Publisher<T> result = action.doInRedis(connToExpose);
            return Flux.from(this.postProcessResult(result, connToUse, false)).doFinally(signalType -> conn.close());
        }
        catch (RuntimeException e) {
            conn.close();
            throw e;
        }
    }

    public <T> Flux<T> createFlux(ReactiveRedisCallback<T> callback) {
        Assert.notNull(callback, (String)"ReactiveRedisCallback must not be null!");
        return Flux.defer(() -> this.doInConnection(callback, this.exposeConnection));
    }

    public <T> Mono<T> createMono(ReactiveRedisCallback<T> callback) {
        Assert.notNull(callback, (String)"ReactiveRedisCallback must not be null!");
        return Mono.defer(() -> Mono.from(this.doInConnection(callback, this.exposeConnection)));
    }

    private <T> Publisher<T> doInConnection(ReactiveRedisCallback<T> action, boolean exposeConnection) {
        Assert.notNull(action, (String)"Callback object must not be null");
        ReactiveRedisConnectionFactory factory = this.getConnectionFactory();
        ReactiveRedisConnection conn = factory.getReactiveConnection();
        ReactiveRedisConnection connToUse = this.preProcessConnection(conn, false);
        ReactiveRedisConnection connToExpose = exposeConnection ? connToUse : this.createRedisConnectionProxy(connToUse);
        Publisher<T> result = action.doInRedis(connToExpose);
        return Flux.from(this.postProcessResult(result, connToUse, false)).doFinally(signal -> conn.close());
    }

    @Override
    public Mono<Long> convertAndSend(String destination, V message) {
        Assert.hasText((String)destination, (String)"Destination channel must not be empty!");
        Assert.notNull(message, (String)"Message must not be null!");
        return this.createMono(connection -> connection.pubSubCommands().publish(this.getSerializationContext().getStringSerializationPair().write(destination), this.getSerializationContext().getValueSerializationPair().write(message)));
    }

    @Override
    public Flux<? extends ReactiveSubscription.Message<String, V>> listenTo(Topic ... topics) {
        ReactiveRedisMessageListenerContainer container = new ReactiveRedisMessageListenerContainer(this.getConnectionFactory());
        return container.receive(Arrays.asList(topics), this.getSerializationContext().getStringSerializationPair(), this.getSerializationContext().getValueSerializationPair()).doFinally(signalType -> container.destroyLater().subscribeOn(Schedulers.elastic()));
    }

    @Override
    public Mono<Boolean> hasKey(K key) {
        Assert.notNull(key, (String)"Key must not be null!");
        return this.createMono(connection -> connection.keyCommands().exists(this.rawKey(key)));
    }

    @Override
    public Mono<DataType> type(K key) {
        Assert.notNull(key, (String)"Key must not be null!");
        return this.createMono(connection -> connection.keyCommands().type(this.rawKey(key)));
    }

    @Override
    public Flux<K> keys(K pattern) {
        Assert.notNull(pattern, (String)"Pattern must not be null!");
        return this.createFlux(connection -> connection.keyCommands().keys(this.rawKey(pattern))).flatMap(Flux::fromIterable).map(this::readKey);
    }

    @Override
    public Flux<K> scan(ScanOptions options) {
        Assert.notNull((Object)options, (String)"ScanOptions must not be null!");
        return this.createFlux(connection -> connection.keyCommands().scan(options)).map(this::readKey);
    }

    @Override
    public Mono<K> randomKey() {
        return this.createMono(connection -> connection.keyCommands().randomKey()).map(this::readKey);
    }

    @Override
    public Mono<Boolean> rename(K oldKey, K newKey) {
        Assert.notNull(oldKey, (String)"Old key must not be null!");
        Assert.notNull(newKey, (String)"New Key must not be null!");
        return this.createMono(connection -> connection.keyCommands().rename(this.rawKey(oldKey), this.rawKey(newKey)));
    }

    @Override
    public Mono<Boolean> renameIfAbsent(K oldKey, K newKey) {
        Assert.notNull(oldKey, (String)"Old key must not be null!");
        Assert.notNull(newKey, (String)"New Key must not be null!");
        return this.createMono(connection -> connection.keyCommands().renameNX(this.rawKey(oldKey), this.rawKey(newKey)));
    }

    @Override
    @SafeVarargs
    public final Mono<Long> delete(K ... keys) {
        Assert.notNull(keys, (String)"Keys must not be null!");
        Assert.notEmpty((Object[])keys, (String)"Keys must not be empty!");
        Assert.noNullElements((Object[])keys, (String)"Keys must not contain null elements!");
        if (keys.length == 1) {
            return this.createMono(connection -> connection.keyCommands().del(this.rawKey(keys[0])));
        }
        Mono listOfKeys = Flux.fromArray((Object[])keys).map(this::rawKey).collectList();
        return this.createMono(connection -> listOfKeys.flatMap(rawKeys -> connection.keyCommands().mDel((List<ByteBuffer>)rawKeys)));
    }

    @Override
    public Mono<Long> delete(Publisher<K> keys) {
        Assert.notNull(keys, (String)"Keys must not be null!");
        return this.createFlux(connection -> connection.keyCommands().mDel((Publisher<List<ByteBuffer>>)Flux.from((Publisher)keys).map(this::rawKey).buffer(128)).map(ReactiveRedisConnection.CommandResponse::getOutput)).collect(Collectors.summingLong(value -> value));
    }

    @Override
    @SafeVarargs
    public final Mono<Long> unlink(K ... keys) {
        Assert.notNull(keys, (String)"Keys must not be null!");
        Assert.notEmpty((Object[])keys, (String)"Keys must not be empty!");
        Assert.noNullElements((Object[])keys, (String)"Keys must not contain null elements!");
        if (keys.length == 1) {
            return this.createMono(connection -> connection.keyCommands().unlink(this.rawKey(keys[0])));
        }
        Mono listOfKeys = Flux.fromArray((Object[])keys).map(this::rawKey).collectList();
        return this.createMono(connection -> listOfKeys.flatMap(rawKeys -> connection.keyCommands().mUnlink((List<ByteBuffer>)rawKeys)));
    }

    @Override
    public Mono<Long> unlink(Publisher<K> keys) {
        Assert.notNull(keys, (String)"Keys must not be null!");
        return this.createFlux(connection -> connection.keyCommands().mUnlink((Publisher<List<ByteBuffer>>)Flux.from((Publisher)keys).map(this::rawKey).buffer(128)).map(ReactiveRedisConnection.CommandResponse::getOutput)).collect(Collectors.summingLong(value -> value));
    }

    @Override
    public Mono<Boolean> expire(K key, Duration timeout) {
        Assert.notNull(key, (String)"Key must not be null!");
        Assert.notNull((Object)timeout, (String)"Timeout must not be null!");
        if (timeout.getNano() == 0) {
            return this.createMono(connection -> connection.keyCommands().expire(this.rawKey(key), timeout));
        }
        return this.createMono(connection -> connection.keyCommands().pExpire(this.rawKey(key), timeout));
    }

    @Override
    public Mono<Boolean> expireAt(K key, Instant expireAt) {
        Assert.notNull(key, (String)"Key must not be null!");
        Assert.notNull((Object)expireAt, (String)"Expire at must not be null!");
        if (expireAt.getNano() == 0) {
            return this.createMono(connection -> connection.keyCommands().expireAt(this.rawKey(key), expireAt));
        }
        return this.createMono(connection -> connection.keyCommands().pExpireAt(this.rawKey(key), expireAt));
    }

    @Override
    public Mono<Boolean> persist(K key) {
        Assert.notNull(key, (String)"Key must not be null!");
        return this.createMono(connection -> connection.keyCommands().persist(this.rawKey(key)));
    }

    @Override
    public Mono<Duration> getExpire(K key) {
        Assert.notNull(key, (String)"Key must not be null!");
        return this.createMono(connection -> connection.keyCommands().pTtl(this.rawKey(key)).flatMap(expiry -> {
            if (expiry == -1L) {
                return Mono.just((Object)Duration.ZERO);
            }
            if (expiry == -2L) {
                return Mono.empty();
            }
            return Mono.just((Object)Duration.ofMillis(expiry));
        }));
    }

    @Override
    public Mono<Boolean> move(K key, int dbIndex) {
        Assert.notNull(key, (String)"Key must not be null!");
        return this.createMono(connection -> connection.keyCommands().move(this.rawKey(key), dbIndex));
    }

    @Override
    public <T> Flux<T> execute(RedisScript<T> script, List<K> keys, List<?> args) {
        return this.reactiveScriptExecutor.execute(script, keys, args);
    }

    @Override
    public <T> Flux<T> execute(RedisScript<T> script, List<K> keys, List<?> args, RedisElementWriter<?> argsWriter, RedisElementReader<T> resultReader) {
        return this.reactiveScriptExecutor.execute(script, keys, args, argsWriter, resultReader);
    }

    protected ReactiveRedisConnection preProcessConnection(ReactiveRedisConnection connection, boolean existingConnection) {
        return connection;
    }

    protected <T> Publisher<T> postProcessResult(Publisher<T> result, ReactiveRedisConnection connection, boolean existingConnection) {
        return result;
    }

    protected ReactiveRedisConnection createRedisConnectionProxy(ReactiveRedisConnection reactiveRedisConnection) {
        Class[] ifcs = ClassUtils.getAllInterfacesForClass(reactiveRedisConnection.getClass(), (ClassLoader)this.getClass().getClassLoader());
        return (ReactiveRedisConnection)Proxy.newProxyInstance(reactiveRedisConnection.getClass().getClassLoader(), ifcs, (InvocationHandler)new CloseSuppressingInvocationHandler(reactiveRedisConnection));
    }

    @Override
    public ReactiveGeoOperations<K, V> opsForGeo() {
        return this.opsForGeo((RedisSerializationContext)this.serializationContext);
    }

    @Override
    public <K1, V1> ReactiveGeoOperations<K1, V1> opsForGeo(RedisSerializationContext<K1, V1> serializationContext) {
        return new DefaultReactiveGeoOperations<K1, V1>(this, serializationContext);
    }

    @Override
    public <HK, HV> ReactiveHashOperations<K, HK, HV> opsForHash() {
        return this.opsForHash((RedisSerializationContext)this.serializationContext);
    }

    @Override
    public <K1, HK, HV> ReactiveHashOperations<K1, HK, HV> opsForHash(RedisSerializationContext<K1, ?> serializationContext) {
        return new DefaultReactiveHashOperations(this, serializationContext);
    }

    @Override
    public ReactiveHyperLogLogOperations<K, V> opsForHyperLogLog() {
        return this.opsForHyperLogLog((RedisSerializationContext)this.serializationContext);
    }

    @Override
    public <K1, V1> ReactiveHyperLogLogOperations<K1, V1> opsForHyperLogLog(RedisSerializationContext<K1, V1> serializationContext) {
        return new DefaultReactiveHyperLogLogOperations<K1, V1>(this, serializationContext);
    }

    @Override
    public ReactiveListOperations<K, V> opsForList() {
        return this.opsForList((RedisSerializationContext)this.serializationContext);
    }

    @Override
    public <K1, V1> ReactiveListOperations<K1, V1> opsForList(RedisSerializationContext<K1, V1> serializationContext) {
        return new DefaultReactiveListOperations<K1, V1>(this, serializationContext);
    }

    @Override
    public ReactiveSetOperations<K, V> opsForSet() {
        return this.opsForSet((RedisSerializationContext)this.serializationContext);
    }

    @Override
    public <K1, V1> ReactiveSetOperations<K1, V1> opsForSet(RedisSerializationContext<K1, V1> serializationContext) {
        return new DefaultReactiveSetOperations<K1, V1>(this, serializationContext);
    }

    @Override
    public <HK, HV> ReactiveStreamOperations<K, HK, HV> opsForStream() {
        return this.opsForStream(this.serializationContext);
    }

    @Override
    public <HK, HV> ReactiveStreamOperations<K, HK, HV> opsForStream(HashMapper<? super K, ? super HK, ? super HV> hashMapper) {
        return this.opsForStream(this.serializationContext, hashMapper);
    }

    @Override
    public <HK, HV> ReactiveStreamOperations<K, HK, HV> opsForStream(RedisSerializationContext<K, ?> serializationContext) {
        return this.opsForStream(serializationContext, new ObjectHashMapper());
    }

    protected <HK, HV> ReactiveStreamOperations<K, HK, HV> opsForStream(RedisSerializationContext<K, ?> serializationContext, @Nullable HashMapper<? super K, ? super HK, ? super HV> hashMapper) {
        return new DefaultReactiveStreamOperations<K, HK, HV>(this, serializationContext, hashMapper);
    }

    @Override
    public ReactiveValueOperations<K, V> opsForValue() {
        return this.opsForValue((RedisSerializationContext)this.serializationContext);
    }

    @Override
    public <K1, V1> ReactiveValueOperations<K1, V1> opsForValue(RedisSerializationContext<K1, V1> serializationContext) {
        return new DefaultReactiveValueOperations<K1, V1>(this, serializationContext);
    }

    @Override
    public ReactiveZSetOperations<K, V> opsForZSet() {
        return this.opsForZSet((RedisSerializationContext)this.serializationContext);
    }

    @Override
    public <K1, V1> ReactiveZSetOperations<K1, V1> opsForZSet(RedisSerializationContext<K1, V1> serializationContext) {
        return new DefaultReactiveZSetOperations<K1, V1>(this, serializationContext);
    }

    @Override
    public RedisSerializationContext<K, V> getSerializationContext() {
        return this.serializationContext;
    }

    private ByteBuffer rawKey(K key) {
        return this.getSerializationContext().getKeySerializationPair().getWriter().write(key);
    }

    private K readKey(ByteBuffer buffer) {
        return this.getSerializationContext().getKeySerializationPair().getReader().read(buffer);
    }
}

