StreamInvocationHelper.java

/*
 * Copyright (C) 2003-2013 eXo Platform SAS.
 *
 * This program is free software: you can redistribute it and/or modify
 * it under the terms of the GNU Affero General Public License as published by
 * the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU Affero General Public License for more details.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */
package org.exoplatform.social.core.storage.streams;

import java.util.List;

import org.exoplatform.services.log.ExoLogger;
import org.exoplatform.services.log.Log;
import org.exoplatform.social.common.service.ProcessContext;
import org.exoplatform.social.common.service.SocialServiceContext;
import org.exoplatform.social.common.service.impl.SocialServiceContextImpl;
import org.exoplatform.social.core.activity.model.ExoSocialActivity;
import org.exoplatform.social.core.chromattic.entity.ActivityEntity;
import org.exoplatform.social.core.identity.model.Identity;
import org.exoplatform.social.core.storage.impl.StorageUtils;

public class StreamInvocationHelper {
  
  private static final Log LOG = ExoLogger.getLogger(StreamInvocationHelper.class);
  
  private static SocialServiceContext ctx = SocialServiceContextImpl.getInstance();

  /**
   * Invokes to records the activity to Stream
   * 
   * @param owner
   * @param entity
   * @param mentioners NULL is empty mentioner.
   * @return
   */
  public static ProcessContext save(Identity owner, ActivityEntity entity, String[] mentioners) {
    //
    SocialServiceContext ctx = SocialServiceContextImpl.getInstance();
    StreamProcessContext processCtx = StreamProcessContext.getIntance(StreamProcessContext.NEW_ACTIVITY_RELATIONS_PROCESS, ctx);
    processCtx.identity(owner).activityEntity(entity).posterId(entity.getPosterIdentity().getId()).mentioners(mentioners);
    
    try {
      if (ctx.isAsync()) {
        //
        ctx.getServiceExecutor().async(StreamProcessorFactory.saveStream(), processCtx);
      } else {
        ctx.getServiceExecutor().execute(StreamProcessorFactory.saveStream(), processCtx);
      }
      
    } finally {
      if (ctx.isTraced()) {
        LOG.debug(processCtx.getTraceLog());
      }
      
    }
    
    return processCtx;
  }
  
  /**
   * Invokes to records the activity to Stream
   * 
   * @param owner
   * @param entity
   * @return
   */
  public static ProcessContext savePoster(Identity owner, ActivityEntity entity) {
    //
    SocialServiceContext ctx = SocialServiceContextImpl.getInstance();
    StreamProcessContext processCtx = StreamProcessContext.getIntance(StreamProcessContext.NEW_ACTIVITY_PROCESS, ctx);
    processCtx.identity(owner).activityEntity(entity);
    
    try {
      //beforeAsync(); Why do we need to save here? Can make the problem with ADD_PROPERTY
      ctx.getServiceExecutor().execute(StreamProcessorFactory.savePoster(), processCtx);
    } finally {
      LOG.debug(processCtx.getTraceLog());
    }
    
    return processCtx;
  }
  
  private static boolean beforeAsync() {
    if (ctx.isAsync()) {
      return StorageUtils.persist();
    }
    return false;
  }
  
  public static ProcessContext update(ExoSocialActivity activity, long oldUpdated) {
    //
    StreamProcessContext processCtx = StreamProcessContext.getIntance(StreamProcessContext.UPDATE_ACTIVITY_PROCESS, ctx);
    processCtx.activity(activity).oldLastUpdated(oldUpdated);
    
    try {
      if (ctx.isAsync()) {
        processCtx.getTraceElement().start();
        beforeAsync();
        ctx.getServiceExecutor().async(StreamProcessorFactory.updateStream(), processCtx);
        processCtx.getTraceElement().end();
      } else {
        ctx.getServiceExecutor().execute(StreamProcessorFactory.updateStream(), processCtx);
      }
      
    } finally {
      LOG.debug(processCtx.getTraceLog());
    }
    
    return processCtx;
  }
  
  public static ProcessContext updateHidable(Identity owner, ActivityEntity entity, ExoSocialActivity activity) {
    //
    StreamProcessContext processCtx = StreamProcessContext.getIntance(StreamProcessContext.UPDATE_ACTIVITY_REF, ctx);
    processCtx.activity(activity).mentioners(entity.getMentioners()).identity(owner);
    
    try {
      if (ctx.isAsync()) {
        beforeAsync();
        ctx.getServiceExecutor().async(StreamProcessorFactory.updateHidable(), processCtx);
      } else {
        ctx.getServiceExecutor().execute(StreamProcessorFactory.updateHidable(), processCtx);
      }
      
    } finally {
      if (ctx.isTraced()) {
        LOG.debug(processCtx.getTraceLog());
      }
      
    }
    
    return processCtx;
  }
  
