/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.network.netty;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import java.io.IOException;
import java.net.SocketAddress;
import java.util.ArrayDeque;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferProvider;
import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
import org.apache.flink.runtime.io.network.netty.NettyMessage;
import org.apache.flink.runtime.io.network.netty.exception.LocalTransportException;
import org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException;
import org.apache.flink.runtime.io.network.netty.exception.TransportException;
import org.apache.flink.runtime.io.network.partition.PartitionNotFoundException;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannelID;
import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
import org.apache.flink.runtime.util.event.EventListener;
import org.apache.flink.shaded.com.google.common.collect.Maps;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class PartitionRequestClientHandler
extends ChannelInboundHandlerAdapter {
    private static final Logger LOG = LoggerFactory.getLogger(PartitionRequestClientHandler.class);
    private final ConcurrentMap<InputChannelID, RemoteInputChannel> inputChannels = new ConcurrentHashMap<InputChannelID, RemoteInputChannel>();
    private final AtomicBoolean channelError = new AtomicBoolean(false);
    private final BufferListenerTask bufferListener = new BufferListenerTask();
    private final Queue<Object> stagedMessages = new ArrayDeque<Object>();
    private final StagedMessagesHandlerTask stagedMessagesHandler = new StagedMessagesHandlerTask();
    private final ConcurrentMap<InputChannelID, InputChannelID> cancelled = Maps.newConcurrentMap();
    private volatile ChannelHandlerContext ctx;

    PartitionRequestClientHandler() {
    }

    void addInputChannel(RemoteInputChannel listener) {
        Preconditions.checkState((!this.channelError.get() ? 1 : 0) != 0, (Object)"There has been an error in the channel.");
        if (!this.inputChannels.containsKey((Object)listener.getInputChannelId())) {
            this.inputChannels.put(listener.getInputChannelId(), listener);
        }
    }

    void removeInputChannel(RemoteInputChannel listener) {
        this.inputChannels.remove((Object)listener.getInputChannelId());
    }

    void cancelRequestFor(InputChannelID inputChannelId) {
        if (inputChannelId == null || this.ctx == null) {
            return;
        }
        if (this.cancelled.putIfAbsent(inputChannelId, inputChannelId) == null) {
            this.ctx.writeAndFlush((Object)new NettyMessage.CancelPartitionRequest(inputChannelId));
        }
    }

    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        if (this.ctx == null) {
            this.ctx = ctx;
        }
        super.channelActive(ctx);
    }

    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        if (!this.inputChannels.isEmpty()) {
            SocketAddress remoteAddr = ctx.channel().remoteAddress();
            this.notifyAllChannelsOfErrorAndClose(new RemoteTransportException("Connection unexpectedly closed by remote task manager '" + remoteAddr + "'. " + "This might indicate that the remote task manager was lost.", remoteAddr));
        }
        super.channelInactive(ctx);
    }

    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        if (cause instanceof TransportException) {
            this.notifyAllChannelsOfErrorAndClose(cause);
        } else {
            SocketAddress remoteAddr = ctx.channel().remoteAddress();
            TransportException tex = cause instanceof IOException && cause.getMessage().equals("Connection reset by peer") ? new RemoteTransportException("Lost connection to task manager '" + remoteAddr + "'. This indicates " + "that the remote task manager was lost.", remoteAddr, cause) : new LocalTransportException(cause.getMessage(), ctx.channel().localAddress(), cause);
            this.notifyAllChannelsOfErrorAndClose(tex);
        }
    }

    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        try {
            if (!this.bufferListener.hasStagedBufferOrEvent() && this.stagedMessages.isEmpty()) {
                this.decodeMsg(msg);
            } else {
                this.stagedMessages.add(msg);
            }
        }
        catch (Throwable t) {
            this.notifyAllChannelsOfErrorAndClose(t);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void notifyAllChannelsOfErrorAndClose(Throwable cause) {
        if (this.channelError.compareAndSet(false, true)) {
            try {
                for (RemoteInputChannel inputChannel : this.inputChannels.values()) {
                    inputChannel.onError(cause);
                }
            }
            catch (Throwable t) {
                LOG.warn("An Exception was thrown during error notification of a remote input channel.", t);
            }
            finally {
                this.inputChannels.clear();
                if (this.ctx != null) {
                    this.ctx.close();
                }
            }
        }
    }

    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        super.channelReadComplete(ctx);
    }

    private boolean decodeMsg(Object msg) throws Throwable {
        Class<?> msgClazz = msg.getClass();
        if (msgClazz == NettyMessage.BufferResponse.class) {
            NettyMessage.BufferResponse bufferOrEvent = (NettyMessage.BufferResponse)msg;
            RemoteInputChannel inputChannel = (RemoteInputChannel)this.inputChannels.get((Object)bufferOrEvent.receiverId);
            if (inputChannel == null) {
                bufferOrEvent.releaseBuffer();
                this.cancelRequestFor(bufferOrEvent.receiverId);
                return true;
            }
            return this.decodeBufferOrEvent(inputChannel, bufferOrEvent);
        }
        if (msgClazz == NettyMessage.ErrorResponse.class) {
            NettyMessage.ErrorResponse error = (NettyMessage.ErrorResponse)msg;
            SocketAddress remoteAddr = this.ctx.channel().remoteAddress();
            if (error.isFatalError()) {
                this.notifyAllChannelsOfErrorAndClose(new RemoteTransportException("Fatal error at remote task manager '" + remoteAddr + "'.", remoteAddr, error.cause));
            } else {
                RemoteInputChannel inputChannel = (RemoteInputChannel)this.inputChannels.get((Object)error.receiverId);
                if (inputChannel != null) {
                    if (error.cause.getClass() == PartitionNotFoundException.class) {
                        inputChannel.onFailedPartitionRequest();
                    } else {
                        inputChannel.onError(new RemoteTransportException("Error at remote task manager '" + remoteAddr + "'.", remoteAddr, error.cause));
                    }
                }
            }
        } else {
            throw new IllegalStateException("Received unknown message from producer: " + msg.getClass());
        }
        return true;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private boolean decodeBufferOrEvent(RemoteInputChannel inputChannel, NettyMessage.BufferResponse bufferOrEvent) throws Throwable {
        boolean releaseNettyBuffer = true;
        try {
            if (bufferOrEvent.isBuffer()) {
                if (bufferOrEvent.getSize() == 0) {
                    inputChannel.onEmptyBuffer(bufferOrEvent.sequenceNumber);
                    boolean bl = true;
                    return bl;
                }
                BufferProvider bufferProvider = inputChannel.getBufferProvider();
                if (bufferProvider == null) {
                    this.cancelRequestFor(bufferOrEvent.receiverId);
                    boolean bl = false;
                    return bl;
                }
                while (true) {
                    Buffer buffer;
                    if ((buffer = bufferProvider.requestBuffer()) != null) {
                        buffer.setSize(bufferOrEvent.getSize());
                        bufferOrEvent.getNettyBuffer().readBytes(buffer.getNioBuffer());
                        inputChannel.onBuffer(buffer, bufferOrEvent.sequenceNumber);
                        boolean bl = true;
                        return bl;
                    }
                    if (this.bufferListener.waitForBuffer(bufferProvider, bufferOrEvent)) {
                        releaseNettyBuffer = false;
                        boolean bl = false;
                        return bl;
                    }
                    if (!bufferProvider.isDestroyed()) continue;
                    boolean bl = false;
                    return bl;
                }
            }
            byte[] byteArray = new byte[bufferOrEvent.getSize()];
            bufferOrEvent.getNettyBuffer().readBytes(byteArray);
            MemorySegment memSeg = MemorySegmentFactory.wrap((byte[])byteArray);
            Buffer buffer = new Buffer(memSeg, FreeingBufferRecycler.INSTANCE, false);
            inputChannel.onBuffer(buffer, bufferOrEvent.sequenceNumber);
            boolean bl = true;
            return bl;
        }
        finally {
            if (releaseNettyBuffer) {
                bufferOrEvent.releaseBuffer();
            }
        }
    }

    public class StagedMessagesHandlerTask
    implements Runnable {
        @Override
        public void run() {
            try {
                Object msg;
                while ((msg = PartitionRequestClientHandler.this.stagedMessages.poll()) != null) {
                    if (PartitionRequestClientHandler.this.decodeMsg(msg)) continue;
                    return;
                }
                PartitionRequestClientHandler.this.ctx.channel().config().setAutoRead(true);
                PartitionRequestClientHandler.this.ctx.channel().read();
            }
            catch (Throwable t) {
                PartitionRequestClientHandler.this.notifyAllChannelsOfErrorAndClose(t);
            }
        }
    }

    private class BufferListenerTask
    implements EventListener<Buffer>,
    Runnable {
        private final AtomicReference<Buffer> availableBuffer = new AtomicReference();
        private NettyMessage.BufferResponse stagedBufferResponse;

        private BufferListenerTask() {
        }

        private boolean waitForBuffer(BufferProvider bufferProvider, NettyMessage.BufferResponse bufferResponse) {
            this.stagedBufferResponse = bufferResponse;
            if (bufferProvider.addListener(this)) {
                if (PartitionRequestClientHandler.this.ctx.channel().config().isAutoRead()) {
                    PartitionRequestClientHandler.this.ctx.channel().config().setAutoRead(false);
                }
                return true;
            }
            this.stagedBufferResponse = null;
            return false;
        }

        private boolean hasStagedBufferOrEvent() {
            return this.stagedBufferResponse != null;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         * Enabled force condition propagation
         * Lifted jumps to return sites
         */
        @Override
        public void onEvent(Buffer buffer) {
            boolean success = false;
            try {
                if (buffer != null) {
                    if (!this.availableBuffer.compareAndSet(null, buffer)) throw new IllegalStateException("Received a buffer notification,  but the previous one has not been handled yet.");
                    PartitionRequestClientHandler.this.ctx.channel().eventLoop().execute((Runnable)this);
                    success = true;
                    return;
                } else {
                    this.stagedBufferResponse = null;
                    if (PartitionRequestClientHandler.this.stagedMessages.isEmpty()) {
                        PartitionRequestClientHandler.this.ctx.channel().config().setAutoRead(true);
                        PartitionRequestClientHandler.this.ctx.channel().read();
                        return;
                    } else {
                        PartitionRequestClientHandler.this.ctx.channel().eventLoop().execute((Runnable)PartitionRequestClientHandler.this.stagedMessagesHandler);
                    }
                }
                return;
            }
            catch (Throwable t) {
                PartitionRequestClientHandler.this.ctx.channel().eventLoop().execute((Runnable)new AsyncErrorNotificationTask(t));
                return;
            }
            finally {
                if (!success && buffer != null) {
                    buffer.recycle();
                }
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            boolean success = false;
            Buffer buffer = null;
            try {
                buffer = this.availableBuffer.getAndSet(null);
                if (buffer == null) {
                    throw new IllegalStateException("Running buffer availability task w/o a buffer.");
                }
                buffer.setSize(this.stagedBufferResponse.getSize());
                this.stagedBufferResponse.getNettyBuffer().readBytes(buffer.getNioBuffer());
                this.stagedBufferResponse.releaseBuffer();
                RemoteInputChannel inputChannel = (RemoteInputChannel)PartitionRequestClientHandler.this.inputChannels.get((Object)this.stagedBufferResponse.receiverId);
                if (inputChannel != null) {
                    inputChannel.onBuffer(buffer, this.stagedBufferResponse.sequenceNumber);
                    success = true;
                } else {
                    PartitionRequestClientHandler.this.cancelRequestFor(this.stagedBufferResponse.receiverId);
                }
                this.stagedBufferResponse = null;
                if (PartitionRequestClientHandler.this.stagedMessages.isEmpty()) {
                    PartitionRequestClientHandler.this.ctx.channel().config().setAutoRead(true);
                    PartitionRequestClientHandler.this.ctx.channel().read();
                } else {
                    PartitionRequestClientHandler.this.ctx.channel().eventLoop().execute((Runnable)PartitionRequestClientHandler.this.stagedMessagesHandler);
                }
            }
            catch (Throwable t) {
                PartitionRequestClientHandler.this.notifyAllChannelsOfErrorAndClose(t);
            }
            finally {
                if (!success && buffer != null) {
                    buffer.recycle();
                }
            }
        }
    }

    private class AsyncErrorNotificationTask
    implements Runnable {
        private final Throwable error;

        public AsyncErrorNotificationTask(Throwable error) {
            this.error = error;
        }

        @Override
        public void run() {
            PartitionRequestClientHandler.this.notifyAllChannelsOfErrorAndClose(this.error);
        }
    }
}

