View Javadoc
1   /*
2    * Copyright (C) 2003-2015 eXo Platform SAS.
3    *
4    * This program is free software; you can redistribute it and/or
5   * modify it under the terms of the GNU Affero General Public License
6   * as published by the Free Software Foundation; either version 3
7   * of the License, or (at your option) any later version.
8   *
9   * This program is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12  * GNU General Public License for more details.
13  *
14  * You should have received a copy of the GNU General Public License
15  * along with this program; if not, see<http://www.gnu.org/licenses/>.
16   */
17  package org.exoplatform.social.core.jpa.storage;
18  import java.util.ArrayList;
19  import java.util.Arrays;
20  import java.util.Collections;
21  import java.util.Comparator;
22  import java.util.Date;
23  import java.util.HashMap;
24  import java.util.HashSet;
25  import java.util.Iterator;
26  import java.util.LinkedHashMap;
27  import java.util.List;
28  import java.util.Map;
29  import java.util.Set;
30  import java.util.SortedSet;
31  import java.util.TreeSet;
32  import java.util.regex.Matcher;
33  import java.util.regex.Pattern;
34  import java.util.stream.Collectors;
35  
36  import javax.persistence.LockModeType;
37  
38  import org.apache.commons.lang.ArrayUtils;
39  
40  import org.exoplatform.commons.api.persistence.ExoTransactional;
41  import org.exoplatform.commons.persistence.impl.EntityManagerHolder;
42  import org.exoplatform.commons.utils.CommonsUtils;
43  import org.exoplatform.commons.utils.PropertyManager;
44  import org.exoplatform.services.log.ExoLogger;
45  import org.exoplatform.services.log.Log;
46  import org.exoplatform.social.core.ActivityProcessor;
47  import org.exoplatform.social.core.BaseActivityProcessorPlugin;
48  import org.exoplatform.social.core.activity.filter.ActivityFilter;
49  import org.exoplatform.social.core.activity.filter.ActivityUpdateFilter;
50  import org.exoplatform.social.core.activity.model.ActivityStream;
51  import org.exoplatform.social.core.activity.model.ActivityStreamImpl;
52  import org.exoplatform.social.core.activity.model.ExoSocialActivity;
53  import org.exoplatform.social.core.activity.model.ExoSocialActivityImpl;
54  import org.exoplatform.social.core.identity.model.Identity;
55  import org.exoplatform.social.core.identity.provider.OrganizationIdentityProvider;
56  import org.exoplatform.social.core.identity.provider.SpaceIdentityProvider;
57  import org.exoplatform.social.core.jpa.storage.dao.ActivityDAO;
58  import org.exoplatform.social.core.jpa.storage.dao.ConnectionDAO;
59  import org.exoplatform.social.core.jpa.storage.dao.StreamItemDAO;
60  import org.exoplatform.social.core.jpa.storage.entity.ActivityEntity;
61  import org.exoplatform.social.core.jpa.storage.entity.StreamItemEntity;
62  import org.exoplatform.social.core.jpa.storage.entity.StreamType;
63  import org.exoplatform.social.core.storage.ActivityStorageException;
64  import org.exoplatform.social.core.storage.ActivityStorageException.Type;
65  import org.exoplatform.social.core.storage.api.ActivityStorage;
66  import org.exoplatform.social.core.storage.api.IdentityStorage;
67  import org.exoplatform.social.core.storage.api.RelationshipStorage;
68  import org.exoplatform.social.core.storage.api.SpaceStorage;
69  import org.exoplatform.social.core.storage.impl.ActivityBuilderWhere;
70  
71  public class RDBMSActivityStorageImpl implements ActivityStorage {
72  
73    private static final Log LOG = ExoLogger.getLogger(RDBMSActivityStorageImpl.class);
74    private final ActivityDAO activityDAO;
75    private IdentityStorage identityStorage;
76    private final SpaceStorage spaceStorage;
77    private final SortedSet<ActivityProcessor> activityProcessors;
78    private static final Pattern MENTION_PATTERN = Pattern.compile("@([^\\s]+)|@([^\\s]+)$");
79    public final static String COMMENT_PREFIX = "comment";
80    private ActivityStorage activityStorage;
81  
82    public RDBMSActivityStorageImpl(RelationshipStorage relationshipStorage, 
83                                        IdentityStorage identityStorage, 
84                                        SpaceStorage spaceStorage,
85                                        ActivityDAO activityDAO,
86                                        ConnectionDAO connectionDAO,
87                                        StreamItemDAO streamItemDAO) {
88      this.identityStorage = identityStorage;
89      this.activityProcessors = new TreeSet<ActivityProcessor>(processorComparator());
90      this.activityDAO = activityDAO;
91      this.spaceStorage = spaceStorage;
92    }
93    
94    private static Comparator<ActivityProcessor> processorComparator() {
95      return new Comparator<ActivityProcessor>() {
96  
97        public int compare(ActivityProcessor p1, ActivityProcessor p2) {
98          if (p1 == null || p2 == null) {
99            throw new IllegalArgumentException("Cannot compare null ActivityProcessor");
100         }
101         return p1.getPriority() - p2.getPriority();
102       }
103     };
104   }
105 
106   private ExoSocialActivity fillActivityFromEntity(ActivityEntity activityEntity, ExoSocialActivity activity) {
107     if (activity == null) {
108       activity = new ExoSocialActivityImpl(activityEntity.getPosterId(), activityEntity.getType(),
109               activityEntity.getTitle(), activityEntity.getBody(), false);
110     } else {
111       activity.setPosterId(activityEntity.getPosterId());
112       activity.setType(activityEntity.getType());
113       activity.setTitle(activity.getTitle());
114       activity.setBody(activity.getBody());
115       activity.isComment(false);
116     }
117 
118     activity.setId(String.valueOf(activityEntity.getId()));
119     activity.setLikeIdentityIds(activityEntity.getLikerIds().toArray(new String[]{}));
120     activity.setTemplateParams(activityEntity.getTemplateParams() != null ? new LinkedHashMap<String, String>(activityEntity.getTemplateParams())
121             : new HashMap<String, String>());
122 
123     String ownerIdentityId = activityEntity.getOwnerId();
124     ActivityStream stream = new ActivityStreamImpl();
125     Identity owner = identityStorage.findIdentityById(ownerIdentityId);
126     if(owner != null) {
127       stream.setType(owner.getProviderId());
128       stream.setPrettyId(owner.getRemoteId());
129       stream.setId(owner.getId());
130       activity.setStreamOwner(owner.getRemoteId());
131     } else {
132       LOG.warn("Cannot find stream of activity " + activityEntity.getId() + " since identity " + ownerIdentityId + " does not exist");
133     }
134     //
135     activity.setActivityStream(stream);
136     activity.setPosterId(activityEntity.getPosterId());
137     //
138     activity.isLocked(activityEntity.getLocked());
139     activity.isHidden(activityEntity.getHidden());
140     activity.setTitleId(activityEntity.getTitleId());
141     activity.setPostedTime(activityEntity.getPosted() != null ? activityEntity.getPosted().getTime() : 0);
142     activity.setUpdated(activityEntity.getUpdatedDate().getTime());
143     //
144     List<String> commentPosterIds = new ArrayList<String>();
145     List<String> replyToIds = new ArrayList<String>();
146     List<ActivityEntity> comments = activityEntity.getComments() != null ? activityEntity.getComments() : new ArrayList<ActivityEntity>();
147     fillCommentsIdsAndPosters(comments, commentPosterIds, replyToIds, false);
148     activity.setCommentedIds(commentPosterIds.toArray(new String[commentPosterIds.size()]));
149     activity.setReplyToId(replyToIds.toArray(new String[replyToIds.size()]));
150     activity.setMentionedIds(activityEntity.getMentionerIds().toArray(new String[activityEntity.getMentionerIds().size()]));
151 
152     return activity;
153   }
154 
155   private void fillCommentsIdsAndPosters(List<ActivityEntity> comments,
156                                          List<String> commentPosterIds,
157                                          List<String> replyToIds,
158                                          boolean isSubComment) {
159     if (comments == null || comments.isEmpty()) {
160       return;
161     }
162     List<Long> commentIds = new ArrayList<>();
163     for (ActivityEntity comment : comments) {
164       if (!commentPosterIds.contains(comment.getPosterId())) {
165         commentPosterIds.add(comment.getPosterId());
166       }
167       replyToIds.add(getExoCommentID(comment.getId()));
168       commentIds.add(comment.getId());
169     }
170     if (!isSubComment) {
171       List<ActivityEntity> subComments = activityDAO.findCommentsOfActivities(commentIds);
172       fillCommentsIdsAndPosters(subComments, commentPosterIds, replyToIds, true);
173     }
174   }
175 
176   private ExoSocialActivity convertActivityEntityToActivity(ActivityEntity activityEntity) {
177     if(activityEntity == null) return null;
178     //
179     ExoSocialActivity activity = fillActivityFromEntity(activityEntity, null);
180     //
181     
182     //
183     processActivity(activity);
184     
185     return activity;
186   }
187 
188   private ActivityEntity convertActivityToActivityEntity(ExoSocialActivity activity, String ownerId) {
189     ActivityEntity activityEntity  =  new ActivityEntity();
190     if (activity.getId() != null) {
191       activityEntity = activityDAO.find(Long.valueOf(activity.getId()));
192     }
193     activityEntity.setTitle(activity.getTitle());
194     activityEntity.setTitleId(activity.getTitleId());
195     activityEntity.setType(activity.getType());
196     activityEntity.setBody(activity.getBody());
197     if (ownerId != null) {
198       activityEntity.setPosterId(activity.getUserId() != null ? activity.getUserId() : ownerId);
199     }
200     if(activity.getLikeIdentityIds() != null) {
201       activityEntity.setLikerIds(new HashSet<String>(Arrays.asList(activity.getLikeIdentityIds())));
202     }
203     Map<String, String> params = activity.getTemplateParams();
204     if (params != null) {
205       activityEntity.setTemplateParams(params);
206     }
207 
208     //
209     if (activity.getPostedTime() == null || activity.getPostedTime() <= 0) {
210       activity.setPostedTime(System.currentTimeMillis());
211     }
212     activityEntity.setPosted(new Date(activity.getPostedTime()));
213     activityEntity.setLocked(activity.isLocked());
214     activityEntity.setHidden(activity.isHidden());
215     activityEntity.setUpdatedDate(activity.getUpdated());
216     activityEntity.setMentionerIds(new HashSet<String>(Arrays.asList(processMentions(activity.getTitle(), activity.getTemplateParams()))));
217     //
218     return activityEntity;
219   }
220   
221   private ExoSocialActivity convertCommentEntityToComment(ActivityEntity comment) {
222     ExoSocialActivity exoComment = new ExoSocialActivityImpl(comment.getPosterId(), null,
223         comment.getTitle(), comment.getBody(), false);
224     exoComment.setId(getExoCommentID(comment.getId()));
225     exoComment.setTitle(comment.getTitle());
226     exoComment.setType(comment.getType());
227     exoComment.setTitleId(comment.getTitleId());
228     exoComment.setBody(comment.getBody());
229     exoComment.setTemplateParams(comment.getTemplateParams() != null ? new LinkedHashMap<String, String>(comment.getTemplateParams())
230                                                                     : new HashMap<String, String>());
231     exoComment.setPosterId(comment.getPosterId());
232     exoComment.isComment(true);
233     //
234     exoComment.isLocked(comment.getLocked() != null ? comment.getLocked().booleanValue() : false);
235     exoComment.isHidden(comment.getHidden() != null ? comment.getHidden().booleanValue() : false);
236     exoComment.setUpdated(comment.getUpdatedDate() != null ? comment.getUpdatedDate().getTime() : null);
237     //
238     ActivityEntity parentActivity = getTopParentActivity(comment);
239     exoComment.setParentId(parentActivity == null ? null : String.valueOf(parentActivity.getId()));
240     exoComment.setParentCommentId(isSubComment(comment) ? getExoCommentID(comment.getParent().getId()) : null);
241     //
242     exoComment.setPostedTime(comment.getPosted() != null ? comment.getPosted().getTime() : 0);
243     exoComment.setUpdated(comment.getUpdatedDate() != null ? comment.getUpdatedDate().getTime() : null);
244     //
245     Set<String> mentioned = comment.getMentionerIds();
246     if (mentioned != null && !mentioned.isEmpty()) {
247       exoComment.setMentionedIds(comment.getMentionerIds().toArray(new String[mentioned.size()]));
248     }
249 
250     //
251     Set<String> likers = comment.getLikerIds();
252     if (likers != null && !likers.isEmpty() ) {
253       exoComment.setLikeIdentityIds(comment.getLikerIds().toArray(new String[likers.size()]));
254     }
255     processActivity(exoComment);
256     
257     return exoComment;
258   }
259 
260   private ActivityEntity getTopParentActivity(ActivityEntity activity) {
261     ActivityEntity parent = activity.getParent();
262     if (parent == null || (parent.getId() == activity.getId())) {
263       return activity;
264     } else {
265       return getTopParentActivity(parent);
266     }
267   }
268 
269   private boolean isSubComment(ActivityEntity comment) {
270     return comment.getParent() != null && comment.getParent().isComment();
271   }
272 
273   private List<ExoSocialActivity> convertCommentEntitiesToComments(List<ActivityEntity> comments) {
274     return convertCommentEntitiesToComments(comments, false);
275   }
276 
277   private List<ExoSocialActivity> convertCommentEntitiesToComments(List<ActivityEntity> comments, boolean loadSubComments) {
278     if (comments == null || comments.isEmpty()) return Collections.emptyList();
279     if(loadSubComments) {
280       // Add subComments to the list
281       List<Long> ids = new ArrayList<>();
282       for (ActivityEntity activityEntity : comments) {
283         ids.add(activityEntity.getId());
284       }
285       List<ActivityEntity> subComments = activityDAO.findCommentsOfActivities(ids);
286 
287       if(subComments != null && !subComments.isEmpty()) {
288         comments.addAll(subComments);
289       }
290 
291       // Sort sub comments just after the comment
292       Collections.sort(comments, new CommentComparator());
293     }
294 
295     return comments.stream().map(comment -> convertCommentEntityToComment(comment)).collect(Collectors.toList());
296   }
297   
298   private ActivityEntity convertCommentToCommentEntity(ActivityEntity activityEntity, ExoSocialActivity comment) {
299     ActivityEntity commentEntity = new ActivityEntity();
300     if (comment.getId() != null) {
301       commentEntity = activityDAO.find(getCommentID(comment.getId()));
302     }
303     if (comment.getParentCommentId() != null) {
304       ActivityEntity parentCommentEntity = activityDAO.find(getCommentID(comment.getParentCommentId()));
305       parentCommentEntity.addComment(commentEntity);
306     } else {
307       activityEntity.addComment(commentEntity);
308     }
309     commentEntity.setComment(true);
310     commentEntity.setTitle(comment.getTitle());
311     commentEntity.setTitleId(comment.getTitleId());
312     commentEntity.setType(comment.getType());
313     commentEntity.setBody(comment.getBody());
314     commentEntity.setPosterId(comment.getPosterId() != null ? comment.getPosterId() : comment.getUserId());
315     if (comment.getTemplateParams() != null) {
316       commentEntity.setTemplateParams(comment.getTemplateParams());
317     }
318     //
319     commentEntity.setLocked(comment.isLocked());
320     commentEntity.setHidden(comment.isHidden());
321     //
322     Date today = new Date();
323     Date commentTime = (comment.getPostedTime() != null ? new Date(comment.getPostedTime()) : today);
324     commentEntity.setPosted(commentTime);
325     //update time may be different from post time
326     Date updateCommentTime = (comment.getUpdated() != null ? comment.getUpdated() : today);
327     commentEntity.setUpdatedDate(updateCommentTime);
328     commentEntity.setMentionerIds(new HashSet<>(Arrays.asList(processMentions(comment.getTitle(), comment.getTemplateParams()))));
329     //
330     return commentEntity;
331   }
332 
333   /**
334    * Help to update mentionIds of activity
335    * Help to update corresponding StreamItem
336    * This method must be called before updating title and template param of activity entity
337    *
338    * activity's mentionIds include :
339    * - mentioned ids processed from its title and template param
340    * - mentioned ids from its comments
341    *
342    * @param activityEntity
343    * @param activity
344    */
345   private void updateActivityMentions(ActivityEntity activityEntity, ExoSocialActivity activity) {
346     Set<String> commentMentions = new HashSet<>();
347     if (activityEntity.getComments() != null) {
348       activityEntity.getComments().forEach(comment -> {
349         String[] mentions = processMentions(comment.getTitle(), comment.getTemplateParams());
350         commentMentions.addAll(Arrays.asList(mentions));
351       });
352     }
353     Set<String> mentionsToRemove = new HashSet<>(Arrays.asList(processMentions(activityEntity.getTitle(), activityEntity.getTemplateParams())));
354     Set<String> mentionToAdd = new HashSet<>(Arrays.asList(processMentions(activity.getTitle(), activity.getTemplateParams())));
355 
356     mentionsToRemove.forEach(mentionedId -> {
357       if (!commentMentions.contains(mentionedId) && !mentionToAdd.contains(mentionedId)) {
358         StreamItemEntity item = new StreamItemEntity(StreamType.MENTIONER);
359         item.setOwnerId(Long.parseLong(mentionedId));
360         activityEntity.removeStreamItem(item);
361       }
362     });
363 
364     mentionToAdd.forEach(mentionedId -> {
365       if (!commentMentions.contains(mentionedId) && !mentionsToRemove.contains(mentionedId)) {
366         mention(null, activityEntity, new String[] {mentionedId});
367       }
368     });
369 
370     mentionToAdd.addAll(commentMentions);
371     activityEntity.setMentionerIds(mentionToAdd);
372   }
373 
374   private List<ExoSocialActivity> convertActivityIdsToActivities(List<Long> activityIds) {
375     if (activityIds == null)
376       return Collections.emptyList();
377     // Use getActivityStorage to benifit from Cached Storage
378     return activityIds.stream().map(activityId -> getActivityStorage().getActivity(String.valueOf(activityId))).collect(Collectors.toList());
379   }
380 
381   private List<ExoSocialActivity> convertActivityEntitiesToActivities(List<ActivityEntity> activities) {
382     if (activities == null)
383       return Collections.emptyList();
384     return activities.stream().map(activity -> convertActivityEntityToActivity(activity)).collect(Collectors.toList());
385   }
386 
387   @Override
388   @ExoTransactional
389   public ExoSocialActivity getActivity(String activityId) throws ActivityStorageException {
390     if (activityId == null || activityId.isEmpty()) {
391       return null;
392     }
393     if (activityId != null && activityId.startsWith(COMMENT_PREFIX)) {
394       return getComment(activityId);
395     }
396     try {
397       ActivityEntity entity = activityDAO.find(Long.valueOf(activityId));
398       return convertActivityEntityToActivity(entity);
399     } catch (Exception e) {
400       if (PropertyManager.isDevelopping()) {
401         throw new ActivityStorageException(Type.FAILED_TO_GET_ACTIVITY, e.getMessage(), e);
402       }
403       return null;
404     }
405   }
406 
407   public ExoSocialActivity getComment(String commentId) throws ActivityStorageException {
408     try {
409       ActivityEntity entity = activityDAO.find(getCommentID(commentId));
410       if (entity != null && entity.isComment()) {
411         return convertCommentEntityToComment(entity);
412       } else {
413         return null;
414       }
415     } catch (Exception e) {
416       if (PropertyManager.isDevelopping()) {
417         throw new ActivityStorageException(Type.FAILED_TO_GET_ACTIVITY, e.getMessage(), e);
418       }
419       return null;
420     }
421   }
422 
423   @Override
424   public List<ExoSocialActivity> getUserActivities(Identity owner) throws ActivityStorageException {
425     return getUserActivities(owner, 0, -1);
426   }
427 
428   @Override
429   public List<ExoSocialActivity> getUserActivities(Identity owner, long offset, long limit) throws ActivityStorageException {
430     return getUserActivitiesForUpgrade(owner, offset, limit);
431   }
432   
433   @Override
434   public List<String> getUserSpacesActivityIds(Identity ownerIdentity, int offset, int limit) {
435     return activityDAO.getUserSpacesActivityIds(ownerIdentity, offset, limit, memberOfSpaceIds(ownerIdentity));
436   }
437   
438   @Override
439   public List<String> getUserIdsActivities(Identity owner, long offset, long limit) throws ActivityStorageException {
440     return activityDAO.getUserIdsActivities(owner, offset, limit);
441   }
442 
443   @Override
444   @ExoTransactional
445   public List<ExoSocialActivity> getUserActivitiesForUpgrade(Identity owner, long offset, long limit) throws ActivityStorageException {
446     return convertActivityIdsToActivities(activityDAO.getUserActivities(owner, offset, limit));
447   }
448 
449   @Override
450   @ExoTransactional
451   public List<ExoSocialActivity> getActivities(Identity owner, Identity viewer, long offset, long limit) throws ActivityStorageException {
452     return convertActivityIdsToActivities(activityDAO.getActivities(owner, viewer, offset, limit));
453   }
454 
455   @Override
456   public List<ExoSocialActivity> getAllActivities(int index, int limit) {
457     return convertActivityEntitiesToActivities(activityDAO.getAllActivities());
458   }
459 
460   @Override
461   @ExoTransactional
462   public void saveComment(ExoSocialActivity activity, ExoSocialActivity eXoComment) throws ActivityStorageException {
463     ActivityEntity activityEntity = activityDAO.find(Long.valueOf(activity.getId()));
464     EntityManagerHolder.get().lock(activityEntity, LockModeType.PESSIMISTIC_WRITE);
465     try {
466       ActivityEntity commentEntity = convertCommentToCommentEntity(activityEntity, eXoComment);
467       commentEntity = activityDAO.create(commentEntity);
468 
469       eXoComment.setId(getExoCommentID(commentEntity.getId()));
470       eXoComment.setPosterId(commentEntity.getPosterId());
471       eXoComment.setParentId(String.valueOf(activity.getId()));
472       eXoComment.isComment(true);
473 
474       Set<String> mentioned = commentEntity.getMentionerIds();
475       if (mentioned != null && !mentioned.isEmpty()) {
476         eXoComment.setMentionedIds(mentioned.toArray(new String[mentioned.size()]));
477       }
478 
479         //
480       Identity commenter = identityStorage.findIdentityById(commentEntity.getPosterId());
481       saveStreamItemForCommenter(commenter, activityEntity);
482 
483       //
484       String[] mentioners = processMentions(eXoComment.getTitle(), eXoComment.getTemplateParams());
485       if (mentioners != null && mentioners.length > 0) {
486         mention(commenter, activityEntity, mentioners);
487         activityEntity.setMentionerIds(processMentionOfComment(activityEntity, commentEntity, activityEntity.getMentionerIds().toArray(new String[activityEntity.getMentionerIds().size()]), mentioners, true));
488       }
489 
490       //
491       processActivityStreamUpdatedTime(activityEntity);
492       activityDAO.update(activityEntity);
493 
494     } finally {
495       if (EntityManagerHolder.get().isOpen() && EntityManagerHolder.get().getLockMode(activityEntity) != null
496           && EntityManagerHolder.get().getLockMode(activityEntity) != LockModeType.NONE) {
497         EntityManagerHolder.get().lock(activityEntity, LockModeType.NONE);
498       }
499     }
500 
501   }
502 
503   /**
504    * Creates the StreamItem for commenter
505    * @param commenter
506    * @param activityEntity
507    */
508   private void saveStreamItemForCommenter(Identity commenter, ActivityEntity activityEntity) {
509     Identity ownerActivity = identityStorage.findIdentityById(activityEntity.getOwnerId());
510     if (! SpaceIdentityProvider.NAME.equals(ownerActivity.getProviderId())) {
511       createStreamItem(StreamType.COMMENTER, activityEntity, Long.parseLong(commenter.getId()));
512     }
513   }
514 
515   private Set<String> processMentionOfComment(ActivityEntity activityEntity, ActivityEntity commentEntity, String[] activityMentioners, String[] commentMentioners, boolean isAdded) {
516     Set<String> mentioners = new HashSet<String>(Arrays.asList(activityMentioners));
517     if (commentMentioners.length == 0) return mentioners;
518     //
519     for (String mentioner : commentMentioners) {
520       if (!mentioners.contains(mentioner) && isAdded) {
521         mentioners.add(mentioner);
522       }
523       if (mentioners.contains(mentioner) && !isAdded) {
524         if (isAllowedToRemove(activityEntity, commentEntity, mentioner)) {
525           mentioners.remove(mentioner);
526           //remove stream item
527           StreamItemEntity item = new StreamItemEntity(StreamType.MENTIONER);
528           item.setOwnerId(Long.parseLong(mentioner));
529           activityEntity.removeStreamItem(item);
530         }
531       }
532     }
533     return mentioners;
534   }
535   
536   private boolean isAllowedToRemove(ActivityEntity activity, ActivityEntity comment, String mentioner) {
537     if (ArrayUtils.contains(processMentions(activity.getTitle(), activity.getTemplateParams()), mentioner)) {
538       return false;
539     }
540     List<ActivityEntity> comments = activity.getComments();
541     comments.remove(comment);
542     for (ActivityEntity cmt : comments) {
543       if (ArrayUtils.contains(processMentions(cmt.getTitle(), cmt.getTemplateParams()), mentioner)) {
544         return false;
545       }
546     }
547     return true;
548   }
549   
550   @Override
551   public ExoSocialActivity saveActivity(Identity owner, ExoSocialActivity activity) throws ActivityStorageException {
552     boolean isNew = (activity.getId() == null);
553     ActivityEntity entity = convertActivityToActivityEntity(activity, owner.getId());
554     //
555     entity.setOwnerId(owner.getId());
556     entity.setProviderId(owner.getProviderId());
557     saveStreamItem(owner, entity);
558     //
559     entity = activityDAO.create(entity);
560     activity.setId(Long.toString(entity.getId()));
561     //
562 
563     if (isNew) {
564       fillActivityFromEntity(entity, activity);
565     }
566     
567     return activity;
568   }
569   /**
570    * Creates the StreamIteam for Space Member
571    * @param spaceOwner
572    * @param activity
573    */
574   private void spaceMembers(Identity spaceOwner, ActivityEntity activity) {
575     createStreamItem(StreamType.SPACE, activity, Long.parseLong(spaceOwner.getId()));
576     createStreamItem(StreamType.SPACE, activity, Long.parseLong(activity.getPosterId()));
577   }
578   
579   private void saveStreamItem(Identity owner, ActivityEntity activity) {
580     //create StreamItem    
581     if (OrganizationIdentityProvider.NAME.equals(owner.getProviderId())) {
582       //poster
583       poster(owner, activity);
584       
585     } else {
586       //for SPACE
587       spaceMembers(owner, activity);
588     }
589     //mention
590     mention(owner, activity, processMentions(activity.getTitle(), activity.getTemplateParams()));
591   }
592   
593   /**
594    * Creates the StreamItem for poster and stream owner
595    * @param owner
596    * @param activity
597    */
598   private void poster(Identity owner, ActivityEntity activity) {
599     createStreamItem(StreamType.POSTER, activity, Long.parseLong(activity.getPosterId()));
600     //User A posts a new activity on user B stream (A connected B)
601     if (!owner.getId().equals(activity.getPosterId())) {
602       createStreamItem(StreamType.POSTER, activity, Long.parseLong(owner.getId()));
603     }
604   }
605   
606   /**
607    * Creates StreamItem for each user who has mentioned on the activity
608    * 
609    * @param owner
610    * @param activity
611    */
612   private void mention(Identity owner, ActivityEntity activity, String [] mentions) {
613     for (String mentioner : mentions) {
614       Identity identity = identityStorage.findIdentityById(mentioner);
615       if(identity != null) {
616         createStreamItem(StreamType.MENTIONER, activity, Long.parseLong(identity.getId()));
617       }
618     }
619   }
620   
621   private void createStreamItem(StreamType streamType, ActivityEntity activity, Long ownerId){
622     StreamItemEntity streamItem = new StreamItemEntity(streamType);
623     streamItem.setOwnerId(ownerId);
624     if (streamType == StreamType.POSTER || streamType == StreamType.SPACE || streamType == StreamType.MENTIONER) {
625       streamItem.setUpdatedDate(activity.getUpdatedDate());
626     } else {
627       streamItem.setUpdatedDate(null);
628     }
629     boolean isExist = false;
630     if (activity.getId() != null) {
631       //TODO need to improve it
632       for (StreamItemEntity item : activity.getStreamItems()) {
633         if (item.getOwnerId().equals(ownerId) && streamType.equals(item.getStreamType())) {
634           isExist = true;
635           break;
636         }
637       }
638     }
639     if (!isExist) {
640       activity.addStreamItem(streamItem);
641     }
642   }
643   
644 
645   /**
646    * Processes Mentioners who has been mentioned via the Activity.
647    * 
648    * @param title
649    */
650   private String[] processMentions(String title, Map<String, String> templateParams) {
651     Set<String> mentions = new HashSet<>();
652     mentions.addAll(parseMention(title));
653 
654     getTemplateParamToProcess(templateParams).forEach(
655             param -> mentions.addAll(parseMention(param)));
656 
657     return mentions.toArray(new String[mentions.size()]);
658   }
659 
660   private Set<String> parseMention(String str) {
661     if (str == null || str.length() == 0) {
662       return Collections.emptySet();
663     }
664 
665     Set<String> mentions = new HashSet<>();
666     Matcher matcher = MENTION_PATTERN.matcher(str);
667     while (matcher.find()) {
668       String remoteId = matcher.group().substring(1);
669       Identity identity = identityStorage.findIdentity(OrganizationIdentityProvider.NAME, remoteId);
670       // if not the right mention then ignore
671       if (identity != null && !mentions.contains(identity.getId())) {
672         mentions.add(identity.getId());
673       }
674     }
675     return mentions;
676   }
677 
678 
679   private List<String> getTemplateParamToProcess(Map<String, String> templateParams){
680     List<String> params = new ArrayList<String>();
681 
682     if(templateParams != null && templateParams.containsKey(BaseActivityProcessorPlugin.TEMPLATE_PARAM_TO_PROCESS)){
683       String[] templateParamKeys = templateParams
684               .get(BaseActivityProcessorPlugin.TEMPLATE_PARAM_TO_PROCESS)
685               .split(BaseActivityProcessorPlugin.TEMPLATE_PARAM_LIST_DELIM);
686       for(String key : templateParamKeys){
687         if(templateParams.containsKey(key)){
688           params.add(templateParams.get(key));
689         }
690       }
691     }
692     return params;
693   }
694   
695   @Override
696   @ExoTransactional
697   public ExoSocialActivity getParentActivity(ExoSocialActivity comment) throws ActivityStorageException {
698     try {
699       Long commentId = getCommentID(comment.getId());
700       return convertActivityEntityToActivity(activityDAO.getParentActivity(commentId));
701     } catch (NumberFormatException e) {
702       LOG.warn("The input ExoSocialActivity is not comment, it is Activity");
703       return null;
704     }
705   }
706 
707   @Override
708   public void deleteActivity(String activityId) throws ActivityStorageException {
709     ActivityEntity a = activityDAO.find(Long.valueOf(activityId));
710     if (a != null) {
711       activityDAO.delete(a);
712     } else {
713       LOG.warn("The activity's " + activityId + " is not found!" );
714     }
715   }
716 
717   @Override
718   @ExoTransactional
719   public void deleteComment(String activityId, String commentId) throws ActivityStorageException {
720     ActivityEntity comment = activityDAO.find(getCommentID(commentId));
721     activityDAO.delete(comment);
722     //
723     ActivityEntity activity = activityDAO.find(Long.valueOf(activityId));
724     activity.getComments().remove(comment);
725     //
726     activity.setMentionerIds(processMentionOfComment(activity, comment, activity.getMentionerIds().toArray(new String[activity.getMentionerIds().size()]), processMentions(comment.getTitle(), comment.getTemplateParams()), false));
727     //
728     if (!hasOtherComment(activity, comment.getPosterId())) {
729       StreamItemEntity item = new StreamItemEntity(StreamType.COMMENTER);
730       item.setOwnerId(Long.parseLong(comment.getPosterId()));
731       activity.removeStreamItem(item);
732     }
733     //
734     activityDAO.update(activity);
735   }
736   
737   private boolean hasOtherComment(ActivityEntity activity, String poster) {
738     for (ActivityEntity comment : activity.getComments()) {
739       if (poster.equals(comment.getPosterId())) {
740         return true;
741       }
742     }
743     return false;
744   }
745 
746   @Override
747   public List<ExoSocialActivity> getActivitiesOfIdentities(List<Identity> connectionList, long offset, long limit) throws ActivityStorageException {
748     return null;
749   }
750 
751   @Override
752   public List<ExoSocialActivity> getActivitiesOfIdentities(List<Identity> connectionList, TimestampType type, long offset, long limit) throws ActivityStorageException {
753     return null;
754   }
755 
756   @Override
757   public int getNumberOfUserActivities(Identity owner) throws ActivityStorageException {
758     return getNumberOfUserActivitiesForUpgrade(owner);
759   }
760 
761   @Override
762   public int getNumberOfUserActivitiesForUpgrade(Identity owner) throws ActivityStorageException {
763     return activityDAO.getNumberOfUserActivities(owner);
764   }
765 
766   @Override
767   public int getNumberOfNewerOnUserActivities(Identity ownerIdentity, ExoSocialActivity baseActivity) {
768     return getNumberOfNewerOnUserActivities(ownerIdentity, baseActivity.getUpdated().getTime());
769   }
770 
771   @Override
772   public List<ExoSocialActivity> getNewerOnUserActivities(Identity ownerIdentity, ExoSocialActivity baseActivity, int limit) {
773     return getNewerUserActivities(ownerIdentity, baseActivity.getUpdated().getTime(), limit);
774   }
775 
776   @Override
777   public int getNumberOfOlderOnUserActivities(Identity ownerIdentity, ExoSocialActivity baseActivity) {
778     return getNumberOfOlderOnUserActivities(ownerIdentity, baseActivity.getUpdated().getTime());
779   }
780 
781   @Override
782   public List<ExoSocialActivity> getOlderOnUserActivities(Identity ownerIdentity, ExoSocialActivity baseActivity, int limit) {
783     return getOlderUserActivities(ownerIdentity, baseActivity.getUpdated().getTime(), limit);
784   }
785 
786   @Override
787   public List<ExoSocialActivity> getActivityFeed(Identity ownerIdentity, int offset, int limit) {
788     return getActivityFeedForUpgrade(ownerIdentity, offset, limit);
789   }
790   
791   @Override
792   public List<String> getActivityIdsFeed(Identity ownerIdentity, int offset, int limit) {
793     return activityDAO.getActivityIdsFeed(ownerIdentity, offset, limit, memberOfSpaceIds(ownerIdentity));
794   }
795 
796   @Override
797   @ExoTransactional
798   public List<ExoSocialActivity> getActivityFeedForUpgrade(Identity ownerIdentity, int offset, int limit) {
799     return convertActivityIdsToActivities(activityDAO.getActivityFeed(ownerIdentity, offset, limit, memberOfSpaceIds(ownerIdentity)));
800   }
801 
802   @Override
803   public int getNumberOfActivitesOnActivityFeed(Identity ownerIdentity) {
804     return getNumberOfActivitesOnActivityFeedForUpgrade(ownerIdentity);
805   }
806 
807   @Override
808   public int getNumberOfActivitesOnActivityFeedForUpgrade(Identity ownerIdentity) {
809     return activityDAO.getNumberOfActivitesOnActivityFeed(ownerIdentity, memberOfSpaceIds(ownerIdentity));
810   }
811 
812   @Override
813   public int getNumberOfNewerOnActivityFeed(Identity ownerIdentity, ExoSocialActivity baseActivity) {
814     return getNumberOfNewerOnActivityFeed(ownerIdentity, baseActivity.getUpdated().getTime());
815   }
816 
817   @Override
818   public List<ExoSocialActivity> getNewerOnActivityFeed(Identity ownerIdentity, ExoSocialActivity baseActivity, int limit) {
819     return getNewerFeedActivities(ownerIdentity, baseActivity.getUpdated().getTime(), limit);
820   }
821 
822   @Override
823   public int getNumberOfOlderOnActivityFeed(Identity ownerIdentity, ExoSocialActivity baseActivity) {
824     return getNumberOfOlderOnActivityFeed(ownerIdentity, baseActivity.getUpdated().getTime());
825   }
826 
827   @Override
828   public List<ExoSocialActivity> getOlderOnActivityFeed(Identity ownerIdentity, ExoSocialActivity baseActivity, int limit) {
829     return getOlderFeedActivities(ownerIdentity, baseActivity.getUpdated().getTime(), limit);
830   }
831 
832   @Override
833   public List<ExoSocialActivity> getActivitiesOfConnections(Identity ownerIdentity, int offset, int limit) {
834     return getActivitiesOfConnectionsForUpgrade(ownerIdentity, offset, limit);
835   }
836   
837   @Override
838   public List<String> getActivityIdsOfConnections(Identity ownerIdentity, int offset, int limit) {
839     return activityDAO.getActivityIdsOfConnections(ownerIdentity, offset, limit);
840   }
841 
842   @Override
843   @ExoTransactional
844   public List<ExoSocialActivity> getActivitiesOfConnectionsForUpgrade(Identity ownerIdentity, int offset, int limit) {
845     return convertActivityIdsToActivities(activityDAO.getActivitiesOfConnections(ownerIdentity, offset, limit));
846   }
847 
848   @Override
849   public int getNumberOfActivitiesOfConnections(Identity ownerIdentity) {
850     return getNumberOfActivitiesOfConnectionsForUpgrade(ownerIdentity);
851   }
852 
853   @Override
854   public int getNumberOfActivitiesOfConnectionsForUpgrade(Identity ownerIdentity) {
855     return activityDAO.getNumberOfActivitiesOfConnections(ownerIdentity);
856   }
857 
858   @Override
859   public List<ExoSocialActivity> getActivitiesOfIdentity(Identity ownerIdentity, long offset, long limit) {
860     return getUserActivities(ownerIdentity, offset, limit);
861   }
862 
863   @Override
864   public int getNumberOfNewerOnActivitiesOfConnections(Identity ownerIdentity, ExoSocialActivity baseActivity) {
865     return getNumberOfNewerOnActivitiesOfConnections(ownerIdentity, baseActivity.getUpdated().getTime());
866   }
867 
868   @Override
869   public List<ExoSocialActivity> getNewerOnActivitiesOfConnections(Identity ownerIdentity, ExoSocialActivity baseActivity, long limit) {
870     return getNewerActivitiesOfConnections(ownerIdentity, baseActivity.getUpdated().getTime(), (int) limit);
871   }
872 
873   @Override
874   public int getNumberOfOlderOnActivitiesOfConnections(Identity ownerIdentity, ExoSocialActivity baseActivity) {
875     return getNumberOfOlderOnActivitiesOfConnections(ownerIdentity, baseActivity.getUpdated().getTime());
876   }
877 
878   @Override
879   public List<ExoSocialActivity> getOlderOnActivitiesOfConnections(Identity ownerIdentity, ExoSocialActivity baseActivity, int limit) {
880     return getOlderActivitiesOfConnections(ownerIdentity, baseActivity.getUpdated().getTime(), limit);
881   }
882 
883   @Override
884   public List<ExoSocialActivity> getUserSpacesActivities(Identity ownerIdentity, int offset, int limit) {
885     return getUserSpacesActivitiesForUpgrade(ownerIdentity, offset, limit);
886   }
887   
888   @Override
889   public List<String> getSpaceActivityIds(Identity spaceIdentity, int offset, int limit) {
890     return activityDAO.getSpaceActivityIds(spaceIdentity, offset, limit);
891   }
892 
893   @Override
894   @ExoTransactional
895   public List<ExoSocialActivity> getUserSpacesActivitiesForUpgrade(Identity ownerIdentity, int offset, int limit) {
896     return convertActivityIdsToActivities(activityDAO.getUserSpacesActivities(ownerIdentity, offset, limit, memberOfSpaceIds(ownerIdentity)));
897   }
898 
899   @Override
900   public int getNumberOfUserSpacesActivities(Identity ownerIdentity) {
901     return getNumberOfUserSpacesActivitiesForUpgrade(ownerIdentity);
902   }
903 
904   @Override
905   public int getNumberOfUserSpacesActivitiesForUpgrade(Identity ownerIdentity) {
906     return activityDAO.getNumberOfUserSpacesActivities(ownerIdentity, memberOfSpaceIds(ownerIdentity));
907   }
908 
909   @Override
910   public int getNumberOfNewerOnUserSpacesActivities(Identity ownerIdentity, ExoSocialActivity baseActivity) {
911     return getNumberOfNewerOnUserSpacesActivities(ownerIdentity, baseActivity.getUpdated().getTime());
912   }
913 
914   @Override
915   public List<ExoSocialActivity> getNewerOnUserSpacesActivities(Identity ownerIdentity, ExoSocialActivity baseActivity, int limit) {
916     return getNewerUserSpacesActivities(ownerIdentity, baseActivity.getUpdated().getTime(), limit);
917   }
918 
919   @Override
920   public int getNumberOfOlderOnUserSpacesActivities(Identity ownerIdentity, ExoSocialActivity baseActivity) {
921     return getNumberOfOlderOnUserSpacesActivities(ownerIdentity, baseActivity.getUpdated().getTime());
922   }
923 
924   @Override
925   public List<ExoSocialActivity> getOlderOnUserSpacesActivities(Identity ownerIdentity, ExoSocialActivity baseActivity, int limit) {
926     return getOlderUserSpacesActivities(ownerIdentity, baseActivity.getUpdated().getTime(), limit);
927   }
928 
929   @Override
930   public List<ExoSocialActivity> getComments(ExoSocialActivity existingActivity, boolean loadSubComments, int offset, int limit) {
931     long activityId = 0;
932     try {
933       activityId = Long.parseLong(existingActivity.getId());
934     } catch (NumberFormatException ex) {
935       activityId = 0;
936     }
937 
938     List<ActivityEntity> comments;
939     if (activityId > 0) {
940       comments = activityDAO.getComments(activityId, offset, limit);
941     } else {
942       comments = null;
943     }
944     
945     return convertCommentEntitiesToComments(comments, loadSubComments);
946   }
947 
948   @Override
949   public int getNumberOfComments(ExoSocialActivity existingActivity) {
950     return (int)activityDAO.getNumberOfComments(Long.valueOf(existingActivity.getId()));
951   }
952 
953   @Override
954   public int getNumberOfNewerComments(ExoSocialActivity existingActivity, ExoSocialActivity baseComment) {
955     return getNewerComments(existingActivity, baseComment, 0).size();
956   }
957 
958   @Override
959   public List<ExoSocialActivity> getNewerComments(ExoSocialActivity existingActivity, ExoSocialActivity baseComment, int limit) {
960     return getNewerComments(existingActivity, baseComment.getPostedTime(), limit);
961   }
962 
963   @Override
964   public int getNumberOfOlderComments(ExoSocialActivity existingActivity, ExoSocialActivity baseComment) {
965     return getOlderComments(existingActivity, baseComment, 0).size();
966   }
967 
968   @Override
969   public List<ExoSocialActivity> getOlderComments(ExoSocialActivity existingActivity, ExoSocialActivity baseComment, int limit) {
970     return getOlderComments(existingActivity, baseComment.getPostedTime(), limit);
971   }
972 
973   @Override
974   @ExoTransactional
975   public List<ExoSocialActivity> getNewerComments(ExoSocialActivity existingActivity, Long sinceTime, int limit) {
976     List<ActivityEntity> comments = activityDAO.getNewerComments(Long.valueOf(existingActivity.getId()), sinceTime > 0 ? new Date(sinceTime) : null, 0, limit);
977     //
978     return convertCommentEntitiesToComments(comments);
979   }
980 
981   @Override
982   @ExoTransactional
983   public List<ExoSocialActivity> getOlderComments(ExoSocialActivity existingActivity, Long sinceTime, int limit) {
984     List<ActivityEntity> comments = activityDAO.getOlderComments(Long.valueOf(existingActivity.getId()), sinceTime > 0 ? new Date(sinceTime) : null, 0, limit);
985     return convertCommentEntitiesToComments(comments);
986   }
987 
988   @Override
989   public int getNumberOfNewerComments(ExoSocialActivity existingActivity, Long sinceTime) {
990     return getNewerComments(existingActivity, sinceTime, 0).size();
991   }
992 
993   @Override
994   public int getNumberOfOlderComments(ExoSocialActivity existingActivity, Long sinceTime) {
995     return getOlderComments(existingActivity, sinceTime, 0).size();
996   }
997 
998   @Override
999   public SortedSet<ActivityProcessor> getActivityProcessors() {
1000     return activityProcessors;
1001   }
1002 
1003   @Override
1004   public void updateActivity(ExoSocialActivity existingActivity) throws ActivityStorageException {
1005     if(existingActivity == null) {
1006       throw new IllegalArgumentException("Activity to update cannot be null");
1007     }
1008     ActivityEntity parentActivity = null;
1009     ActivityEntity updatedActivity = null;
1010     boolean isComment = existingActivity.getId().startsWith(COMMENT_PREFIX);
1011     if (isComment) {
1012       long id = getCommentID(existingActivity.getId());
1013       updatedActivity = activityDAO.find(id);
1014       parentActivity = getTopParentActivity(updatedActivity);
1015     } else {
1016       parentActivity = updatedActivity = activityDAO.find(Long.valueOf(existingActivity.getId()));
1017     }
1018 
1019     if(updatedActivity != null) {
1020       if(isComment) {
1021         // update comment
1022         updatedActivity.setUpdatedDate(new Date());
1023       }
1024       // only raise the activity in stream when activity date updated
1025       if (existingActivity.getUpdated() != null && updatedActivity.getUpdatedDate() != null
1026           && existingActivity.getUpdated().getTime() != updatedActivity.getUpdatedDate().getTime()) {
1027         processActivityStreamUpdatedTime(updatedActivity);
1028       }
1029       //create or remove liker if exist
1030       processLikerActivityInStreams(new HashSet<>(Arrays.asList(existingActivity.getLikeIdentityIds())), new HashSet<>(updatedActivity.getLikerIds()), parentActivity, isComment);
1031       //this method must be called before updating title and template params
1032       updateActivityMentions(updatedActivity, existingActivity);
1033 
1034       if (existingActivity.getTitleId() != null) updatedActivity.setTitleId(existingActivity.getTitleId());
1035       if (existingActivity.getTitle() != null) updatedActivity.setTitle(existingActivity.getTitle());
1036       if (existingActivity.getBody() != null) updatedActivity.setBody(existingActivity.getBody());
1037       if (existingActivity.getUpdated() != null) updatedActivity.setUpdatedDate(existingActivity.getUpdated());
1038       if (existingActivity.getLikeIdentityIds() != null) updatedActivity.setLikerIds(new HashSet<>(Arrays.asList(existingActivity.getLikeIdentityIds())));
1039       if (existingActivity.getPermaLink() != null) updatedActivity.setPermaLink(existingActivity.getPermaLink());
1040       if (existingActivity.getTemplateParams() != null) updatedActivity.setTemplateParams(existingActivity.getTemplateParams());
1041       updatedActivity.setHidden(existingActivity.isHidden());
1042       updatedActivity.setComment(existingActivity.isComment());
1043       updatedActivity.setLocked(existingActivity.isLocked());
1044       processActivity(existingActivity);
1045 
1046       activityDAO.update(updatedActivity);
1047     } else {
1048       throw new ActivityStorageException(Type.FAILED_TO_UPDATE_ACTIVITY, "Cannot find activity with id=" + existingActivity.getId());
1049     }
1050   }
1051 
1052   private void processLikerActivityInStreams(Set<String> newLikerList, Set<String> oldLikerList, ActivityEntity activity, boolean commentLike) {
1053     for (String id : newLikerList) {
1054       if (!oldLikerList.contains(id)) {//new like ==> create stream item
1055         createStreamItem(commentLike ? StreamType.COMMENT_LIKER : StreamType.LIKER, activity, Long.parseLong(id));
1056       } else {
1057         oldLikerList.remove(id);
1058       }
1059     }
1060     if (oldLikerList.size() > 0) {//unlike ==> remove stream item
1061       for (String id : oldLikerList) {
1062         StreamItemEntity item = new StreamItemEntity(StreamType.LIKER);
1063         item.setOwnerId(Long.parseLong(id));
1064         activity.removeStreamItem(item);
1065         item.setStreamType(StreamType.COMMENT_LIKER);
1066         activity.removeStreamItem(item);
1067       }
1068     }
1069   }
1070 
1071   @Override
1072   public int getNumberOfNewerOnActivityFeed(Identity ownerIdentity, Long sinceTime) {
1073     return activityDAO.getNumberOfNewerOnActivityFeed(ownerIdentity, sinceTime, memberOfSpaceIds(ownerIdentity));
1074   }
1075 
1076   @Override
1077   public int getNumberOfNewerOnUserActivities(Identity ownerIdentity, Long sinceTime) {
1078     return activityDAO.getNumberOfNewerOnUserActivities(ownerIdentity, sinceTime);
1079   }
1080 
1081   @Override
1082   public int getNumberOfNewerOnActivitiesOfConnections(Identity ownerIdentity, Long sinceTime) {
1083     return activityDAO.getNumberOfNewerOnActivitiesOfConnections(ownerIdentity, sinceTime);
1084   }
1085 
1086   @Override
1087   public int getNumberOfNewerOnUserSpacesActivities(Identity ownerIdentity, Long sinceTime) {
1088     return activityDAO.getNumberOfNewerOnUserSpacesActivities(ownerIdentity, sinceTime, memberOfSpaceIds(ownerIdentity));
1089   }
1090 
1091   @Override
1092   public List<ExoSocialActivity> getActivitiesOfIdentities(ActivityBuilderWhere where, ActivityFilter filter, long offset, long limit) throws ActivityStorageException {
1093     return null;
1094   }
1095 
1096   @Override
1097   public int getNumberOfSpaceActivities(Identity spaceIdentity) {
1098     return getNumberOfSpaceActivitiesForUpgrade(spaceIdentity);
1099   }
1100 
1101   @Override
1102   public int getNumberOfSpaceActivitiesForUpgrade(Identity spaceIdentity) {
1103     return activityDAO.getNumberOfSpaceActivities(spaceIdentity);
1104   }
1105 
1106   @Override
1107   @ExoTransactional
1108   public List<ExoSocialActivity> getSpaceActivities(Identity spaceIdentity, int offset, int limit) {
1109     return convertActivityIdsToActivities(activityDAO.getSpaceActivities(spaceIdentity, offset, limit));
1110   }
1111 
1112   @Override
1113   @ExoTransactional
1114   public List<ExoSocialActivity> getSpaceActivitiesForUpgrade(Identity spaceIdentity, int offset, int limit) {
1115     return convertActivityIdsToActivities(activityDAO.getSpaceActivities(spaceIdentity, offset, limit));
1116   }
1117 
1118   @Override
1119   public List<ExoSocialActivity> getActivitiesByPoster(Identity posterIdentity, int offset, int limit) {
1120     return getActivitiesByPoster(posterIdentity, offset, limit, new String[]{});
1121   }
1122 
1123   @Override
1124   public List<ExoSocialActivity> getActivitiesByPoster(Identity posterIdentity, int offset, int limit, String... activityTypes) {
1125     return convertActivityIdsToActivities(activityDAO.getActivitiesByPoster(posterIdentity, offset, limit, activityTypes));
1126   }
1127 
1128   @Override
1129   public int getNumberOfActivitiesByPoster(Identity posterIdentity) {
1130     return activityDAO.getNumberOfActivitiesByPoster(posterIdentity, new String[]{});
1131   }
1132 
1133   @Override
1134   public int getNumberOfActivitiesByPoster(Identity ownerIdentity, Identity viewerIdentity) {
1135     return 0;
1136   }
1137 
1138   @Override
1139   public List<ExoSocialActivity> getNewerOnSpaceActivities(Identity spaceIdentity, ExoSocialActivity baseActivity, int limit) {
1140     return getNewerSpaceActivities(spaceIdentity, baseActivity.getUpdated().getTime(), limit);
1141   }
1142 
1143   @Override
1144   public int getNumberOfNewerOnSpaceActivities(Identity spaceIdentity, ExoSocialActivity baseActivity) {
1145     return getNumberOfNewerOnSpaceActivities(spaceIdentity, baseActivity.getUpdated().getTime());
1146   }
1147 
1148   @Override
1149   public List<ExoSocialActivity> getOlderOnSpaceActivities(Identity spaceIdentity, ExoSocialActivity baseActivity, int limit) {
1150     return getOlderSpaceActivities(spaceIdentity, baseActivity.getUpdated().getTime(), limit);
1151   }
1152 
1153   @Override
1154   public int getNumberOfOlderOnSpaceActivities(Identity spaceIdentity, ExoSocialActivity baseActivity) {
1155     return getNumberOfOlderOnSpaceActivities(spaceIdentity, baseActivity.getUpdated().getTime());
1156   }
1157 
1158   @Override
1159   public int getNumberOfNewerOnSpaceActivities(Identity spaceIdentity, Long sinceTime) {
1160     return activityDAO.getNumberOfNewerOnSpaceActivities(spaceIdentity, sinceTime);
1161   }
1162 
1163   @Override
1164   public int getNumberOfUpdatedOnActivityFeed(Identity owner, ActivityUpdateFilter filter) {
1165     return 0;
1166   }
1167 
1168   @Override
1169   public int getNumberOfUpdatedOnUserActivities(Identity owner, ActivityUpdateFilter filter) {
1170     return 0;
1171   }
1172 
1173   @Override
1174   public int getNumberOfUpdatedOnActivitiesOfConnections(Identity owner, ActivityUpdateFilter filter) {
1175     return 0;
1176   }
1177 
1178   @Override
1179   public int getNumberOfUpdatedOnUserSpacesActivities(Identity owner, ActivityUpdateFilter filter) {
1180     return 0;
1181   }
1182 
1183   @Override
1184   public int getNumberOfUpdatedOnSpaceActivities(Identity owner, ActivityUpdateFilter filter) {
1185     return 0;
1186   }
1187 
1188   @Override
1189   public int getNumberOfMultiUpdated(Identity owner, Map<String, Long> sinceTimes) {
1190     return 0;
1191   }
1192 
1193   @Override
1194   @ExoTransactional
1195   public List<ExoSocialActivity> getNewerFeedActivities(Identity owner, Long sinceTime, int limit) {
1196     return convertActivityIdsToActivities(activityDAO.getNewerOnActivityFeed(owner, sinceTime, limit, memberOfSpaceIds(owner)));
1197   }
1198 
1199   @Override
1200   @ExoTransactional
1201   public List<ExoSocialActivity> getNewerUserActivities(Identity owner, Long sinceTime, int limit) {
1202     return convertActivityIdsToActivities(activityDAO.getNewerOnUserActivities(owner, sinceTime, limit));
1203   }
1204 
1205   @Override
1206   @ExoTransactional
1207   public List<ExoSocialActivity> getNewerUserSpacesActivities(Identity owner, Long sinceTime, int limit) {
1208     return convertActivityIdsToActivities(activityDAO.getNewerOnUserSpacesActivities(owner, sinceTime, limit, memberOfSpaceIds(owner)));
1209   }
1210 
1211   @Override
1212   @ExoTransactional
1213   public List<ExoSocialActivity> getNewerActivitiesOfConnections(Identity owner, Long sinceTime, int limit) {
1214     return convertActivityIdsToActivities(activityDAO.getNewerOnActivitiesOfConnections(owner, sinceTime, limit));
1215   }
1216 
1217   @Override
1218   @ExoTransactional
1219   public List<ExoSocialActivity> getNewerSpaceActivities(Identity owner, Long sinceTime, int limit) {
1220     return convertActivityIdsToActivities(activityDAO.getNewerOnSpaceActivities(owner, sinceTime, limit));
1221   }
1222 
1223   @Override
1224   @ExoTransactional
1225   public List<ExoSocialActivity> getOlderFeedActivities(Identity owner, Long sinceTime, int limit) {
1226     return convertActivityIdsToActivities(activityDAO.getOlderOnActivityFeed(owner, sinceTime, limit, memberOfSpaceIds(owner)));
1227   }
1228 
1229   @Override
1230   @ExoTransactional
1231   public List<ExoSocialActivity> getOlderUserActivities(Identity owner, Long sinceTime, int limit) {
1232     return convertActivityIdsToActivities(activityDAO.getOlderOnUserActivities(owner, sinceTime, limit));
1233   }
1234 
1235   @Override
1236   @ExoTransactional
1237   public List<ExoSocialActivity> getOlderUserSpacesActivities(Identity owner, Long sinceTime, int limit) {
1238     return convertActivityIdsToActivities(activityDAO.getOlderOnUserSpacesActivities(owner, sinceTime, limit, memberOfSpaceIds(owner)));
1239   }
1240 
1241   @Override
1242   @ExoTransactional
1243   public List<ExoSocialActivity> getOlderActivitiesOfConnections(Identity owner, Long sinceTime, int limit) {
1244     return convertActivityIdsToActivities(activityDAO.getOlderOnActivitiesOfConnections(owner, sinceTime, limit));
1245   }
1246 
1247   @Override
1248   @ExoTransactional
1249   public List<ExoSocialActivity> getOlderSpaceActivities(Identity owner, Long sinceTime, int limit) {
1250     return convertActivityIdsToActivities(activityDAO.getOlderOnSpaceActivities(owner, sinceTime, limit));
1251   }
1252 
1253   @Override
1254   public int getNumberOfOlderOnActivityFeed(Identity ownerIdentity, Long sinceTime) {
1255     return activityDAO.getNumberOfOlderOnActivityFeed(ownerIdentity, sinceTime, memberOfSpaceIds(ownerIdentity));
1256   }
1257 
1258   @Override
1259   public int getNumberOfOlderOnUserActivities(Identity ownerIdentity, Long sinceTime) {
1260     return activityDAO.getNumberOfOlderOnUserActivities(ownerIdentity, sinceTime);
1261   }
1262 
1263   @Override
1264   public int getNumberOfOlderOnActivitiesOfConnections(Identity ownerIdentity, Long sinceTime) {
1265     return activityDAO.getNumberOfOlderOnActivitiesOfConnections(ownerIdentity, sinceTime);
1266   }
1267 
1268   @Override
1269   public int getNumberOfOlderOnUserSpacesActivities(Identity ownerIdentity, Long sinceTime) {
1270     return activityDAO.getNumberOfOlderOnUserSpacesActivities(ownerIdentity, sinceTime, memberOfSpaceIds(ownerIdentity));
1271   }
1272 
1273   @Override
1274   public int getNumberOfOlderOnSpaceActivities(Identity ownerIdentity, Long sinceTime) {
1275     return activityDAO.getNumberOfOlderOnSpaceActivities(ownerIdentity, sinceTime);
1276   }
1277 
1278   @Override
1279   public List<ExoSocialActivity> getSubComments(ExoSocialActivity comment) {
1280     long commentId = getCommentID(comment.getId());
1281     List<ActivityEntity> subComments = activityDAO.getComments(commentId, 0, -1);
1282     return convertCommentEntitiesToComments(subComments, false);
1283   }
1284 
1285   private Long getCommentID(String commentId) {
1286     return (commentId == null || commentId.trim().isEmpty()) ? null : Long.valueOf(commentId.replace(COMMENT_PREFIX, ""));
1287   }
1288 
1289   private String getExoCommentID(Long commentId) {
1290     return String.valueOf(COMMENT_PREFIX + commentId);
1291   }
1292 
1293   private void processActivity(ExoSocialActivity existingActivity) {
1294     Iterator<ActivityProcessor> it = activityProcessors.iterator();
1295     while (it.hasNext()) {
1296       try {
1297         it.next().processActivity(existingActivity);
1298       } catch (Exception e) {
1299         LOG.debug("activity processing failed ");
1300       }
1301     }
1302   }
1303 
1304   /**
1305    * This method will update StreamItem updatedDate field,
1306    * which is used to sort activies in Activity Stream
1307    * We only update this in 3 cases:
1308    * 1. edit activity message
1309    * 2. edit activity's comment message
1310    * 3. add new comment
1311    * @param activityEntity
1312    * @return
1313    */
1314   private ActivityEntity processActivityStreamUpdatedTime(ActivityEntity activityEntity) {
1315     if (activityEntity.getStreamItems() != null) {
1316       List<StreamItemEntity> items = activityEntity.getStreamItems().stream()
1317               .filter(item -> item.getStreamType() == StreamType.POSTER || item.getStreamType() == StreamType.SPACE)
1318               .collect(Collectors.toList());
1319       if (!items.isEmpty()) {
1320         items.get(0).setUpdatedDate(new Date());
1321       }
1322     }
1323     return activityEntity;
1324   }
1325   
1326   /**
1327    * Gets the list of spaceIds what the given identify is member
1328    * 
1329    * @param ownerIdentity
1330    * @return
1331    */
1332   private List<String> memberOfSpaceIds(Identity ownerIdentity) {
1333     return spaceStorage.getMemberSpaceIds(ownerIdentity.getId(), 0, -1);
1334  
1335    }
1336 
1337 
1338   public void setIdentityStorage(IdentityStorage identityStorage) {
1339     this.identityStorage = identityStorage;
1340   }
1341 
1342   public static final class CommentComparator implements Comparator<ActivityEntity> {
1343     public int compare(ActivityEntity o1, ActivityEntity o2) {
1344       ActivityEntity parent1 = o1.getParent();
1345       ActivityEntity parent2 = o2.getParent();
1346 
1347       boolean isParentActivity1 = parent1 == null || !parent1.isComment();
1348       boolean isParentActivity2 = parent2 == null || !parent2.isComment();
1349 
1350       if (isParentActivity1 && isParentActivity2) {
1351         return o1.getPosted().compareTo(o2.getPosted());
1352       } else if (isParentActivity1) {
1353         return compare(o1, parent2);
1354       } else if (isParentActivity2) {
1355         return compare(parent1, o2);
1356       } else if (parent1.getId() == parent2.getId()) {
1357         return o1.getPosted().compareTo(o2.getPosted());
1358       } else {
1359         return compare(parent1, parent2);
1360       }
1361     }
1362   }
1363 
1364   public ActivityStorage getActivityStorage() {
1365     if (activityStorage == null) {
1366       activityStorage = CommonsUtils.getService(ActivityStorage.class);
1367       // This may happen in Test context
1368       if (activityStorage == null) {
1369         activityStorage = this;
1370       }
1371     }
1372     return activityStorage;
1373   }
1374 
1375   @Override
1376   public List<ExoSocialActivity> getActivities(List<String> activityIdList) {
1377     if (activityIdList == null || activityIdList.isEmpty()) {
1378       return Collections.emptyList();
1379     }
1380     List<Long> activityIds = new ArrayList<>();
1381     for (String activityId : activityIdList) {
1382       if (activityId == null || activityId.isEmpty()) {
1383         continue;
1384       }
1385 
1386       if (activityId != null && activityId.startsWith(COMMENT_PREFIX)) {
1387         activityIds.add(getCommentID(activityId));
1388       } else {
1389         activityIds.add(Long.valueOf(activityId));
1390       }
1391     }
1392     List<ActivityEntity> activityEntities = activityDAO.findActivities(activityIds);
1393     if (activityEntities == null || activityEntities.isEmpty()) {
1394       return Collections.emptyList();
1395     }
1396     List<ExoSocialActivity> activityDTOs = new ArrayList<>();
1397     for (ActivityEntity activityEntity : activityEntities) {
1398       activityDTOs.add(convertActivityEntityToActivity(activityEntity));
1399     }
1400     return activityDTOs;
1401   }
1402 }