NotificationServiceImpl.java

/*
 * Copyright (C) 2003-2019 eXo Platform SAS.
 *
 * This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU Affero General Public License
* as published by the Free Software Foundation; either version 3
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, see<http://www.gnu.org/licenses/>.
 */
package org.exoplatform.commons.notification.impl.service;

import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;

import org.exoplatform.commons.api.notification.NotificationContext;
import org.exoplatform.commons.api.notification.channel.AbstractChannel;
import org.exoplatform.commons.api.notification.channel.ChannelManager;
import org.exoplatform.commons.api.notification.lifecycle.AbstractNotificationLifecycle;
import org.exoplatform.commons.api.notification.model.ChannelKey;
import org.exoplatform.commons.api.notification.model.MessageInfo;
import org.exoplatform.commons.api.notification.model.NotificationInfo;
import org.exoplatform.commons.api.notification.model.PluginKey;
import org.exoplatform.commons.api.notification.model.UserSetting;
import org.exoplatform.commons.api.notification.model.UserSetting.FREQUENCY;
import org.exoplatform.commons.api.notification.service.QueueMessage;
import org.exoplatform.commons.api.notification.service.setting.PluginSettingService;
import org.exoplatform.commons.api.notification.service.setting.UserSettingService;
import org.exoplatform.commons.api.notification.service.storage.MailNotificationStorage;
import org.exoplatform.commons.api.notification.service.storage.NotificationService;
import org.exoplatform.commons.api.notification.service.template.DigestorService;
import org.exoplatform.commons.api.settings.SettingService;
import org.exoplatform.commons.api.settings.data.Context;
import org.exoplatform.commons.notification.NotificationContextFactory;
import org.exoplatform.commons.notification.NotificationUtils;
import org.exoplatform.commons.notification.channel.MailChannel;
import org.exoplatform.commons.notification.impl.AbstractService;
import org.exoplatform.commons.notification.impl.NotificationContextImpl;
import org.exoplatform.commons.utils.CommonsUtils;
import org.exoplatform.services.log.ExoLogger;
import org.exoplatform.services.log.Log;

public class NotificationServiceImpl extends AbstractService implements NotificationService {
  private static final Log                 LOG = ExoLogger.getLogger(NotificationServiceImpl.class);

  /** */
  private final MailNotificationStorage    storage;

  /** */
  private final DigestorService            digestorService;

  /** */
  private final UserSettingService         userService;

  /** */
  private final NotificationContextFactory notificationContextFactory;

  /** */
  private final ChannelManager             channelManager;

  public NotificationServiceImpl(ChannelManager channelManager,
                                 UserSettingService userService,
                                 DigestorService digestorService,
                                 MailNotificationStorage storage,
                                 NotificationContextFactory notificationContextFactory) {
    this.userService = userService;
    this.digestorService = digestorService;
    this.storage = storage;
    this.notificationContextFactory = notificationContextFactory;
    this.channelManager = channelManager;
  }

  @Override
  public void process(NotificationInfo notification) throws Exception {
    if (notification == null) {
      throw new IllegalArgumentException("Notification argument shouldn't be null");
    }
    String pluginId = notification.getKey().getId();

    // statistic metrics
    if (this.notificationContextFactory.getStatisticsService().isStatisticsEnabled()) {
      this.notificationContextFactory.getStatisticsCollector().createNotificationInfoCount(pluginId);
    }
    //
    NotificationContext ctx = NotificationContextImpl.cloneInstance();
    ctx.setNotificationInfo(notification);
    //
    List<String> userIds = null;

    List<AbstractChannel> channels = channelManager.getChannels();
    for (AbstractChannel channel : channels) {
      if (!CommonsUtils.getService(PluginSettingService.class).isActive(channel.getId(), pluginId)) {
        continue;
      }

      AbstractNotificationLifecycle lifecycle = channelManager.getLifecycle(ChannelKey.key(channel.getId()));
      if (notification.isSendAll()) {
        SettingService settingService = CommonsUtils.getService(SettingService.class);
        long usersCount = settingService.countContextsByType(Context.USER.getName());
        int maxResults = 100;
        for (int i = 0; i < usersCount; i += maxResults) {
          List<String> users = settingService.getContextNamesByType(Context.USER.getName(), i, maxResults);
          lifecycle.process(ctx, users.toArray(new String[users.size()]));
        }
      } else {
        if (notification.getSendToUserIds() == null || notification.getSendToUserIds().isEmpty()) {
          LOG.debug("Notification with id '{}' and parameters = '{}' not sent because receivers are empty",
                    notification.getId(),
                    notification.getOwnerParameter());
          userIds = Collections.emptyList();
        } else {
          userIds = notification.getSendToUserIds();
        }
        lifecycle.process(ctx, userIds.toArray(new String[userIds.size()]));
      }
    }
  }

