/*
 * Decompiled with CFR 0.152.
 */
package org.apache.storm.messaging.netty;

import java.io.IOException;
import java.io.Serializable;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.storm.grouping.Load;
import org.apache.storm.messaging.ConnectionWithStatus;
import org.apache.storm.messaging.IConnectionCallback;
import org.apache.storm.messaging.TaskMessage;
import org.apache.storm.messaging.netty.ISaslServer;
import org.apache.storm.messaging.netty.MessageBatch;
import org.apache.storm.messaging.netty.NettyRenameThreadFactory;
import org.apache.storm.messaging.netty.SaslUtils;
import org.apache.storm.messaging.netty.StormServerPipelineFactory;
import org.apache.storm.metric.api.IStatefulObject;
import org.apache.storm.serialization.KryoValuesSerializer;
import org.apache.storm.utils.ObjectReader;
import org.jboss.netty.bootstrap.ServerBootstrap;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFactory;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.group.ChannelGroup;
import org.jboss.netty.channel.group.DefaultChannelGroup;
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class Server
extends ConnectionWithStatus
implements IStatefulObject,
ISaslServer {
    private static final Logger LOG = LoggerFactory.getLogger(Server.class);
    Map<String, Object> topoConf;
    int port;
    private final ConcurrentHashMap<String, AtomicInteger> messagesEnqueued = new ConcurrentHashMap();
    private final AtomicInteger messagesDequeued = new AtomicInteger(0);
    volatile ChannelGroup allChannels = new DefaultChannelGroup("storm-server");
    final ChannelFactory factory;
    final ServerBootstrap bootstrap;
    private volatile boolean closing = false;
    List<TaskMessage> closeMessage = Arrays.asList(new TaskMessage(-1, null));
    private KryoValuesSerializer _ser;
    private IConnectionCallback _cb = null;

    Server(Map<String, Object> topoConf, int port) {
        this.topoConf = topoConf;
        this.port = port;
        this._ser = new KryoValuesSerializer(topoConf);
        int buffer_size = ObjectReader.getInt(topoConf.get("storm.messaging.netty.buffer_size"));
        int backlog = ObjectReader.getInt(topoConf.get("storm.messaging.netty.socket.backlog"), 500);
        int maxWorkers = ObjectReader.getInt(topoConf.get("storm.messaging.netty.server_worker_threads"));
        NettyRenameThreadFactory bossFactory = new NettyRenameThreadFactory(this.netty_name() + "-boss");
        NettyRenameThreadFactory workerFactory = new NettyRenameThreadFactory(this.netty_name() + "-worker");
        this.factory = maxWorkers > 0 ? new NioServerSocketChannelFactory((Executor)Executors.newCachedThreadPool(bossFactory), (Executor)Executors.newCachedThreadPool(workerFactory), maxWorkers) : new NioServerSocketChannelFactory((Executor)Executors.newCachedThreadPool(bossFactory), (Executor)Executors.newCachedThreadPool(workerFactory));
        LOG.info("Create Netty Server " + this.netty_name() + ", buffer_size: " + buffer_size + ", maxWorkers: " + maxWorkers);
        this.bootstrap = new ServerBootstrap(this.factory);
        this.bootstrap.setOption("child.tcpNoDelay", (Object)true);
        this.bootstrap.setOption("child.receiveBufferSize", (Object)buffer_size);
        this.bootstrap.setOption("child.keepAlive", (Object)true);
        this.bootstrap.setOption("backlog", (Object)backlog);
        this.bootstrap.setPipelineFactory((ChannelPipelineFactory)new StormServerPipelineFactory(this));
        Channel channel = this.bootstrap.bind((SocketAddress)new InetSocketAddress(port));
        this.allChannels.add((Object)channel);
    }

    private void addReceiveCount(String from, int amount) {
        AtomicInteger i = this.messagesEnqueued.get(from);
        if (i == null) {
            i = new AtomicInteger(amount);
            AtomicInteger prev = this.messagesEnqueued.putIfAbsent(from, i);
            if (prev != null) {
                prev.addAndGet(amount);
            }
        } else {
            i.addAndGet(amount);
        }
    }

    protected void enqueue(List<TaskMessage> msgs, String from) throws InterruptedException {
        if (null == msgs || msgs.size() == 0 || this.closing) {
            return;
        }
        this.addReceiveCount(from, msgs.size());
        if (this._cb != null) {
            this._cb.recv(msgs);
        }
    }

    @Override
    public void registerRecv(IConnectionCallback cb) {
        this._cb = cb;
    }

    protected void addChannel(Channel channel) {
        this.allChannels.add((Object)channel);
    }

    @Override
    public void closeChannel(Channel channel) {
        channel.close().awaitUninterruptibly();
        this.allChannels.remove((Object)channel);
    }

    @Override
    public synchronized void close() {
        if (this.allChannels != null) {
            this.allChannels.close().awaitUninterruptibly();
            this.factory.releaseExternalResources();
            this.allChannels = null;
        }
    }

    @Override
    public void sendLoadMetrics(Map<Integer, Double> taskToLoad) {
        try {
            MessageBatch mb = new MessageBatch(1);
            mb.add(new TaskMessage(-1, this._ser.serialize(Arrays.asList(taskToLoad))));
            this.allChannels.write((Object)mb);
        }
        catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    @Override
    public Map<Integer, Load> getLoad(Collection<Integer> tasks) {
        throw new RuntimeException("Server connection cannot get load");
    }

    @Override
    public void send(int task, byte[] message) {
        throw new UnsupportedOperationException("Server connection should not send any messages");
    }

    @Override
    public void send(Iterator<TaskMessage> msgs) {
        throw new UnsupportedOperationException("Server connection should not send any messages");
    }

    public String netty_name() {
        return "Netty-server-localhost-" + this.port;
    }

    @Override
    public ConnectionWithStatus.Status status() {
        if (this.closing) {
            return ConnectionWithStatus.Status.Closed;
        }
        if (!this.connectionEstablished(this.allChannels)) {
            return ConnectionWithStatus.Status.Connecting;
        }
        return ConnectionWithStatus.Status.Ready;
    }

    private boolean connectionEstablished(Channel channel) {
        return channel != null && channel.isBound();
    }

    private boolean connectionEstablished(ChannelGroup allChannels) {
        boolean allEstablished = true;
        for (Channel channel : allChannels) {
            if (this.connectionEstablished(channel)) continue;
            allEstablished = false;
            break;
        }
        return allEstablished;
    }

    @Override
    public Object getState() {
        LOG.debug("Getting metrics for server on port {}", (Object)this.port);
        HashMap<String, Serializable> ret = new HashMap<String, Serializable>();
        ret.put("dequeuedMessages", Integer.valueOf(this.messagesDequeued.getAndSet(0)));
        HashMap<String, Integer> enqueued = new HashMap<String, Integer>();
        Iterator<Map.Entry<String, AtomicInteger>> it = this.messagesEnqueued.entrySet().iterator();
        while (it.hasNext()) {
            Map.Entry<String, AtomicInteger> ent = it.next();
            AtomicInteger i = ent.getValue();
            if (i.get() == 0) {
                it.remove();
                continue;
            }
            enqueued.put(ent.getKey(), i.getAndSet(0));
        }
        ret.put("enqueued", enqueued);
        return ret;
    }

    @Override
    public void channelConnected(Channel c) {
        this.addChannel(c);
    }

    @Override
    public void received(Object message, String remote, Channel channel) throws InterruptedException {
        List msgs = (List)message;
        this.enqueue(msgs, remote);
    }

    @Override
    public String name() {
        return (String)this.topoConf.get("topology.name");
    }

    @Override
    public String secretKey() {
        return SaslUtils.getSecretKey(this.topoConf);
    }

    @Override
    public void authenticated(Channel c) {
    }

    public String toString() {
        return String.format("Netty server listening on port %s", this.port);
    }
}

