MailNotificationsMigration.java

package org.exoplatform.commons.migration;

import java.io.InputStream;
import java.util.concurrent.Callable;

import javax.jcr.Node;
import javax.jcr.NodeIterator;
import javax.jcr.RepositoryException;
import javax.jcr.Session;
import javax.servlet.ServletContext;

import org.apache.commons.lang.StringUtils;
import org.json.JSONObject;

import org.exoplatform.commons.api.notification.model.MessageInfo;
import org.exoplatform.commons.api.settings.SettingService;
import org.exoplatform.commons.api.settings.SettingValue;
import org.exoplatform.commons.api.settings.data.Context;
import org.exoplatform.commons.api.settings.data.Scope;
import org.exoplatform.commons.notification.NotificationConfiguration;
import org.exoplatform.commons.notification.impl.jpa.email.JPAMailNotificationStorage;
import org.exoplatform.commons.notification.impl.jpa.email.JPAQueueMessageImpl;
import org.exoplatform.commons.notification.impl.service.storage.MailNotificationStorageImpl;
import org.exoplatform.commons.utils.CommonsUtils;
import org.exoplatform.commons.utils.RDBMSMigrationUtils;
import org.exoplatform.commons.utils.StringCommonUtils;
import org.exoplatform.container.ExoContainer;
import org.exoplatform.container.ExoContainerContext;
import org.exoplatform.container.PortalContainer;
import org.exoplatform.container.RootContainer;
import org.exoplatform.container.component.RequestLifeCycle;
import org.exoplatform.services.jcr.RepositoryService;
import org.exoplatform.services.jcr.config.RepositoryConfigurationException;
import org.exoplatform.services.jcr.ext.hierarchy.NodeHierarchyCreator;
import org.exoplatform.services.jcr.impl.core.SessionImpl;
import org.exoplatform.services.log.ExoLogger;
import org.exoplatform.services.log.Log;
import org.exoplatform.services.scheduler.JobSchedulerService;
import org.exoplatform.services.transaction.TransactionService;

public class MailNotificationsMigration {
  //scope of mail notification migration status
  public static final String MAIL_NOTIFICATION_MIGRATION_DONE_KEY = "MAIL_NOTIFICATION_MIGRATION_DONE";
  //status of mail notifications migration (true if migration completed successfully)
  public static final String MAIL_NOTIFICATION_RDBMS_MIGRATION_DONE = "MAIL_NOTIFICATION_RDBMS_MIGRATION_DONE";
  //status of mail notifications cleanup from JCR (true if cleanup is completed successfully)
  public static final String MAIL_NOTIFICATION_RDBMS_CLEANUP_DONE = "MAIL_NOTIFICATION_RDBMS_CLEANUP_DONE";

  private static final Log LOG = ExoLogger.getLogger(MailNotificationsMigration.class);

  //JPA Storage
  private JPAMailNotificationStorage jpaMailNotificationStorage;
  private JPAQueueMessageImpl jpaQueueMessage;

  //JCR storage
  private MailNotificationStorageImpl jcrNotificationDataStorage;

  private String jcrWorkspace;

  private Session session;
  private NodeHierarchyCreator nodeHierarchyCreator;
  private JobSchedulerService schedulerService;
  private RepositoryService repositoryService;
  private SettingService settingService;
  private TransactionService transactionService;

  public MailNotificationsMigration(MailNotificationStorageImpl jcrNotificationDataStorage,
                                    JPAMailNotificationStorage jpaMailNotificationStorage,
                                    JobSchedulerService schedulerService,
                                    SettingService settingService,
                                    NotificationConfiguration notificationConfiguration,
                                    RepositoryService repositoryService,
                                    TransactionService transactionService,
                                    NodeHierarchyCreator nodeHierarchyCreator) {
    this.jpaMailNotificationStorage = jpaMailNotificationStorage;
    this.jcrNotificationDataStorage = jcrNotificationDataStorage;
    this.nodeHierarchyCreator = nodeHierarchyCreator;
    this.schedulerService = schedulerService;
    this.repositoryService = repositoryService;
    this.settingService = settingService;
    this.transactionService = transactionService;
    this.jcrWorkspace = notificationConfiguration.getWorkspace();

    this.jpaQueueMessage = CommonsUtils.getService(JPAQueueMessageImpl.class);
  }

