/*
 * Decompiled with CFR 0.152.
 */
package org.mariadb.r2dbc.client;

import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelOption;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslHandler;
import io.netty.util.ReferenceCountUtil;
import io.r2dbc.spi.R2dbcException;
import io.r2dbc.spi.R2dbcNonTransientResourceException;
import io.r2dbc.spi.R2dbcTransientResourceException;
import io.r2dbc.spi.TransactionDefinition;
import java.net.SocketAddress;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
import javax.net.ssl.SSLEngine;
import javax.net.ssl.SSLException;
import javax.net.ssl.SSLHandshakeException;
import javax.net.ssl.SSLParameters;
import org.mariadb.r2dbc.ExceptionFactory;
import org.mariadb.r2dbc.HaMode;
import org.mariadb.r2dbc.MariadbConnectionConfiguration;
import org.mariadb.r2dbc.MariadbTransactionDefinition;
import org.mariadb.r2dbc.SslMode;
import org.mariadb.r2dbc.client.Client;
import org.mariadb.r2dbc.client.DecoderState;
import org.mariadb.r2dbc.client.Exchange;
import org.mariadb.r2dbc.client.MariadbFrameDecoder;
import org.mariadb.r2dbc.client.MariadbPacketEncoder;
import org.mariadb.r2dbc.client.RedoContext;
import org.mariadb.r2dbc.client.ServerVersion;
import org.mariadb.r2dbc.client.SimpleContext;
import org.mariadb.r2dbc.message.ClientMessage;
import org.mariadb.r2dbc.message.Context;
import org.mariadb.r2dbc.message.ServerMessage;
import org.mariadb.r2dbc.message.client.ExecutePacket;
import org.mariadb.r2dbc.message.client.PreparePacket;
import org.mariadb.r2dbc.message.client.QueryPacket;
import org.mariadb.r2dbc.message.client.QuitPacket;
import org.mariadb.r2dbc.message.client.SslRequestPacket;
import org.mariadb.r2dbc.message.server.CompletePrepareResult;
import org.mariadb.r2dbc.message.server.ErrorPacket;
import org.mariadb.r2dbc.message.server.InitialHandshakePacket;
import org.mariadb.r2dbc.util.HostAddress;
import org.mariadb.r2dbc.util.PrepareCache;
import org.mariadb.r2dbc.util.ServerPrepareResult;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Operators;
import reactor.core.publisher.Sinks;
import reactor.netty.Connection;
import reactor.netty.channel.AbortedException;
import reactor.netty.resources.ConnectionProvider;
import reactor.netty.tcp.TcpClient;
import reactor.util.Logger;
import reactor.util.Loggers;
import reactor.util.concurrent.Queues;

