/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.web.reactive.socket.adapter;

import java.io.IOException;
import java.util.concurrent.atomic.AtomicBoolean;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.springframework.core.io.buffer.DataBufferFactory;
import org.springframework.http.server.reactive.AbstractListenerReadPublisher;
import org.springframework.http.server.reactive.AbstractListenerWriteProcessor;
import org.springframework.web.reactive.socket.CloseStatus;
import org.springframework.web.reactive.socket.HandshakeInfo;
import org.springframework.web.reactive.socket.WebSocketMessage;
import org.springframework.web.reactive.socket.adapter.AbstractWebSocketSession;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoProcessor;

public abstract class AbstractListenerWebSocketSession<T>
extends AbstractWebSocketSession<T>
implements Subscriber<Void> {
    private static final int RECEIVE_BUFFER_SIZE = 8192;
    private final MonoProcessor<Void> completionMono;
    private final WebSocketReceivePublisher receivePublisher = new WebSocketReceivePublisher();
    private volatile WebSocketSendProcessor sendProcessor;
    private final AtomicBoolean sendCalled = new AtomicBoolean();

    public AbstractListenerWebSocketSession(T delegate, String id, HandshakeInfo handshakeInfo, DataBufferFactory bufferFactory) {
        this(delegate, id, handshakeInfo, bufferFactory, null);
    }

    public AbstractListenerWebSocketSession(T delegate, String id, HandshakeInfo handshakeInfo, DataBufferFactory bufferFactory, MonoProcessor<Void> completionMono) {
        super(delegate, id, handshakeInfo, bufferFactory);
        this.completionMono = completionMono;
    }

    protected WebSocketSendProcessor getSendProcessor() {
        return this.sendProcessor;
    }

    @Override
    public Flux<WebSocketMessage> receive() {
        return this.canSuspendReceiving() ? Flux.from((Publisher)this.receivePublisher) : Flux.from((Publisher)this.receivePublisher).onBackpressureBuffer(8192);
    }

    @Override
    public Mono<Void> send(Publisher<WebSocketMessage> messages) {
        if (this.sendCalled.compareAndSet(false, true)) {
            this.sendProcessor = new WebSocketSendProcessor();
            return Mono.from(subscriber -> {
                messages.subscribe((Subscriber)this.sendProcessor);
                this.sendProcessor.subscribe(subscriber);
            });
        }
        return Mono.error((Throwable)new IllegalStateException("send() has already been called"));
    }

    protected abstract boolean canSuspendReceiving();

    protected abstract void suspendReceiving();

    protected abstract void resumeReceiving();

    protected abstract boolean sendMessage(WebSocketMessage var1) throws IOException;

    void handleMessage(WebSocketMessage.Type type, WebSocketMessage message) {
        this.receivePublisher.handleMessage(message);
    }

    void handleError(Throwable ex) {
        this.receivePublisher.onError(ex);
        if (this.sendProcessor != null) {
            this.sendProcessor.cancel();
            this.sendProcessor.onError(ex);
        }
    }

    void handleClose(CloseStatus reason) {
        this.receivePublisher.onAllDataRead();
        if (this.sendProcessor != null) {
            this.sendProcessor.cancel();
            this.sendProcessor.onComplete();
        }
    }

    public void onSubscribe(Subscription subscription) {
        subscription.request(Long.MAX_VALUE);
    }

    public void onNext(Void aVoid) {
    }

    public void onError(Throwable ex) {
        if (this.completionMono != null) {
            this.completionMono.onError(ex);
        }
        int code = CloseStatus.SERVER_ERROR.getCode();
        this.close(new CloseStatus(code, ex.getMessage()));
    }

    public void onComplete() {
        if (this.completionMono != null) {
            this.completionMono.onComplete();
        }
        this.close();
    }

    protected final class WebSocketSendProcessor
    extends AbstractListenerWriteProcessor<WebSocketMessage> {
        private volatile boolean isReady = true;

        protected WebSocketSendProcessor() {
        }

        protected boolean write(WebSocketMessage message) throws IOException {
            return AbstractListenerWebSocketSession.this.sendMessage(message);
        }

        protected void releaseData() {
            this.currentData = null;
        }

        protected boolean isDataEmpty(WebSocketMessage message) {
            return message.getPayload().readableByteCount() == 0;
        }

        protected boolean isWritePossible() {
            return this.isReady && this.currentData != null;
        }

        public void setReadyToSend(boolean ready) {
            this.isReady = ready;
        }
    }

    private final class WebSocketReceivePublisher
    extends AbstractListenerReadPublisher<WebSocketMessage> {
        private volatile WebSocketMessage webSocketMessage;

        private WebSocketReceivePublisher() {
        }

        protected void checkOnDataAvailable() {
            if (this.webSocketMessage != null) {
                this.onDataAvailable();
            }
        }

        protected WebSocketMessage read() throws IOException {
            if (this.webSocketMessage != null) {
                WebSocketMessage result = this.webSocketMessage;
                this.webSocketMessage = null;
                AbstractListenerWebSocketSession.this.resumeReceiving();
                return result;
            }
            return null;
        }

        void handleMessage(WebSocketMessage webSocketMessage) {
            this.webSocketMessage = webSocketMessage;
            AbstractListenerWebSocketSession.this.suspendReceiving();
            this.onDataAvailable();
        }
    }
}