  private Session getJCRSession() {
    Session jcrSession = null;
    try {
      jcrSession = repositoryService.getCurrentRepository().getSystemSession(jcrWorkspace);
      if (jcrSession instanceof SessionImpl) {
        ((SessionImpl) jcrSession).setTimeout(JPAAsynMigrationService.ONE_DAY_IN_MS);
      }
      transactionService.setTransactionTimeout(JPAAsynMigrationService.ONE_DAY_IN_SECONDS);
    } catch (Exception e) {
      LOG.error("Error while getting Notification nodes for Notifications migration - Cause : " + e.getMessage(), e);
    }
    finally {
      if (jcrSession != null) {
        jcrSession.logout();
      }
    }
    return jcrSession;
  }

  public void migrate() {
    session = getJCRSession();
    //migration of mail notifications data from JCR to RDBMS is done as a background task
    PortalContainer.addInitTask(PortalContainer.getInstance().getPortalContext(), new RootContainer.PortalContainerPostInitTask() {
      @Override
      public void execute(ServletContext context, PortalContainer portalContainer) {
        RDBMSMigrationUtils.getExecutorService().submit(new Callable<Void>() {
          @Override
          public Void call() throws Exception {
            if (!isMailNotifMigrationDone()) {
              try {
                // pause job of sending digest mails
                schedulerService.pauseJob("NotificationDailyJob", "Notification");
                schedulerService.pauseJob("NotificationWeeklyJob", "Notification");
                ExoContainerContext.setCurrentContainer(PortalContainer.getInstance());
                LOG.info("=== Start migration of Mail Notifications data from JCR");
                long startTime = System.currentTimeMillis();
                migrateMailNotifData();
                setMailNotifMigrationDone();
                long endTime = System.currentTimeMillis();
                LOG.info("=== Migration of Mail Notification data done in " + (endTime - startTime) + " ms");
              } catch (Exception e) {
                LOG.error("Error while migrating Mail Notification data from JCR to RDBMS - Cause : " + e.getMessage(), e);
              } finally {
                schedulerService.resumeJob("NotificationDailyJob", "Notification");
                schedulerService.resumeJob("NotificationWeeklyJob", "Notification");
              }
            } else {
              LOG.info("No mail notification data to migrate from JCR to RDBMS");
            }
            cleanupMailNotifications();
            return null;
          }
        });
      }
    });

    //migration of queue messages data from JCR to RDBMS is done as a background task
    PortalContainer.addInitTask(PortalContainer.getInstance().getPortalContext(), new RootContainer.PortalContainerPostInitTask() {
      @Override
      public void execute(ServletContext context, PortalContainer portalContainer) {
        RDBMSMigrationUtils.getExecutorService().submit(new Callable<Void>() {
          @Override
          public Void call() throws Exception {
            ExoContainerContext.setCurrentContainer(PortalContainer.getInstance());
            if (hasQueueMessagesDataToMigrate()) {
              try {
                LOG.info("=== Start migration of Mail messages stored in the queue from JCR");
                long startTime = System.currentTimeMillis();
                migrateQueueMessages();
                long endTime = System.currentTimeMillis();
                LOG.info("=== Migration of Mail messages data done in " + (endTime - startTime) + " ms");
              } catch (Exception e) {
                LOG.error("Error while migrating Mail messages data from JCR to RDBMS - Cause : " + e.getMessage(), e);
              }
            } else {
              LOG.info("No queue messages data to migrate from JCR to RDBMS");
            }
            cleanupQueue();
            return null;
          }
        });
      }
    });
  }

  public void cleanupMailNotifications() {
    //migration of mail notifications data from JCR to RDBMS is done as a background task
    RDBMSMigrationUtils.getExecutorService().submit(new Callable<Void>() {
      @Override
      public Void call() throws Exception {
        RequestLifeCycle.begin(PortalContainer.getInstance());
        if (isMailNotifMigrationDone() && !isMailNotifCleanupDone()) {
          // pause job of sending digest mails
          schedulerService.pauseJob("NotificationDailyJob", "Notification");
          schedulerService.pauseJob("NotificationWeeklyJob", "Notification");
          try {
              LOG.info("=== Start cleaning Mail notifications data from JCR");
              long startTime = System.currentTimeMillis();
              deleteJcrMailNotifications();
              setMailNotifCleanupDone();
              long endTime = System.currentTimeMillis();
              LOG.info("=== Mail notifications JCR data cleaning due to RDBMS migration done in " + (endTime - startTime) + " ms");
          } catch (Exception e) {
            LOG.error("Error while cleaning Mail notifications JCR data", e);
          } finally {
            RequestLifeCycle.end();
          }
          try {
            LOG.info("=== Start cleaning Mail messages data from JCR");
            long startTime = System.currentTimeMillis();
            deleteJcrMailMessages();
            long endTime = System.currentTimeMillis();
            LOG.info("=== Mail messages JCR data cleaning due to RDBMS migration done in " + (endTime - startTime) + " ms");

          } catch (Exception e) {
            LOG.error("Error while cleaning Mail messages JCR data to RDBMS - Cause : " + e.getMessage(), e);
          }
        }
        return null;
      }
    });
  }

