/*
 * Decompiled with CFR 0.152.
 */
package org.infinispan.server.resp;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelPromise;
import java.util.List;
import java.util.concurrent.CompletionStage;
import org.infinispan.commons.util.Util;
import org.infinispan.commons.util.concurrent.CompletionStages;
import org.infinispan.server.resp.BaseRespDecoder;
import org.infinispan.server.resp.ByteBufferUtils;
import org.infinispan.server.resp.RespCommand;
import org.infinispan.server.resp.RespDecoder;
import org.infinispan.server.resp.RespRequestHandler;
import org.infinispan.server.resp.logging.AccessLoggerManager;
import org.infinispan.server.resp.logging.Log;
import org.infinispan.server.resp.logging.RespAccessLogger;
import org.infinispan.util.logging.LogFactory;

public class RespHandler
extends ChannelInboundHandlerAdapter {
    protected static final Log log = (Log)LogFactory.getLog(RespHandler.class, Log.class);
    protected static final int MINIMUM_BUFFER_SIZE = Integer.parseInt(System.getProperty("infinispan.resp.minimum-buffer-size", "4096"));
    protected final BaseRespDecoder resumeHandler;
    protected RespRequestHandler requestHandler;
    protected ByteBuf outboundBuffer;
    protected boolean resumeAutoReadOnWritability;
    private final boolean traceAccess = RespAccessLogger.isEnabled();
    private AccessLoggerManager accessLogger;

    public RespHandler(BaseRespDecoder resumeHandler, RespRequestHandler requestHandler) {
        this.resumeHandler = resumeHandler;
        this.requestHandler = requestHandler;
    }

    protected ByteBuf allocateBuffer(ChannelHandlerContext ctx, int size) {
        assert (ctx.channel().eventLoop().inEventLoop()) : "Buffer allocation should occur in event loop, it was " + Thread.currentThread().getName();
        if (this.traceAccess) {
            this.accessLogger.accept(size);
        }
        if (this.outboundBuffer != null) {
            if (this.outboundBuffer.writableBytes() > size) {
                return this.outboundBuffer;
            }
            log.tracef("Writing buffer %s as request is larger than remaining", this.outboundBuffer);
            ctx.write((Object)this.outboundBuffer, ctx.voidPromise());
        }
        int allocatedSize = Math.max(size, MINIMUM_BUFFER_SIZE);
        this.outboundBuffer = ctx.alloc().buffer(allocatedSize, allocatedSize);
        return this.outboundBuffer;
    }

    private void flushBufferIfNeeded(ChannelHandlerContext ctx, boolean runOnEventLoop, CompletionStage<?> res) {
        if (this.outboundBuffer != null) {
            log.tracef("Writing and flushing buffer %s", this.outboundBuffer);
            if (runOnEventLoop) {
                ctx.channel().eventLoop().execute(() -> {
                    ChannelPromise p = this.newPromise(ctx);
                    ctx.writeAndFlush((Object)this.outboundBuffer, p);
                    this.flushAccessLog(ctx, p, res);
                    this.outboundBuffer = null;
                });
            } else {
                ChannelPromise p = this.newPromise(ctx);
                ctx.writeAndFlush((Object)this.outboundBuffer, p);
                this.flushAccessLog(ctx, p, res);
                this.outboundBuffer = null;
            }
        }
    }

    private ChannelPromise newPromise(ChannelHandlerContext ctx) {
        return this.traceAccess ? ctx.newPromise() : ctx.voidPromise();
    }

    private void flushAccessLog(ChannelHandlerContext ctx, ChannelPromise promise, CompletionStage<?> res) {
        if (this.accessLogger == null) {
            return;
        }
        this.accessLogger.flush(ctx, (ChannelFuture)promise, res);
    }

    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
        ctx.channel().attr(RespRequestHandler.BYTE_BUF_POOL_ATTRIBUTE_KEY).set(size -> this.allocateBuffer(ctx, size));
        this.accessLogger = this.traceAccess ? new AccessLoggerManager(ctx, this.requestHandler.respServer().getTimeService()) : null;
        super.channelRegistered(ctx);
    }

    public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
        super.channelUnregistered(ctx);
        this.requestHandler.handleChannelDisconnect(ctx);
        if (this.traceAccess) {
            this.accessLogger.close();
        }
    }

    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        if (ctx.channel().config().isAutoRead()) {
            this.flushBufferIfNeeded(ctx, false, null);
        }
        super.channelReadComplete(ctx);
    }

    public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
        if (this.resumeAutoReadOnWritability && ctx.channel().isWritable()) {
            this.resumeAutoReadOnWritability = false;
            log.tracef("Re-enabling auto read for channel %s as channel is now writeable", ctx.channel());
            this.resumeAutoRead(ctx);
        }
        super.channelWritabilityChanged(ctx);
    }

    protected void resumeAutoRead(ChannelHandlerContext ctx) {
        ctx.channel().config().setAutoRead(true);
        this.resumeHandler.resumeRead();
    }

    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        RespDecoder arg = (RespDecoder)((Object)msg);
        this.handleCommandAndArguments(ctx, arg.getCommand(), arg.getArguments());
    }

    protected void handleCommandAndArguments(ChannelHandlerContext ctx, RespCommand command, List<byte[]> arguments) {
        CompletionStage<RespRequestHandler> stage;
        if (log.isTraceEnabled()) {
            log.tracef("Received command: %s with arguments %s for %s", command, Util.toStr(arguments), ctx.channel());
        }
        if (this.traceAccess) {
            this.accessLogger.track(command, arguments);
        }
        if (CompletionStages.isCompletedSuccessfully(stage = this.requestHandler.handleRequest(ctx, command, arguments))) {
            this.requestHandler = (RespRequestHandler)CompletionStages.join(stage);
            if (this.outboundBuffer != null && (long)this.outboundBuffer.readableBytes() > ctx.channel().bytesBeforeUnwritable()) {
                log.tracef("Buffer will cause channel %s to be unwriteable - forcing flush", ctx.channel());
                this.flushBufferIfNeeded(ctx, true, stage);
                ctx.channel().config().setAutoRead(false);
                this.resumeAutoReadOnWritability = true;
                return;
            }
            if (this.traceAccess) {
                this.accessLogger.register(stage);
            }
            return;
        }
        log.tracef("Disabling auto read for channel %s until previous command is complete", ctx.channel());
        ctx.channel().config().setAutoRead(false);
        stage.whenComplete((handler, t) -> {
            assert (ctx.channel().eventLoop().inEventLoop()) : "Command should complete only in event loop thread, it was " + Thread.currentThread().getName();
            if (t != null) {
                this.exceptionCaught(ctx, (Throwable)t);
                return;
            }
            this.requestHandler = handler;
            this.flushBufferIfNeeded(ctx, false, stage);
            log.tracef("Re-enabling auto read for channel %s as previous command is complete", ctx.channel());
            this.resumeAutoRead(ctx);
        });
    }

    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        log.unexpectedException(cause);
        ByteBufferUtils.stringToByteBuf("-ERR Server Error Encountered: " + cause.getMessage() + "\\r\\n", this.requestHandler.allocatorToUse);
        this.flushBufferIfNeeded(ctx, false, null);
        ctx.close();
    }
}

