WebNotificationStorageImpl.java
package org.exoplatform.commons.notification.impl.service.storage;
import org.apache.commons.lang.StringUtils;
import org.exoplatform.commons.api.notification.NotificationMessageUtils;
import org.exoplatform.commons.api.notification.model.NotificationInfo;
import org.exoplatform.commons.api.notification.model.PluginKey;
import org.exoplatform.commons.api.notification.model.WebNotificationFilter;
import org.exoplatform.commons.api.notification.service.setting.UserSettingService;
import org.exoplatform.commons.api.notification.service.storage.WebNotificationStorage;
import org.exoplatform.commons.notification.impl.AbstractService;
import org.exoplatform.commons.notification.impl.NotificationSessionManager;
import org.exoplatform.commons.notification.impl.service.storage.cache.CachedWebNotificationStorage;
import org.exoplatform.commons.utils.CommonsUtils;
import org.exoplatform.commons.utils.XPathUtils;
import org.exoplatform.services.jcr.core.ManageableRepository;
import org.exoplatform.services.jcr.ext.common.SessionProvider;
import org.exoplatform.services.jcr.ext.distribution.DataDistributionManager;
import org.exoplatform.services.jcr.ext.distribution.DataDistributionMode;
import org.exoplatform.services.jcr.ext.hierarchy.NodeHierarchyCreator;
import org.exoplatform.services.jcr.impl.core.query.QueryImpl;
import org.exoplatform.services.log.ExoLogger;
import org.exoplatform.services.log.Log;
import javax.jcr.*;
import javax.jcr.query.Query;
import javax.jcr.query.QueryManager;
import java.text.SimpleDateFormat;
import java.util.*;
import java.util.concurrent.locks.ReentrantLock;
public class WebNotificationStorageImpl extends AbstractService implements WebNotificationStorage {
private static final Log LOG = ExoLogger.getLogger(WebNotificationStorageImpl.class);
private static final String NOTIFICATIONS = "notifications";
private static final String NT_UNSTRUCTURED = "nt:unstructured";
private final ReentrantLock lock = new ReentrantLock();
private final NodeHierarchyCreator nodeHierarchyCreator;
private final UserSettingService userSettingService;
private final DataDistributionManager distributionManager;
private WebNotificationStorage webNotificationStorage;
public WebNotificationStorageImpl(NodeHierarchyCreator nodeHierarchyCreator, DataDistributionManager distributionManager,
UserSettingService userSettingService) {
this.nodeHierarchyCreator = nodeHierarchyCreator;
this.distributionManager = distributionManager;
this.userSettingService = userSettingService;
}
private Session getSession(SessionProvider sProvider) throws Exception {
ManageableRepository repository = CommonsUtils.getRepository();
return sProvider.getSession(repository.getConfiguration().getDefaultWorkspaceName(), repository);
}
private String converDateToNodeName(Calendar cal) {
return new SimpleDateFormat(DATE_NODE_PATTERN).format(cal.getTime());
}
/**
* Gets or create the Web Date Node on Collaboration workspace.
*/
private Node getOrCreateDataNode(Node rootNode, String nodeName, String nodeType) throws RepositoryException {
return distributionManager.getDataDistributionType(DataDistributionMode.NONE)
.getOrCreateDataNode(rootNode, nodeName, nodeType);
}
/**
* Gets or create the Web Date Node on Collaboration workspace.
*
* For example: The web date node has the path as bellow:
* User1: /Users/U___/Us___/Use___/User1/ApplicationData/notifications/web/20141224/
*
* @param sProvider
* @param notification
* @return
* @throws Exception
*/
private Node getOrCreateWebDateNode(SessionProvider sProvider, NotificationInfo notification) throws Exception {
String dateNodeName = converDateToNodeName(notification.getDateCreated());
Node channelNode = getOrCreateChannelNode(sProvider, notification.getTo());
try {
return channelNode.getNode(dateNodeName);
} catch (PathNotFoundException e) {
Node dateNode = getOrCreateDataNode(channelNode, dateNodeName, NTF_NOTIF_DATE);
dateNode.setProperty(NTF_LAST_MODIFIED_DATE, notification.getDateCreated().getTimeInMillis());
channelNode.save();
return dateNode;
}
}
/**
* Gets or create the Channel Node by NodeHierarchyCreator on Collaboration workspace.
*
* For example: The channel node has the path as bellow:
* User1: /Users/U___/Us___/Use___/User1/ApplicationData/notifications/web
*
* @param sProvider
* @param userId the remoteId
* @return the channel node
* @throws Exception
*/
private Node getOrCreateChannelNode(SessionProvider sProvider, String userId) throws Exception {
Node userNodeApp = nodeHierarchyCreator.getUserApplicationNode(sProvider, userId);
Node parentNode = getOrCreateDataNode(userNodeApp, NOTIFICATIONS, NT_UNSTRUCTURED);
Node channelNode = getOrCreateDataNode(parentNode, WEB_CHANNEL, NTF_CHANNEL);
return channelNode;
}
private void addMixinCountItemOnPopover(Node notifyNode, String userId) throws Exception {
if (!notifyNode.isNodeType(MIX_NEW_NODE)) {
notifyNode.addMixin(MIX_NEW_NODE);
}
}
@Override
public void save(NotificationInfo notification) {
save(notification, true);
}
/**
* Creates the notification message to the specified user.
*
* @param notification The notification to save
* @param isCountOnPopover The status to update count on Popover or not
*/
private void save(NotificationInfo notification, boolean isCountOnPopover) {
boolean created = NotificationSessionManager.createSystemProvider();
SessionProvider sProvider = NotificationSessionManager.getSessionProvider();
try {
lock.lock();
//
Node userNode = getOrCreateWebDateNode(sProvider, notification);
Node notifyNode = getOrCreateDataNode(userNode, notification.getId(), NTF_NOTIF_INFO);
//
notifyNode.setProperty(NTF_PLUGIN_ID, notification.getKey().getId());
notifyNode.setProperty(NTF_TEXT, notification.getTitle());
notifyNode.setProperty(NTF_SENDER, notification.getFrom());
notifyNode.setProperty(NTF_OWNER, notification.getTo());
notifyNode.setProperty(NTF_LAST_MODIFIED_DATE, notification.getLastModifiedDate());
//NTF_NAME_SPACE
Map<String, String> ownerParameter = notification.getOwnerParameter();
if(ownerParameter != null && !ownerParameter.isEmpty()) {
for (String key : ownerParameter.keySet()) {
String propertyName = (key.indexOf(NTF_NAME_SPACE) != 0) ? NTF_NAME_SPACE + key : key;
notifyNode.setProperty(propertyName, ownerParameter.get(key));
}
}
//
if (isCountOnPopover) {
addMixinCountItemOnPopover(notifyNode, notification.getTo());
}
//
userNode.getSession().save();
} catch (Exception e) {
LOG.error("Failed to save the notificaton.", e);
} finally {
NotificationSessionManager.closeSessionProvider(created);
lock.unlock();
}
}
@Override
public List<NotificationInfo> get(WebNotificationFilter filter, int offset, int limit) {
List<NotificationInfo> result = new ArrayList<NotificationInfo>();
if (limit > 0) {
SessionProvider sProvider = CommonsUtils.getSystemSessionProvider();
try {
Node userWebNotificationNode = getOrCreateChannelNode(sProvider, filter.getUserId());
NotificationIterator notificationIterator = new NotificationIterator(filter, userWebNotificationNode, offset, limit);
//nodes order by lastUpdated DESC
List<Node> nodes = notificationIterator.nodes();
//
for (Node node : nodes) {
result.add(getWebNotificationStorage().get(node.getName()));
}
Collections.sort(result, new NotificationInfoUpdateDateComparator());
} catch (Exception e) {
LOG.error("Notifications not found by filter: " + filter.toString(), e);
}
}
return result;
}
@Override
public NotificationInfo get(String id) {
try {
return fillModel(getNodeNotification(CommonsUtils.getSystemSessionProvider(), id));
} catch (Exception e) {
LOG.error("Notification not found by id: " + id, e);
return null;
}
}
@Override
public boolean remove(String notificationId) {
boolean created = NotificationSessionManager.createSystemProvider();
SessionProvider sProvider = NotificationSessionManager.getSessionProvider();
try {
Node node = getNodeNotification(sProvider, notificationId);
if (node != null) {
distributionManager.getDataDistributionType(DataDistributionMode.NONE)
.removeDataNode(node.getParent(), notificationId);
return true;
}
} catch (Exception e) {
LOG.error("Failed to remove the notification id: " + notificationId, e);
} finally {
NotificationSessionManager.closeSessionProvider(created);
}
return false;
}
@Override
public boolean remove(long seconds) {
boolean removed = false;
boolean created = NotificationSessionManager.createSystemProvider();
SessionProvider sProvider = NotificationSessionManager.getSessionProvider();
Calendar cal = Calendar.getInstance();
long delayTime = System.currentTimeMillis() - (seconds * 1000);
cal.setTimeInMillis(delayTime);
try {
Session session = getSession(sProvider);
StringBuilder strQuery = new StringBuilder("SELECT * FROM ").append(NTF_NOTIF_DATE).append(" WHERE ");
strQuery.append(NTF_LAST_MODIFIED_DATE).append(" < ").append(delayTime);
QueryManager qm = session.getWorkspace().getQueryManager();
Query query = qm.createQuery(strQuery.toString(), Query.SQL);
NodeIterator it = query.execute().getNodes();
while (it.hasNext()) {
Node node = it.nextNode();
node.remove();
removed = true;
//
session.save();
}
} catch (Exception e) {
LOG.error("Failed to remove all notifications and delay date: " + converDateToNodeName(cal), e);
return false;
} finally {
NotificationSessionManager.closeSessionProvider(created);
}
return removed;
}
@Override
public boolean remove(String userId, long seconds) {
boolean created = NotificationSessionManager.createSystemProvider();
SessionProvider sProvider = NotificationSessionManager.getSessionProvider();
try {
//
Node userNode = getOrCreateChannelNode(sProvider, userId);
String userPath = XPathUtils.escapeIllegalSQLName(userNode.getPath());
Session session = userNode.getSession();
long delayTime = System.currentTimeMillis() - (seconds * 1000);
StringBuilder strQuery = new StringBuilder("SELECT * FROM ");
strQuery.append(NTF_NOTIF_DATE).append(" WHERE (").append("jcr:path LIKE '").append(userPath)
.append("/%' AND NOT jcr:path LIKE '").append(userPath).append("/%/%'").append(") AND (")
.append(NTF_LAST_MODIFIED_DATE).append(" < ").append(delayTime).append(")");
QueryManager qm = session.getWorkspace().getQueryManager();
Query query = qm.createQuery(strQuery.toString(), Query.SQL);
NodeIterator it = query.execute().getNodes();
while (it.hasNext()) {
Node node = it.nextNode();
node.remove();
//
session.save();
}
} catch (Exception e) {
LOG.error("Failed to remove all notifications for the user id: " + userId, e);
return false;
} finally {
NotificationSessionManager.closeSessionProvider(created);
}
return true;
}
@Override
public void markRead(String notificationId) {
Node node = getNodeNotification(CommonsUtils.getSystemSessionProvider(), notificationId);
if (node != null) {
Session session;
try {
session = node.getSession();
node.setProperty(NTF_READ, "true");
session.save();
} catch (Exception e) {
LOG.error("Failed to update the read notification Id: " + notificationId, e);
}
}
}
@Override
public void hidePopover(String notificationId) {
Node node = getNodeNotification(CommonsUtils.getSystemSessionProvider(), notificationId);
if (node != null) {
Session session;
try {
session = node.getSession();
node.setProperty(NTF_SHOW_POPOVER, "false");
session.save();
} catch (Exception e) {
LOG.error("Failed to hide the notification Id: " + notificationId + " on the popover list.", e);
}
}
}
@Override
public void markAllRead(String userId) {
try {
//
userSettingService.saveLastReadDate(userId, System.currentTimeMillis());
//
if (getWebNotificationStorage() instanceof CachedWebNotificationStorage) {
CachedWebNotificationStorage cacheStorage = (CachedWebNotificationStorage) getWebNotificationStorage();
cacheStorage.updateAllRead(userId);
}
} catch (Exception e) {
LOG.error("Failed to update the all read for userId:" + userId, e);
}
}
/**
* Fill data from JCR node to model object
* @param node
* @return
* @throws Exception
*/
public NotificationInfo fillModel(Node node) throws Exception {
if(node == null) return null;
NotificationInfo notifiInfo = NotificationInfo.instance()
.setTo(node.getProperty(NTF_OWNER).getString()) // owner of notification NTF_OWNER
.key(node.getProperty(NTF_PLUGIN_ID).getString())//pluginId
.setTitle(node.getProperty(NTF_TEXT).getString())
.setOnPopOver(node.getProperty(NTF_SHOW_POPOVER).getBoolean())
.setResetOnBadge(!node.isNodeType(MIX_NEW_NODE))
//
.setLastModifiedDate(node.getProperty(NTF_LAST_MODIFIED_DATE).getLong())
.setId(node.getName())
.end();
if (node.hasProperty(NTF_FROM)) {
notifiInfo.setFrom(node.getProperty(NTF_SENDER).getString()); // user make event of notification
}
if (node.hasProperty(EXO_DATE_CREATED)) {
notifiInfo.setDateCreated(node.getProperty(EXO_DATE_CREATED).getDate());
}
List<String> ignoreProperties = Arrays.asList(NTF_PLUGIN_ID, NTF_TEXT, NTF_OWNER, NTF_LAST_MODIFIED_DATE);
PropertyIterator iterator = node.getProperties();
while (iterator.hasNext()) {
Property p = iterator.nextProperty();
if (p.getName().indexOf(NTF_NAME_SPACE) == 0) {
if (ignoreProperties.contains(p.getName())) {
continue;
}
try {
notifiInfo.with(p.getName().replace(NTF_NAME_SPACE, ""), p.getString());
} catch (Exception e) {
LOG.error("Failed to get the property value.", e);
}
}
}
/**
* Comparison the read time point to decide the read status of message.
* + If less than the read time point, Read = TRUE
* + Else depends on the the status of the message
*/
long lastReadDate = getLastReadDateOfUser(notifiInfo.getTo());
if (notifiInfo.getLastModifiedDate() <= lastReadDate) {
notifiInfo.with(NotificationMessageUtils.READ_PORPERTY.getKey(), "true");
}
//
return notifiInfo;
}
private Node getNodeNotification(SessionProvider sProvider, String notificationId) {
try {
Session session = getSession(sProvider);
StringBuilder strQuery = new StringBuilder("SELECT * FROM ");
strQuery.append(NTF_NOTIF_INFO).append(" WHERE fn:name() = '").append(notificationId).append("'");
QueryManager qm = session.getWorkspace().getQueryManager();
QueryImpl query = (QueryImpl) qm.createQuery(strQuery.toString(), Query.SQL);
query.setOffset(0);
query.setLimit(1);
NodeIterator it = query.execute().getNodes();
if (it.hasNext()) {
return it.nextNode();
}
} catch (Exception e) {
LOG.error("Failed to get web notification node: " + notificationId, e);
}
return null;
}
/**
* Gets {@link WebNotificationStorage}
* @return
*/
private WebNotificationStorage getWebNotificationStorage() {
if (webNotificationStorage == null) {
webNotificationStorage = CommonsUtils.getService(WebNotificationStorage.class);
}
return webNotificationStorage;
}
@Override
public NotificationInfo getUnreadNotification(String pluginId, String activityId, String owner) {
boolean created = NotificationSessionManager.createSystemProvider();
SessionProvider sProvider = NotificationSessionManager.getSessionProvider();
try {
String userPath = getOrCreateChannelNode(sProvider, owner).getPath();
long lastReadDate = getLastReadDateOfUser(owner);
StringBuilder strQuery = new StringBuilder("SELECT * FROM ").append(NTF_NOTIF_INFO);
strQuery.append(" WHERE jcr:path LIKE '").append(XPathUtils.escapeIllegalSQLName(userPath)).append("/%'")
.append(" AND ntf:pluginId = '").append(pluginId).append("'")
.append(" AND ntf:activityId = '").append(activityId).append("'")
.append(" AND ntf:lastModifiedDate > ").append(lastReadDate)
.append(" AND ntf:read = 'false'");
Session session = getSession(sProvider);
QueryManager qm = session.getWorkspace().getQueryManager();
QueryImpl query = (QueryImpl) qm.createQuery(strQuery.toString(), Query.SQL);
query.setOffset(0);
query.setLimit(1);
NodeIterator iter = query.execute().getNodes();
if (iter.hasNext()) {
return getWebNotificationStorage().get(iter.nextNode().getName());
}
} catch (Exception e) {
LOG.debug("Failed to getUnreadNotification ", e);
} finally {
NotificationSessionManager.closeSessionProvider(created);
}
return null;
}
@Override
public void update(NotificationInfo notification, boolean moveTop) {
try {
lock.lock();
//only remove the old notification and create a new one in case of update and move top
//else just update it
if (moveTop) {
remove(notification.getId());
Calendar calendar = Calendar.getInstance();
notification.setLastModifiedDate(calendar);
}
// if moveTop == true, the number on badge will increase
// else the number on badge will not increase
save(notification, moveTop);
} finally {
lock.unlock();
}
}
@Override
public int getNumberOnBadge(String userId) {
if (StringUtils.isNotBlank(userId)) {
try {
SessionProvider sProvider = CommonsUtils.getSystemSessionProvider();
NodeIterator iter = getNewMessage(sProvider, userId, 0);
return (int) iter.getSize();
} catch (Exception e) {
if (LOG.isDebugEnabled()) {
LOG.error("Failed to getNumberOnBadge() ", e);
} else {
LOG.warn("Exception raising when getNumberOnBadge() associated to the userId " + userId);
}
}
return 0;
} else {
LOG.warn("Can't getNumberOnBadge(). The userId is null");
return 0;
}
}
@Override
public void resetNumberOnBadge(String userId) {
try {
SessionProvider sProvider = CommonsUtils.getSystemSessionProvider();
NodeIterator iter = getNewMessage(sProvider, userId, 0);
while (iter.hasNext()) {
Node node = iter.nextNode();
node.removeMixin(MIX_NEW_NODE);
}
getSession(sProvider).save();
} catch (Exception e) {
LOG.error("Failed to resetNumberOnBadge() ", e);
}
}
private NodeIterator getNewMessage(SessionProvider sProvider, String userId, int limit) throws Exception {
Session session = getSession(sProvider);
String userPath = getOrCreateChannelNode(sProvider, userId).getPath();
StringBuilder strQuery = new StringBuilder("SELECT * FROM ").append(MIX_NEW_NODE)
.append(" WHERE jcr:path LIKE '").append(XPathUtils.escapeIllegalSQLName(userPath)).append("/%' ");
QueryManager qm = session.getWorkspace().getQueryManager();
QueryImpl query = (QueryImpl) qm.createQuery(strQuery.toString(), Query.SQL);
if (limit > 0) {
query.setOffset(0);
query.setLimit(limit);
}
//
return query.execute().getNodes();
}
/**
* @param userId
* @return
*/
private long getLastReadDateOfUser(String userId) {
return userSettingService.get(userId).getLastReadDate();
}
public static final class NotificationInfoUpdateDateComparator implements Comparator<NotificationInfo> {
public int compare(NotificationInfo n1, NotificationInfo n2) {
return (int) (n2.getLastModifiedDate() - n1.getLastModifiedDate());
}
}
}