/*
 * Decompiled with CFR 0.152.
 */
package org.exoplatform.services.rpc.impl;

import java.io.Externalizable;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.io.Serializable;
import java.net.URL;
import java.security.PrivilegedAction;
import java.security.PrivilegedActionException;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Vector;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import org.exoplatform.commons.utils.PropertyManager;
import org.exoplatform.commons.utils.SecurityHelper;
import org.exoplatform.container.ExoContainerContext;
import org.exoplatform.container.configuration.ConfigurationManager;
import org.exoplatform.container.xml.InitParams;
import org.exoplatform.container.xml.ValueParam;
import org.exoplatform.services.log.ExoLogger;
import org.exoplatform.services.log.Log;
import org.exoplatform.services.rpc.RPCException;
import org.exoplatform.services.rpc.RPCService;
import org.exoplatform.services.rpc.RemoteCommand;
import org.exoplatform.services.rpc.TopologyChangeEvent;
import org.exoplatform.services.rpc.TopologyChangeListener;
import org.jgroups.Address;
import org.jgroups.Channel;
import org.jgroups.MembershipListener;
import org.jgroups.Message;
import org.jgroups.View;
import org.jgroups.blocks.MessageDispatcher;
import org.jgroups.blocks.RequestHandler;
import org.jgroups.conf.ConfiguratorFactory;
import org.jgroups.conf.ProtocolStackConfigurator;
import org.jgroups.util.Rsp;
import org.jgroups.util.RspList;
import org.picocontainer.Startable;

