JPAQueueMessageImpl.java
package org.exoplatform.commons.notification.impl.jpa.email;
import static org.exoplatform.commons.notification.impl.jpa.EntityConverter.convertQueueEntityToMessageInfo;
import java.util.Calendar;
import java.util.HashSet;
import java.util.Set;
import org.picocontainer.Startable;
import org.exoplatform.commons.api.notification.model.MessageInfo;
import org.exoplatform.commons.api.notification.service.QueueMessage;
import org.exoplatform.commons.api.persistence.DataInitializer;
import org.exoplatform.commons.api.persistence.ExoTransactional;
import org.exoplatform.commons.notification.NotificationContextFactory;
import org.exoplatform.commons.notification.NotificationUtils;
import org.exoplatform.commons.notification.impl.jpa.email.dao.MailQueueDAO;
import org.exoplatform.commons.notification.impl.jpa.email.entity.MailQueueEntity;
import org.exoplatform.commons.notification.impl.service.MailQueueMessageManager;
import org.exoplatform.container.xml.InitParams;
import org.exoplatform.management.annotations.ManagedBy;
import org.exoplatform.services.listener.Event;
import org.exoplatform.services.listener.ListenerService;
import org.exoplatform.services.log.ExoLogger;
import org.exoplatform.services.log.Log;
import org.exoplatform.services.mail.MailService;
@ManagedBy(MailQueueMessageManager.class)
public class JPAQueueMessageImpl implements QueueMessage, Startable {
private static final Log LOG = ExoLogger.getExoLogger(JPAQueueMessageImpl.class);
private static final String MAX_TO_SEND_SYS_KEY = "conf.notification.service.QueueMessage.numberOfMailPerBatch";
private static final String MAX_TO_SEND_KEY = "numberOfMailPerBatch";
private static final int MAX_TO_SEND_DEFAULT = 20;
private boolean enabled = true;
private int maxToSend;
private MailService mailService;
private MailQueueDAO mailQueueDAO;
private ListenerService listenerService;
private NotificationContextFactory notificationContextFactory;
public JPAQueueMessageImpl(MailService mailService,
MailQueueDAO mailQueueDAO,
ListenerService listenerService,
DataInitializer dataInitializer,
NotificationContextFactory notificationContextFactory,
InitParams params) {
this.mailService = mailService;
this.mailQueueDAO = mailQueueDAO;
this.listenerService = listenerService;
this.notificationContextFactory = notificationContextFactory;
maxToSend = NotificationUtils.getSystemValue(params, MAX_TO_SEND_SYS_KEY, MAX_TO_SEND_KEY, MAX_TO_SEND_DEFAULT);
}
/**
* {@inheritDoc}
*/
@Override
public void enable(boolean enabled) {
this.enabled = enabled;
}
/**
* {@inheritDoc}
*/
@Override
public void start() {
}
/**
* {@inheritDoc}
*/
@Override
public void stop() {
}
/**
* {@inheritDoc}
*/
@Override
@ExoTransactional
public boolean put(MessageInfo message) throws Exception {
if (message == null || message.getTo() == null || message.getTo().length() == 0) {
return false;
}
//
if (NotificationUtils.isValidEmailAddresses(message.getTo()) == false) {
LOG.warn(String.format("The email %s is not valid for sending notification", message.getTo()));
return false;
}
//
saveMessageInfo(message);
//
listenerService.broadcast(new Event<QueueMessage, String>(MESSAGE_SENT_FROM_QUEUE, this, message.getId()));
return true;
}
/**
* {@inheritDoc}
*/
@Override
@ExoTransactional
public void send() {
final boolean statsEnabled = notificationContextFactory.getStatistics().isStatisticsEnabled();
int messagesSize = 0;
//
Set<MessageInfo> messages = load();
//
int originalMessagesSize = messages.size();
messagesSize = originalMessagesSize;
if (messagesSize > 0) {
LOG.info(messagesSize + " message(s) will be sent.");
}
for (MessageInfo messageInfo : messages) {
if (messageInfo == null) {
continue;
}
try {
if (sendMessage(messageInfo)) {
LOG.debug("Mail message '{}' sent to user: {}", messageInfo.getId(), messageInfo.getTo());
removeMessageInfo(messageInfo.getId());
LOG.debug("Mail message '{}' removed from queue, to user: {}", messageInfo.getId(), messageInfo.getTo());
//
if (statsEnabled) {
notificationContextFactory.getStatisticsCollector().pollQueue(messageInfo.getPluginId());
}
}
} catch (Exception e) {
messagesSize--;
LOG.error("Error sending message from = '" + messageInfo.getFrom() + "', to = '" + messageInfo.getTo() + "', id = '"
+ messageInfo.getId() + "'", e);
}
}
if (originalMessagesSize > 0) {
LOG.info("{}/{} message(s) are loaded, sent and deleted from queue.", messagesSize, originalMessagesSize);
}
}
/**
* {@inheritDoc}
*/
@Override
public boolean sendMessage(MessageInfo message) throws Exception {
if (message == null) {
throw new IllegalArgumentException("Message is null");
}
if (message.getFrom() == null) {
throw new IllegalStateException("Message with id '" + message.getId() + "' has an empty 'from' field");
}
if (this.enabled) {
// ensure the message is valid
mailService.sendMessage(message.makeEmailNotification());
return true;
}
//
listenerService.broadcast(new Event<QueueMessage, String>(MESSAGE_SENT_FROM_QUEUE, this, message.getId()));
return true;
}
/**
* {@inheritDoc}
*/
@Override
public void removeAll() {
//
LOG.debug("Removing messages: ");
mailQueueDAO.deleteAll();
LOG.debug("Done to removed messages! ");
}
private void saveMessageInfo(MessageInfo message) {
MailQueueEntity mailQueueEntity = new MailQueueEntity();
mailQueueEntity.setType(message.getPluginId());
mailQueueEntity.setFrom(message.getFrom());
mailQueueEntity.setTo(message.getTo());
mailQueueEntity.setSubject(message.getSubject());
mailQueueEntity.setBody(message.getBody());
mailQueueEntity.setFooter(message.getFooter());
mailQueueEntity.setCreationDate(Calendar.getInstance());
mailQueueDAO.create(mailQueueEntity);
}
private Set<MessageInfo> load() {
Set<MessageInfo> messages = new HashSet<>();
for (MailQueueEntity mailQueueEntity : mailQueueDAO.findAll(0, maxToSend)) {
try {
messages.add(convertQueueEntityToMessageInfo(mailQueueEntity));
} catch (Exception e) {
LOG.error("Failed to load message with id = " + mailQueueEntity.getId(), e);
}
}
return messages;
}
private void removeMessageInfo(String id) throws Exception {
LOG.debug("Removing messageId: " + id);
mailQueueDAO.delete(mailQueueDAO.find(Long.parseLong(id)));
//
listenerService.broadcast(new Event<QueueMessage, String>(MESSAGE_DELETED_FROM_QUEUE, this, id));
}
}