  public static ProcessContext updateCommenter(Identity commenter, ActivityEntity entity, String[] commenters, long oldUpdated) {
    //
    StreamProcessContext processCtx = StreamProcessContext.getIntance(StreamProcessContext.UPDATE_ACTIVITY_COMMENTER_PROCESS, ctx);
    processCtx.identity(commenter).activityEntity(entity).commenters(commenters).oldLastUpdated(oldUpdated);
    
    try {
      //beforeAsync(); this point can make the problem with ADD_PROPERTY exception
      //
      ctx.getServiceExecutor().execute(StreamProcessorFactory.updateCommenter(), processCtx);
    } finally {
      LOG.debug(processCtx.getTraceLog());
    }
    
    return processCtx;
  }
  
  //(parentActivity, mentioners, commenters);
  public static ProcessContext deleteComment(ExoSocialActivity activity, String[] mentioners, String[] commenters) {
    //
    SocialServiceContext ctx = SocialServiceContextImpl.getInstance();
    StreamProcessContext processCtx = StreamProcessContext.getIntance(StreamProcessContext.DELETE_COMMENT_PROCESS, ctx);
    processCtx.activity(activity).mentioners(mentioners).commenters(commenters);
    
    try {
      //beforeAsync(); //this point can make the problem with ADD_PROPERTY exception
      //
      ctx.getServiceExecutor().execute(StreamProcessorFactory.deleteCommentStream(), processCtx);
    } finally {
      LOG.debug(processCtx.getTraceLog());
    }
    
    return processCtx;
  }
  
  
  public static ProcessContext unLike(Identity removedLike, ExoSocialActivity activity) {
    //
    SocialServiceContext ctx = SocialServiceContextImpl.getInstance();
    StreamProcessContext processCtx = StreamProcessContext.getIntance(StreamProcessContext.UNLIKE_ACTIVITY_PROCESS, ctx);
    processCtx.identity(removedLike).activity(activity);
    
    try {
      //beforeAsync();
      ctx.getServiceExecutor().execute(StreamProcessorFactory.unlikeActivity(), processCtx);
    } finally {
      LOG.debug(processCtx.getTraceLog());
    }
    
    return processCtx;
  }
  
  public static ProcessContext like(Identity liker, ExoSocialActivity activity) {
    //
    SocialServiceContext ctx = SocialServiceContextImpl.getInstance();
    StreamProcessContext processCtx = StreamProcessContext.getIntance(StreamProcessContext.LIKE_ACTIVITY_PROCESS, ctx);
    processCtx.identity(liker).activity(activity);
    
    try {
      //beforeAsync(); this point can make the problem with ADD_PROPERTY exception
      //don't use asynchronous because there is problem to get SessionProvider on Ecms side
      //org.exoplatform.services.cms.impl.Utils
      ctx.getServiceExecutor().execute(StreamProcessorFactory.likeActivity(), processCtx);
    } finally {
      LOG.debug(processCtx.getTraceLog());
    }
    
    return processCtx;
  }
  
  public static ProcessContext deleteConnect(Identity sender, Identity receiver) {
    //
    SocialServiceContext ctx = SocialServiceContextImpl.getInstance();
    StreamProcessContext processCtx = StreamProcessContext.getIntance(StreamProcessContext.DELETE_CONNECT_ACTIVITY_PROCESS, ctx);
    processCtx.sender(sender).receiver(receiver);
    
    try {
      //beforeAsync(); //this point can make the problem with ADD_PROPERTY exception
      //
      ctx.getServiceExecutor().execute(StreamProcessorFactory.deleteConnectStream(), processCtx);
    } finally {
      LOG.debug(processCtx.getTraceLog());
    }
    
    return processCtx;
  }
  
  public static ProcessContext connect(Identity sender, Identity receiver) {
    //
    SocialServiceContext ctx = SocialServiceContextImpl.getInstance();
    StreamProcessContext processCtx = StreamProcessContext.getIntance(StreamProcessContext.CONNECT_ACTIVITY_PROCESS, ctx);
    processCtx.sender(sender).receiver(receiver);
    
    try {
      if(ctx.isAsync()) {
        beforeAsync();
        ctx.getServiceExecutor().async(StreamProcessorFactory.connectStream(), processCtx);
      } else {
        ctx.getServiceExecutor().execute(StreamProcessorFactory.connectStream(), processCtx);
      }
      
    } finally {
      LOG.debug(processCtx.getTraceLog());
    }
    
    return processCtx;
  }
  