public abstract class AbstractRPCService
implements RPCService,
Startable,
RequestHandler,
MembershipListener {
    private static final Log LOG = ExoLogger.getLogger((String)"exo.kernel.component.common.RPCServiceImpl");
    protected static final String PARAM_JGROUPS_CONFIG = "jgroups-configuration";
    protected static final String PARAM_CLUSTER_NAME = "jgroups-cluster-name";
    protected static final String PARAM_DEFAULT_TIMEOUT = "jgroups-default-timeout";
    protected static final String PARAM_ALLOW_FAILOVER = "allow-failover";
    protected static final String PARAM_RETRY_TIMEOUT = "retry-timeout";
    protected static final int DEFAULT_TIMEOUT = 0;
    protected static final int DEFAULT_RETRY_TIMEOUT = 20000;
    protected static final String CLUSTER_NAME = "RPCService-Cluster";
    protected final ProtocolStackConfigurator configurator;
    private final Object topologyChangeLock = new Object();
    private final String clusterName;
    protected Channel channel;
    protected volatile List<Address> members;
    protected volatile Address coordinator;
    protected volatile boolean isCoordinator;
    private long defaultTimeout = 0L;
    private long retryTimeout = 20000L;
    private boolean allowFailover = true;
    protected MessageDispatcher dispatcher;
    private final CountDownLatch startSignal = new CountDownLatch(1);
    private final List<TopologyChangeListener> listeners = new CopyOnWriteArrayList<TopologyChangeListener>();
    private volatile State state;
    private volatile Map<String, RemoteCommand> commands = Collections.unmodifiableMap(new HashMap());

    public AbstractRPCService(ExoContainerContext ctx, InitParams params, ConfigurationManager configManager) {
        String sAllowFailover;
        String sTimeout;
        if (params == null) {
            throw new IllegalArgumentException("The RPCServiceImpl requires some parameters");
        }
        final URL properties = AbstractRPCService.getProperties(params, configManager);
        if (LOG.isInfoEnabled()) {
            LOG.info((Object)("The JGroups configuration used for the RPCServiceImpl will be loaded from " + properties));
        }
        try {
            this.configurator = (ProtocolStackConfigurator)SecurityHelper.doPrivilegedExceptionAction((PrivilegedExceptionAction)new PrivilegedExceptionAction<ProtocolStackConfigurator>(){

                @Override
                public ProtocolStackConfigurator run() throws Exception {
                    return ConfiguratorFactory.getStackConfigurator((URL)properties);
                }
            });
        }
        catch (PrivilegedActionException pae) {
            throw new RuntimeException("Cannot load the JGroups configuration from " + properties, pae.getCause());
        }
        this.clusterName = AbstractRPCService.getClusterName(ctx, params);
        if (LOG.isDebugEnabled()) {
            LOG.debug((Object)("The cluster name of the RPCServiceImpl has been set to " + this.clusterName));
        }
        if ((sTimeout = AbstractRPCService.getValueParam(params, PARAM_DEFAULT_TIMEOUT)) != null) {
            this.defaultTimeout = Integer.parseInt(sTimeout);
            if (LOG.isDebugEnabled()) {
                LOG.debug((Object)("The default timeout of the RPCServiceImpl has been set to " + this.defaultTimeout));
            }
        }
        if ((sAllowFailover = AbstractRPCService.getValueParam(params, PARAM_ALLOW_FAILOVER)) != null) {
            this.allowFailover = Boolean.valueOf(sAllowFailover);
            if (LOG.isDebugEnabled()) {
                LOG.debug((Object)("The parameter 'allow-failover' of the RPCServiceImpl has been set to " + this.allowFailover));
            }
        }
        if ((sTimeout = AbstractRPCService.getValueParam(params, PARAM_RETRY_TIMEOUT)) != null) {
            this.retryTimeout = Integer.parseInt(sTimeout);
            if (LOG.isDebugEnabled()) {
                LOG.debug((Object)("The retry timeout of the RPCServiceImpl has been set to " + this.retryTimeout));
            }
        }
        this.state = State.INITIALIZED;
    }

    @Override
    public List<Object> executeCommandOnAllNodes(RemoteCommand command, boolean synchronous, Serializable ... args) throws RPCException {
        return this.executeCommandOnAllNodesMain(command, synchronous, this.defaultTimeout, args);
    }

    @Override
    public List<Object> executeCommandOnAllNodes(RemoteCommand command, long timeout, Serializable ... args) throws RPCException {
        return this.executeCommandOnAllNodesMain(command, true, timeout, args);
    }

    protected List<Object> executeCommandOnAllNodesMain(RemoteCommand command, boolean synchronous, long timeout, Serializable ... args) throws RPCException {
        return this.excecuteCommand(this.members, command, synchronous, timeout, args);
    }

    @Override
    public Object executeCommandOnCoordinator(RemoteCommand command, boolean synchronous, Serializable ... args) throws RPCException {
        return this.executeCommandOnCoordinatorMain(command, synchronous, this.defaultTimeout, args);
    }

    @Override
    public Object executeCommandOnCoordinator(RemoteCommand command, long timeout, Serializable ... args) throws RPCException {
        return this.executeCommandOnCoordinatorMain(command, true, timeout, args);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected Object executeCommandOnCoordinatorMain(RemoteCommand command, boolean synchronous, long timeout, Serializable ... args) throws RPCException {
        Object result;
        Address coordinator = this.coordinator;
        Vector<Address> v = new Vector<Address>(1);
        v.add(coordinator);
        List<Object> lResults = this.excecuteCommand(v, command, synchronous, timeout, args);
        Object object = result = lResults == null || lResults.size() == 0 ? null : lResults.get(0);
        if (this.allowFailover && result instanceof MemberHasLeftException) {
            if (coordinator.equals(this.coordinator)) {
                Object object2 = this.topologyChangeLock;
                synchronized (object2) {
                    if (coordinator.equals(this.coordinator)) {
                        if (LOG.isTraceEnabled()) {
                            LOG.trace((Object)("The coordinator did not change yet, we will relaunch the command after " + this.retryTimeout + " ms or once a topology change has been detected"));
                        }
                        try {
                            this.topologyChangeLock.wait(this.retryTimeout);
                        }
                        catch (InterruptedException e) {
                            Thread.currentThread().interrupt();
                        }
                    }
                }
            }
            if (LOG.isTraceEnabled()) {
                LOG.trace((Object)"The coordinator has changed, we will automatically retry with the new coordinator");
            }
            return this.executeCommandOnCoordinator(command, synchronous, Long.valueOf(timeout), args);
        }
        if (result instanceof RPCException) {
            throw (RPCException)result;
        }
        return result;
    }

    protected List<Object> excecuteCommand(final List<Address> dests, RemoteCommand command, final boolean synchronous, final long timeout, Serializable ... args) throws RPCException {
        SecurityManager security = System.getSecurityManager();
        if (security != null) {
            security.checkPermission(RPCService.ACCESS_RPC_SERVICE_PERMISSION);
        }
        if (this.state != State.STARTED) {
            throw new RPCException("Cannot execute any commands if the service is not started, the current state of the service is " + (Object)((Object)this.state));
        }
        final String commandId = command.getId();
        if (this.commands.get(commandId) != command) {
            throw new RPCException("Command " + commandId + " unknown, please register your command first");
        }
        final Message msg = new Message();
        this.setObject(msg, new MessageBody(dests.size() == 1 && dests != this.members ? dests.get(0) : null, commandId, args));
        RspList rsps = (RspList)SecurityHelper.doPrivilegedAction((PrivilegedAction)new PrivilegedAction<RspList>(){

            @Override
            public RspList run() {
                try {
                    return AbstractRPCService.this.castMessage(dests, msg, synchronous, timeout);
                }
                catch (Exception e) {
                    LOG.error((Object)("Could not cast the message corresponding to the command " + commandId + "."), (Throwable)e);
                    return null;
                }
            }
        });
        if (LOG.isTraceEnabled()) {
            LOG.trace((Object)("responses: " + rsps));
        }
        if (rsps == null) {
            throw new RPCException("Could not get the responses for command " + commandId + ".");
        }
        if (!synchronous) {
            return Collections.emptyList();
        }
        if (LOG.isTraceEnabled()) {
            LOG.trace((Object)("(" + this.getLocalAddress() + "): responses for command " + commandId + ":\n" + rsps));
        }
        ArrayList<Object> retval = new ArrayList<Object>(rsps.size());
        for (Address dest : dests) {
            Rsp rsp = rsps.get((Object)dest);
            if (rsp == null || rsp.wasSuspected() && !rsp.wasReceived()) {
                retval.add(new MemberHasLeftException("No response for the member " + dest + ", this member has probably left the cluster."));
                continue;
            }
            if (!rsp.wasReceived()) {
                retval.add(new RPCException("Replication timeout for " + rsp.getSender() + ", rsp=" + rsp));
                continue;
            }
            Object value = rsp.getValue();
            if (value instanceof RPCException && LOG.isTraceEnabled()) {
                LOG.trace((Object)("Recieved exception'" + value + "' from " + rsp.getSender()), (Throwable)((RPCException)value));
            }
            retval.add(value);
        }
        return retval;
    }

    public Object handle(Message msg) {
        String commandId = null;
        try {
            this.startSignal.await();
            MessageBody body = (MessageBody)msg.getObject();
            commandId = body.getCommandId();
            if (!body.accept(this.getLocalAddress())) {
                if (LOG.isTraceEnabled()) {
                    LOG.trace((Object)("Command : " + commandId + " needs to be executed on the coordinator only and the local node is not the coordinator, the command will be ignored"));
                }
                return null;
            }
            RemoteCommand command = this.getCommand(commandId);
            if (command == null) {
                return new RPCException("Command " + commandId + " unkown, please register your command first");
            }
            Serializable execResult = command.execute(body.getArgs());
            if (LOG.isTraceEnabled()) {
                LOG.trace((Object)("Command : " + commandId + " executed, result is: " + execResult));
            }
            return execResult;
        }
        catch (Throwable x) {
            if (LOG.isTraceEnabled()) {
                LOG.trace((Object)"Problems invoking command.", x);
            }
            return new RPCException("Cannot execute the command " + (commandId == null ? "" : commandId), x);
        }
    }

    public void block() {
    }

    public void suspect(Address suspectedMbr) {
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void viewAccepted(View view) {
        boolean coordinatorHasChanged;
        Object object = this.topologyChangeLock;
        synchronized (object) {
            this.members = this.getMembers(view);
            Address currentCoordinator = this.coordinator;
            this.coordinator = this.members != null && this.members.size() > 0 ? this.members.get(0) : null;
            this.isCoordinator = this.coordinator != null && this.coordinator.equals(this.getLocalAddress());
            coordinatorHasChanged = currentCoordinator != null && !currentCoordinator.equals(this.coordinator);
            this.topologyChangeLock.notifyAll();
        }
        this.onTopologyChange(coordinatorHasChanged);
    }

    private void onTopologyChange(boolean coordinatorHasChanged) {
        TopologyChangeEvent event = new TopologyChangeEvent(coordinatorHasChanged, this.isCoordinator);
        for (TopologyChangeListener listener : this.listeners) {
            try {
                listener.onChange(event);
            }
            catch (Exception e) {
                LOG.warn((Object)("An error occurs with the listener of type " + listener.getClass()), (Throwable)e);
            }
        }
    }

    @Override
    public synchronized RemoteCommand registerCommand(RemoteCommand command) {
        SecurityManager security = System.getSecurityManager();
        if (security != null) {
            security.checkPermission(RPCService.ACCESS_RPC_SERVICE_PERMISSION);
        }
        if (command != null) {
            String commandId = command.getId();
            if (commandId == null) {
                throw new IllegalArgumentException("The command Id cannot be null");
            }
            HashMap<String, RemoteCommand> tmpCommands = new HashMap<String, RemoteCommand>(this.commands);
            RemoteCommand oldCommand = tmpCommands.put(commandId, command);
            if (oldCommand != null && PropertyManager.isDevelopping()) {
                LOG.warn((Object)("A command has already been registered with the id " + commandId + ", this command will be replaced with the new one"));
            }
            this.commands = Collections.unmodifiableMap(tmpCommands);
            return command;
        }
        return null;
    }

    @Override
    public synchronized void unregisterCommand(RemoteCommand command) {
        SecurityManager security = System.getSecurityManager();
        if (security != null) {
            security.checkPermission(RPCService.ACCESS_RPC_SERVICE_PERMISSION);
        }
        if (command != null) {
            String commandId = command.getId();
            if (commandId == null) {
                throw new IllegalArgumentException("The command Id cannot be null");
            }
            if (this.commands.get(commandId) != command) {
                if (PropertyManager.isDevelopping()) {
                    LOG.warn((Object)("Cannot unregister an unknown RemoteCommand, either the command id " + commandId + " is unknown or the instance of RemoteCommand to unregister is unknown"));
                }
                return;
            }
            HashMap<String, RemoteCommand> tmpCommands = new HashMap<String, RemoteCommand>(this.commands);
            tmpCommands.remove(commandId);
            this.commands = Collections.unmodifiableMap(tmpCommands);
        }
    }

    @Override
    public boolean isCoordinator() throws RPCException {
        if (this.state != State.STARTED) {
            throw new RPCException("Cannot know whether the local node is a coordinator or not if the service is not started, the current state of the service is " + (Object)((Object)this.state));
        }
        return this.isCoordinator;
    }

    @Override
    public void registerTopologyChangeListener(TopologyChangeListener listener) throws SecurityException {
        SecurityManager security = System.getSecurityManager();
        if (security != null) {
            security.checkPermission(RPCService.ACCESS_RPC_SERVICE_PERMISSION);
        }
        if (listener == null) {
            return;
        }
        this.listeners.add(listener);
    }

    @Override
    public void unregisterTopologyChangeListener(TopologyChangeListener listener) throws SecurityException {
        SecurityManager security = System.getSecurityManager();
        if (security != null) {
            security.checkPermission(RPCService.ACCESS_RPC_SERVICE_PERMISSION);
        }
        if (listener == null) {
            return;
        }
        this.listeners.remove(listener);
    }

    protected RemoteCommand getCommand(String commandId) {
        return this.commands.get(commandId);
    }

    public void start() {
        SecurityManager security = System.getSecurityManager();
        if (security != null) {
            security.checkPermission(RPCService.ACCESS_RPC_SERVICE_PERMISSION);
        }
        try {
            SecurityHelper.doPrivilegedExceptionAction((PrivilegedExceptionAction)new PrivilegedExceptionAction<Void>(){

                @Override
                public Void run() throws Exception {
                    AbstractRPCService.this.channel = AbstractRPCService.this.createChannel();
                    AbstractRPCService.this.dispatcher = new MessageDispatcher(AbstractRPCService.this.channel, null, (MembershipListener)AbstractRPCService.this, (RequestHandler)AbstractRPCService.this);
                    AbstractRPCService.this.channel.connect(AbstractRPCService.this.clusterName);
                    return null;
                }
            });
        }
        catch (PrivilegedActionException pae) {
            throw new RuntimeException("Cannot initialize the Channel needed for the RPCServiceImpl", pae.getCause());
        }
        finally {
            this.state = State.STARTED;
            this.startSignal.countDown();
        }
    }

    public void stop() {
        SecurityManager security = System.getSecurityManager();
        if (security != null) {
            security.checkPermission(RPCService.ACCESS_RPC_SERVICE_PERMISSION);
        }
        this.state = State.STOPPED;
        this.isCoordinator = false;
        if (this.channel != null && this.channel.isOpen()) {
            if (LOG.isInfoEnabled()) {
                LOG.info((Object)"Disconnecting and closing the Channel");
            }
            SecurityHelper.doPrivilegedAction((PrivilegedAction)new PrivilegedAction<Void>(){

                @Override
                public Void run() {
                    AbstractRPCService.this.channel.disconnect();
                    AbstractRPCService.this.channel.close();
                    return null;
                }
            });
            this.channel = null;
        }
        if (this.dispatcher != null) {
            this.dispatcher.stop();
            this.dispatcher = null;
        }
    }

    protected long getDefaultTimeout() {
        return this.defaultTimeout;
    }

    protected String getClusterName() {
        return this.clusterName;
    }

    protected long getRetryTimeout() {
        return this.retryTimeout;
    }

    protected boolean isAllowFailover() {
        return this.allowFailover;
    }

    protected abstract Address getLocalAddress();

    protected abstract RspList castMessage(List<Address> var1, Message var2, boolean var3, long var4) throws Exception;

    protected abstract Channel createChannel() throws Exception;

    protected abstract List<Address> getMembers(View var1);

    protected abstract void setObject(Message var1, Object var2);

    private static String getValueParam(InitParams params, String parameterKey) {
        String parameterKeyValueString;
        ValueParam parameterKeyValue;
        if (params != null && (parameterKeyValue = params.getValueParam(parameterKey)) != null && (parameterKeyValueString = parameterKeyValue.getValue()) != null) {
            return parameterKeyValueString.trim();
        }
        return null;
    }

    private static URL getProperties(InitParams params, ConfigurationManager configManager) {
        URL properties;
        String configPath = AbstractRPCService.getValueParam(params, PARAM_JGROUPS_CONFIG);
        if (configPath == null) {
            throw new IllegalArgumentException("The parameter 'jgroups-configuration' of RPCServiceImpl is mandatory");
        }
        try {
            properties = configManager.getResource(configPath);
        }
        catch (Exception e) {
            throw new IllegalArgumentException("Cannot find the JGroups configuration at " + configPath, e);
        }
        if (properties == null) {
            throw new IllegalArgumentException("Cannot find the JGroups configuration at " + configPath);
        }
        return properties;
    }

    private static String getClusterName(ExoContainerContext ctx, InitParams params) {
        String clusterName = AbstractRPCService.getValueParam(params, PARAM_CLUSTER_NAME);
        if (clusterName == null) {
            clusterName = CLUSTER_NAME;
        }
        clusterName = clusterName + "-" + ctx.getName();
        return clusterName;
    }

    public static class MemberHasLeftException
    extends RPCException {
        private static final long serialVersionUID = 3558158913564367637L;

        public MemberHasLeftException(String message) {
            super(message);
        }
    }

    public static enum State {
        INITIALIZED,
        STARTED,
        STOPPED;

    }

    public static class MessageBody
    implements Externalizable {
        private String commandId;
        private Serializable[] args;
        private int destination;

        public MessageBody() {
        }

        public MessageBody(Address dest, String commandId, Serializable[] args) {
            this.commandId = commandId;
            this.args = args;
            this.destination = dest == null ? 0 : dest.hashCode();
        }

        public String getCommandId() {
            return this.commandId;
        }

        public Serializable[] getArgs() {
            return this.args;
        }

        public boolean accept(Address address) {
            return this.destination == 0 || this.destination == address.hashCode();
        }

        @Override
        public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
            boolean unicast = in.readBoolean();
            if (unicast) {
                this.destination = in.readInt();
            }
            this.commandId = in.readUTF();
            int size = in.readInt();
            if (size == -1) {
                this.args = null;
            } else {
                this.args = new Serializable[size];
                for (int i = 0; i < size; ++i) {
                    this.args[i] = (Serializable)in.readObject();
                }
            }
        }

        @Override
        public void writeExternal(ObjectOutput out) throws IOException {
            boolean unicast = this.destination != 0;
            out.writeBoolean(unicast);
            if (unicast) {
                out.writeInt(this.destination);
            }
            out.writeUTF(this.commandId);
            if (this.args == null) {
                out.writeInt(-1);
            } else {
                out.writeInt(this.args.length);
                for (int i = 0; i < this.args.length; ++i) {
                    out.writeObject(this.args[i]);
                }
            }
        }
    }
}

