/*
 * Decompiled with CFR 0.152.
 */
package org.apache.storm.pacemaker;

import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.Map;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import javax.security.auth.login.Configuration;
import org.apache.storm.generated.HBMessage;
import org.apache.storm.messaging.netty.ISaslServer;
import org.apache.storm.messaging.netty.IServer;
import org.apache.storm.messaging.netty.NettyRenameThreadFactory;
import org.apache.storm.pacemaker.IServerMessageHandler;
import org.apache.storm.pacemaker.codec.ThriftNettyServerCodec;
import org.apache.storm.security.auth.AuthUtils;
import org.jboss.netty.bootstrap.ServerBootstrap;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFactory;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.group.ChannelGroup;
import org.jboss.netty.channel.group.DefaultChannelGroup;
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class PacemakerServer
implements ISaslServer {
    private static final long FIVE_MB_IN_BYTES = 0x500000L;
    private static final Logger LOG = LoggerFactory.getLogger(PacemakerServer.class);
    private final ServerBootstrap bootstrap;
    private int port;
    private IServerMessageHandler handler;
    private String secret;
    private String topo_name;
    private volatile ChannelGroup allChannels = new DefaultChannelGroup("storm-server");
    private ConcurrentSkipListSet<Channel> authenticated_channels = new ConcurrentSkipListSet();
    private ThriftNettyServerCodec.AuthMethod authMethod;

    public PacemakerServer(IServerMessageHandler handler, Map<String, Object> config) {
        String auth;
        int maxWorkers = (Integer)config.get("pacemaker.max.threads");
        this.port = (Integer)config.get("pacemaker.port");
        this.handler = handler;
        this.topo_name = "pacemaker_server";
        switch (auth = (String)config.get("pacemaker.auth.method")) {
            case "DIGEST": {
                Configuration login_conf = AuthUtils.GetConfiguration(config);
                this.authMethod = ThriftNettyServerCodec.AuthMethod.DIGEST;
                this.secret = AuthUtils.makeDigestPayload((Configuration)login_conf, (String)"PacemakerDigest");
                if (this.secret != null) break;
                LOG.error("Can't start pacemaker server without digest secret.");
                throw new RuntimeException("Can't start pacemaker server without digest secret.");
            }
            case "KERBEROS": {
                this.authMethod = ThriftNettyServerCodec.AuthMethod.KERBEROS;
                break;
            }
            case "NONE": {
                this.authMethod = ThriftNettyServerCodec.AuthMethod.NONE;
                break;
            }
            default: {
                LOG.error("Can't start pacemaker server without proper PACEMAKER_AUTH_METHOD.");
                throw new RuntimeException("Can't start pacemaker server without proper PACEMAKER_AUTH_METHOD.");
            }
        }
        NettyRenameThreadFactory bossFactory = new NettyRenameThreadFactory("server-boss");
        NettyRenameThreadFactory workerFactory = new NettyRenameThreadFactory("server-worker");
        NioServerSocketChannelFactory factory = maxWorkers > 0 ? new NioServerSocketChannelFactory((Executor)Executors.newCachedThreadPool((ThreadFactory)bossFactory), (Executor)Executors.newCachedThreadPool((ThreadFactory)workerFactory), maxWorkers) : new NioServerSocketChannelFactory((Executor)Executors.newCachedThreadPool((ThreadFactory)bossFactory), (Executor)Executors.newCachedThreadPool((ThreadFactory)workerFactory));
        this.bootstrap = new ServerBootstrap((ChannelFactory)factory);
        this.bootstrap.setOption("tcpNoDelay", (Object)true);
        this.bootstrap.setOption("sendBufferSize", (Object)0x500000L);
        this.bootstrap.setOption("keepAlive", (Object)true);
        ChannelPipelineFactory pipelineFactory = new ThriftNettyServerCodec((IServer)this, config, this.authMethod).pipelineFactory();
        this.bootstrap.setPipelineFactory(pipelineFactory);
        Channel channel = this.bootstrap.bind((SocketAddress)new InetSocketAddress(this.port));
        this.allChannels.add((Object)channel);
        LOG.info("Bound server to port: {}", (Object)Integer.toString(this.port));
    }

    public void channelConnected(Channel c) {
        this.allChannels.add((Object)c);
    }

    public void cleanPipeline(Channel channel) {
        boolean authenticated = this.authenticated_channels.contains(channel);
        if (!authenticated) {
            if (channel.getPipeline().get("sasl-handler") != null) {
                channel.getPipeline().remove("sasl-handler");
            } else if (channel.getPipeline().get("kerberos-handler") != null) {
                channel.getPipeline().remove("kerberos-handler");
            }
        }
    }

    public void received(Object mesg, String remote, Channel channel) throws InterruptedException {
        this.cleanPipeline(channel);
        boolean authenticated = this.authMethod == ThriftNettyServerCodec.AuthMethod.NONE || this.authenticated_channels.contains(channel);
        HBMessage m = (HBMessage)mesg;
        LOG.debug("received message. Passing to handler. {} : {} : {}", new Object[]{this.handler.toString(), m.toString(), channel.toString()});
        HBMessage response = this.handler.handleMessage(m, authenticated);
        if (response != null) {
            LOG.debug("Got Response from handler: {}", (Object)response);
            channel.write((Object)response);
        } else {
            LOG.info("Got null response from handler handling message: {}", (Object)m);
        }
    }

    public void closeChannel(Channel c) {
        c.close().awaitUninterruptibly();
        this.allChannels.remove((Object)c);
        this.authenticated_channels.remove(c);
    }

    public String name() {
        return this.topo_name;
    }

    public String secretKey() {
        return this.secret;
    }

    public void authenticated(Channel c) {
        LOG.debug("Pacemaker server authenticated channel: {}", (Object)c.toString());
        this.authenticated_channels.add(c);
    }
}