  public void cleanupQueue() {
    //migration of mail notifications data from JCR to RDBMS is done as a background task
    RDBMSMigrationUtils.getExecutorService().submit(new Callable<Void>() {
      @Override
      public Void call() throws Exception {
        RequestLifeCycle.begin(PortalContainer.getInstance());
        if (isMailNotifMigrationDone() && !isMailNotifCleanupDone()) {
          // pause job of sending digest mails
          schedulerService.pauseJob("NotificationDailyJob", "Notification");
          schedulerService.pauseJob("NotificationWeeklyJob", "Notification");
          try {
            LOG.info("=== Start cleaning Mail messages data from JCR");
            long startTime = System.currentTimeMillis();
            deleteJcrMailMessages();
            long endTime = System.currentTimeMillis();
            LOG.info("=== Mail messages JCR data cleaning due to RDBMS migration done in " + (endTime - startTime) + " ms");

          } catch (Exception e) {
            LOG.error("Error while cleaning Mail messages JCR data to RDBMS - Cause : " + e.getMessage(), e);
          }
        }
        return null;
      }
    });
  }

  private void setMailNotifMigrationDone() {
    settingService.set(Context.GLOBAL, Scope.APPLICATION.id(MAIL_NOTIFICATION_MIGRATION_DONE_KEY), MAIL_NOTIFICATION_RDBMS_MIGRATION_DONE, SettingValue.create("true"));
  }

  private void setMailNotifCleanupDone() {
    settingService.set(Context.GLOBAL, Scope.APPLICATION.id(MAIL_NOTIFICATION_MIGRATION_DONE_KEY), MAIL_NOTIFICATION_RDBMS_CLEANUP_DONE, SettingValue.create("true"));
  }

  private boolean isMailNotifMigrationDone() {
    SettingValue<?> setting = settingService.get(Context.GLOBAL, Scope.APPLICATION.id(MAIL_NOTIFICATION_MIGRATION_DONE_KEY), MAIL_NOTIFICATION_RDBMS_MIGRATION_DONE);
    return (setting != null && setting.getValue().equals("true"));
  }

  private boolean isMailNotifCleanupDone() {
    SettingValue<?> setting = settingService.get(Context.GLOBAL, Scope.APPLICATION.id(MAIL_NOTIFICATION_MIGRATION_DONE_KEY), MAIL_NOTIFICATION_RDBMS_CLEANUP_DONE);
    return (setting != null && setting.getValue().equals("true"));
  }

  private void deleteJcrMailMessages() throws RepositoryException {
    Node parentMsgHome = getNode("eXoNotification", "messageInfoHome");
    if (parentMsgHome != null) {
      parentMsgHome.remove();
      session.save();
    }
  }

  private void deleteJcrMailNotifications() throws RepositoryException {
    ExoContainer currentContainer = ExoContainerContext.getCurrentContainer();

    NodeIterator pluginNodesIterator = getMailNotificationNodes();
    if(pluginNodesIterator != null) {
      while (pluginNodesIterator.hasNext()) {
        Node pluginNode = pluginNodesIterator.nextNode();
        NodeIterator dayNodesIterator = pluginNode.getNodes();
        while (dayNodesIterator.hasNext()) {
          Node dayNode = dayNodesIterator.nextNode();
          NodeIterator notifNodes = dayNode.getNodes();
          LOG.info("    Removing JCR mail notifications for plugin: " + pluginNode.getName() + " - day: " + dayNode.getName());
          int i = 0;
          while (notifNodes.hasNext()) {
            i++;
            notifNodes.nextNode().remove();
            if (i % 100 == 0) {
              session.save();
              RequestLifeCycle.end();
              RequestLifeCycle.begin(currentContainer);
            }
          }
          if (i > 0) {
            session.save();
            LOG.info("=== done removed " + i + " mail notifications from JCR for plugin: " + pluginNode.getName());
          }
        }
        pluginNode.remove();
        session.save();
      }
    }
    Node parentMsgHome = getNode("eXoNotification", "messageHome");
    if (parentMsgHome != null) {
      parentMsgHome.remove();
      session.save();
    }
  }