  public static ProcessContext addSpaceMember(Identity owner, Identity spaceIdentity) {
    //
    SocialServiceContext ctx = SocialServiceContextImpl.getInstance();
    StreamProcessContext processCtx = StreamProcessContext.getIntance(StreamProcessContext.ADD_SPACE_MEMBER_ACTIVITY_PROCESS, ctx);
    processCtx.identity(owner).spaceIdentity(spaceIdentity);
    
    try {
//      if(ctx.isAsync()) {
//        beforeAsync();
//        //
//        ctx.getServiceExecutor().async(StreamProcessorFactory.addSpaceMemberStream(), processCtx);
//      } else {
//        ctx.getServiceExecutor().execute(StreamProcessorFactory.addSpaceMemberStream(), processCtx);
//      }
      ctx.getServiceExecutor().execute(StreamProcessorFactory.addSpaceMemberStream(), processCtx);
      
    } finally {
      LOG.debug(processCtx.getTraceLog());
    }
    
    return processCtx;
  }
  
  public static ProcessContext removeSpaceMember(Identity owner, Identity spaceIdentity) {
    //
    SocialServiceContext ctx = SocialServiceContextImpl.getInstance();
    StreamProcessContext processCtx = StreamProcessContext.getIntance(StreamProcessContext.REMOVE_SPACE_MEMBER_ACTIVITY_PROCESS, ctx);
    processCtx.identity(owner).spaceIdentity(spaceIdentity);
    
    try {
      //beforeAsync(); //this point can make the problem with ADD_PROPERTY exception
      //
      ctx.getServiceExecutor().execute(StreamProcessorFactory.removeSpaceMemberStream(), processCtx);
    } finally {
      LOG.debug(processCtx.getTraceLog());
    }
    
    return processCtx;
  }
  
  public static ProcessContext createFeedActivityRef(Identity owner, List<ExoSocialActivity> list) {
    //
    SocialServiceContext ctx = SocialServiceContextImpl.getInstance();
    StreamProcessContext processCtx = StreamProcessContext.getIntance(StreamProcessContext.LAZY_UPGRADE_STREAM_PROCESS, ctx);
    processCtx.identity(owner).activities(list);
    
    try {
      if(ctx.isAsync()) {
        beforeAsync();
        //
        ctx.getServiceExecutor().async(StreamProcessorFactory.createFeedActivityRef(), processCtx);
      } else {
        //
        ctx.getServiceExecutor().execute(StreamProcessorFactory.createFeedActivityRef(), processCtx);
      }
      
    } finally {
      LOG.debug(processCtx.getTraceLog());
    }
    
    return processCtx;
  }
  
  public static ProcessContext createFeedActivityRefSynchronous(Identity owner, List<ExoSocialActivity> list) {
    //
    SocialServiceContext ctx = SocialServiceContextImpl.getInstance();
    StreamProcessContext processCtx = StreamProcessContext.getIntance(StreamProcessContext.LAZY_UPGRADE_STREAM_PROCESS, ctx);
    processCtx.identity(owner).activities(list);
    
    try {
      ctx.getServiceExecutor().execute(StreamProcessorFactory.createFeedActivityRef(), processCtx);
    } finally {
      LOG.debug(processCtx.getTraceLog());
    }
    
    return processCtx;
  }
  
  public static ProcessContext createConnectionsActivityRef(Identity owner, List<ExoSocialActivity> list) {
    //
    SocialServiceContext ctx = SocialServiceContextImpl.getInstance();
    StreamProcessContext processCtx = StreamProcessContext.getIntance(StreamProcessContext.LAZY_UPGRADE_STREAM_PROCESS, ctx);
    processCtx.identity(owner).activities(list);
    
    try {
      if(ctx.isAsync()) {
        //
        beforeAsync();     
        ctx.getServiceExecutor().async(StreamProcessorFactory.createConnectionsActivityRef(), processCtx);
      } else {
        //
        ctx.getServiceExecutor().execute(StreamProcessorFactory.createConnectionsActivityRef(), processCtx);
      }
      
    } finally {
      LOG.debug(processCtx.getTraceLog());
    }
    
    return processCtx;
  }
  
