/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.data.cassandra.core.cql.session;

import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.ColumnDefinitions;
import com.datastax.driver.core.ExecutionInfo;
import com.datastax.driver.core.PreparedStatement;
import com.datastax.driver.core.RegularStatement;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.ResultSetFuture;
import com.datastax.driver.core.Row;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.SimpleStatement;
import com.datastax.driver.core.Statement;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.cassandra.ReactiveResultSet;
import org.springframework.data.cassandra.ReactiveSession;
import org.springframework.util.Assert;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoProcessor;
import reactor.core.publisher.MonoSink;
import reactor.core.scheduler.Scheduler;

public class DefaultBridgedReactiveSession
implements ReactiveSession {
    private final Logger logger = LoggerFactory.getLogger(this.getClass());
    private final Session session;

    public DefaultBridgedReactiveSession(Session session) {
        Assert.notNull((Object)session, (String)"Session must not be null");
        this.session = session;
    }

    @Deprecated
    public DefaultBridgedReactiveSession(Session session, Scheduler scheduler) {
        this(session);
    }

    @Override
    public boolean isClosed() {
        return this.session.isClosed();
    }

    @Override
    public Cluster getCluster() {
        return this.session.getCluster();
    }

    @Override
    public Mono<ReactiveResultSet> execute(String query2) {
        Assert.hasText((String)query2, (String)"Query must not be empty");
        return this.execute((Statement)new SimpleStatement(query2));
    }

    @Override
    public Mono<ReactiveResultSet> execute(String query2, Object ... values) {
        Assert.hasText((String)query2, (String)"Query must not be empty");
        return this.execute((Statement)new SimpleStatement(query2, values));
    }

    @Override
    public Mono<ReactiveResultSet> execute(String query2, Map<String, Object> values) {
        Assert.hasText((String)query2, (String)"Query must not be empty");
        return this.execute((Statement)new SimpleStatement(query2, values));
    }

    @Override
    public Mono<ReactiveResultSet> execute(Statement statement) {
        Assert.notNull((Object)statement, (String)"Statement must not be null");
        return Mono.create(sink -> {
            try {
                if (this.logger.isDebugEnabled()) {
                    this.logger.debug("Executing Statement [{}]", (Object)statement);
                }
                ResultSetFuture future = this.session.executeAsync(statement);
                ListenableFuture resultSetFuture = Futures.transform((ListenableFuture)future, DefaultReactiveResultSet::new);
                DefaultBridgedReactiveSession.adaptFuture(resultSetFuture, sink);
            }
            catch (Exception cause) {
                sink.error((Throwable)cause);
            }
        });
    }

    @Override
    public Mono<PreparedStatement> prepare(String query2) {
        Assert.hasText((String)query2, (String)"Query must not be empty");
        return this.prepare((RegularStatement)new SimpleStatement(query2));
    }

    @Override
    public Mono<PreparedStatement> prepare(RegularStatement statement) {
        Assert.notNull((Object)statement, (String)"Statement must not be null");
        return Mono.create(sink -> {
            try {
                if (this.logger.isDebugEnabled()) {
                    this.logger.debug("Preparing Statement [{}]", (Object)statement);
                }
                ListenableFuture resultSetFuture = this.session.prepareAsync(statement);
                DefaultBridgedReactiveSession.adaptFuture(resultSetFuture, sink);
            }
            catch (Exception cause) {
                sink.error((Throwable)cause);
            }
        });
    }

    @Override
    public void close() {
        this.session.close();
    }

    private static <T> void adaptFuture(ListenableFuture<T> future, MonoSink<T> sink) {
        future.addListener(() -> {
            if (future.isDone()) {
                try {
                    sink.success(future.get());
                }
                catch (ExecutionException cause) {
                    sink.error(cause.getCause());
                }
                catch (Exception cause) {
                    sink.error((Throwable)cause);
                }
            }
        }, Runnable::run);
    }

    static class DefaultReactiveResultSet
    implements ReactiveResultSet {
        private final ResultSet resultSet;

        DefaultReactiveResultSet(ResultSet resultSet) {
            this.resultSet = resultSet;
        }

        @Override
        public Flux<Row> rows() {
            return this.getRows((Mono<ResultSet>)Mono.just((Object)this.resultSet));
        }

        Flux<Row> getRows(Mono<ResultSet> nextResults) {
            return nextResults.flatMapMany(it -> {
                Flux<Row> rows = DefaultReactiveResultSet.toRows(it);
                if (it.isFullyFetched()) {
                    return rows;
                }
                MonoProcessor processor = MonoProcessor.create();
                return rows.doOnComplete(() -> DefaultReactiveResultSet.fetchMore((ListenableFuture<ResultSet>)it.fetchMoreResults(), (MonoProcessor<ResultSet>)processor)).concatWith(this.getRows((Mono<ResultSet>)processor));
            });
        }

        static Flux<Row> toRows(ResultSet resultSet) {
            int prefetch = Math.max(1, resultSet.getAvailableWithoutFetching());
            return Flux.fromIterable((Iterable)resultSet).take((long)prefetch);
        }

        static void fetchMore(ListenableFuture<ResultSet> future, MonoProcessor<ResultSet> sink) {
            try {
                future.addListener(() -> {
                    try {
                        sink.onNext(future.get());
                        sink.onComplete();
                    }
                    catch (ExecutionException cause) {
                        sink.onError(cause.getCause());
                    }
                    catch (Exception cause) {
                        sink.onError((Throwable)cause);
                    }
                }, Runnable::run);
            }
            catch (Exception cause) {
                sink.onError((Throwable)cause);
            }
        }

        @Override
        public ColumnDefinitions getColumnDefinitions() {
            return this.resultSet.getColumnDefinitions();
        }

        @Override
        public boolean wasApplied() {
            return this.resultSet.wasApplied();
        }

        @Override
        public ExecutionInfo getExecutionInfo() {
            return this.resultSet.getExecutionInfo();
        }

        @Override
        public List<ExecutionInfo> getAllExecutionInfo() {
            return this.resultSet.getAllExecutionInfo();
        }
    }
}

