MailNotificationStorageImpl.java
/*
* Copyright (C) 2003-2019 eXo Platform SAS.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU Affero General Public License
* as published by the Free Software Foundation; either version 3
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, see<http://www.gnu.org/licenses/>.
*/
package org.exoplatform.commons.notification.impl.service.storage;
import org.exoplatform.commons.api.notification.NotificationContext;
import org.exoplatform.commons.api.notification.model.NotificationInfo;
import org.exoplatform.commons.api.notification.model.PluginKey;
import org.exoplatform.commons.api.notification.model.UserSetting;
import org.exoplatform.commons.api.notification.service.storage.MailNotificationStorage;
import org.exoplatform.commons.notification.NotificationConfiguration;
import org.exoplatform.commons.notification.NotificationContextFactory;
import org.exoplatform.commons.notification.NotificationUtils;
import org.exoplatform.commons.notification.impl.AbstractService;
import org.exoplatform.commons.notification.impl.NotificationSessionManager;
import org.exoplatform.commons.notification.job.NotificationJob;
import org.exoplatform.services.jcr.ext.common.SessionProvider;
import org.exoplatform.services.log.ExoLogger;
import org.exoplatform.services.log.Log;
import javax.jcr.Item;
import javax.jcr.Node;
import javax.jcr.NodeIterator;
import javax.jcr.Session;
import javax.jcr.query.Query;
import javax.jcr.query.QueryManager;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantLock;
public class MailNotificationStorageImpl extends AbstractService implements MailNotificationStorage {
private static final Log LOG = ExoLogger.getLogger(MailNotificationStorageImpl.class);
private static final String REMOVE_ALL = "removeAll";
private static final String REMOVE_DAILY = "removeDaily";
private NotificationContextFactory notificationContextFactory;
private String workspace;
private final ReentrantLock lock = new ReentrantLock();
private Map<String, Set<String>> removeByCallBack = new ConcurrentHashMap<String, Set<String>>();
public MailNotificationStorageImpl(NotificationConfiguration configuration, NotificationContextFactory notificationContextFactory) {
this.notificationContextFactory = notificationContextFactory;
this.workspace = configuration.getWorkspace();
}
@Override
public void save(NotificationInfo message) throws Exception {
boolean created = NotificationSessionManager.createSystemProvider();
SessionProvider sProvider = NotificationSessionManager.getSessionProvider();
final ReentrantLock localLock = lock;
try {
localLock.lock();
Node messageHomeNode = getOrCreateMessageParent(sProvider, workspace, message.getKey().getId());
Node messageNode = messageHomeNode.addNode(message.getId(), NTF_MESSAGE);
messageNode.setProperty(NTF_FROM, message.getFrom());
messageNode.setProperty(NTF_ORDER, message.getOrder());
messageNode.setProperty(NTF_PROVIDER_TYPE, message.getKey().getId());
messageNode.setProperty(NTF_OWNER_PARAMETER, message.getArrayOwnerParameter());
messageNode.setProperty(NTF_SEND_TO_DAILY, message.getSendToDaily());
messageNode.setProperty(NTF_SEND_TO_WEEKLY, message.getSendToWeekly());
messageHomeNode.getSession().save();
//record statistics insert entity
if (notificationContextFactory.getStatistics().isStatisticsEnabled()) {
notificationContextFactory.getStatisticsCollector().insertEntity(NTF_MESSAGE);
}
} catch (Exception e) {
LOG.error("Failed to save the NotificationMessage", e);
} finally {
NotificationSessionManager.closeSessionProvider(created);
localLock.unlock();
}
}
@Override
public Map<PluginKey, List<NotificationInfo>> getByUser(NotificationContext context, UserSetting setting) {
boolean created = NotificationSessionManager.createSystemProvider();
SessionProvider sProvider = NotificationSessionManager.getSessionProvider();
Map<PluginKey, List<NotificationInfo>> notificationData = new LinkedHashMap<PluginKey, List<NotificationInfo>>();
try {
boolean isWeekly = context.value(NotificationJob.JOB_WEEKLY);
if (isWeekly) {
for (String pluginId : setting.getWeeklyPlugins()) {
putMap(notificationData, PluginKey.key(pluginId), getWeeklyNotifs(sProvider, pluginId, setting.getUserId()));
}
}
//
boolean isDaily = context.value(NotificationJob.JOB_DAILY);
if (isDaily) {
for (String pluginId : setting.getDailyPlugins()) {
putMap(notificationData, PluginKey.key(pluginId), getDailyNotifs(sProvider, context, pluginId, setting.getUserId()));
}
}
} catch (Exception e) {
LOG.error("Failed to get the NotificationMessage by user: " + setting.getUserId(), e);
} finally {
NotificationSessionManager.closeSessionProvider(created);
}
return notificationData;
}
private static void putMap(Map<PluginKey, List<NotificationInfo>> notificationData, PluginKey key, List<NotificationInfo> values) {
if (notificationData.containsKey(key)) {
List<NotificationInfo> messages = notificationData.get(key);
for (NotificationInfo notificationMessage : values) {
if (messages.size() == 0 || messages.contains(notificationMessage) == false) {
messages.add(notificationMessage);
}
}
//
if(messages.size() > 0 ) {
notificationData.put(key, messages);
}
} else if (values.size() > 0) {
notificationData.put(key, values);
}
}
private List<NotificationInfo> getDailyNotifs(SessionProvider sProvider,
NotificationContext context,
String pluginId,
String userId) throws Exception {
List<NotificationInfo> messages = new ArrayList<NotificationInfo>();
Node plugInDayNode = getParentNodeByDate(sProvider, workspace, context, pluginId);
NodeIterator iter = getDailyNodes(plugInDayNode, userId);
Session session = plugInDayNode.getSession();
while (iter.hasNext()) {
Node node = iter.nextNode();
NotificationInfo model = fillModel(node);
messages.add(model.setTo(userId));
removeDaily(session, model, node.getPath());
}
return messages;
}
/**
* Makes the node path for MessageHome node '/eXoNotification/messageHome/<pluginId>/<DAY_OF_MONTH>/'
*
* @param sProvider
* @param workspace
* @param context keeping the day if in the daily job context
* @param pluginId
* @return
* @throws Exception
*/
private Node getParentNodeByDate(SessionProvider sProvider,
String workspace,
NotificationContext context,
String pluginId) throws Exception {
Node providerNode = getMessageNodeByPluginId(sProvider, workspace, pluginId);
String dayName = context.value(NotificationJob.DAY_OF_JOB);
return getOrCreateMessageNode(providerNode, DAY + dayName);
}
private List<NotificationInfo> getWeeklyNotifs(SessionProvider sProvider,
String pluginId,
String userId) throws Exception {
List<NotificationInfo> messages = new ArrayList<NotificationInfo>();
Node messageHomeNode = getMessageNodeByPluginId(sProvider, workspace, pluginId);
NodeIterator iter = getWeeklyNodes(messageHomeNode, userId);
Session session = messageHomeNode.getSession();
while (iter.hasNext()) {
Node node = iter.nextNode();
NotificationInfo model = fillModel(node);
messages.add(model.setTo(userId));
removeWeekly(session, model, node.getPath());
}
return messages;
}
private NodeIterator getWeeklyNodes(Node messageHomeNode, String userId) throws Exception {
final boolean stats = notificationContextFactory.getStatistics().isStatisticsEnabled();
long startTime = 0;
if ( stats ) startTime = System.currentTimeMillis();
//
userId = userId.replace("'", "''");
StringBuilder strQuery = new StringBuilder("SELECT * FROM ").append(NTF_MESSAGE).append(" WHERE ");
strQuery.append(" jcr:path LIKE '").append(messageHomeNode.getPath()).append("/%'");
strQuery.append(" AND (").append(NTF_SEND_TO_WEEKLY).append("='").append(userId).append("'");
strQuery.append(" OR ").append(NTF_SEND_TO_WEEKLY).append("='").append(NotificationInfo.FOR_ALL_USER)
.append("') AND (").append(NTF_FROM).append("<>'").append(userId).append("' OR ").append(NTF_FROM).append(" IS NULL)");
strQuery.append(" order by ").append(NTF_ORDER).append(ASCENDING).append(", exo:dateCreated").append(DESCENDING);
QueryManager qm = messageHomeNode.getSession().getWorkspace().getQueryManager();
Query query = qm.createQuery(strQuery.toString(), Query.SQL);
NodeIterator it = query.execute().getNodes();
//record statistics insert entity
if (stats) {
notificationContextFactory.getStatisticsCollector().queryExecuted(strQuery.toString(), it.getSize(), System.currentTimeMillis() - startTime);
}
return it;
}
private NodeIterator getDailyNodes(Node pluginDayNode, String userId) throws Exception {
final boolean stats = notificationContextFactory.getStatistics().isStatisticsEnabled();
long startTime = 0;
if ( stats ) startTime = System.currentTimeMillis();
//
userId = userId.replace("'", "''");
StringBuilder strQuery = new StringBuilder("SELECT * FROM ").append(NTF_MESSAGE).append(" WHERE ");
strQuery.append(" (jcr:path LIKE '").append(pluginDayNode.getPath()).append("/%'")
.append(" AND NOT jcr:path LIKE '").append(pluginDayNode.getPath()).append("/%/%')");
strQuery.append(" AND (").append(NTF_SEND_TO_DAILY).append("='").append(userId).append("'");
strQuery.append(" OR ").append(NTF_SEND_TO_DAILY).append("='").append(NotificationInfo.FOR_ALL_USER)
.append("') AND (").append(NTF_FROM).append("<>'").append(userId).append("' OR ").append(NTF_FROM).append(" IS NULL)");
strQuery.append(" order by ").append(NTF_ORDER).append(ASCENDING).append(", exo:dateCreated").append(DESCENDING);
QueryManager qm = pluginDayNode.getSession().getWorkspace().getQueryManager();
Query query = qm.createQuery(strQuery.toString(), Query.SQL);
NodeIterator it = query.execute().getNodes();
if (stats) {
notificationContextFactory.getStatisticsCollector().queryExecuted(strQuery.toString(), it.getSize(), System.currentTimeMillis() - startTime);
}
return it;
}
public NotificationInfo fillModel(Node node) throws Exception {
if(node == null) return null;
if(!node.hasProperty(EXO_LAST_MODIFIED_DATE)) {
if(node.isNodeType(EXO_MODIFY)) {
node.setProperty(EXO_LAST_MODIFIED_DATE, Calendar.getInstance());
node.save();
}
else if(node.canAddMixin(EXO_MODIFY)) {
node.addMixin(EXO_MODIFY);
node.setProperty(EXO_LAST_MODIFIED_DATE, Calendar.getInstance());
node.save();
}
else {
LOG.warn("Cannot add mixin to node '{}'.", node.getPath());
}
}
NotificationInfo message = NotificationInfo.instance()
.setOrder(Integer.valueOf(node.getProperty(NTF_ORDER).getString()))
.key(node.getProperty(NTF_PROVIDER_TYPE).getString())
.setOwnerParameter(node.getProperty(NTF_OWNER_PARAMETER).getValues())
.setSendToDaily(NotificationUtils.valuesToArray(node.getProperty(NTF_SEND_TO_DAILY).getValues()))
.setSendToWeekly(NotificationUtils.valuesToArray(node.getProperty(NTF_SEND_TO_WEEKLY).getValues()))
.setLastModifiedDate(node.getProperty(EXO_LAST_MODIFIED_DATE).getDate())
.setId(node.getName());
if (node.hasProperty(NTF_FROM)) {
message.setFrom(node.getProperty(NTF_FROM).getString());
}
return message;
}
private void putRemoveMap(String key, String value) {
Set<String> set = removeByCallBack.get(key);
if (set == null) {
set = new HashSet<String>();
removeByCallBack.put(key, set);
}
set.add(value);
}
/**
* In the case if the notification plug-in allows to impact all of user.
* In the case Daily, the notifiation_send_to_daily will be remove the sendAll value and still keep it for weekly
*
* @param session
* @param message
* @param path
* @throws Exception
*/
private void removeDaily(Session session, NotificationInfo message, String path) throws Exception {
if (message.getSendToDaily().length == 1 && message.getSendToWeekly().length == 0) {
putRemoveMap(REMOVE_ALL, path);
} if (message.getSendToDaily().length > 0 && NotificationInfo.FOR_ALL_USER.equals(message.getSendToDaily()[0])) {
putRemoveMap(REMOVE_DAILY, path);
} else {
removeProperty(session, path, NTF_SEND_TO_DAILY, message.getTo());
}
}
private void removeWeekly(Session session, NotificationInfo message, String path) throws Exception {
if (message.isSendAll() || message.getSendToWeekly().length == 1) {
putRemoveMap(REMOVE_ALL, path);
} else {
removeProperty(session, path, NTF_SEND_TO_WEEKLY, message.getTo());
}
}
private void removeProperty(Session session, String path, String property, String value) {
final boolean stats = notificationContextFactory.getStatistics().isStatisticsEnabled();
try {
Node node = (Node) session.getItem(path);
List<String> values = NotificationUtils.valuesToList(node.getProperty(property).getValues());
if (values.contains(value)) {
values.remove(value);
if (values.isEmpty()) {
values.add("");
}
node.setProperty(property, values.toArray(new String[values.size()]));
node.save();
//record entity update here
if (stats) {
notificationContextFactory.getStatisticsCollector().updateEntity(NTF_MESSAGE);
}
}
} catch (Exception e) {
LOG.warn(String.format("Failed to remove property %s of value %s on node ", property, value));
}
}
@Override
public void removeMessageAfterSent(NotificationContext ctx) throws Exception {
final boolean stats = notificationContextFactory.getStatistics().isStatisticsEnabled();
boolean created = NotificationSessionManager.createSystemProvider();
SessionProvider sProvider = NotificationSessionManager.getSessionProvider();
try {
Node notificationHome = getNotificationHomeNode(sProvider, workspace);
Session session = notificationHome.getSession();
// remove all
Set<String> listPaths = removeByCallBack.get(REMOVE_ALL);
removeByCallBack.remove(REMOVE_ALL);
if (listPaths != null && listPaths.size() > 0) {
for (String nodePath : listPaths) {
try {
session.getItem(nodePath).remove();
//record entity delete here
if (stats) {
notificationContextFactory.getStatisticsCollector().deleteEntity(NTF_MESSAGE);
}
LOG.debug("Remove NotificationMessage " + nodePath);
} catch (Exception e) {
LOG.warn("Failed to remove node of NotificationMessage " + nodePath + "\n" + e.getMessage());
LOG.debug("Remove NotificationMessage " + nodePath, e);
}
}
session.save();
}
listPaths = removeByCallBack.get(REMOVE_DAILY);
if (listPaths != null && listPaths.size() > 0) {
for (String nodePath : listPaths) {
try {
Item item = session.getItem(nodePath);
if (item.isNode()) {
Node node = (Node) item;
node.setProperty(NTF_SEND_TO_DAILY, new String[] { "" });
}
LOG.debug("Remove SendToDaily property " + nodePath);
} catch (Exception e) {
LOG.warn("Failed to remove SendToDaily property of " + nodePath + "\n" + e.getMessage());
LOG.debug("Remove SendToDaily property " + nodePath, e);
}
}
session.save();
}
} catch (Exception e) {
LOG.warn("Failed to remove message after sent email notification", e);
} finally {
NotificationSessionManager.closeSessionProvider(created);
}
}
}