  public static ProcessContext createConnectionsActivityRefSynchronous(Identity owner, List<ExoSocialActivity> list) {
    //
    SocialServiceContext ctx = SocialServiceContextImpl.getInstance();
    StreamProcessContext processCtx = StreamProcessContext.getIntance(StreamProcessContext.LAZY_UPGRADE_STREAM_PROCESS, ctx);
    processCtx.identity(owner).activities(list);
    
    try {
        ctx.getServiceExecutor().execute(StreamProcessorFactory.createConnectionsActivityRef(), processCtx);
    } finally {
      LOG.debug(processCtx.getTraceLog());
    }
    
    return processCtx;
  }
  
  public static ProcessContext createMySpacesActivityRef(Identity owner, List<ExoSocialActivity> list) {
    //
    SocialServiceContext ctx = SocialServiceContextImpl.getInstance();
    StreamProcessContext processCtx = StreamProcessContext.getIntance(StreamProcessContext.LAZY_UPGRADE_STREAM_PROCESS, ctx);
    processCtx.identity(owner).activities(list);
    
    try {
      if(ctx.isAsync()) {
        //
        beforeAsync();
        ctx.getServiceExecutor().async(StreamProcessorFactory.createMySpacesActivityRef(), processCtx);
      } else {
        //
        ctx.getServiceExecutor().execute(StreamProcessorFactory.createMySpacesActivityRef(), processCtx);
      }
      
    } finally {
      LOG.debug(processCtx.getTraceLog());
    }
    
    return processCtx;
  }
  
  public static ProcessContext createMyActivitiesActivityRef(Identity owner, List<ExoSocialActivity> list) {
    //
    SocialServiceContext ctx = SocialServiceContextImpl.getInstance();
    StreamProcessContext processCtx = StreamProcessContext.getIntance(StreamProcessContext.LAZY_UPGRADE_STREAM_PROCESS, ctx);
    processCtx.identity(owner).activities(list);
    
    try {
      if(ctx.isAsync()) {
        processCtx.getTraceElement().start();
        //
        beforeAsync();
        ctx.getServiceExecutor().async(StreamProcessorFactory.createMyActivitiesActivityRef(), processCtx);
        processCtx.getTraceElement().end();
      } else {
        //
        ctx.getServiceExecutor().execute(StreamProcessorFactory.createMyActivitiesActivityRef(), processCtx);
      }
      
    } finally {
      LOG.debug(processCtx.getTraceLog());
    }
    
    return processCtx;
  }
  
  public static ProcessContext createSpaceActivityRef(Identity owner, List<ExoSocialActivity> list) {
    //
    SocialServiceContext ctx = SocialServiceContextImpl.getInstance();
    StreamProcessContext processCtx = StreamProcessContext.getIntance(StreamProcessContext.LAZY_UPGRADE_STREAM_PROCESS, ctx);
    processCtx.identity(owner).activities(list);
    
    try {
      if(ctx.isAsync()) {
        //
        beforeAsync();
        ctx.getServiceExecutor().async(StreamProcessorFactory.createSpaceActivityRef(), processCtx);
      } else {
        //
        ctx.getServiceExecutor().execute(StreamProcessorFactory.createSpaceActivityRef(), processCtx);
      }
      
    } finally {
      LOG.debug(processCtx.getTraceLog());
    }
    
    return processCtx;
  }
  
  public static ProcessContext loadFeed(Identity owner) {
    //
    SocialServiceContext ctx = SocialServiceContextImpl.getInstance();
    StreamProcessContext processCtx = StreamProcessContext.getIntance(StreamProcessContext.LOAD_ACTIVITIES_STREAM_PROCESS, ctx);
    processCtx.identity(owner);
    
    try {
      processCtx.getTraceElement().start();
      ctx.getServiceExecutor().async(StreamProcessorFactory.loadFeed(), processCtx);
      processCtx.getTraceElement().end();
    } finally {
      LOG.debug(processCtx.getTraceLog());
    }
    
    return processCtx;
  }
  
  public static ProcessContext addMentioners(ExoSocialActivity activity, String[] mentioners) {
    //
    StreamProcessContext processCtx = StreamProcessContext.getIntance(StreamProcessContext.UPDATE_ACTIVITY_MENTIONER_PROCESS, ctx);
    processCtx.activity(activity).mentioners(mentioners);
    
    try {
      processCtx.getTraceElement().start();
      ctx.getServiceExecutor().async(StreamProcessorFactory.addMentioners(), processCtx);
      processCtx.getTraceElement().end();
    } finally {
      LOG.debug(processCtx.getTraceLog());
    }
    
    return processCtx;
  }
}