  @Override
  public void process(Collection<NotificationInfo> messages) throws Exception {
    for (NotificationInfo message : messages) {
      if (message == null) {
        continue;
      }
      process(message);
    }
  }

  @Override
  public void digest(NotificationContext notifContext) throws Exception {
    UserSetting defaultConfigPlugins = getDefaultUserSetting(notifContext.getPluginSettingService()
                                                                         .getActivePluginIds(UserSetting.EMAIL_CHANNEL));
    // process for users used setting
    long startTime = System.currentTimeMillis();
    int limit = 100;
    int offset = 0;
    while (true) {
      List<UserSetting> userDigestSettings = this.userService.getDigestSettingForAllUser(notifContext, offset, limit);
      if (userDigestSettings.size() == 0) {
        break;
      }
      send(notifContext, userDigestSettings);
      offset += limit;
    }
    LOG.info("Time spent to send mail messages for users having personal settings: " + (System.currentTimeMillis() - startTime)
        + "ms.");
    startTime = System.currentTimeMillis();
    // process for users used default setting
    if (defaultConfigPlugins.getDailyPlugins().size() > 0 || defaultConfigPlugins.getWeeklyPlugins().size() > 0) {
      offset = 0;
      while (true) {
        List<UserSetting> usersWithDefaultSettings = this.userService.getDigestDefaultSettingForAllUser(offset, limit);
        if (usersWithDefaultSettings.size() == 0) {
          break;
        }
        sendDefault(notifContext, usersWithDefaultSettings, defaultConfigPlugins);
        offset += limit;
      }
    }

    // Clear all stored message
    storage.removeMessageAfterSent(notifContext);
    LOG.info("Time spent to send mail messages for users having default settings: " + (System.currentTimeMillis() - startTime)
        + "ms.");
  }

  private void send(NotificationContext context, List<UserSetting> userSettings) {
    final boolean stats = notificationContextFactory.getStatistics().isStatisticsEnabled();

    for (UserSetting userSetting : userSettings) {
      if (!userSetting.isChannelActive(MailChannel.ID) || !userSetting.isEnabled()
          || NotificationUtils.isDeletedMember(userSetting.getUserId())) {
        continue;
      }

      putMessageToQueue(context, userSetting, stats);
    }
  }

  private void sendDefault(NotificationContext context, List<UserSetting> userSettings, UserSetting defaultConfigPlugins) {
    final boolean stats = notificationContextFactory.getStatistics().isStatisticsEnabled();

    for (UserSetting userSetting : userSettings) {
      if (!userSetting.isEnabled() || NotificationUtils.isDeletedMember(userSetting.getUserId())) {
        continue;
      }

      userSetting = defaultConfigPlugins.clone()
                                        .setUserId(userSetting.getUserId())
                                        .setLastUpdateTime(userSetting.getLastUpdateTime());
      putMessageToQueue(context, userSetting, stats);
    }
  }

  private void putMessageToQueue(NotificationContext context, UserSetting userSetting, final boolean stats) {
    Map<PluginKey, List<NotificationInfo>> notificationMessageMap = storage.getByUser(context, userSetting);
    if (notificationMessageMap.size() > 0) {
      MessageInfo messageInfo = this.digestorService.buildMessage(context, notificationMessageMap, userSetting);
      if (messageInfo != null) {
        //
        try {
          CommonsUtils.getService(QueueMessage.class).put(messageInfo);

          if (stats) {
            notificationContextFactory.getStatisticsCollector().createMessageInfoCount(messageInfo.getPluginId());
            notificationContextFactory.getStatisticsCollector().putQueue(messageInfo.getPluginId());
          }
        } catch (Exception e) {
          LOG.error("An error occurred while putting message " + messageInfo.getId() + " to user " + messageInfo.getTo()
              + " in queue", e);
        }
      }
    }
  }

  /**
   * The method uses to get the notification plugin's default setting. If it had
   * been changed by the administrator then the setting must be followed by
   * admin's setting. For example:
   * 
   * @param activatedPluginsByAdminSetting The setting what set by administrator
   * @return
   */
  private UserSetting getDefaultUserSetting(List<String> activatedPluginsByAdminSetting) {
    UserSetting setting = UserSetting.getInstance();
    // default setting loaded from configuration xml file
    UserSetting defaultSetting = userService.getDefaultSettings();
    for (String string : activatedPluginsByAdminSetting) {
      if (defaultSetting.isInWeekly(string)) {
        setting.addPlugin(string, FREQUENCY.WEEKLY);
      } else if (defaultSetting.isInDaily(string)) {
        setting.addPlugin(string, FREQUENCY.DAILY);
      }
    }

    return setting;
  }
}