/*
 * Decompiled with CFR 0.152.
 */
package io.leangen.graphql.spqr.spring.web.apollo;

import graphql.ExecutionResult;
import graphql.GraphQL;
import io.leangen.graphql.spqr.spring.web.apollo.ApolloMessage;
import io.leangen.graphql.spqr.spring.web.apollo.ApolloMessages;
import io.leangen.graphql.spqr.spring.web.apollo.StartMessage;
import io.leangen.graphql.spqr.spring.web.dto.ExecutorParams;
import io.leangen.graphql.spqr.spring.web.dto.GraphQLRequest;
import io.leangen.graphql.spqr.spring.web.dto.TransportType;
import io.leangen.graphql.spqr.spring.web.mvc.websocket.GraphQLWebSocketExecutor;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.atomic.AtomicReference;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.web.socket.CloseStatus;
import org.springframework.web.socket.TextMessage;
import org.springframework.web.socket.WebSocketMessage;
import org.springframework.web.socket.WebSocketSession;
import org.springframework.web.socket.handler.TextWebSocketHandler;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;

class ApolloProtocolHandler
extends TextWebSocketHandler {
    private final GraphQL graphQL;
    private final GraphQLWebSocketExecutor executor;
    private final TaskScheduler taskScheduler;
    private final int keepAliveInterval;
    private final Map<String, Disposable> subscriptions = new ConcurrentHashMap<String, Disposable>();
    private final AtomicReference<ScheduledFuture<?>> keepAlive = new AtomicReference();
    private static final Logger log = LoggerFactory.getLogger(ApolloProtocolHandler.class);

    public ApolloProtocolHandler(GraphQL graphQL, GraphQLWebSocketExecutor executor, TaskScheduler taskScheduler, int keepAliveInterval) {
        this.graphQL = graphQL;
        this.executor = executor;
        this.taskScheduler = taskScheduler;
        this.keepAliveInterval = keepAliveInterval;
    }

    public void afterConnectionEstablished(WebSocketSession session) throws Exception {
        super.afterConnectionEstablished(session);
        if (this.taskScheduler != null) {
            this.keepAlive.compareAndSet(null, this.taskScheduler.scheduleWithFixedDelay(this.keepAliveTask(session), (long)Math.max(this.keepAliveInterval, 1000)));
        }
    }

    public void afterConnectionClosed(WebSocketSession session, CloseStatus status) {
        this.cancelAll();
        if (this.taskScheduler != null) {
            this.keepAlive.getAndUpdate(task -> {
                if (task != null) {
                    task.cancel(false);
                }
                return null;
            });
        }
    }

    public void handleTransportError(WebSocketSession session, Throwable exception) {
        this.fatalError(session, exception);
    }

    protected void handleTextMessage(WebSocketSession session, TextMessage message) {
        try {
            ApolloMessage apolloMessage;
            try {
                apolloMessage = ApolloMessages.from(message);
            }
            catch (IOException e) {
                session.sendMessage((WebSocketMessage)ApolloMessages.connectionError());
                return;
            }
            switch (apolloMessage.getType()) {
                case "connection_init": {
                    session.sendMessage((WebSocketMessage)ApolloMessages.connectionAck());
                    if (this.taskScheduler == null) break;
                    session.sendMessage((WebSocketMessage)ApolloMessages.keepAlive());
                    break;
                }
                case "start": {
                    GraphQLRequest request = (GraphQLRequest)((StartMessage)apolloMessage).getPayload();
                    ExecutorParams<WebSocketSession> params = new ExecutorParams<WebSocketSession>(request, session, TransportType.WEBSOCKET);
                    ExecutionResult result = this.executor.execute(this.graphQL, params);
                    if (result.getData() instanceof Publisher) {
                        this.handleSubscription(apolloMessage.getId(), result, session);
                        break;
                    }
                    this.handleQueryOrMutation(apolloMessage.getId(), result, session);
                    break;
                }
                case "stop": {
                    Disposable toStop = this.subscriptions.get(apolloMessage.getId());
                    if (toStop == null) break;
                    toStop.dispose();
                    this.subscriptions.remove(apolloMessage.getId(), toStop);
                    break;
                }
                case "connection_terminate": {
                    session.close();
                    this.cancelAll();
                }
            }
        }
        catch (Exception e) {
            this.fatalError(session, e);
        }
    }

    private void handleQueryOrMutation(String id, ExecutionResult result, WebSocketSession session) {
        try {
            session.sendMessage((WebSocketMessage)ApolloMessages.data(id, result));
            session.sendMessage((WebSocketMessage)ApolloMessages.complete(id));
        }
        catch (IOException e) {
            this.fatalError(session, e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void handleSubscription(String id, ExecutionResult executionResult, WebSocketSession session) {
        Publisher events = (Publisher)executionResult.getData();
        Disposable subscription = Flux.from((Publisher)events).subscribe(result -> this.onNext((ExecutionResult)result, id, session), error -> this.onError((Throwable)error, id, session), () -> this.onComplete(id, session));
        Map<String, Disposable> map = this.subscriptions;
        synchronized (map) {
            this.subscriptions.put(id, subscription);
        }
    }

    private void onNext(ExecutionResult result, String id, WebSocketSession session) {
        try {
            if (result.getErrors().isEmpty()) {
                session.sendMessage((WebSocketMessage)ApolloMessages.data(id, result));
            } else {
                session.sendMessage((WebSocketMessage)ApolloMessages.error(id, result.getErrors()));
            }
        }
        catch (IOException e) {
            this.fatalError(session, e);
        }
    }

    private void onError(Throwable error, String id, WebSocketSession session) {
        try {
            session.sendMessage((WebSocketMessage)ApolloMessages.error(id, error));
            session.sendMessage((WebSocketMessage)ApolloMessages.complete(id));
        }
        catch (IOException e) {
            this.fatalError(session, e);
        }
    }

    private void onComplete(String id, WebSocketSession session) {
        try {
            session.sendMessage((WebSocketMessage)ApolloMessages.complete(id));
        }
        catch (IOException e) {
            this.fatalError(session, e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void cancelAll() {
        Map<String, Disposable> map = this.subscriptions;
        synchronized (map) {
            this.subscriptions.values().forEach(Disposable::dispose);
            this.subscriptions.clear();
        }
    }

    private void fatalError(WebSocketSession session, Throwable exception) {
        try {
            session.close(exception instanceof IOException ? CloseStatus.SESSION_NOT_RELIABLE : CloseStatus.SERVER_ERROR);
        }
        catch (Exception suppressed) {
            exception.addSuppressed(suppressed);
        }
        this.cancelAll();
        log.warn(String.format("WebSocket session %s (%s) closed due to an exception", session.getId(), session.getRemoteAddress()), exception);
    }

    private Runnable keepAliveTask(WebSocketSession session) {
        return () -> {
            try {
                if (session != null && session.isOpen()) {
                    session.sendMessage((WebSocketMessage)ApolloMessages.keepAlive());
                }
            }
            catch (IOException exception) {
                this.fatalError(session, exception);
            }
        };
    }
}

