View Javadoc
1   /*
2    * Copyright (C) 2003-2015 eXo Platform SAS.
3    *
4    * This program is free software: you can redistribute it and/or modify
5    * it under the terms of the GNU Affero General Public License as published by
6    * the Free Software Foundation, either version 3 of the License, or
7    * (at your option) any later version.
8    *
9    * This program is distributed in the hope that it will be useful,
10   * but WITHOUT ANY WARRANTY; without even the implied warranty of
11   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12   * GNU Affero General Public License for more details.
13   *
14   * You should have received a copy of the GNU Affero General Public License
15   * along with this program. If not, see <http://www.gnu.org/licenses/>.
16   */
17  package org.exoplatform.social.core.jpa.storage;
18  
19  import java.util.*;
20  
21  import org.exoplatform.commons.api.persistence.ExoTransactional;
22  import org.exoplatform.commons.utils.ListAccess;
23  import org.exoplatform.services.log.ExoLogger;
24  import org.exoplatform.services.log.Log;
25  import org.exoplatform.social.core.jpa.search.ExtendProfileFilter;
26  import org.exoplatform.social.core.jpa.search.ProfileSearchConnector;
27  import org.exoplatform.social.core.jpa.storage.dao.ConnectionDAO;
28  import org.exoplatform.social.core.jpa.storage.dao.IdentityDAO;
29  import org.exoplatform.social.core.jpa.storage.entity.ConnectionEntity;
30  import org.exoplatform.social.core.identity.model.Identity;
31  import org.exoplatform.social.core.identity.model.Profile;
32  import org.exoplatform.social.core.jpa.storage.entity.IdentityEntity;
33  import org.exoplatform.social.core.profile.ProfileFilter;
34  import org.exoplatform.social.core.relationship.model.Relationship;
35  import org.exoplatform.social.core.relationship.model.Relationship.Type;
36  import org.exoplatform.social.core.search.Sorting;
37  import org.exoplatform.social.core.storage.RelationshipStorageException;
38  import org.exoplatform.social.core.storage.api.RelationshipStorage;
39  import org.exoplatform.social.core.storage.impl.RelationshipStorageImpl;
40  
41  /**
42   * Created by The eXo Platform SAS
43   * Author : eXoPlatform
44   *          exo@exoplatform.com
45   * Jun 3, 2015  
46   */
47  public class RDBMSRelationshipStorageImpl implements RelationshipStorage {
48  
49    private static final Log LOG = ExoLogger.getLogger(RDBMSRelationshipStorageImpl.class);
50    
51    private final ConnectionDAO connectionDAO;
52    private final IdentityDAO identityDAO;
53    private final RDBMSIdentityStorageImpl identityStorage;
54  
55    public RDBMSRelationshipStorageImpl(RDBMSIdentityStorageImpl identityStorage, ConnectionDAO connectionDAO, IdentityDAO identityDAO) {
56      this.connectionDAO = connectionDAO;
57      this.identityDAO = identityDAO;
58      this.identityStorage = identityStorage;
59    }
60  
61    @Override
62    @ExoTransactional
63    public Relationship saveRelationship(Relationship relationship) throws RelationshipStorageException {
64      if (relationship.getId() == null) {//create new relationship
65  
66        ConnectionEntity entity = connectionDAO.getConnection(relationship.getSender(), relationship.getReceiver());
67        if (entity == null) {
68          entity = new ConnectionEntity();
69        }
70  
71        entity.setReceiver(identityDAO.find(Long.valueOf(relationship.getReceiver().getId())));
72        entity.setSender(identityDAO.find(Long.valueOf(relationship.getSender().getId())));
73        entity.setStatus(relationship.getStatus());
74        entity.setUpdatedDate(new Date());
75        //
76        connectionDAO.create(entity);
77        relationship.setId(Long.toString(entity.getId()));
78  
79      } else {//update an relationship
80        ConnectionEntity entity = connectionDAO.getConnection(relationship.getSender(), relationship.getReceiver());
81        entity.setStatus(relationship.getStatus());
82        entity.setUpdatedDate(new Date());
83        connectionDAO.update(entity);
84      }
85      //
86      return relationship;
87    }
88    
89    @Override
90    @ExoTransactional
91    public void removeRelationship(Relationship relationship) throws RelationshipStorageException {
92      ConnectionEntity connection = connectionDAO.getConnection(relationship.getSender(), relationship.getReceiver());
93      if (connection != null) {
94        connectionDAO.delete(connection);
95      }
96    }
97    
98    @Override
99    @ExoTransactional
100   public Relationship getRelationship(Identity identity1, Identity identity2) throws RelationshipStorageException {
101     if (identity1==null) {
102       throw new IllegalArgumentException("Argument identity1 is null");
103     }
104     if (identity2==null) {
105       throw new IllegalArgumentException("Argument identity2 is null");
106     }
107     ConnectionEntity item = connectionDAO.getConnection(identity1, identity2);
108     if (item == null) {
109       item = connectionDAO.getConnection(identity2, identity1);
110     }
111     return EntityConverterUtils.convertRelationshipItemToRelationship(item);
112   }
113   
114   @Override
115   public Relationship getRelationship(String relationshipId) throws RelationshipStorageException {
116     return EntityConverterUtils.convertRelationshipItemToRelationship(connectionDAO.find(Long.valueOf(relationshipId)));
117   }
118   
119   @Override
120   public List<Identity> getConnections(Identity identity) throws RelationshipStorageException {
121     return getConnections(identity, 0, -1);
122   }
123   
124   @Override
125   public List<Identity> getConnections(Identity identity, long offset, long limit) throws RelationshipStorageException {
126     return convertRelationshipEntitiesToIdentities(connectionDAO.getConnections(identity, Relationship.Type.CONFIRMED, offset, limit), identity.getId());
127   }
128 
129   @Override
130   @ExoTransactional
131   public int getConnectionsCount(Identity identity) throws RelationshipStorageException {
132     return connectionDAO.getConnectionsCount(identity, Relationship.Type.CONFIRMED);
133   }
134 
135   @Override
136   public int getRelationshipsCount(Identity identity) throws RelationshipStorageException {
137     return connectionDAO.getConnectionsCount(identity, null);
138   }
139 
140   @Override//TODO need review again
141   public List<Relationship> getRelationships(Identity identity, Relationship.Type type, List<Identity> listCheckIdentity) throws RelationshipStorageException {
142     return getRelationships(identity, type);
143   }
144 
145   @Override
146   public List<Relationship> getReceiverRelationships(Identity receiver, Relationship.Type type, List<Identity> listCheckIdentity) throws RelationshipStorageException {
147     return getRelationships(null, receiver, type);
148   }
149 
150   @Override
151   public List<Relationship> getSenderRelationships(Identity sender, Relationship.Type type, List<Identity> listCheckIdentity) throws RelationshipStorageException {
152     return getRelationships(sender, null, type);
153   }
154 
155   public List<Relationship> getRelationships(Identity identity, Relationship.Type type) {
156     return convertRelationshipEntitiesToRelationships(connectionDAO.getConnections(identity, type, 0, -1));
157   }
158 
159   public List<Relationship> getRelationships(Identity sender, Identity receiver, Type type) {
160     return convertRelationshipEntitiesToRelationships(connectionDAO.getConnections(sender, receiver, type));
161   }
162 
163   @Override
164   public List<Identity> getOutgoingRelationships(Identity sender, long offset, long limit) throws RelationshipStorageException {
165     return convertRelationshipEntitiesToIdentities(connectionDAO.getConnections(sender, Relationship.Type.OUTGOING, offset, limit), sender.getId());
166   }
167 
168   @Override
169   public int getOutgoingRelationshipsCount(Identity sender) throws RelationshipStorageException {
170     return connectionDAO.getConnectionsCount(sender, Relationship.Type.OUTGOING);
171   }
172 
173   @Override
174   public List<Identity> getIncomingRelationships(Identity receiver, long offset, long limit) throws RelationshipStorageException {
175     return searchConnections(receiver, Relationship.Type.INCOMING, offset, limit, null);
176   }
177 
178   @Override
179   public int getIncomingRelationshipsCount(Identity receiver) throws RelationshipStorageException {
180     return connectionDAO.getConnectionsCount(receiver, Relationship.Type.INCOMING);
181   }
182 
183   @Override
184   public List<Identity> getLastConnections(Identity identity, int limit) throws RelationshipStorageException {
185     return convertRelationshipEntitiesToIdentities(connectionDAO.getLastConnections(identity, limit), identity.getId());
186   }
187 
188   private List<Identity> convertRelationshipEntitiesToIdentities(List<ConnectionEntity> connections, String ownerId) {
189     List<Identity> identities = new ArrayList<Identity>();
190     if (connections == null) return identities;
191     for (ConnectionEntity item : connections) {
192       identities.add(getIdentityFromRelationshipItem(item, ownerId));
193     }
194     return identities;
195   }
196 
197   private List<Relationship> convertRelationshipEntitiesToRelationships(List<ConnectionEntity> connections) {
198     List<Relationship> relationships = new ArrayList<Relationship>();
199     if (connections == null) return relationships;
200     for (ConnectionEntity item : connections) {
201       relationships.add(EntityConverterUtils.convertRelationshipItemToRelationship(item));
202     }
203     return relationships;
204   }
205 
206   private Identity getIdentityFromRelationshipItem(ConnectionEntity item, String ownerId) {
207     Identity identity = null;
208     if (ownerId.equals(item.getSender().getStringId())) {
209       identity = identityStorage.findIdentityById(item.getReceiver().getStringId());
210     } else {
211       identity = identityStorage.findIdentityById(item.getSender().getStringId());
212     }
213     if (identity == null) return null;
214     //load profile
215     Profile profile = identityStorage.loadProfile(identity.getProfile());
216     identity.setProfile(profile);
217     return identity;
218   }
219 
220   @Override
221   public List<Relationship> getSenderRelationships(String senderId, Type type, List<Identity> listCheckIdentity) throws RelationshipStorageException {
222     Identity sender = identityStorage.findIdentityById(senderId);
223     return getSenderRelationships(sender, type, listCheckIdentity);
224   }
225 
226   @Override
227   public boolean hasRelationship(Identity identity1, Identity identity2, String relationshipPath) {
228     Relationship r = getRelationship(identity1, identity2);
229     return r != null && Relationship.Type.CONFIRMED.equals(r.getStatus());
230   }
231 
232   @Override
233   public List<Identity> getRelationships(Identity identity, long offset, long limit) throws RelationshipStorageException {
234     return convertRelationshipEntitiesToIdentities(connectionDAO.getConnections(identity, null, offset, limit), identity.getId());
235   }
236 
237   @Override
238   public List<Identity> getConnectionsByFilter(Identity existingIdentity, ProfileFilter profileFilter, long offset, long limit) throws RelationshipStorageException {
239     //return profileESConnector.search(existingIdentity, profileFilter, Relationship.Type.CONFIRMED, offset, limit);
240     return searchConnectionByFilter(existingIdentity, Type.CONFIRMED, profileFilter, offset, limit);
241   }
242 
243   @Override
244   public List<Identity> getIncomingByFilter(Identity existingIdentity, ProfileFilter profileFilter, long offset, long limit) throws RelationshipStorageException {
245     return searchConnectionByFilter(existingIdentity, Type.INCOMING, profileFilter, offset, limit);
246   }
247 
248   @Override
249   public List<Identity> getOutgoingByFilter(Identity existingIdentity, ProfileFilter profileFilter, long offset, long limit) throws RelationshipStorageException {
250     return searchConnectionByFilter(existingIdentity, Type.OUTGOING, profileFilter, offset, limit);
251   }
252 
253   @Override
254   public int getConnectionsCountByFilter(Identity existingIdentity, ProfileFilter profileFilter) throws RelationshipStorageException {
255     return countConnectionByFilter(existingIdentity, Type.CONFIRMED, profileFilter);
256   }
257 
258   @Override
259   public int getIncomingCountByFilter(Identity existingIdentity, ProfileFilter profileFilter) throws RelationshipStorageException {
260     return countConnectionByFilter(existingIdentity, Type.INCOMING, profileFilter);
261   }
262 
263   @Override
264   public int getOutgoingCountByFilter(Identity existingIdentity, ProfileFilter profileFilter) throws RelationshipStorageException {
265     return countConnectionByFilter(existingIdentity, Type.OUTGOING, profileFilter);
266   }
267 
268   @Override
269   public List<Relationship> getRelationshipsByStatus(Identity identity, Type type, long offset, long limit) {
270     return getRelationships(identity, type);
271   }
272 
273   @Override
274   public int getRelationshipsCountByStatus(Identity identity, Type type) {
275     return countConnectionByFilter(identity, type, null);
276   }
277 
278   @Override
279   public Map<Identity, Integer> getSuggestions(Identity identity, int maxConnections, int maxConnectionsToLoad, int maxSuggestions) throws RelationshipStorageException {
280     if (maxConnectionsToLoad > 0 && maxConnections > maxConnectionsToLoad)
281       maxConnectionsToLoad = maxConnections;
282     // Get identities level 1
283    Set<Identity> relationIdLevel1 = new HashSet<Identity>();
284    int size = getConnectionsCount(identity);
285    // The ideal limit of connection to treat however we could need to go beyond this limit
286    // if we cannot reach the expected amount of suggestions
287    int endIndex;
288    Random random = new Random();
289    List<Identity> connections;
290    if (size > maxConnectionsToLoad && maxConnectionsToLoad > 0 && maxConnections > 0) {
291      // The total amount of connections is bigger than the maximum allowed
292      // We will then load only a random sample to reduce the best we can the
293      // required time for this task
294      int startIndex = random.nextInt(size - maxConnectionsToLoad);
295      endIndex = maxConnections;
296      connections= getConnections(identity, startIndex, maxConnectionsToLoad);
297    } else {
298      // The total amount of connections is less than the maximum allowed
299      // We call load everything
300      endIndex = size;
301      connections= getConnections(identity, 0, size);
302    }
303    // we need to load all the connections
304    for (int i = 0; i < connections.size(); i++) {
305      Identity id = connections.get(i);
306      relationIdLevel1.add(id);
307    }
308    relationIdLevel1.remove(identity);
309 
310    // Get identities level 2 (suggested Identities)
311    Map<Identity, Integer> suggestedIdentities = new HashMap<Identity, Integer>();
312    Iterator<Identity> it = relationIdLevel1.iterator();
313    for (int j = 0; j < size && it.hasNext(); j++) {
314      Identity id = it.next();
315      // We check if we reach the limit of connections to treat and if we have enough suggestions
316      if (j >= endIndex && suggestedIdentities.size() > maxSuggestions && maxSuggestions > 0)
317        break;
318      int allConnSize = getConnectionsCount(id);
319      int allConnStartIndex = 0;
320      if (allConnSize > maxConnections && maxConnections > 0) {
321        // The current identity has more connections that the allowed amount so we will treat a sample
322        allConnStartIndex = random.nextInt(allConnSize - maxConnections);
323        connections = getConnections(id, allConnStartIndex, maxConnections);
324      } else {
325        // The current identity doesn't have more connections that the allowed amount so we will
326        // treat all of them
327        connections = getConnections(id, 0, allConnSize);
328      }
329      for (int i = 0; i < connections.size(); i++) {
330        Identity ids = connections.get(i);
331        // We check if the current connection is not already part of the connections of the identity
332        // for which we seek some suggestions
333        if (!relationIdLevel1.contains(ids) && !ids.equals(identity) && !ids.isDeleted()
334             && getRelationship(ids, identity) == null) {
335          Integer commonIdentities = suggestedIdentities.get(ids);
336          if (commonIdentities == null) {
337            commonIdentities = new Integer(1);
338          } else {
339            commonIdentities = new Integer(commonIdentities.intValue() + 1);
340          }
341          suggestedIdentities.put(ids, commonIdentities);
342        }
343      }
344    }
345    NavigableMap<Integer, List<Identity>> groupByCommonConnections = new TreeMap<Integer, List<Identity>>();
346    // This for loop allows to group the suggestions by total amount of common connections
347    for (Identity id : suggestedIdentities.keySet()) {
348      Integer commonIdentities = suggestedIdentities.get(id);
349      List<Identity> ids = groupByCommonConnections.get(commonIdentities);
350      if (ids == null) {
351        ids = new ArrayList<Identity>();
352        groupByCommonConnections.put(commonIdentities, ids);
353      }
354      ids.add(id);
355    }
356    Map<Identity, Integer> suggestions = new LinkedHashMap<Identity, Integer>();
357    int suggestionLeft = maxSuggestions;
358    // We iterate over the suggestions starting from the suggestions with the highest amount of common
359    // connections
360    main: for (Integer key : groupByCommonConnections.descendingKeySet()) {
361      List<Identity> ids = groupByCommonConnections.get(key);
362      for (Identity id : ids) {
363        suggestions.put(id, key);
364        // We stop once we have enough suggestions
365        if (maxSuggestions > 0 && --suggestionLeft == 0)
366          break main;
367      }
368    }
369    return suggestions;
370   }
371 
372   private List<Identity> searchConnectionByFilter(Identity owner, Relationship.Type status, ProfileFilter profileFilter, long offset, long limit) {
373     ExtendProfileFilter xFilter = new ExtendProfileFilter(profileFilter);
374     if(xFilter.isEmpty()) {
375       return searchConnections(owner, status, offset, limit, xFilter.getSorting());
376     }
377     xFilter.setConnection(owner);
378     xFilter.setConnectionStatus(status);
379 
380     ListAccess<Identity> list = identityStorage.findByFilter(xFilter);
381     try {
382       return Arrays.asList(list.load((int)offset, (int)limit));
383     } catch (Exception ex) {
384       LOG.error(ex.getMessage(), ex);
385       return Collections.emptyList();
386     }
387   }
388 
389   private List<Identity> searchConnections(Identity owner, Type status, long offset, long limit, Sorting sorting) {
390     long ownerId = Long.valueOf(owner.getId());
391 
392     List<ConnectionEntity> connections = connectionDAO.getConnections(owner, status, offset, limit);
393     List<Identity> identities = new ArrayList<Identity>();
394 
395     for (ConnectionEntity connectionEntity : connections) {
396       IdentityEntity receiver = connectionEntity.getReceiver();
397       IdentityEntity sender = connectionEntity.getSender();
398       if (ownerId == sender.getId() && !receiver.isDeleted() && receiver.isEnabled()) {
399         Identity identity = EntityConverterUtils.convertToIdentity(receiver);
400         identities.add(identity);
401       } else if (ownerId == receiver.getId() && !sender.isDeleted() && sender.isEnabled()) {
402         Identity identity = EntityConverterUtils.convertToIdentity(sender);
403         identities.add(identity);
404       } else {
405         LOG.warn("Incompatible returned connection entity, the ownerId {} is not receiver, neither sender", ownerId);
406       }
407     }
408     return identities;
409   }
410 
411   private int countConnectionByFilter(Identity owner, Relationship.Type status, ProfileFilter profileFilter) {
412     ExtendProfileFilter xFilter = new ExtendProfileFilter(profileFilter);
413     xFilter.setConnection(owner);
414     xFilter.setConnectionStatus(status);
415 
416     ListAccess<Identity> list = identityStorage.findByFilter(xFilter);
417     try {
418       return list.getSize();
419     } catch (Exception ex) {
420       LOG.error(ex.getMessage(), ex);
421       return 0;
422     }
423   }
424 }