ActivityStreamStorageImpl.java
/*
* Copyright (C) 2003-2013 eXo Platform SAS.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.exoplatform.social.core.storage.impl;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import javax.jcr.RepositoryException;
import org.apache.commons.lang.ArrayUtils;
import org.chromattic.api.ChromatticException;
import org.chromattic.api.query.Query;
import org.chromattic.api.query.QueryBuilder;
import org.chromattic.api.query.QueryResult;
import org.exoplatform.commons.utils.CommonsUtils;
import org.exoplatform.services.log.ExoLogger;
import org.exoplatform.services.log.Log;
import org.exoplatform.social.common.service.ProcessContext;
import org.exoplatform.social.common.service.utils.ObjectHelper;
import org.exoplatform.social.common.service.utils.TraceElement;
import org.exoplatform.social.core.activity.filter.ActivityFilter;
import org.exoplatform.social.core.activity.model.ExoSocialActivity;
import org.exoplatform.social.core.chromattic.entity.ActivityEntity;
import org.exoplatform.social.core.chromattic.entity.ActivityRef;
import org.exoplatform.social.core.chromattic.entity.ActivityRefListEntity;
import org.exoplatform.social.core.chromattic.entity.HidableEntity;
import org.exoplatform.social.core.chromattic.entity.IdentityEntity;
import org.exoplatform.social.core.chromattic.entity.StreamsEntity;
import org.exoplatform.social.core.chromattic.filter.JCRFilterLiteral;
import org.exoplatform.social.core.chromattic.utils.ActivityRefIterator;
import org.exoplatform.social.core.chromattic.utils.ActivityRefList;
import org.exoplatform.social.core.identity.model.ActiveIdentityFilter;
import org.exoplatform.social.core.identity.model.Identity;
import org.exoplatform.social.core.identity.provider.OrganizationIdentityProvider;
import org.exoplatform.social.core.identity.provider.SpaceIdentityProvider;
import org.exoplatform.social.core.relationship.model.Relationship;
import org.exoplatform.social.core.space.model.Space;
import org.exoplatform.social.core.storage.ActivityStorageException;
import org.exoplatform.social.core.storage.api.ActivityStorage;
import org.exoplatform.social.core.storage.api.ActivityStreamStorage;
import org.exoplatform.social.core.storage.api.IdentityStorage;
import org.exoplatform.social.core.storage.api.RelationshipStorage;
import org.exoplatform.social.core.storage.api.SpaceStorage;
import org.exoplatform.social.core.storage.exception.NodeNotFoundException;
import org.exoplatform.social.core.storage.query.ChromatticNameEncode;
import org.exoplatform.social.core.storage.query.JCRProperties;
import org.exoplatform.social.core.storage.streams.StreamConfig;
import org.exoplatform.social.core.storage.streams.StreamProcessContext;
public class ActivityStreamStorageImpl extends AbstractStorage implements ActivityStreamStorage {
/**
* The identity storage
*/
private final IdentityStorageImpl identityStorage;
/**
* The space storage
*/
private SpaceStorage spaceStorage;
/**
* The relationship storage
*/
private RelationshipStorage relationshipStorage;
/**
* The activity storage
*/
private ActivityStorage activityStorage;
/** Logger */
private static final Log LOG = ExoLogger.getLogger(ActivityStreamStorageImpl.class);
/** */
private Lock activityWriteLock;
/** */
private Lock activityReadLock;
/** */
private static int BATCH = 20;
public ActivityStreamStorageImpl(IdentityStorageImpl identityStorage) {
this.identityStorage = identityStorage;
ReadWriteLock activityLock = new ReentrantReadWriteLock();
activityWriteLock = activityLock.writeLock();
activityReadLock = activityLock.readLock();
}
private ActivityStorage getStorage() {
if (activityStorage == null) {
activityStorage = CommonsUtils.getService(ActivityStorage.class);
}
return activityStorage;
}
private SpaceStorage getSpaceStorage() {
if (spaceStorage == null) {
spaceStorage = CommonsUtils.getService(SpaceStorage.class);
}
return this.spaceStorage;
}
private RelationshipStorage getRelationshipStorage() {
if (relationshipStorage == null) {
relationshipStorage = CommonsUtils.getService(RelationshipStorage.class);
}
return this.relationshipStorage;
}
@Override
public void save(ProcessContext ctx) {
//must call with asynchronous
this.activityWriteLock.lock();
try {
StreamProcessContext streamCtx = ObjectHelper.cast(StreamProcessContext.class, ctx);
Identity owner = streamCtx.getIdentity();
//It has been invoked by Activity Service with the multi-threading.
//so that, gets Entity from JCR, prevent Session.logout exception.
ActivityEntity activityEntity = null;
try {
activityEntity = _findById(ActivityEntity.class, streamCtx.getActivityEntity().getId());
} catch (Exception e) {
activityEntity = streamCtx.getActivityEntity();
}
if (OrganizationIdentityProvider.NAME.equals(owner.getProviderId())) {
Identity poster = CommonsUtils.getService(IdentityStorage.class).findIdentityById(streamCtx.getPosterId());
user(poster, activityEntity);
//mention case
addMentioner(streamCtx.getMentioners(), activityEntity);
} else if (SpaceIdentityProvider.NAME.equals(owner.getProviderId())) {
//records to Space Streams for SpaceIdentity
space(owner, activityEntity);
//mention case
addMentioner(streamCtx.getMentioners(), activityEntity);
}
} catch (Exception e) {
ctx.setException(e);
LOG.warn("Failed to add Activity references.", e);
LOG.debug("Failed to add Activity references.", e);
} finally {
this.activityWriteLock.unlock();
}
}
@Override
public void savePoster(ProcessContext ctx) {
//call synchronous
this.activityWriteLock.lock();
try {
StreamProcessContext streamCtx = ObjectHelper.cast(StreamProcessContext.class, ctx);
Identity owner = streamCtx.getIdentity();
//It has been invoked by Activity Service with the same thread.
//so that, retrieves Entity directly from Stream context, don't spend time to get from JCR => impact performance.
ActivityEntity activityEntity = streamCtx.getActivityEntity();
if (OrganizationIdentityProvider.NAME.equals(owner.getProviderId())) {
//fixed for SOC-4494 post on viewer stream
Identity poster = CommonsUtils.getService(IdentityStorage.class).findIdentityById(activityEntity.getPosterIdentity().getId());
manageRefList(new UpdateContext(owner, null), activityEntity, ActivityRefType.MY_ACTIVITIES);
createOwnerRefs(poster, activityEntity);
} else if (SpaceIdentityProvider.NAME.equals(owner.getProviderId())) {
//
manageRefList(new UpdateContext(owner, null), activityEntity, ActivityRefType.SPACE_STREAM);
//
Identity ownerPosterOnSpace = CommonsUtils.getService(IdentityStorage.class).findIdentityById(activityEntity.getPosterIdentity().getId());
ownerSpaceMembersRefs(ownerPosterOnSpace, activityEntity);
}
} catch (NodeNotFoundException e) {
ctx.setException(e);
LOG.warn("Failed to add Activity references.");
LOG.debug("Failed to add Activity references.", e);
} finally {
this.activityWriteLock.unlock();
}
}
/**
* Making the Activity Reference for the user's connections
*
* @param owner
* @param activityEntity
* @throws NodeNotFoundException
*/
private void user(Identity poster, ActivityEntity activityEntity) throws NodeNotFoundException, RepositoryException {
//
TraceElement trace = TraceElement.getInstance("creating ref-" + poster.getRemoteId());
trace.start();
//
StreamConfig streamConfig = CommonsUtils.getService(StreamConfig.class);
//get multiple user groups separates by comma. Using StringTokenizer to split
String userGroups = streamConfig.getActiveUserGroups();
ActiveIdentityFilter filter = new ActiveIdentityFilter(userGroups);
Set<String> activeGroups = CommonsUtils.getService(IdentityStorage.class).getActiveUsers(filter);
//
int days = streamConfig.getLastLoginAroundDays();
filter = new ActiveIdentityFilter(days);
int i = createRefForActiveUsers(poster, activityEntity, filter, activeGroups);
trace.end();
if (i > 0) {
LOG.info("loop times = " + i + trace.toString());
}
}
/**
* Creates the activity ref for active users
*
* @param owner
* @param activityEntity
* @param filer the days filter
* @param activeGroups
* @return
* @throws NodeNotFoundException
*/
private int createRefForActiveUsers(Identity owner,
ActivityEntity activityEntity,
ActiveIdentityFilter filer, Set<String> activeGroups) throws NodeNotFoundException, RepositoryException {
Set<String> activeUsers = CommonsUtils.getService(IdentityStorage.class).getActiveUsers(filer);
//just for testing
/**
List<Identity> relationships = getRelationshipStorage().getConnections(owner, 0, 500);
Set<String> activeUsers = new HashSet<String>();
for(Identity identity : relationships) {
activeUsers.add(identity.getRemoteId());
}*/
if (activeUsers == null) {
activeUsers = new HashSet<String>();
}
//eliminate duplicate "active" user between N days list and group list
if (activeGroups.size() > 0) {
activeUsers.addAll(activeGroups);
}
int i = activeUsers.size() > 0 ? createRefWithActiveUser(owner, activityEntity, activeUsers) : createRefWithoutActiveUser(owner, activityEntity);
return i;
}
/**
* Creates the activity ref for connections in the case.
* Don't found any active users around days.
*
* @param owner
* @param activityEntity
* @return
* @throws NodeNotFoundException
*/
private int createRefWithoutActiveUser(Identity owner,
ActivityEntity activityEntity) throws NodeNotFoundException {
StreamConfig streamConfig = CommonsUtils.getService(StreamConfig.class);
int limitLoading = streamConfig.getLimitThresholdLoading();
int connectionsThreshold = streamConfig.getConnectionsThreshold();
int timesLoop = connectionsThreshold / limitLoading;
LOG.info("Identity:" + owner.getRemoteId());
int offset = 0;
int i = 0;
for (i = 0; i < timesLoop; i++) {
List<Identity> got = getRelationshipStorage().getConnections(owner, offset, limitLoading);
if (got.size() > 0) {
createConnectionsRefs(got, activityEntity);
} else {
break;
}
// increase offset
offset += limitLoading;
StorageUtils.persist();
}
return i;
}
/**
* Creates the activity ref for connections in the case.
* Don't found any active users around days.
*
* @param owner
* @param activityEntity
* @return
* @throws NodeNotFoundException
*/
private int createRefWithActiveUser(Identity owner, ActivityEntity activityEntity, Set<String> activeUsers) throws NodeNotFoundException, RepositoryException {
StreamConfig streamConfig = CommonsUtils.getService(StreamConfig.class);
int connectionsThreshold = streamConfig.getConnectionsThreshold();
int limitLoading = streamConfig.getLimitThresholdLoading();
LOG.debug("active users: " + (activeUsers.size() -1));
int offset = 0;
IdentityEntity ownerEntity = _findById(IdentityEntity.class, owner.getId());
String nodePath = ownerEntity.getPath();
StringBuilder relationshipPath = new StringBuilder();
int batchIndex = 0;
List<Identity> inputIdentities = new ArrayList<Identity>();
for(String userName : activeUsers) {
//userName is the same owner, ignore.
if (owner.getRemoteId().equals(userName)) continue;
//reset StringBuilder with delete(0, length)
relationshipPath.delete(0, relationshipPath.length());
//
relationshipPath.append(nodePath).append("/").append(JCRProperties.RELATIONSHIP_NODE_TYPE).append("/").append("soc:").append(ChromatticNameEncode.encodeNodeName(userName));
Identity identity2 = CommonsUtils.getService(IdentityStorage.class).findIdentity(OrganizationIdentityProvider.NAME, userName);
boolean hasRelationship = getRelationshipStorage().hasRelationship(owner, identity2, relationshipPath.toString());
if(hasRelationship) {
LOG.debug("creates activity ref: " + userName);
if (identity2 != null) {
inputIdentities.add(identity2);
batchIndex++;
//handle loading limit
if (batchIndex == limitLoading) {
createConnectionsRefs(inputIdentities, activityEntity);
batchIndex = 0;
inputIdentities.clear();
LOG.debug("start - persist to storage...");
StorageUtils.persist();
LOG.debug("end - persist to storage...");
}
}
offset++;
//handle connections threshold
if (offset == connectionsThreshold) {
break;
}
}//end if(relationshipNode != null)
}
if (batchIndex > 0 && inputIdentities.size() > 0) {
createConnectionsRefs(inputIdentities, activityEntity);
}
return offset;
}
/**
* Remove activity reference from "my activity stream" of an user and if he is not connected with the
* activity's owner, remove also this reference from his "feed activity stream"
*
* @param identityIds
* @param activityEntity
* @throws NodeNotFoundException
*/
private void removeActivityRefs(String[] identityIds, ActivityEntity activityEntity) throws NodeNotFoundException {
if (identityIds != null && identityIds.length > 0) {
Identity owner = CommonsUtils.getService(IdentityStorage.class).findIdentityById(activityEntity.getIdentity().getId());
for(String identityId : identityIds) {
if (identityId.equals(owner.getId()) || identityId.equals(activityEntity.getPosterIdentity().getId())) {
continue;
}
Identity identity = CommonsUtils.getService(IdentityStorage.class).findIdentityById(identityId);
manageRefList(new UpdateContext(null, identity), activityEntity, ActivityRefType.MY_ACTIVITIES);
Relationship relationship = relationshipStorage.getRelationship(owner, identity);
if (relationship == null || ! relationship.getStatus().equals(Relationship.Type.CONFIRMED)) {
manageRefList(new UpdateContext(null, identity), activityEntity, ActivityRefType.FEED);
}
}
}
}
private void addMentioner(String[] identityIds, ActivityEntity activityEntity) throws NodeNotFoundException {
if (identityIds != null && identityIds.length > 0) {
for(String identityId : identityIds) {
Identity identity = CommonsUtils.getService(IdentityStorage.class).findIdentityById(identityId);
createOwnerRefs(identity, activityEntity);
}
}
}
private void space(Identity owner, ActivityEntity activityEntity) throws NodeNotFoundException {
Space space = getSpaceStorage().getSpaceByPrettyName(owner.getRemoteId());
if (space == null) return;
//Don't create ActivityRef on space stream for given SpaceIdentity
List<Identity> identities = getMemberIdentities(space);
createSpaceMembersRefs(identities, activityEntity);
}
private List<Identity> getMemberIdentities(Space space) {
List<Identity> identities = new ArrayList<Identity>();
for(String remoteId : space.getMembers()) {
//improves performance here just load identity data without profile (UT will be failed if load profile)
identities.add(identityStorage._findIdentityEntity(OrganizationIdentityProvider.NAME, remoteId, false));
}
return identities;
}
@Override
public void delete(String activityId) {
this.activityWriteLock.lock();
try {
//
ActivityEntity activityEntity = _findById(ActivityEntity.class, activityId);
HidableEntity hidableActivity = _getMixin(activityEntity, HidableEntity.class, true);
Collection<ActivityRef> references = activityEntity.getActivityRefs();
List<ActivityRefListEntity> refList = new ArrayList<ActivityRefListEntity>();
//
for(ActivityRef ref : references) {
//
refList.add(ref.getDay().getMonth().getYear().getList());
}
for(ActivityRefListEntity list : refList) {
list.remove(activityEntity, hidableActivity.getHidden(), null);
}
} catch (NodeNotFoundException e) {
LOG.warn("Failed to delete Activities references.", e);
} finally {
this.activityWriteLock.unlock();
}
}
@Override
public void like(Identity liker, ExoSocialActivity activity) {
this.activityWriteLock.lock();
try {
//
ActivityEntity entity = _findById(ActivityEntity.class, activity.getId());
manageRefList(new UpdateContext(liker, null), entity, ActivityRefType.FEED);
manageRefList(new UpdateContext(liker, null), entity, ActivityRefType.MY_ACTIVITIES);
} catch (NodeNotFoundException e) {
LOG.warn("Failed to make Activity References for like case.");
} finally {
this.activityWriteLock.unlock();
}
}
@Override
public void unLike(Identity removedLike, ExoSocialActivity activity) {
this.activityWriteLock.lock();
try {
//
ActivityEntity entity = _findById(ActivityEntity.class, activity.getId());
//manageRefList(new UpdateContext(null, removedLike), entity, ActivityRefType.FEED);
boolean notDelete = ArrayUtils.contains(activity.getCommentedIds(), removedLike.getId());
notDelete |= hasMentioned(removedLike, activity);
if (notDelete) return;
manageRefList(new UpdateContext(null, removedLike), entity, ActivityRefType.MY_ACTIVITIES);
} catch (NodeNotFoundException e) {
LOG.warn("Failed to delete Activity References for unlike case.");
} finally {
this.activityWriteLock.unlock();
}
}
private boolean hasMentioned(Identity removedLike, ExoSocialActivity activity) {
for(String id : activity.getMentionedIds()) {
if (id.indexOf(removedLike.getId()) > -1) {
return true;
}
}
return false;
}
@Override
public void updateCommenter(ProcessContext ctx) {
this.activityWriteLock.lock();
try {
StreamProcessContext streamCtx = ObjectHelper.cast(StreamProcessContext.class, ctx);
Identity commenter = streamCtx.getIdentity();
//It has been invoked by Activity Service with the same thread.
//so that, retrieves Entity directly from Stream context, don't spend time to get from JCR => impact performance.
ActivityEntity activityEntity = streamCtx.getActivityEntity();
//
long oldUpdated = streamCtx.getOldLastUpdated();
//activity's poster != comment's poster
//don't have on My Activity stream
updateCommenterActivityRefs(commenter, activityEntity, ActivityRefType.MY_ACTIVITIES, oldUpdated);
//post comment also put the activity on feed if have not any
updateCommenterActivityRefs(commenter, activityEntity, ActivityRefType.FEED, oldUpdated);
//create activityref for owner's activity for 3.5.x
createRefForPoster(activityEntity, oldUpdated);
} catch (NodeNotFoundException ex) {
LOG.warn("Probably was updated activity reference by another session");
LOG.debug(ex.getMessage(), ex);
} catch (ChromatticException ex) {
LOG.warn("Probably was updated activity reference by another session");
LOG.debug(ex.getMessage(), ex);
} finally {
this.activityWriteLock.unlock();
}
}
private void updateCommenterActivityRefs(Identity identity, ActivityEntity activityEntity, ActivityRefType type, long oldUpdated) throws NodeNotFoundException {
IdentityEntity identityEntity = identityStorage._findIdentityEntity(identity.getProviderId(), identity.getRemoteId());
ActivityRefListEntity refList = type.refsOf(identityEntity);
ActivityRef ref = refList.get(activityEntity, oldUpdated);
HidableEntity hidableActivity = _getMixin(activityEntity, HidableEntity.class, true);
if (ref != null) {
LOG.trace("remove activityRefId " + ref.getId() +" for commenter: " + identityEntity.getRemoteId());
refList.remove(activityEntity, hidableActivity.getHidden(), oldUpdated);
}
refList.getOrCreated(activityEntity, hidableActivity.getHidden() );
}
private void createRefForPoster(ActivityEntity activityEntity, long oldUpdated) throws NodeNotFoundException {
boolean has;
//poster if not migration
IdentityEntity posterIdentity = activityEntity.getIdentity();
has = hasActivityRefs(posterIdentity, activityEntity, ActivityRefType.MY_ACTIVITIES, oldUpdated);
if (has == false) {
addRefList(posterIdentity, activityEntity, ActivityRefType.MY_ACTIVITIES, false);
LOG.debug("createRefForPoster::MyActivities stream :" + posterIdentity.getRemoteId());
}
//post comment also put the activity on feed if have not any
has = hasActivityRefs(posterIdentity, activityEntity, ActivityRefType.FEED, oldUpdated);
if (has == false) {
addRefList(posterIdentity, activityEntity, ActivityRefType.FEED, false);
LOG.debug("createRefForPoster::Feed stream :" + posterIdentity.getRemoteId());
}
}
@Override
public void update(ProcessContext ctx) {
this.activityWriteLock.lock();
try {
StreamProcessContext streamCtx = ObjectHelper.cast(StreamProcessContext.class, ctx);
//It has been invoked by Activity Service with the multi-threading.
//so that, gets Entity from JCR, prevent Session.logout exception when retrieves its references
ActivityEntity activityEntity = _findById(ActivityEntity.class, streamCtx.getActivity().getId());
HidableEntity hidableActivity = _getMixin(activityEntity, HidableEntity.class, true);
//ActivityEntity activityEntity = streamCtx.getActivity();
Collection<ActivityRef> references = activityEntity.getActivityRefs();
long oldUpdated = streamCtx.getOldLastUpdated();
ActivityRef newRef = null;
synchronized (references) {
for (ActivityRef old : references) {
ActivityRefListEntity refList = old.getDay().getMonth().getYear().getList();
//ActivityRef.getName equals ActivityId or not
if (old.getName().equalsIgnoreCase(activityEntity.getId())) {
refList.update(activityEntity, old, oldUpdated, hidableActivity.getHidden());
} else {
newRef = refList.getOrCreated(activityEntity, hidableActivity.getHidden());
newRef.setLastUpdated(activityEntity.getLastUpdated());
newRef.setActivityEntity(activityEntity);
refList.remove(activityEntity, hidableActivity.getHidden(), oldUpdated);
}
}
}
} catch (NodeNotFoundException ex) {
LOG.warn("Probably was updated activity reference by another session");
LOG.debug(ex.getMessage(), ex);
//turnOnLock to avoid next exception
} catch (ChromatticException ex) {
LOG.warn("Probably was updated activity reference by another session");
LOG.debug(ex.getMessage(), ex);
} finally {
this.activityWriteLock.unlock();
}
}
@Override
public void deleteComment(ProcessContext ctx) {
try {
StreamProcessContext streamCtx = ObjectHelper.cast(StreamProcessContext.class, ctx);
ExoSocialActivity activity = streamCtx.getActivity();
//
ActivityEntity activityEntity = _findById(ActivityEntity.class, activity.getId());
//mentioners
removeActivityRefs(streamCtx.getMentioners(), activityEntity);
//commenter
removeActivityRefs(streamCtx.getCommenters(), activityEntity);
} catch (NodeNotFoundException e) {
LOG.warn("Failed to delete Activity references for mentioner and commenter.");
}
}
@Override
public void addSpaceMember(ProcessContext ctx) {
this.activityWriteLock.lock();
try {
StreamProcessContext streamCtx = ObjectHelper.cast(StreamProcessContext.class, ctx);
createSpaceMemberRefs(streamCtx.getIdentity(), streamCtx.getSpaceIdentity());
} catch (NodeNotFoundException e) {
LOG.warn("Failed to addSpaceMember Activity references.");
} finally {
this.activityWriteLock.unlock();
}
}
@Override
public void removeSpaceMember(ProcessContext ctx) {
this.activityWriteLock.lock();
try {
StreamProcessContext streamCtx = ObjectHelper.cast(StreamProcessContext.class, ctx);
removeSpaceMemberRefs(streamCtx.getIdentity(), streamCtx.getSpaceIdentity());
} catch (NodeNotFoundException e) {
LOG.warn("Failed to removeSpaceMember Activity references.");
} finally {
this.activityWriteLock.unlock();
}
}
@Override
public List<ExoSocialActivity> getFeed(Identity owner, int offset, int limit) {
return getActivitiesNotQuery(ActivityRefType.FEED, owner, offset, limit);
}
@Override
public List<String> getIdsFeed(Identity owner, int offset, int limit) {
return getIdsNotQuery(ActivityRefType.FEED, owner, offset, limit);
}
@Override
public int getNumberOfFeed(Identity owner) {
return getNumberOfActivities(ActivityRefType.FEED, owner);
}
@Override
public List<ExoSocialActivity> getConnections(Identity owner, int offset, int limit) {
return getActivitiesNotQuery(ActivityRefType.CONNECTION, owner, offset, limit);
}
@Override
public List<String> getIdsConnections(Identity owner, int offset, int limit) {
return getIdsNotQuery(ActivityRefType.CONNECTION, owner, offset, limit);
}
@Override
public int getNumberOfConnections(Identity owner) {
return getNumberOfActivities(ActivityRefType.CONNECTION, owner);
}
@Override
public List<ExoSocialActivity> getMySpaces(Identity owner, int offset, int limit) {
return getActivitiesNotQuery(ActivityRefType.MY_SPACES, owner, offset, limit);
}
@Override
public List<String> getIdsMySpaces(Identity owner, int offset, int limit) {
return getIdsNotQuery(ActivityRefType.MY_SPACES, owner, offset, limit);
}
@Override
public int getNumberOfMySpaces(Identity owner) {
return getNumberOfActivities(ActivityRefType.MY_SPACES, owner);
}
@Override
public List<ExoSocialActivity> getSpaceStream(Identity owner, int offset, int limit) {
return getActivitiesNotQuery(ActivityRefType.SPACE_STREAM, owner, offset, limit);
}
@Override
public List<String> getIdsSpaceStream(Identity owner, int offset, int limit) {
return getIdsNotQuery(ActivityRefType.SPACE_STREAM, owner, offset, limit);
}
@Override
public int getNumberOfSpaceStream(Identity owner) {
return getNumberOfActivities(ActivityRefType.SPACE_STREAM, owner);
}
@Override
public List<ExoSocialActivity> getMyActivities(Identity owner, int offset, int limit) {
return getActivitiesNotQuery(ActivityRefType.MY_ACTIVITIES, owner, offset, limit);
}
@Override
public List<String> getIdsMyActivities(Identity owner, int offset, int limit) {
return getIdsNotQuery(ActivityRefType.MY_ACTIVITIES, owner, offset, limit);
}
@Override
public int getNumberOfMyActivities(Identity owner) {
return getNumberOfActivities(ActivityRefType.MY_ACTIVITIES, owner);
}
@Override
public List<ExoSocialActivity> getViewerActivities(Identity owner, int offset, int limit) {
return getOwnerActivitiesNotQuery(ActivityRefType.MY_ACTIVITIES, owner, offset, limit);
}
@Override
public void connect(Identity sender, Identity receiver) {
try {
this.activityWriteLock.lock();
//
List<ActivityEntity> activities = getActivitiesByPoster(sender);
IdentityEntity receiverEntity = identityStorage._findIdentityEntity(receiver.getProviderId(), receiver.getRemoteId());
if (activities != null) {
Iterator<ActivityEntity> it = activities.iterator();
while(it.hasNext()) {
ActivityEntity entity = it.next();
//has on sender stream
if (isExistingActivityRef(receiverEntity, entity, ActivityRefType.CONNECTION)) continue;
//exclude activity of space
// for SOC-4525
if (entity.getPath().contains(SPACE_NODETYPE_PATH)) {
continue;
}
//
createConnectionsRefs(receiver, entity);
}
}
//
IdentityEntity senderEntity = identityStorage._findIdentityEntity(sender.getProviderId(), sender.getRemoteId());
activities = getActivitiesByPoster(receiver);
if (activities != null) {
Iterator<ActivityEntity> it = activities.iterator();
while(it.hasNext()) {
ActivityEntity entity = it.next();
//has on receiver stream
if (isExistingActivityRef(senderEntity, entity, ActivityRefType.CONNECTION)) continue;
// for SOC-4525
if (entity.getPath().contains(SPACE_NODETYPE_PATH)) {
continue;
}
//
createConnectionsRefs(sender, entity);
}
}
} catch (NodeNotFoundException e) {
LOG.warn("Failed to add Activity references when create relationship.");
} finally {
this.activityWriteLock.unlock();
}
}
@Override
public void deleteConnect(Identity sender, Identity receiver) {
try {
//
QueryResult<ActivityEntity> activities = getActivitiesOfConnections(sender);
if (activities != null) {
while(activities.hasNext()) {
ActivityEntity entity = activities.next();
removeRelationshipRefs(receiver, entity);
}
}
//
activities = getActivitiesOfConnections(receiver);
if (activities != null) {
while(activities.hasNext()) {
ActivityEntity entity = activities.next();
removeRelationshipRefs(sender, entity);
}
}
} catch (NodeNotFoundException e) {
LOG.warn("Failed to delete Activity references when delete relationship.");
}
}
/**
* The reference types.
*/
public enum ActivityRefType {
FEED() {
@Override
public ActivityRefListEntity refsOf(IdentityEntity identityEntity) {
if (identityEntity.getStreams() == null) return create(identityEntity);
if (identityEntity.getStreams().getOwner() == null) return create(identityEntity);
return identityEntity.getStreams().getAll();
}
@Override
public ActivityRefListEntity create(IdentityEntity identityEntity) {
StreamsEntity streams = identityEntity.getStreams();
if (streams == null) {
streams = identityEntity.createStreams();
identityEntity.setStreams(streams);
}
ActivityRefListEntity refList = streams.getAll() == null ? streams.createAllStream() : streams.getAll();
return refList;
}
},
CONNECTION() {
@Override
public ActivityRefListEntity refsOf(IdentityEntity identityEntity) {
if (identityEntity.getStreams() == null) return create(identityEntity);
if (identityEntity.getStreams().getOwner() == null) return create(identityEntity);
return identityEntity.getStreams().getConnections();
}
@Override
public ActivityRefListEntity create(IdentityEntity identityEntity) {
StreamsEntity streams = identityEntity.getStreams();
if (streams == null) {
streams = identityEntity.createStreams();
identityEntity.setStreams(streams);
}
ActivityRefListEntity refList = streams.getConnections() == null ? streams.createConnectionsStream() : streams.getConnections();
return refList;
}
},
MY_SPACES() {
@Override
public ActivityRefListEntity refsOf(IdentityEntity identityEntity) {
if (identityEntity.getStreams() == null) return create(identityEntity);
if (identityEntity.getStreams().getOwner() == null) return create(identityEntity);
return identityEntity.getStreams().getMySpaces();
}
@Override
public ActivityRefListEntity create(IdentityEntity identityEntity) {
StreamsEntity streams = identityEntity.getStreams();
if (streams == null) {
streams = identityEntity.createStreams();
identityEntity.setStreams(streams);
}
ActivityRefListEntity refList = streams.getMySpaces() == null ? streams.createMySpacesStream() : streams.getMySpaces();
return refList;
}
},
SPACE_STREAM() {
@Override
public ActivityRefListEntity refsOf(IdentityEntity identityEntity) {
if (identityEntity.getStreams() == null) return create(identityEntity);
if (identityEntity.getStreams().getOwner() == null) return create(identityEntity);
return identityEntity.getStreams().getSpace();
}
@Override
public ActivityRefListEntity create(IdentityEntity identityEntity) {
StreamsEntity streams = identityEntity.getStreams();
if (streams == null) {
streams = identityEntity.createStreams();
identityEntity.setStreams(streams);
}
ActivityRefListEntity refList = streams.getSpace() == null ? streams.createSpaceStream() : streams.getSpace();
return refList;
}
},
MY_ACTIVITIES() {
@Override
public ActivityRefListEntity refsOf(IdentityEntity identityEntity) {
if (identityEntity.getStreams() == null) return create(identityEntity);
if (identityEntity.getStreams().getOwner() == null) return create(identityEntity);
return identityEntity.getStreams().getOwner();
}
@Override
public ActivityRefListEntity create(IdentityEntity identityEntity) {
StreamsEntity streams = identityEntity.getStreams();
if (streams == null) {
streams = identityEntity.createStreams();
identityEntity.setStreams(streams);
}
ActivityRefListEntity refList = streams.getOwner() == null ? streams.createOwnerStream() : streams.getOwner();
return refList;
}
};
public abstract ActivityRefListEntity refsOf(IdentityEntity identityEntity);
public abstract ActivityRefListEntity create(IdentityEntity identityEntity);
}
private List<String> getIdsNotQuery(ActivityRefType type, Identity owner, int offset, int limit) {
List<String> got = new LinkedList<String>();
try {
IdentityEntity identityEntity = _findById(IdentityEntity.class, owner.getId());
ActivityRefListEntity refList = type.refsOf(identityEntity);
ActivityRefList list = new ActivityRefList(refList);
int nb = 0;
ActivityRefIterator it = list.iterator();
_skip(it, offset);
int size = refList.getNumber() > 0 ? refList.getNumber(): 0;
boolean sizeIsZero = (size == 0);
boolean isHide = false;
while (it.hasNext()) {
if(sizeIsZero) size ++;
try {
ActivityRef current = it.next();
// take care in the case, current.getActivityEntity() = null the same
// SpaceRef, need to remove it out
if (current.getActivityEntity() == null) {
current.getDay().getActivityRefs().remove(current.getName());
continue;
}
ActivityEntity entity = current.getActivityEntity();
HidableEntity hidable = _getMixin(entity, HidableEntity.class, false);
if (hidable != null) {
isHide = hidable.getHidden();
} else {
isHide = false;
}
if (!got.contains(entity.getId())) {
if (!isHide) {
got.add(entity.getId());
if (++nb == limit) {
break;
}
}
}
} catch (Exception e) {
LOG.warn("Exception while loading activities for user: " + owner.getRemoteId());
}
}
} catch (NodeNotFoundException e) {
LOG.warn("Failed to activities!");
}
return got;
}
private List<ExoSocialActivity> getActivitiesNotQuery(ActivityRefType type, Identity owner, int offset, int limit) {
List<ExoSocialActivity> got = new LinkedList<ExoSocialActivity>();
//
this.activityReadLock.lock();
try {
IdentityEntity identityEntity = identityStorage._findIdentityEntity(owner.getProviderId(), owner.getRemoteId());
ActivityRefListEntity refList = type.refsOf(identityEntity);
ActivityRefList list = new ActivityRefList(refList);
int nb = 0;
ActivityRefIterator it = list.iterator();
_skip(it, offset);
int size = refList.getNumber()>0? refList.getNumber(): 0;
boolean sizeIsZero = (size==0)?true:false;
while (it.hasNext()) {
if(sizeIsZero) size ++;
try {
ActivityRef current = it.next();
// take care in the case, current.getActivityEntity() = null the same
// SpaceRef, need to remove it out
if (current.getActivityEntity() == null) {
current.getDay().getActivityRefs().remove(current.getName());
continue;
}
ExoSocialActivity a = getStorage().getActivity(current.getActivityEntity().getId());
//SOC-4525 : exclude all space activities that owner is not member
if (SpaceIdentityProvider.NAME.equals(a.getActivityStream().getType().toString())) {
Space space = getSpaceStorage().getSpaceByPrettyName(a.getStreamOwner());
if(null == space){
IdentityEntity spaceIdentity = current.getActivityEntity().getIdentity();
LOG.info("SPACE PATH:" + spaceIdentity.getPath());
space = getSpaceStorage().getSpaceByPrettyName(spaceIdentity.getName());
if(space!=null){
LOG.info("SPACE was renamed before: " + space.getPrettyName());
}
}
if (ActivityRefType.CONNECTION.equals(type) || ActivityRefType.FEED.equals(type)) {
if (space != null && !ArrayUtils.contains(space.getMembers(), owner.getRemoteId())) {
LOG.info("Cleanup leak activities " + current.getName() + " of space: " + space.getPrettyName());
current.getDay().getActivityRefs().remove(current.getName());
getSession().save();
size--;
continue;
}
}
}
if (!got.contains(a)) {
if (!a.isHidden()) {
got.add(a);
if (++nb == limit) {
break;
}
}
} else {
//remove if we have duplicate activity on stream.
//some of cases on PLF 3.5.x migration has duplicated Activity
current.getDay().getActivityRefs().remove(current.getName());
}
} catch (Exception e) {
LOG.warn("Exception while loading activities for user: " + owner.getRemoteId());
}
}
//re-update size
refList.setNumber(size);
} catch (NodeNotFoundException e) {
LOG.warn("Failed to activities!");
} finally {
this.activityReadLock.unlock();
}
return got;
}
private List<ExoSocialActivity> getOwnerActivitiesNotQuery(ActivityRefType type, Identity owner, int offset, int limit) {
List<ExoSocialActivity> got = new LinkedList<ExoSocialActivity>();
try {
IdentityEntity identityEntity = identityStorage._findIdentityEntity(owner.getProviderId(), owner.getRemoteId());
ActivityRefListEntity refList = type.refsOf(identityEntity);
ActivityRefList list = new ActivityRefList(refList);
int nb = 0;
ActivityRefIterator it = list.iterator();
_skip(it, offset);
while (it.hasNext()) {
ActivityRef current = it.next();
// take care in the case, current.getActivityEntity() = null the same
// SpaceRef, need to remove it out
if (current.getActivityEntity() == null) {
current.getDay().getActivityRefs().remove(current.getName());
continue;
}
ExoSocialActivity a = getStorage().getActivity(current.getActivityEntity().getId());
if (!got.contains(a)) {
//only take these user's activities and ower is poster
if (!a.isHidden() && a.getStreamOwner().equals(owner.getRemoteId())) {
got.add(a);
if (++nb == limit) {
break;
}
}
} else {
//remove if we have duplicate activity on stream.
//some of cases on PLF 3.5.x migration has duplicated Activity
current.getDay().getActivityRefs().remove(current.getName());
}
}
} catch (NodeNotFoundException e) {
LOG.warn("Failed to activities!");
}
return got;
}
private int getNumberOfActivities(ActivityRefType type, Identity owner) {
this.activityReadLock.lock();
try {
IdentityEntity identityEntity = identityStorage._findIdentityEntity(owner.getProviderId(), owner.getRemoteId());
ActivityRefListEntity refList = type.refsOf(identityEntity);
if (refList == null || refList.getNumber() < 0) return 0;
return refList.getNumber().intValue();
} catch (NodeNotFoundException e) {
LOG.warn("Failed to getNumberOfActivities()");
} finally {
this.activityReadLock.unlock();
}
return 0;
}
private QueryResult<ActivityEntity> getActivitiesOfConnections(Identity ownerIdentity) {
List<Identity> connections = new ArrayList<Identity>();
if (ownerIdentity == null ) {
return null;
}
connections.add(ownerIdentity);
//
ActivityFilter filter = ActivityFilter.newer();
//
return getActivitiesOfIdentities(ActivityBuilderWhere.simple().owners(connections).poster(ownerIdentity), filter, 0, -1);
}
private List<ActivityEntity> getActivitiesByPoster(Identity ownerIdentity) {
List<ActivityEntity> got = new LinkedList<ActivityEntity>();
try {
IdentityEntity identityEntity = identityStorage._findIdentityEntity(ownerIdentity.getProviderId(),
ownerIdentity.getRemoteId());
ActivityRefListEntity refList = ActivityRefType.MY_ACTIVITIES.refsOf(identityEntity);
ActivityRefList list = new ActivityRefList(refList);
ActivityRefIterator it = list.iterator();
while (it.hasNext()) {
ActivityRef current = it.next();
got.add(current.getActivityEntity());
}
} catch (NodeNotFoundException e) {
LOG.warn("Failed to get the activities!");
}
return got;
}
private QueryResult<ActivityEntity> getActivitiesOfSpace(Identity spaceIdentity) {
if (spaceIdentity == null) {
return null;
}
//
ActivityFilter filter = ActivityFilter.space();
//
return getActivitiesOfIdentities(ActivityBuilderWhere.space().owners(spaceIdentity), filter, 0, -1);
}
/**
* {@inheritDoc}
*/
private QueryResult<ActivityEntity> getActivitiesOfIdentities(ActivityBuilderWhere where, ActivityFilter filter,
long offset, long limit) throws ActivityStorageException {
return getActivitiesOfIdentitiesQuery(where, filter).objects(offset, limit);
}
private Query<ActivityEntity> getActivitiesOfIdentitiesQuery(ActivityBuilderWhere whereBuilder,
JCRFilterLiteral filter) throws ActivityStorageException {
QueryBuilder<ActivityEntity> builder = getSession().createQueryBuilder(ActivityEntity.class);
builder.where(whereBuilder.build(filter));
whereBuilder.orderBy(builder, filter);
return builder.get();
}
private boolean isExistingActivityRef(IdentityEntity identityEntity, ActivityEntity activityEntity, ActivityRefType type) throws NodeNotFoundException {
ActivityRefListEntity refList = type.refsOf(identityEntity);
return refList.get(activityEntity, null) != null;
}
private boolean hasActivityRefs(IdentityEntity identityEntity, ActivityEntity activityEntity, ActivityRefType type, long oldUpdated) throws NodeNotFoundException {
ActivityRefListEntity refList = type.refsOf(identityEntity);
ActivityRef ref = refList.get(activityEntity, oldUpdated);
return ref != null && ref.getActivityEntity().getId() == activityEntity.getId();
}
private void createOwnerRefs(Identity owner, ActivityEntity activityEntity) throws NodeNotFoundException {
manageRefList(new UpdateContext(owner, null), activityEntity, ActivityRefType.FEED);
manageRefList(new UpdateContext(owner, null), activityEntity, ActivityRefType.MY_ACTIVITIES);
}
private void createConnectionsRefs(List<Identity> identities, ActivityEntity activityEntity) throws NodeNotFoundException {
manageRefList(new UpdateContext(identities, null), activityEntity, ActivityRefType.FEED);
manageRefList(new UpdateContext(identities, null), activityEntity, ActivityRefType.CONNECTION);
}
private void createConnectionsRefs(Identity identity, ActivityEntity activityEntity) throws NodeNotFoundException {
manageRefList(new UpdateContext(identity, null), activityEntity, ActivityRefType.FEED);
manageRefList(new UpdateContext(identity, null), activityEntity, ActivityRefType.CONNECTION);
}
private void removeRelationshipRefs(Identity identity, ActivityEntity activityEntity) throws NodeNotFoundException {
manageRefList(new UpdateContext(null, identity), activityEntity, ActivityRefType.FEED);
manageRefList(new UpdateContext(null, identity), activityEntity, ActivityRefType.CONNECTION);
}
private void createSpaceMembersRefs(List<Identity> identities, ActivityEntity activityEntity) throws NodeNotFoundException {
manageRefList(new UpdateContext(identities, null), activityEntity, ActivityRefType.FEED);
manageRefList(new UpdateContext(identities, null), activityEntity, ActivityRefType.MY_SPACES);
//manageRefList(new UpdateContext(identities, null), activityEntity, ActivityRefType.SPACE_STREAM);
}
private void ownerSpaceMembersRefs(Identity identity, ActivityEntity activityEntity) throws NodeNotFoundException {
manageRefList(new UpdateContext(identity, null), activityEntity, ActivityRefType.MY_ACTIVITIES);
}
private void createSpaceMembersRefs(Identity identity, ActivityEntity activityEntity) throws NodeNotFoundException {
manageRefList(new UpdateContext(identity, null), activityEntity, ActivityRefType.FEED);
manageRefList(new UpdateContext(identity, null), activityEntity, ActivityRefType.MY_SPACES);
}
private void createSpaceMemberRefs(Identity member, Identity space) throws NodeNotFoundException {
QueryResult<ActivityEntity> spaceActivities = getActivitiesOfSpace(space);
if (spaceActivities != null) {
while(spaceActivities.hasNext()) {
createSpaceMembersRefs(member, spaceActivities.next());
}
}
}
private void removeSpaceMemberRefs(Identity removedMember, Identity space) throws NodeNotFoundException {
QueryResult<ActivityEntity> spaceActivities = getActivitiesOfSpace(space);
if (spaceActivities != null) {
while(spaceActivities.hasNext()) {
ActivityEntity entity = spaceActivities.next();
manageRefList(new UpdateContext(null, removedMember), entity, ActivityRefType.FEED);
manageRefList(new UpdateContext(null, removedMember), entity, ActivityRefType.MY_SPACES);
}
}
}
private void manageRefList(UpdateContext context, ActivityEntity activityEntity, ActivityRefType type) throws NodeNotFoundException {
manageRefList(context, activityEntity, type, false);
}
private void manageRefList(UpdateContext context, ActivityEntity activityEntity, ActivityRefType type, boolean mustCheck) throws NodeNotFoundException {
HidableEntity hidableActivity = _getMixin(activityEntity, HidableEntity.class, true);
int counter = 0;
if (context.getAdded() != null) {
for (Identity identity : context.getAdded()) {
counter ++;
if (!identity.isEnable()) {
continue;
}
IdentityEntity identityEntity = identityStorage._findIdentityEntity(identity.getProviderId(), identity.getRemoteId());
if (identityEntity == null) {
continue;
}
//keep the latest activity posted time
if (type.equals(ActivityRefType.CONNECTION)) {
if (activityEntity.getLastUpdated() != null) {
identityEntity.setLatestActivityCreatedTime(activityEntity.getLastUpdated());
} else {
identityEntity.setLatestActivityCreatedTime(activityEntity.getPostedTime());
}
}
//
if (mustCheck) {
//to avoid add back activity to given stream what has already existing
if (isExistingActivityRef(identityEntity, activityEntity, type)) continue;
}
ActivityRefListEntity listRef = type.refsOf(identityEntity);
listRef.getOrCreated(activityEntity, hidableActivity.getHidden());
if(counter == BATCH){
StorageUtils.persist();
counter = 0;
}
}
}
if (context.getRemoved() != null) {
for (Identity identity : context.getRemoved()) {
if (!identity.isEnable()) {
continue;
}
IdentityEntity identityEntity = identityStorage._findIdentityEntity(identity.getProviderId(), identity.getRemoteId());
ActivityRefListEntity listRef = type.refsOf(identityEntity);
listRef.remove(activityEntity, hidableActivity.getHidden(), null);
}
}
}
private void addRefList(IdentityEntity identityEntity,
ActivityEntity activityEntity,
ActivityRefType type,
boolean mustCheck) throws NodeNotFoundException {
//
if (mustCheck) {
// to avoid add back activity to given stream what has already existing
if (isExistingActivityRef(identityEntity, activityEntity, type))
return;
}
HidableEntity hidableActivity = _getMixin(activityEntity, HidableEntity.class, true);
ActivityRefListEntity listRef = type.refsOf(identityEntity);
ActivityRef ref = listRef.getOrCreated(activityEntity, hidableActivity.getHidden());
if (ref.getName() == null) {
ref.setName(activityEntity.getName());
}
if (ref.getLastUpdated() == null) {
ref.setLastUpdated(activityEntity.getLastUpdated());
}
ref.setActivityEntity(activityEntity);
}
@Override
public void createFeedActivityRef(Identity owner,
List<ExoSocialActivity> activities) {
createActivityRef(owner, activities, ActivityRefType.FEED);
}
@Override
public void createConnectionsActivityRef(Identity owner,
List<ExoSocialActivity> activities) {
createActivityRef(owner, activities, ActivityRefType.CONNECTION);
}
@Override
public void createMySpacesActivityRef(Identity owner,
List<ExoSocialActivity> activities) {
createActivityRef(owner, activities, ActivityRefType.MY_SPACES);
}
@Override
public void createSpaceActivityRef(Identity owner,
List<ExoSocialActivity> activities) {
createActivityRef(owner, activities, ActivityRefType.SPACE_STREAM);
}
@Override
public void createMyActivitiesActivityRef(Identity owner,
List<ExoSocialActivity> activities) {
createActivityRef(owner, activities, ActivityRefType.MY_ACTIVITIES);
}
@Override
public void createActivityRef(Identity owner,
List<ExoSocialActivity> activities,
ActivityRefType type) {
if (activities == null || activities.size() == 0) return;
try {
IdentityEntity identityEntity = identityStorage._findIdentityEntity(owner.getProviderId(),
owner.getRemoteId());
ActivityRefListEntity listRef = type.create(identityEntity);
// keep last migration
ExoSocialActivity entity = activities.get(activities.size() - 1);
Long value = entity.getUpdated() != null ? entity.getUpdated().getTime() : entity.getPostedTime();
Long oldLastMigration = listRef.getLastMigration();
listRef.setLastMigration(value.longValue());
//don't increase with lazy migration.
Integer numberOfStream = listRef.getNumber();
//
for (ExoSocialActivity a : activities) {
ActivityEntity activityEntity = getSession().findById(ActivityEntity.class, a.getId());
HidableEntity hidableActivity = _getMixin(activityEntity, HidableEntity.class, true);
// migration 3.5.x => 4.x, lastUpdated of Activity is NULL, then use
// createdDate for replacement
ActivityRef ref = listRef.getOrCreated(activityEntity, hidableActivity.getHidden());
ref.setActivityEntity(activityEntity);
}
//StorageUtils.getNode(identityEntity).save();
//getSession().save();
//don't increase with lazy migration if has any migration before
if (oldLastMigration != null && oldLastMigration.longValue() > 0) {
listRef.setNumber(numberOfStream);
}
StorageUtils.persist();
} catch (NodeNotFoundException e) {
LOG.warn("Failed to create Activity references.");
}
}
@Override
public void migrateStreamSize(Identity owner, int size, ActivityRefType type) {
try {
IdentityEntity identityEntity = identityStorage._findIdentityEntity(owner.getProviderId(), owner.getRemoteId());
ActivityRefListEntity listRef = type.create(identityEntity);
listRef.setNumber(size);
} catch (NodeNotFoundException e) {
LOG.warn("Failed to migrateStreamSize.");
} finally {
StorageUtils.persist();
}
}
@Override
public boolean hasSizeOfFeed(Identity owner) {
return hasSizeOfActivities(ActivityRefType.FEED, owner);
}
@Override
public boolean hasSizeOfMySpaces(Identity owner) {
return hasSizeOfActivities(ActivityRefType.MY_SPACES, owner);
}
@Override
public boolean hasSizeOfSpaceStream(Identity owner) {
return hasSizeOfActivities(ActivityRefType.SPACE_STREAM, owner);
}
@Override
public boolean hasSizeOfMyActivities(Identity owner) {
return hasSizeOfActivities(ActivityRefType.MY_ACTIVITIES, owner);
}
@Override
public boolean hasSizeOfConnections(Identity owner) {
return hasSizeOfActivities(ActivityRefType.CONNECTION, owner);
}
private boolean hasSizeOfActivities(ActivityRefType type, Identity owner) {
try {
IdentityEntity identityEntity = identityStorage._findIdentityEntity(owner.getProviderId(), owner.getRemoteId());
ActivityRefListEntity refList = type.refsOf(identityEntity);
if (refList == null) return false;
return refList.getNumber().intValue() > 0;
//using this code
//return refList.getSize().intValue() > 0;
} catch (NodeNotFoundException e) {
LOG.warn("Failed to hasSizeOfActivities()");
}
return false;
}
@Override
public void updateHidable(ProcessContext ctx) {
this.activityWriteLock.lock();
try {
StreamProcessContext streamCtx = ObjectHelper.cast(StreamProcessContext.class, ctx);
ExoSocialActivity activity = streamCtx.getActivity();
ActivityEntity activityEntity = _findById(ActivityEntity.class, activity.getId());
Collection<ActivityRef> references = activityEntity.getActivityRefs();
//Case of update hidden activity after migration
if (references == null || references.size() == 0) {
streamCtx.activityEntity(activityEntity);
savePoster(streamCtx);
save(streamCtx);
}
HidableEntity hidableActivity = _getMixin(activityEntity, HidableEntity.class, true);
hidableActivity.setHidden(activity.isHidden());
for (ActivityRef ref : references) {
if (hidableActivity.getHidden() == false) {
ref.getDay().inc();
} else {
ref.getDay().desc();
}
}
} catch (Exception e) {
LOG.warn("Failed to update Activity references when change the visibility of activity.", e);
} finally {
this.activityWriteLock.unlock();
}
}
@Override
public void addMentioners(ProcessContext ctx) {
this.activityWriteLock.lock();
try {
StreamProcessContext streamCtx = ObjectHelper.cast(StreamProcessContext.class, ctx);
//
if (streamCtx.getMentioners() == null || streamCtx.getMentioners().length == 0) {
return;
}
ActivityEntity activityEntity = _findById(ActivityEntity.class, streamCtx.getActivity().getId());
// mentioners
addMentioner(streamCtx.getMentioners(), activityEntity);
} catch (NodeNotFoundException ex) {
LOG.warn("Probably was updated activity reference by another session");
LOG.debug(ex.getMessage(), ex);
} finally {
this.activityWriteLock.unlock();
}
}
}