StartableClusterService.java
package org.exoplatform.commons.cluster;
import org.exoplatform.commons.api.settings.SettingService;
import org.exoplatform.commons.api.settings.SettingValue;
import org.exoplatform.commons.api.settings.data.Context;
import org.exoplatform.commons.api.settings.data.Scope;
import org.exoplatform.commons.utils.PropertyManager;
import org.exoplatform.commons.utils.SecurityHelper;
import org.exoplatform.container.ExoContainer;
import org.exoplatform.container.ExoContainerContext;
import org.exoplatform.container.component.RequestLifeCycle;
import org.exoplatform.container.spi.ComponentAdapter;
import org.exoplatform.container.xml.InitParams;
import org.exoplatform.services.log.ExoLogger;
import org.exoplatform.services.log.Log;
import org.picocontainer.Startable;
import java.security.PrivilegedAction;
import java.util.HashMap;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
/**
* This service allow to running a specific service (implements StartableClusterAware interface) on one cluster node.
*/
public class StartableClusterService implements Startable {
private static final Log LOG = ExoLogger.getLogger(StartableClusterService.class);
private ExoContainer container;
private SettingService settingService;
/**
* Registers shutdown hook.
*/
private final Thread hook = new ShutdownThread();
/**
* Timer to schedule check runnable cluster aware service after some idle time.
*/
private static final Timer CLUSTER_AWARE_TIMER = new Timer("CLUSTER AWARE Timer", true);
/**
* Task that is periodically called by {@link #CLUSTER_AWARE_TIMER} and checks if
* service should be started
*/
private TimerTask task;
private static final String CLUSTER_NODE_NAME = "exo.cluster.node.name";
private static final String CLUSTER_CHECK_PERIOD = "exo.cluster.startable.check.period";
private static final String CLUSTER_SERVICE_SETTING_GLOBAL_KEY = "CLUSTER_SERVICE_CLUSTER_STARTABLE_SERVICE";
private static Map<Object,Boolean> services = new HashMap<>();
private final static String nodeName;
private final static boolean clusterEnabled;
/***
* Check service period
*/
private static long checkPeriod = 120000;
static {
nodeName = PropertyManager.getProperty(CLUSTER_NODE_NAME);
clusterEnabled = (PropertyManager.getProperty(PropertyManager.RUNTIME_PROFILES) != null ) ? PropertyManager.getProperty(PropertyManager.RUNTIME_PROFILES).contains("cluster") : false;
}
public StartableClusterService(InitParams initParams, ExoContainerContext containerContext, SettingService settingService) {
this.container = containerContext.getContainer();
this.settingService = settingService;
if (initParams.getValueParam(CLUSTER_CHECK_PERIOD) != null) {
this.checkPeriod = Long.parseLong(initParams.getValueParam(CLUSTER_CHECK_PERIOD).getValue());
}
if (clusterEnabled && (nodeName == null || nodeName.isEmpty())) {
LOG.error("Cluster node name cannot be empty, exo.cluster.node.name should be configured");
throw new IllegalArgumentException("Cluster node name cannot be empty");
}
}
@Override
public void start() {
/**Select all service implement StartableClusterAware**/
for (ComponentAdapter componentAdapter : container.getComponentAdaptersOfType(StartableClusterAware.class)) {
if (componentAdapter != null) {
Object key = componentAdapter.getComponentKey();
StartableClusterAware service = (StartableClusterAware) container.getComponentInstance(key);
services.put(key,false);
/**Start the service if is not done and is not yet started**/
if (!service.isDone() && canStart(key)) {
if (clusterEnabled) {
LOG.info("Start service {} on node {} mode cluster aware ", key, nodeName);
}
service.start();
services.put(key,true);
}
}
}
/** Unregister node name , If System.exit() is called before Thread migration is done.**/
SecurityHelper.doPrivilegedAction(new PrivilegedAction<Void>() {
public Void run() {
Runtime.getRuntime().addShutdownHook(hook);
return null;
}
});
/**Register checker task, verify service state**/
if (clusterEnabled) {
boolean initTimer = false;
/**Start Timer only if exist services already started by other node**/
for(Object key : services.keySet()){
if(!services.get(key)){
initTimer = true;
break;
}
}
//existing services already started by other node or all services is done
if(initTimer && !checkAllIsDone()) {
task = new TimerTask() {
@Override
public void run() {
//check if exist at least one service is not done
//If all cluster aware services is done, cancel the actual timer Task
if(checkAllIsDone()){
cancel();
}
for (Object key : services.keySet()) {
if (services.get(key))
continue; //This services is already started by current node
SettingValue<String> serviceSetting = currentSetting(key.toString());
StartableClusterAware service = (StartableClusterAware) container.getComponentInstance(key);
if (serviceSetting != null && serviceSetting.getValue().isEmpty()) {
if (!service.isDone() && canStart(key)) {
LOG.info("Start service {} on node {} mode cluster aware ", key, nodeName);
service.start();
services.put(key, true);
}
}
}
}
};
CLUSTER_AWARE_TIMER.schedule(task, 0, checkPeriod);
}
}
}
@Override
public void stop() {
resetSetting();
for (ComponentAdapter<?> componentAdapter : container.getComponentAdaptersOfType(StartableClusterAware.class)) {
if (componentAdapter != null) {
Object key = componentAdapter.getComponentKey();
StartableClusterAware service = (StartableClusterAware) container.getComponentInstance(key);
service.stop();
}
}
}
//***** Internal Methods *****//
/***
* Check if current node can start the specific service
* @param key service key
* @return
*/
private boolean canStart(Object key) {
if (!clusterEnabled)
return true;
String name = getServiceSettings(key.toString(), nodeName);
if (name == null) {
LOG.error("Unable to get service setting {} ", key.toString());
return false;
}
return nodeName.equalsIgnoreCase(name);
}
/**
* Get or update current value of service
* @param key service key
* @param defaultValue current node name
* @return
*/
private String getServiceSettings(String key, String defaultValue) {
try {
SettingValue<String> serviceSetting = currentSetting(key);
if (serviceSetting != null && !serviceSetting.getValue().isEmpty()) {
return serviceSetting.getValue();
} else {
updateServiceSettings(key, defaultValue);
//double checking of state
serviceSetting = currentSetting(key);
return serviceSetting.getValue();
}
} finally {
Scope.GLOBAL.id(null);
}
}
/**
* Get current service state
* @param key service key
* @return
*/
private SettingValue<String> currentSetting(String key) {
return (SettingValue<String>) settingService.get(Context.GLOBAL, Scope.GLOBAL.id(CLUSTER_SERVICE_SETTING_GLOBAL_KEY), key);
}
/**
* Update service setting
* @param key service key
* @param value node name
*/
private void updateServiceSettings(String key, String value) {
try {
RequestLifeCycle.begin(ExoContainerContext.getCurrentContainer());
settingService.set(Context.GLOBAL, Scope.GLOBAL.id(CLUSTER_SERVICE_SETTING_GLOBAL_KEY), key, SettingValue.create(value));
} finally {
RequestLifeCycle.end();
Scope.GLOBAL.id(null);
}
}
/**
* Reset all service setting running by the current node
*/
private void resetSetting() {
for (Object key : services.keySet()) {
SettingValue<String> serviceSetting = currentSetting(key.toString());
if (serviceSetting != null && serviceSetting.getValue().equalsIgnoreCase(nodeName)) {
updateServiceSettings(key.toString(), "");
}
}
}
/**
* Check if all cluster aware services is done
*/
private boolean checkAllIsDone(){
for (Object key : services.keySet()) {
StartableClusterAware service = (StartableClusterAware) container.getComponentInstance(key);
if (!service.isDone()){
return false;
}
}
return true;
}
private class ShutdownThread extends Thread {
@Override
public void run() {
SecurityHelper.doPrivilegedAction(new PrivilegedAction<Void>() {
public Void run() {
resetSetting();
return null;
}
});
}
}
}