  private void migrateQueueMessages() throws Exception {
    NodeIterator iterator = getMessageInfoNodes();
    while (iterator.hasNext()) {
      Node node = iterator.nextNode();
      jpaQueueMessage.put(getMessageInfo(node));
    }
  }

  private MessageInfo getMessageInfo(Node node) {
    String path = null;
    try {
      path = node.getPath();

      String messageJson = getDataJson(node);
      JSONObject object = new JSONObject(messageJson);
      MessageInfo info = new MessageInfo();
      info.pluginId(object.optString("pluginId"))
          .from(object.getString("from"))
          .to(object.getString("to"))
          .subject(object.getString("subject"))
          .body(object.getString("body"))
          .footer(object.optString("footer"))
          .setCreatedTime(object.getLong("createdTime"));
      //
      return info;
    } catch (Exception e) {
      LOG.warn("Failed to map message from node " + path + "", e);
    }
    return null;
  }

  private String getDataJson(Node node) throws Exception {
    Node fileNode = node.getNode("datajson");
    Node nodeContent = fileNode.getNode("jcr:content");
    InputStream stream = nodeContent.getProperty("jcr:data").getStream();
    return StringCommonUtils.decompress(stream);
  }

  private NodeIterator getMessageInfoNodes() {
    return getSubNodes("eXoNotification", "messageInfoHome");
  }

  private NodeIterator getMailNotificationNodes() {
    return getSubNodes("eXoNotification", "messageHome");
  }

  private NodeIterator getSubNodes(String jcrPathAlias, String relPath) {
    Node parentNode = getNode(jcrPathAlias, relPath);
    if (parentNode != null) {
      try {
        return parentNode.getNodes();
      } catch (RepositoryException e) {
        LOG.error("Error while getting sub nodes with path '" + relPath + "' from node with alias'" + jcrPathAlias + "'", e);
      }
    }
    return null;
  }

  private Node getNode(String jcrPathAlias, String relPath) {
    Node parentNode = null;
    try {
      String eXoNotificationJCRPath = nodeHierarchyCreator.getJcrPath(jcrPathAlias);
      if (StringUtils.isNotBlank(eXoNotificationJCRPath) && session.itemExists(eXoNotificationJCRPath)) {
        Node msgInfoHome = (Node) session.getItem(eXoNotificationJCRPath);
        if (msgInfoHome.hasNode(relPath)) {
          parentNode = msgInfoHome.getNode(relPath);
        }
      }
    } catch (Exception e) {
      LOG.error("Error while getting Path '" + relPath + "' from node with alias'" + jcrPathAlias + "'", e);
    }
    return parentNode;
  }

  private void migrateMailNotifData() throws RepositoryException, RepositoryConfigurationException {
    NodeIterator pluginNodesIterator = getMailNotificationNodes();
    if(pluginNodesIterator != null) {
      while (pluginNodesIterator.hasNext()) {
        Node pluginNode = pluginNodesIterator.nextNode();
        NodeIterator dayNodesIterator = pluginNode.getNodes();
        while (dayNodesIterator.hasNext()) {
          Node dayNode = dayNodesIterator.nextNode();
          NodeIterator notifNodes = dayNode.getNodes();
          if (notifNodes.getSize() > 0) {
            LOG.info("    Progression mail notifications migration for plugin: " + pluginNode.getName() + " - day: "
                    + dayNode.getName());
            while (notifNodes.hasNext()) {
              migrateMailNotifNodeToRDBMS(notifNodes.nextNode());
            }
          }
        }
      }
    }
  }

  private void migrateMailNotifNodeToRDBMS(Node node) {
    try {
      jpaMailNotificationStorage.save(jcrNotificationDataStorage.fillModel(node));
    } catch (Exception e) {
      LOG.error(e.getMessage(), e);
    }
  }

  private boolean hasQueueMessagesDataToMigrate() {
    return (getMessageInfoNodes() != null && getMessageInfoNodes().hasNext());
  }
}