/*
 * Decompiled with CFR 0.152.
 */
package org.apache.storm.messaging.netty;

import java.io.IOException;
import org.apache.storm.messaging.netty.ControlMessage;
import org.apache.storm.messaging.netty.ISaslServer;
import org.apache.storm.messaging.netty.SaslMessageToken;
import org.apache.storm.messaging.netty.SaslNettyServer;
import org.apache.storm.messaging.netty.SaslNettyServerState;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelHandler;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SaslStormServerHandler
extends SimpleChannelUpstreamHandler {
    ISaslServer server;
    private byte[] token;
    private String topologyName;
    private static final Logger LOG = LoggerFactory.getLogger(SaslStormServerHandler.class);

    public SaslStormServerHandler(ISaslServer server) throws IOException {
        this.server = server;
        this.getSASLCredentials();
    }

    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
        Object msg = e.getMessage();
        if (msg == null) {
            return;
        }
        Channel channel = ctx.getChannel();
        if (msg instanceof ControlMessage && e.getMessage() == ControlMessage.SASL_TOKEN_MESSAGE_REQUEST) {
            SaslNettyServer saslNettyServer = (SaslNettyServer)SaslNettyServerState.getSaslNettyServer.get(channel);
            if (saslNettyServer == null) {
                LOG.debug("No saslNettyServer for " + channel + " yet; creating now, with topology token: ");
                try {
                    saslNettyServer = new SaslNettyServer(this.topologyName, this.token);
                }
                catch (IOException ioe) {
                    LOG.error("Error occurred while creating saslNettyServer on server " + channel.getLocalAddress() + " for client " + channel.getRemoteAddress());
                    saslNettyServer = null;
                }
                SaslNettyServerState.getSaslNettyServer.set(channel, (Object)saslNettyServer);
            } else {
                LOG.debug("Found existing saslNettyServer on server:" + channel.getLocalAddress() + " for client " + channel.getRemoteAddress());
            }
            LOG.debug("processToken:  With nettyServer: " + saslNettyServer + " and token length: " + this.token.length);
            SaslMessageToken saslTokenMessageRequest = new SaslMessageToken(saslNettyServer.response(new byte[0]));
            channel.write((Object)saslTokenMessageRequest);
            return;
        }
        if (msg instanceof SaslMessageToken) {
            SaslNettyServer saslNettyServer = (SaslNettyServer)SaslNettyServerState.getSaslNettyServer.get(channel);
            if (saslNettyServer == null) {
                throw new Exception("saslNettyServer was unexpectedly null for channel: " + channel);
            }
            SaslMessageToken saslTokenMessageRequest = new SaslMessageToken(saslNettyServer.response(((SaslMessageToken)msg).getSaslToken()));
            channel.write((Object)saslTokenMessageRequest);
            if (saslNettyServer.isComplete()) {
                LOG.debug("SASL authentication is complete for client with username: " + saslNettyServer.getUserName());
                channel.write((Object)ControlMessage.SASL_COMPLETE_REQUEST);
                LOG.debug("Removing SaslServerHandler from pipeline since SASL authentication is complete.");
                ctx.getPipeline().remove((ChannelHandler)this);
                this.server.authenticated(channel);
            }
        } else {
            LOG.warn("Sending upstream an unexpected non-SASL message :  " + msg);
            Channels.fireMessageReceived((ChannelHandlerContext)ctx, (Object)msg);
        }
    }

    public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
        if (this.server != null) {
            this.server.closeChannel(e.getChannel());
        }
    }

    private void getSASLCredentials() throws IOException {
        this.topologyName = this.server.name();
        String secretKey = this.server.secretKey();
        if (secretKey != null) {
            this.token = secretKey.getBytes();
        }
        LOG.debug("SASL credentials for storm topology {} is {}", (Object)this.topologyName, (Object)secretKey);
    }
}