public class SimpleClient
implements Client {
    private static final Logger logger = Loggers.getLogger(SimpleClient.class);
    protected final MariadbConnectionConfiguration configuration;
    private final ServerMessageSubscriber messageSubscriber;
    private final Sinks.Many<ClientMessage> requestSink = Sinks.many().unicast().onBackpressureBuffer();
    private final Queue<Exchange> exchangeQueue = (Queue)Queues.get((int)Queues.SMALL_BUFFER_SIZE).get();
    private final Queue<ServerMessage> receiverQueue = (Queue)Queues.get((int)Queues.SMALL_BUFFER_SIZE).get();
    private final AtomicBoolean isClosed = new AtomicBoolean(false);
    private final MariadbFrameDecoder decoder;
    private final MariadbPacketEncoder encoder;
    private final PrepareCache prepareCache;
    private final ByteBufAllocator byteBufAllocator;
    private volatile boolean closeRequested = false;
    protected final ReentrantLock lock;
    protected final Connection connection;
    protected final HostAddress hostAddress;
    protected volatile Context context;

    @Override
    public Context getContext() {
        return this.context;
    }

    protected SimpleClient(Connection connection, MariadbConnectionConfiguration configuration, HostAddress hostAddress, ReentrantLock lock) {
        this.connection = connection;
        this.configuration = configuration;
        this.hostAddress = hostAddress;
        this.lock = lock;
        this.prepareCache = new PrepareCache(this.configuration.useServerPrepStmts() ? this.configuration.getPrepareCacheSize() : 0, this);
        this.decoder = new MariadbFrameDecoder(this.exchangeQueue, this, configuration);
        this.encoder = new MariadbPacketEncoder();
        this.byteBufAllocator = connection.outbound().alloc();
        this.messageSubscriber = new ServerMessageSubscriber(this.lock, this.exchangeQueue, this.receiverQueue);
        connection.addHandlerFirst((ChannelHandler)this.decoder);
        if (configuration.getSslConfig().getSslMode() == SslMode.TUNNEL) {
            try {
                SSLEngine engine;
                SslContext sslContext = configuration.getSslConfig().getSslContext();
                if (hostAddress != null) {
                    engine = sslContext.newEngine(connection.channel().alloc(), hostAddress.getHost(), hostAddress.getPort());
                    SSLParameters sslParameters = engine.getSSLParameters();
                    if (!configuration.getSslConfig().tunnelHostVerificationDisabled()) {
                        sslParameters.setEndpointIdentificationAlgorithm("HTTPS");
                    }
                    engine.setSSLParameters(sslParameters);
                } else {
                    engine = sslContext.newEngine(connection.channel().alloc());
                }
                connection.addHandlerFirst((ChannelHandler)new SslHandler(engine));
            }
            catch (SSLException e) {
                this.handleConnectionError(e);
            }
        }
        if (logger.isTraceEnabled()) {
            connection.addHandlerFirst(LoggingHandler.class.getSimpleName(), (ChannelHandler)new LoggingHandler(SimpleClient.class, LogLevel.TRACE));
        }
        connection.inbound().receiveObject().cast(ServerMessage.class).onErrorResume(this::receiveResumeError).subscribe((CoreSubscriber)this.messageSubscriber);
        this.requestSink.asFlux().map(this.encoder::encodeFlux).flatMap(b -> connection.outbound().send((Publisher)b), 1).onErrorResume(this::sendResumeError).doAfterTerminate(this::closeChannelIfNeeded).subscribe();
    }

    public static Mono<SimpleClient> connect(ConnectionProvider connectionProvider, SocketAddress socketAddress, HostAddress hostAddress, MariadbConnectionConfiguration configuration, ReentrantLock lock) {
        TcpClient tcpClient = TcpClient.create((ConnectionProvider)connectionProvider).remoteAddress(() -> socketAddress).runOn(configuration.loopResources());
        tcpClient = SimpleClient.setSocketOption(configuration, tcpClient);
        return tcpClient.connect().flatMap(it -> Mono.just((Object)new SimpleClient((Connection)it, configuration, hostAddress, lock)));
    }

    public static TcpClient setSocketOption(MariadbConnectionConfiguration configuration, TcpClient tcpClient) {
        if (configuration.getConnectTimeout() != null) {
            tcpClient = tcpClient.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, (Object)Math.toIntExact(configuration.getConnectTimeout().toMillis()));
        }
        if (configuration.isTcpKeepAlive()) {
            tcpClient = tcpClient.option(ChannelOption.SO_KEEPALIVE, (Object)configuration.isTcpKeepAlive());
        }
        if (configuration.isTcpAbortiveClose()) {
            tcpClient = tcpClient.option(ChannelOption.SO_LINGER, (Object)0);
        }
        return tcpClient;
    }

    @Override
    public void handleConnectionError(Throwable throwable) {
        if (AbortedException.isConnectionReset((Throwable)throwable) && !this.isConnected()) {
            this.messageSubscriber.close((R2dbcException)new R2dbcNonTransientResourceException("Cannot execute command since connection is already closed", "08000", throwable));
        } else {
            R2dbcNonTransientResourceException error = throwable instanceof SSLHandshakeException ? new R2dbcNonTransientResourceException(throwable.getMessage(), "08000", throwable) : new R2dbcNonTransientResourceException("Connection error", "08000", throwable);
            this.messageSubscriber.close((R2dbcException)error);
            this.closeChannelIfNeeded();
        }
    }

    private Mono<Void> sendResumeError(Throwable throwable) {
        this.handleConnectionError(throwable);
        this.requestSink.emitComplete(Sinks.EmitFailureHandler.FAIL_FAST);
        return this.quitOrClose();
    }

    private Mono<ServerMessage> receiveResumeError(Throwable throwable) {
        Mono empty = Mono.empty();
        return this.sendResumeError(throwable).then(empty);
    }

    @Override
    public boolean closeChannelIfNeeded() {
        if (this.isClosed.compareAndSet(false, true)) {
            Channel channel = this.connection.channel();
            this.messageSubscriber.close((R2dbcException)new R2dbcNonTransientResourceException("Connection unexpectedly closed", "08000"));
            if (channel.isOpen()) {
                this.connection.dispose();
            }
            return true;
        }
        return false;
    }

    @Override
    public Mono<Void> close() {
        this.closeRequested = true;
        return this.quitOrClose();
    }

    private Mono<Void> quitOrClose() {
        return Mono.defer(() -> {
            this.messageSubscriber.close((R2dbcException)new R2dbcNonTransientResourceException(this.closeRequested ? "Connection has been closed" : "Connection closed", "08000"));
            if (this.isClosed.compareAndSet(false, true)) {
                Channel channel = this.connection.channel();
                if (channel.isOpen()) {
                    this.connection.dispose();
                    return this.connection.onDispose();
                }
                return Flux.just((Object)QuitPacket.INSTANCE).doOnNext(message -> this.connection.channel().writeAndFlush(message.encode(this.context, this.context.getByteBufAllocator()))).then().doOnSuccess(v -> this.connection.dispose()).then(this.connection.onDispose());
            }
            return Mono.empty();
        });
    }

    @Override
    public Mono<Void> sendSslRequest(SslRequestPacket sslRequest, MariadbConnectionConfiguration configuration) {
        try {
            SslHandler sslHandler;
            SslContext sslContext = configuration.getSslConfig().getSslContext();
            if (this.hostAddress != null) {
                sslHandler = sslContext.newHandler(this.connection.channel().alloc(), this.hostAddress.getHost(), this.hostAddress.getPort());
                if (configuration.getSslConfig().getSslMode() == SslMode.VERIFY_FULL) {
                    SSLEngine sslEngine = sslHandler.engine();
                    SSLParameters sslParameters = sslEngine.getSSLParameters();
                    sslParameters.setEndpointIdentificationAlgorithm("HTTPS");
                    sslEngine.setSSLParameters(sslParameters);
                }
            } else {
                sslHandler = sslContext.newHandler(this.connection.channel().alloc());
            }
            this.requestSink.emitNext((Object)sslRequest, Sinks.EmitFailureHandler.FAIL_FAST);
            this.connection.addHandlerFirst((ChannelHandler)sslHandler);
        }
        catch (Throwable e) {
            this.closeChannelIfNeeded();
            return Mono.error((Throwable)e);
        }
        return Mono.empty();
    }

    private Flux<ServerMessage> execute(Consumer<FluxSink<ServerMessage>> s) {
        return Flux.create(sink -> {
            if (!this.isConnected()) {
                sink.error((Throwable)new R2dbcNonTransientResourceException("Connection is close. Cannot send anything"));
                return;
            }
            try {
                this.lock.lock();
                s.accept((FluxSink<ServerMessage>)sink);
            }
            finally {
                this.lock.unlock();
            }
        });
    }

    @Override
    public long getThreadId() {
        return this.context.getThreadId();
    }

    @Override
    public Mono<Void> beginTransaction() {
        try {
            this.lock.lock();
            Mono mono = this.execute(sink -> {
                if (!this.exchangeQueue.isEmpty() || (this.context.getServerStatus() & 1) == 0) {
                    Exchange exchange = new Exchange((FluxSink<ServerMessage>)sink, DecoderState.QUERY_RESPONSE, "BEGIN");
                    if (this.exchangeQueue.offer(exchange)) {
                        this.requestSink.emitNext((Object)new QueryPacket("BEGIN"), Sinks.EmitFailureHandler.FAIL_FAST);
                        sink.onRequest(value -> this.messageSubscriber.onRequest(exchange, value));
                    } else {
                        sink.error((Throwable)new R2dbcTransientResourceException("Request queue limit reached"));
                    }
                } else {
                    logger.debug("Skipping start transaction because already in transaction");
                    sink.complete();
                }
            }).handle(ExceptionFactory.withSql("BEGIN")::handleErrorResponse).then();
            return mono;
        }
        finally {
            this.lock.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public Mono<Void> beginTransaction(TransactionDefinition definition) {
        StringBuilder sb = new StringBuilder("START TRANSACTION");
        boolean first = true;
        if (Boolean.TRUE.equals(definition.getAttribute(TransactionDefinition.READ_ONLY))) {
            sb.append(" READ ONLY");
            first = false;
        }
        if (Boolean.TRUE.equals(definition.getAttribute(MariadbTransactionDefinition.WITH_CONSISTENT_SNAPSHOT))) {
            if (!first) {
                sb.append(",");
            }
            sb.append(" WITH CONSISTENT SNAPSHOT");
        }
        String sql = sb.toString();
        try {
            this.lock.lock();
            Mono mono = this.execute(sink -> {
                if (!this.exchangeQueue.isEmpty() || (this.context.getServerStatus() & 1) == 0) {
                    Exchange exchange = new Exchange((FluxSink<ServerMessage>)sink, DecoderState.QUERY_RESPONSE, sql);
                    if (this.exchangeQueue.offer(exchange)) {
                        this.requestSink.emitNext((Object)new QueryPacket(sql), Sinks.EmitFailureHandler.FAIL_FAST);
                        sink.onRequest(value -> this.messageSubscriber.onRequest(exchange, value));
                    } else {
                        sink.error((Throwable)new R2dbcTransientResourceException("Request queue limit reached"));
                    }
                } else {
                    logger.debug("Skipping start transaction because already in transaction");
                    sink.complete();
                }
            }).handle(ExceptionFactory.withSql(sql)::handleErrorResponse).then();
            return mono;
        }
        finally {
            this.lock.unlock();
        }
    }

    @Override
    public Mono<Void> commitTransaction() {
        try {
            this.lock.lock();
            Mono mono = this.execute(sink -> this.executeWhenTransaction((FluxSink<ServerMessage>)sink, "COMMIT")).handle(ExceptionFactory.withSql("COMMIT")::handleErrorResponse).then();
            return mono;
        }
        finally {
            this.lock.unlock();
        }
    }

    private void executeWhenTransaction(FluxSink<ServerMessage> sink, String sql) {
        if (!this.exchangeQueue.isEmpty() || (this.context.getServerStatus() & 1) > 0) {
            try {
                this.lock.lock();
                Exchange exchange = new Exchange(sink, DecoderState.QUERY_RESPONSE, sql);
                if (this.exchangeQueue.offer(exchange)) {
                    sink.onRequest(value -> this.messageSubscriber.onRequest(exchange, value));
                    this.requestSink.emitNext((Object)new QueryPacket(sql), Sinks.EmitFailureHandler.FAIL_FAST);
                }
                sink.error((Throwable)new R2dbcTransientResourceException("Request queue limit reached"));
            }
            catch (Throwable t) {
                t.printStackTrace();
                throw t;
            }
            finally {
                this.lock.unlock();
            }
        } else {
            logger.debug(String.format("Skipping '%s' because no active transaction", sql));
            sink.complete();
        }
    }

    @Override
    public Mono<Void> rollbackTransaction() {
        try {
            this.lock.lock();
            Mono mono = this.execute(sink -> this.executeWhenTransaction((FluxSink<ServerMessage>)sink, "ROLLBACK")).handle(ExceptionFactory.withSql("ROLLBACK")::handleErrorResponse).then();
            return mono;
        }
        finally {
            this.lock.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public Mono<Void> rollbackTransactionToSavepoint(String name) {
        try {
            this.lock.lock();
            String sql = String.format("ROLLBACK TO SAVEPOINT `%s`", name.replace("`", "``"));
            Mono mono = this.execute(sink -> this.executeWhenTransaction((FluxSink<ServerMessage>)sink, sql)).handle(ExceptionFactory.withSql(sql)::handleErrorResponse).then();
            return mono;
        }
        finally {
            this.lock.unlock();
        }
    }

    @Override
    public Mono<Void> setAutoCommit(boolean autoCommit) {
        try {
            this.lock.lock();
            Mono mono = this.execute(sink -> {
                String sql = "SET autocommit=" + (autoCommit ? (char)'1' : (char)'0');
                if (!this.exchangeQueue.isEmpty() || autoCommit != this.isAutoCommit()) {
                    try {
                        Exchange exchange = new Exchange((FluxSink<ServerMessage>)sink, DecoderState.QUERY_RESPONSE, sql);
                        if (this.exchangeQueue.offer(exchange)) {
                            sink.onRequest(value -> this.messageSubscriber.onRequest(exchange, value));
                            this.requestSink.emitNext((Object)new QueryPacket(sql), Sinks.EmitFailureHandler.FAIL_FAST);
                        }
                        sink.error((Throwable)new R2dbcTransientResourceException("Request queue limit reached"));
                    }
                    catch (Throwable t) {
                        t.printStackTrace();
                        throw t;
                    }
                } else {
                    logger.debug("Skipping autocommit since already in that state");
                    sink.complete();
                }
            }).handle(ExceptionFactory.withSql(null)::handleErrorResponse).then();
            return mono;
        }
        finally {
            this.lock.unlock();
        }
    }

    public Flux<ServerMessage> receive(DecoderState initialState) {
        return Flux.create(sink -> {
            try {
                this.lock.lock();
                Exchange exchange = new Exchange((FluxSink<ServerMessage>)sink, initialState);
                sink.onRequest(value -> this.messageSubscriber.onRequest(exchange, value));
                if (!this.exchangeQueue.offer(exchange)) {
                    sink.error((Throwable)new R2dbcTransientResourceException("Request queue limit reached during handshake"));
                }
            }
            catch (Throwable t) {
                t.printStackTrace();
                throw t;
            }
            finally {
                this.lock.unlock();
            }
        });
    }

    @Override
    public void setContext(InitialHandshakePacket handshake, long clientCapabilities) {
        this.context = !HaMode.NONE.equals((Object)this.configuration.getHaMode()) && this.configuration.isTransactionReplay() ? new RedoContext(handshake.getServerVersion(), handshake.getThreadId(), handshake.getCapabilities(), handshake.getServerStatus(), handshake.isMariaDBServer(), clientCapabilities, this.configuration.getDatabase(), this.byteBufAllocator, this.configuration.getIsolationLevel()) : new SimpleContext(handshake.getServerVersion(), handshake.getThreadId(), handshake.getCapabilities(), handshake.getServerStatus(), handshake.isMariaDBServer(), clientCapabilities, this.configuration.getDatabase(), this.byteBufAllocator, this.configuration.getIsolationLevel());
        this.decoder.setContext(this.context);
        this.encoder.setContext(this.context);
    }

    @Override
    public boolean isAutoCommit() {
        return (this.context.getServerStatus() & 2) > 0;
    }

    @Override
    public boolean isInTransaction() {
        return (this.context.getServerStatus() & 1) > 0;
    }

    @Override
    public boolean noBackslashEscapes() {
        return (this.context.getServerStatus() & 0x200) > 0;
    }

    @Override
    public ServerVersion getVersion() {
        return this.context != null ? this.context.getVersion() : ServerVersion.UNKNOWN_VERSION;
    }

    @Override
    public boolean isConnected() {
        if (this.isClosed.get()) {
            return false;
        }
        return this.connection.channel().isOpen();
    }

    @Override
    public boolean isCloseRequested() {
        return this.closeRequested;
    }

    @Override
    public void sendCommandWithoutResult(ClientMessage message) {
        try {
            this.lock.lock();
            this.requestSink.emitNext((Object)message, Sinks.EmitFailureHandler.FAIL_FAST);
        }
        finally {
            this.lock.unlock();
        }
    }

    @Override
    public Flux<ServerMessage> sendCommand(ClientMessage message, boolean canSafelyBeReExecuted) {
        return this.sendCommand(message, DecoderState.QUERY_RESPONSE, null, canSafelyBeReExecuted);
    }

    @Override
    public Flux<ServerMessage> sendCommand(ClientMessage message, DecoderState initialState, boolean canSafelyBeReExecuted) {
        return this.sendCommand(message, initialState, null, canSafelyBeReExecuted);
    }

    @Override
    public Flux<ServerMessage> sendCommand(ClientMessage message, DecoderState initialState, String sql, boolean canSafelyBeReExecuted) {
        return Flux.create(sink -> {
            if (!this.isConnected() || this.messageSubscriber.isClose()) {
                sink.error((Throwable)new R2dbcNonTransientResourceException("Connection is close. Cannot send anything"));
                return;
            }
            try {
                this.lock.lock();
                Exchange exchange = new Exchange((FluxSink<ServerMessage>)sink, initialState, sql);
                if (this.exchangeQueue.offer(exchange)) {
                    if (message instanceof PreparePacket) {
                        this.decoder.addPrepare(((PreparePacket)message).getSql());
                    }
                    sink.onRequest(value -> this.messageSubscriber.onRequest(exchange, value));
                    this.requestSink.emitNext((Object)message, Sinks.EmitFailureHandler.FAIL_FAST);
                } else {
                    sink.error((Throwable)new R2dbcTransientResourceException("Request queue limit reached"));
                }
            }
            catch (Throwable t) {
                sink.error(t);
            }
            finally {
                this.lock.unlock();
            }
        });
    }

    @Override
    public Mono<ServerPrepareResult> sendPrepare(ClientMessage requests, ExceptionFactory factory, String sql) {
        return this.sendCommand(requests, DecoderState.PREPARE_RESPONSE, sql, true).handle((it, sink) -> {
            if (it instanceof ErrorPacket) {
                sink.error((Throwable)factory.from((ErrorPacket)it));
                return;
            }
            if (it instanceof CompletePrepareResult) {
                sink.next((Object)((CompletePrepareResult)it).getPrepare());
            }
            if (it.ending()) {
                sink.complete();
            }
        }).cast(ServerPrepareResult.class).singleOrEmpty();
    }

    @Override
    public Flux<ServerMessage> sendCommand(PreparePacket preparePacket, ExecutePacket executePacket, boolean canSafelyBeReExecuted) {
        return Flux.create(sink -> {
            if (!this.isConnected() || this.messageSubscriber.isClose()) {
                sink.error((Throwable)new R2dbcNonTransientResourceException("Connection is close. Cannot send anything"));
                return;
            }
            try {
                this.lock.lock();
                Exchange exchange = new Exchange((FluxSink<ServerMessage>)sink, DecoderState.PREPARE_AND_EXECUTE_RESPONSE, preparePacket.getSql());
                if (this.exchangeQueue.offer(exchange)) {
                    sink.onRequest(value -> this.messageSubscriber.onRequest(exchange, value));
                    this.decoder.addPrepare(preparePacket.getSql());
                    this.requestSink.emitNext((Object)preparePacket, Sinks.EmitFailureHandler.FAIL_FAST);
                    this.requestSink.emitNext((Object)executePacket, Sinks.EmitFailureHandler.FAIL_FAST);
                } else {
                    sink.error((Throwable)new R2dbcTransientResourceException("Request queue limit reached"));
                }
            }
            catch (Throwable t) {
                t.printStackTrace();
                sink.error(t);
            }
            finally {
                this.lock.unlock();
            }
        });
    }

    @Override
    public HostAddress getHostAddress() {
        return this.hostAddress;
    }

    @Override
    public PrepareCache getPrepareCache() {
        return this.prepareCache;
    }

    public String toString() {
        return "Client{isClosed=" + this.isClosed + ", context=" + this.context + '}';
    }

    protected class ServerMessageSubscriber
    implements CoreSubscriber<ServerMessage> {
        private Subscription upstream;
        private volatile boolean close;
        private final AtomicLong receiverDemands = new AtomicLong(0L);
        private final ReentrantLock lock;
        private final Queue<Exchange> exchangeQueue;
        private final Queue<ServerMessage> receiverQueue;

        public ServerMessageSubscriber(ReentrantLock lock, Queue<Exchange> exchangeQueue, Queue<ServerMessage> receiverQueue) {
            this.lock = lock;
            this.receiverQueue = receiverQueue;
            this.exchangeQueue = exchangeQueue;
        }

        public void onSubscribe(Subscription subscription) {
            this.upstream = subscription;
        }

        public void onError(Throwable t) {
            t.printStackTrace();
            if (this.close) {
                Operators.onErrorDropped((Throwable)t, (reactor.util.context.Context)this.currentContext());
                return;
            }
            SimpleClient.this.requestSink.emitComplete(Sinks.EmitFailureHandler.FAIL_FAST);
            SimpleClient.this.handleConnectionError(t);
            SimpleClient.this.quitOrClose().subscribe();
        }

        public void onComplete() {
            this.close((R2dbcException)new R2dbcNonTransientResourceException(String.format("Connection %s", SimpleClient.this.closeChannelIfNeeded() ? "unexpected error" : "error"), "08000"));
            SimpleClient.this.quitOrClose().subscribe();
        }

        public void onNext(ServerMessage message) {
            if (this.close) {
                Operators.onNextDropped((Object)message, (reactor.util.context.Context)this.currentContext());
                return;
            }
            this.receiverDemands.decrementAndGet();
            Exchange exchange = this.exchangeQueue.peek();
            ReferenceCountUtil.retain((Object)message);
            if (this.receiverQueue.isEmpty() && exchange != null && exchange.hasDemand()) {
                if (exchange.emit(message)) {
                    this.exchangeQueue.poll();
                }
                if (exchange.hasDemand() || exchange.isCancelled()) {
                    this.requestQueueFilling();
                }
                return;
            }
            if (!this.receiverQueue.offer(message)) {
                message.release();
                Operators.onNextDropped((Object)message, (reactor.util.context.Context)this.currentContext());
                this.onError((Throwable)new R2dbcNonTransientResourceException("unexpected : server message queue is full"));
                return;
            }
            this.tryDrainQueue();
        }

        public void onRequest(Exchange exchange, long n) {
            exchange.incrementDemand(n);
            this.requestQueueFilling();
            this.tryDrainQueue();
        }

        private void requestQueueFilling() {
            if (this.receiverQueue.isEmpty() && this.receiverDemands.compareAndSet(0L, Queues.SMALL_BUFFER_SIZE)) {
                this.upstream.request((long)Queues.SMALL_BUFFER_SIZE);
            }
        }

        private void tryDrainQueue() {
            while (!this.receiverQueue.isEmpty()) {
                Exchange exchange;
                if (!this.lock.tryLock()) {
                    return;
                }
                try {
                    while (!this.receiverQueue.isEmpty()) {
                        exchange = this.exchangeQueue.peek();
                        if (exchange == null || !exchange.hasDemand()) {
                            return;
                        }
                        ServerMessage srvMsg = this.receiverQueue.poll();
                        if (srvMsg == null) {
                            return;
                        }
                        if (!exchange.emit(srvMsg)) continue;
                        this.exchangeQueue.poll();
                    }
                }
                finally {
                    this.lock.unlock();
                }
                exchange = this.exchangeQueue.peek();
                if (exchange != null && !exchange.hasDemand()) continue;
                this.requestQueueFilling();
            }
        }

        public void close(R2dbcException error) {
            Exchange exchange;
            this.close = true;
            while ((exchange = this.exchangeQueue.poll()) != null) {
                exchange.onError((Throwable)error);
            }
            while (!this.receiverQueue.isEmpty()) {
                this.receiverQueue.poll().release();
            }
        }

        public boolean isClose() {
            return this.close;
        }
    }
}

