QueueMessageImpl.java
/*
* Copyright (C) 2003-2019 eXo Platform SAS.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU Affero General Public License
* as published by the Free Software Foundation; either version 3
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.exoplatform.commons.notification.impl.service;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.locks.ReentrantLock;
import javax.jcr.Node;
import javax.jcr.NodeIterator;
import javax.jcr.Session;
import javax.jcr.query.Query;
import javax.jcr.query.QueryManager;
import javax.jcr.query.QueryResult;
import org.json.JSONObject;
import org.picocontainer.Startable;
import org.exoplatform.commons.api.notification.model.MessageInfo;
import org.exoplatform.commons.api.notification.service.QueueMessage;
import org.exoplatform.commons.api.settings.SettingService;
import org.exoplatform.commons.notification.NotificationConfiguration;
import org.exoplatform.commons.notification.NotificationContextFactory;
import org.exoplatform.commons.notification.NotificationUtils;
import org.exoplatform.commons.notification.impl.AbstractService;
import org.exoplatform.commons.notification.impl.NotificationSessionManager;
import org.exoplatform.commons.utils.CommonsUtils;
import org.exoplatform.commons.utils.StringCommonUtils;
import org.exoplatform.container.xml.InitParams;
import org.exoplatform.services.jcr.ext.common.SessionProvider;
import org.exoplatform.services.jcr.impl.core.query.QueryImpl;
import org.exoplatform.services.listener.Event;
import org.exoplatform.services.listener.ListenerService;
import org.exoplatform.services.log.ExoLogger;
import org.exoplatform.services.log.Log;
import org.exoplatform.services.mail.MailService;
public class QueueMessageImpl extends AbstractService implements QueueMessage, Startable {
private static final Log LOG = ExoLogger.getExoLogger(QueueMessageImpl.class);
private static final String MAX_TO_SEND_SYS_KEY = "conf.notification.service.QueueMessage.numberOfMailPerBatch";
private static final String MAX_TO_SEND_KEY = "numberOfMailPerBatch";
private static final int MAX_TO_SEND_DEFAULT = 20;
private boolean enabled = true;
private int maxToSend;
/** .. */
private MailService mailService;
private ListenerService listenerService;
/** .. */
private NotificationConfiguration notificationConfiguration;
private NotificationContextFactory notificationContextFactory;
/** The lock protecting all mutators */
transient final ReentrantLock lock = new ReentrantLock();
/** using the set to keep the messages. */
private Set<MessageInfo> messages = Collections.synchronizedSet(new HashSet<MessageInfo>());
/** .. */
private ThreadLocal<Set<String>> idsRemovingLocal = new ThreadLocal<Set<String>>();
public QueueMessageImpl(NotificationConfiguration notificationConfiguration,
MailService mailService,
ListenerService listenerService,
SettingService settingService,
NotificationContextFactory notificationContextFactory,
InitParams params) {
this.notificationConfiguration = notificationConfiguration;
this.listenerService = listenerService;
this.mailService = mailService;
this.notificationContextFactory = notificationContextFactory;
maxToSend = NotificationUtils.getSystemValue(params, MAX_TO_SEND_SYS_KEY, MAX_TO_SEND_KEY, MAX_TO_SEND_DEFAULT);
}
@Override
public boolean put(MessageInfo message) throws Exception {
final boolean stats = notificationContextFactory.getStatistics().isStatisticsEnabled();
//
if (message == null || message.getTo() == null || message.getTo().length() == 0) {
return false;
}
//
if (NotificationUtils.isValidEmailAddresses(message.getTo()) == false) {
LOG.warn(String.format("The email %s is not valid for sending notification", message.getTo()));
return false;
}
//
if (stats) {
LOG.info("Tenant Name:: " + CommonsUtils.getRepository().getConfiguration().getName());
LOG.info("Message::From: " + message.getFrom() + " To: " + message.getTo() + " body: " + message.getBody());
}
saveMessageInfo(message);
//
listenerService.broadcast(new Event<QueueMessage, String>(MESSAGE_ADDED_IN_QUEUE, this, message.getId()));
return true;
}
@Override
public void send() throws Exception {
final boolean stats = notificationContextFactory.getStatistics().isStatisticsEnabled();
SessionProvider sProvider = SessionProvider.createSystemProvider();
try {
//
load(sProvider);
if (idsRemovingLocal.get() == null) {
idsRemovingLocal.set(new HashSet<String>());
}
//
if (messages.size() > 0) {
LOG.info(messages.size() + " message(s) will be sent.");
}
for (MessageInfo messageInfo : messages) {
if (messageInfo != null && !idsRemovingLocal.get().contains(messageInfo.getId())
&& sendMessage(messageInfo)) {
LOG.debug("Message sent to user: " + messageInfo.getTo());
//
idsRemovingLocal.get().add(messageInfo.getId());
if (stats) {
notificationContextFactory.getStatisticsCollector().pollQueue(messageInfo.getPluginId());
}
}
}
} catch (Exception e) {
LOG.warn("Failed to send message.");
LOG.debug(e.getMessage(), e);
} finally {
sProvider.close();
removeMessageInfo();
}
}
/**
* Loading the messageInfo as buffer with Limit
* and sinceTime
* @param sProvider
*/
private void load(SessionProvider sProvider) {
try {
NodeIterator iterator = getMessageInfoNodes(sProvider);
while (iterator.hasNext()) {
Node node = iterator.nextNode();
MessageInfo messageInfo = getMessageInfo(node);
messageInfo.setId(node.getUUID());
messages.add(messageInfo);
}
} catch (Exception e) {
LOG.warn("Failed to load message.");
LOG.debug(e.getMessage(), e);
}
}
private void saveMessageInfo(MessageInfo message) {
final ReentrantLock lock = this.lock;
lock.lock();
boolean created = NotificationSessionManager.createSystemProvider();
SessionProvider sProvider = NotificationSessionManager.getSessionProvider();
try {
message.setCreatedTime(System.currentTimeMillis());
Node messageInfoHome = getMessageInfoHomeNode(sProvider, notificationConfiguration.getWorkspace());
Node messageInfoNode = messageInfoHome.addNode(String.valueOf(message.getCreatedTime()), NTF_MESSAGE_INFO);
if (messageInfoNode.canAddMixin("mix:referenceable")) {
messageInfoNode.addMixin("mix:referenceable");
}
//
saveData(messageInfoNode, StringCommonUtils.compress(message.toJSON()));
sessionSave(messageInfoHome);
} catch (Exception e) {
LOG.warn("Failed to save message.");
LOG.debug(e.getMessage() + message.toJSON(), e);
} finally {
NotificationSessionManager.closeSessionProvider(created);
lock.unlock();
}
}
private void removeMessageInfo() {
SessionProvider sProvider = SessionProvider.createSystemProvider();
final ReentrantLock lock = this.lock;
List<String> ids = new ArrayList<String>(idsRemovingLocal.get()) ;
try {
lock.lock();
Session session = getSession(sProvider, notificationConfiguration.getWorkspace());
for (String messageId : ids) {
session.getNodeByUUID(messageId).remove();
//
listenerService.broadcast(new Event<QueueMessage, String>(MESSAGE_DELETED_FROM_QUEUE, this, messageId));
LOG.debug("Removing messageId: " + messageId);
}
session.save();
} catch (Exception e) {
LOG.warn("Failed to remove message.");
LOG.debug(e.getMessage(), e);
} finally {
messages.clear();
idsRemovingLocal.get().removeAll(ids);
lock.unlock();
sProvider.close();
}
}
private NodeIterator getMessageInfoNodes(SessionProvider sProvider) {
try {
Node messageInfoHome = getMessageInfoHomeNode(sProvider, notificationConfiguration.getWorkspace());
QueryManager qm = messageInfoHome.getSession().getWorkspace().getQueryManager();
StringBuilder sqlQuery = new StringBuilder();
sqlQuery.append("SELECT * FROM ").append(NTF_MESSAGE_INFO)
.append(" WHERE jcr:path LIKE '").append(messageInfoHome.getPath()).append("/%' AND NOT jcr:path LIKE '")
.append(messageInfoHome.getPath()).append("/%/%'")
.append(" ORDER BY exo:name");
QueryImpl query = (QueryImpl) qm.createQuery(sqlQuery.toString(), Query.SQL);
query.setOffset(0);
query.setLimit(maxToSend);
QueryResult result = query.execute();
return result.getNodes();
} catch (Exception e) {
LOG.warn("Failed to get message from node.");
LOG.debug(e.getMessage(), e);
}
return null;
}
public MessageInfo getMessageInfo(Node messageInfoNode) {
try {
String messageJson = getDataJson(messageInfoNode);
JSONObject object = new JSONObject(messageJson);
MessageInfo info = new MessageInfo();
info.pluginId(object.optString("pluginId"))
.from(object.getString("from"))
.to(object.getString("to"))
.subject(object.getString("subject"))
.body(object.getString("body"))
.footer(object.optString("footer"))
.setCreatedTime(object.getLong("createdTime"));
//
return info;
} catch (Exception e) {
LOG.warn("Failed to map message between node and model.");
LOG.debug(e.getMessage(), e);
}
return null;
}
@Override
public boolean sendMessage(MessageInfo message) throws Exception {
if (message == null) {
throw new IllegalArgumentException("Message is null");
}
if (this.enabled) {
try {
//ensure the message is valid
if (message.getFrom() == null) {
return false;
}
mailService.sendMessage(message.makeEmailNotification());
return true;
} catch (Exception e) {
LOG.error("Error while sending a message - Cause : " + e.getMessage(), e);
return false;
}
}
//
listenerService.broadcast(new Event<QueueMessage, String>(MESSAGE_SENT_FROM_QUEUE, this, message.getId()));
return true;
}
/**
* {@inheritDoc}
*/
@Override
public void enable(boolean enabled) {
this.enabled = enabled;
}
@Override
public void start() {
}
@Override
public void stop() {
}
private void saveData(Node node, InputStream is) throws Exception {
Node fileNode = node.addNode("datajson", "nt:file");
Node nodeContent = fileNode.addNode("jcr:content", "nt:resource");
//
nodeContent.setProperty("jcr:mimeType", "application/x-gzip");
nodeContent.setProperty("jcr:data", is);
nodeContent.setProperty("jcr:lastModified", Calendar.getInstance().getTimeInMillis());
}
private String getDataJson(Node node) throws Exception {
Node fileNode = node.getNode("datajson");
Node nodeContent = fileNode.getNode("jcr:content");
InputStream stream = nodeContent.getProperty("jcr:data").getStream();
return StringCommonUtils.decompress(stream);
}
public void removeAll() {
SessionProvider sProvider = SessionProvider.createSystemProvider();
int t = 0, j = 0;
String pli="";
try {
Session session = getSession(sProvider, notificationConfiguration.getWorkspace());
Node root = session.getRootNode();
//
LOG.debug("Removing messages: ");
if (root.hasNode("eXoNotification/messageInfoHome")) {
NodeIterator it = root.getNode("eXoNotification/messageInfoHome").getNodes();
//
removeNodes(session, it);
}
LOG.debug("Done to removed messages! ");
//
LOG.debug("Removing notification info... ");
NodeIterator it = root.getNode("eXoNotification/messageHome").getNodes();
List<String> pluginPaths = new ArrayList<String>();
while (it.hasNext()) {
pluginPaths.add(it.nextNode().getPath());
}
session.logout();
for (String string : pluginPaths) {
pli = string;
LOG.debug("Remove notification info on plugin: " + pli);
//
session = getSession(sProvider, notificationConfiguration.getWorkspace());
it = ((Node) session.getItem(string)).getNodes();
while (it.hasNext()) {
NodeIterator hIter = it.nextNode().getNodes();
j = removeNodes(session, hIter);
t += j;
}
LOG.debug("Removed " + j + " nodes info on plugin: " + pli);
session.logout();
}
} catch (Exception e) {
LOG.debug("Removed " + j + " nodes info on plugin: " + pli);
LOG.debug("Removed all " + t + " nodes.");
LOG.debug("Failed to remove all data of feature notification." + e.getMessage());
} finally {
sProvider.close();
}
}
private int removeNodes(Session session, NodeIterator it) throws Exception {
int i = 0, size = Integer.valueOf(System.getProperty("sizePersiter", "200"));
LOG.debug("Starting to remove nodes...");
while (it.hasNext()) {
it.nextNode().remove();
++i;
if (i % size == 0) {
session.save();
}
}
session.save();
LOG.debug(String.format("Done to removed %s nodes", i));
return i;
}
}