/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.queryablestate.network;

import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.flink.annotation.Internal;
import org.apache.flink.queryablestate.network.AbstractServerBase;
import org.apache.flink.queryablestate.network.BadRequestException;
import org.apache.flink.queryablestate.network.ChunkedByteBuf;
import org.apache.flink.queryablestate.network.messages.MessageBody;
import org.apache.flink.queryablestate.network.messages.MessageSerializer;
import org.apache.flink.queryablestate.network.messages.MessageType;
import org.apache.flink.queryablestate.network.stats.KvStateRequestStats;
import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFuture;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFutureListener;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter;
import org.apache.flink.shaded.netty4.io.netty.util.ReferenceCountUtil;
import org.apache.flink.shaded.netty4.io.netty.util.concurrent.GenericFutureListener;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
@ChannelHandler.Sharable
public abstract class AbstractServerHandler<REQ extends MessageBody, RESP extends MessageBody>
extends ChannelInboundHandlerAdapter {
    private static final Logger LOG = LoggerFactory.getLogger(AbstractServerHandler.class);
    private static final long UNKNOWN_REQUEST_ID = -1L;
    private final AbstractServerBase<REQ, RESP> server;
    private final MessageSerializer<REQ, RESP> serializer;
    protected final ExecutorService queryExecutor;
    private final KvStateRequestStats stats;

    public AbstractServerHandler(AbstractServerBase<REQ, RESP> server, MessageSerializer<REQ, RESP> serializer, KvStateRequestStats stats) {
        this.server = (AbstractServerBase)Preconditions.checkNotNull(server);
        this.serializer = (MessageSerializer)Preconditions.checkNotNull(serializer);
        this.queryExecutor = server.getQueryExecutor();
        this.stats = (KvStateRequestStats)Preconditions.checkNotNull((Object)stats);
    }

    protected String getServerName() {
        return this.server.getServerName();
    }

    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        this.stats.reportActiveConnection();
    }

    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        this.stats.reportInactiveConnection();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        Object request = null;
        long requestId = -1L;
        try {
            ByteBuf buf = (ByteBuf)msg;
            MessageType msgType = MessageSerializer.deserializeHeader(buf);
            requestId = MessageSerializer.getRequestId(buf);
            LOG.trace("Handling request with ID {}", (Object)requestId);
            if (msgType == MessageType.REQUEST) {
                request = this.serializer.deserializeRequest(buf);
                this.stats.reportRequest();
                this.queryExecutor.submit(new AsyncRequestTask(this, ctx, requestId, request, this.stats));
            } else {
                String errMsg = "Unexpected message type " + msgType + ". Expected " + MessageType.REQUEST + ".";
                ByteBuf failure = MessageSerializer.serializeServerFailure(ctx.alloc(), new IllegalArgumentException(errMsg));
                LOG.debug(errMsg);
                ctx.writeAndFlush((Object)failure);
            }
        }
        catch (Throwable t) {
            ByteBuf err;
            LOG.error("Error while handling request with ID [{}]", requestId == -1L ? "unknown" : Long.valueOf(requestId), (Object)t);
            String stringifiedCause = ExceptionUtils.stringifyException((Throwable)t);
            if (request != null) {
                String errMsg = "Failed request with ID " + requestId + ". Caused by: " + stringifiedCause;
                err = MessageSerializer.serializeRequestFailure(ctx.alloc(), requestId, new RuntimeException(errMsg));
                this.stats.reportFailedRequest();
            } else {
                String errMsg = "Failed incoming message. Caused by: " + stringifiedCause;
                err = MessageSerializer.serializeServerFailure(ctx.alloc(), new RuntimeException(errMsg));
            }
            ctx.writeAndFlush((Object)err);
        }
        finally {
            ReferenceCountUtil.release((Object)msg);
        }
    }

    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        String msg = "Exception in server pipeline. Caused by: " + ExceptionUtils.stringifyException((Throwable)cause);
        ByteBuf err = MessageSerializer.serializeServerFailure(ctx.alloc(), new RuntimeException(msg));
        LOG.debug(msg);
        ctx.writeAndFlush((Object)err).addListener((GenericFutureListener)ChannelFutureListener.CLOSE);
    }

    public abstract CompletableFuture<RESP> handleRequest(long var1, REQ var3);

    public abstract CompletableFuture<Void> shutdown();

    private static class AsyncRequestTask<REQ extends MessageBody, RESP extends MessageBody>
    implements Runnable {
        private final AbstractServerHandler<REQ, RESP> handler;
        private final ChannelHandlerContext ctx;
        private final long requestId;
        private final REQ request;
        private final KvStateRequestStats stats;
        private final long creationNanos;

        AsyncRequestTask(AbstractServerHandler<REQ, RESP> handler, ChannelHandlerContext ctx, long requestId, REQ request, KvStateRequestStats stats) {
            this.handler = (AbstractServerHandler)((Object)Preconditions.checkNotNull(handler));
            this.ctx = (ChannelHandlerContext)Preconditions.checkNotNull((Object)ctx);
            this.requestId = requestId;
            this.request = (MessageBody)Preconditions.checkNotNull(request);
            this.stats = (KvStateRequestStats)Preconditions.checkNotNull((Object)stats);
            this.creationNanos = System.nanoTime();
        }

        @Override
        public void run() {
            if (!this.ctx.channel().isActive()) {
                return;
            }
            this.handler.handleRequest(this.requestId, this.request).whenComplete((resp, throwable) -> {
                try {
                    if (throwable != null) {
                        throw throwable instanceof CompletionException ? throwable.getCause() : throwable;
                    }
                    if (resp == null) {
                        throw new BadRequestException(this.handler.getServerName(), "NULL returned for request with ID " + this.requestId + ".");
                    }
                    ByteBuf serialResp = MessageSerializer.serializeResponse(this.ctx.alloc(), this.requestId, resp);
                    int highWatermark = this.ctx.channel().config().getWriteBufferHighWaterMark();
                    ChannelFuture write = serialResp.readableBytes() <= highWatermark ? this.ctx.writeAndFlush((Object)serialResp) : this.ctx.writeAndFlush((Object)new ChunkedByteBuf(serialResp, highWatermark));
                    write.addListener((GenericFutureListener)new RequestWriteListener());
                }
                catch (BadRequestException e) {
                    LOG.debug("Bad request (request ID = {})", (Object)this.requestId, (Object)e);
                    try {
                        this.stats.reportFailedRequest();
                        ByteBuf err = MessageSerializer.serializeRequestFailure(this.ctx.alloc(), this.requestId, e);
                        this.ctx.writeAndFlush((Object)err);
                    }
                    catch (IOException io) {
                        LOG.error("Failed to respond with the error after failed request", (Throwable)io);
                    }
                }
                catch (Throwable t) {
                    LOG.error("Error while handling request with ID {}", (Object)this.requestId, (Object)t);
                    try {
                        this.stats.reportFailedRequest();
                        String errMsg = "Failed request " + this.requestId + "." + System.lineSeparator() + " Caused by: " + ExceptionUtils.stringifyException((Throwable)t);
                        ByteBuf err = MessageSerializer.serializeRequestFailure(this.ctx.alloc(), this.requestId, new RuntimeException(errMsg));
                        this.ctx.writeAndFlush((Object)err);
                    }
                    catch (IOException io) {
                        LOG.error("Failed to respond with the error after failed request", (Throwable)io);
                    }
                }
            });
        }

        public String toString() {
            return "AsyncRequestTask{requestId=" + this.requestId + ", request=" + this.request + "}";
        }

        private class RequestWriteListener
        implements ChannelFutureListener {
            private RequestWriteListener() {
            }

            public void operationComplete(ChannelFuture future) throws Exception {
                long durationNanos = System.nanoTime() - AsyncRequestTask.this.creationNanos;
                long durationMillis = TimeUnit.MILLISECONDS.convert(durationNanos, TimeUnit.NANOSECONDS);
                if (future.isSuccess()) {
                    LOG.debug("Request {} was successfully answered after {} ms.", AsyncRequestTask.this.request, (Object)durationMillis);
                    AsyncRequestTask.this.stats.reportSuccessfulRequest(durationMillis);
                } else {
                    LOG.debug("Request {} failed after {} ms", new Object[]{AsyncRequestTask.this.request, durationMillis, future.cause()});
                    AsyncRequestTask.this.stats.reportFailedRequest();
                }
            }
        }
    }
}

