/*
 * Decompiled with CFR 0.152.
 */
package org.apache.storm.messaging.netty;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import org.apache.storm.messaging.IConnection;
import org.apache.storm.messaging.IContext;
import org.apache.storm.messaging.netty.Client;
import org.apache.storm.messaging.netty.NettyRenameThreadFactory;
import org.apache.storm.messaging.netty.Server;
import org.apache.storm.utils.ObjectReader;
import org.jboss.netty.channel.ChannelFactory;
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
import org.jboss.netty.util.HashedWheelTimer;

public class Context
implements IContext {
    private Map<String, Object> topoConf;
    private Map<String, IConnection> connections;
    private NioClientSocketChannelFactory clientChannelFactory;
    private HashedWheelTimer clientScheduleService;

    @Override
    public void prepare(Map<String, Object> topoConf) {
        this.topoConf = topoConf;
        this.connections = new HashMap<String, IConnection>();
        int maxWorkers = ObjectReader.getInt(topoConf.get("storm.messaging.netty.client_worker_threads"));
        NettyRenameThreadFactory bossFactory = new NettyRenameThreadFactory("client-boss");
        NettyRenameThreadFactory workerFactory = new NettyRenameThreadFactory("client-worker");
        this.clientChannelFactory = maxWorkers > 0 ? new NioClientSocketChannelFactory((Executor)Executors.newCachedThreadPool(bossFactory), (Executor)Executors.newCachedThreadPool(workerFactory), maxWorkers) : new NioClientSocketChannelFactory((Executor)Executors.newCachedThreadPool(bossFactory), (Executor)Executors.newCachedThreadPool(workerFactory));
        this.clientScheduleService = new HashedWheelTimer((ThreadFactory)new NettyRenameThreadFactory("client-schedule-service"));
    }

    @Override
    public synchronized IConnection bind(String storm_id, int port) {
        Server server = new Server(this.topoConf, port);
        this.connections.put(this.key(storm_id, port), server);
        return server;
    }

    @Override
    public synchronized IConnection connect(String storm_id, String host, int port) {
        IConnection connection = this.connections.get(this.key(host, port));
        if (connection != null) {
            return connection;
        }
        Client client = new Client(this.topoConf, (ChannelFactory)this.clientChannelFactory, this.clientScheduleService, host, port, this);
        this.connections.put(this.key(host, port), client);
        return client;
    }

    synchronized void removeClient(String host, int port) {
        if (this.connections != null) {
            this.connections.remove(this.key(host, port));
        }
    }

    @Override
    public synchronized void term() {
        this.clientScheduleService.stop();
        for (IConnection conn : this.connections.values()) {
            conn.close();
        }
        this.connections = null;
        this.clientChannelFactory.releaseExternalResources();
    }

    private String key(String host, int port) {
        return String.format("%s:%d", host, port);
    }
}

