View Javadoc
1   /*
2    * Copyright (C) 2019 eXo Platform SAS.
3    *
4    * This is free software; you can redistribute it and/or modify it
5    * under the terms of the GNU Lesser General Public License as
6    * published by the Free Software Foundation; either version 2.1 of
7    * the License, or (at your option) any later version.
8    *
9    * This software is distributed in the hope that it will be useful,
10   * but WITHOUT ANY WARRANTY; without even the implied warranty of
11   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12   * Lesser General Public License for more details.
13   *
14   * You should have received a copy of the GNU Lesser General Public
15   * License along with this software; if not, write to the Free
16   * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
17   * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
18   */
19  
20  package org.exoplatform.social.core.jpa.updater;
21  
22  import java.io.ByteArrayInputStream;
23  import java.util.ArrayList;
24  import java.util.HashSet;
25  import java.util.List;
26  import java.util.Set;
27  
28  import javax.jcr.Node;
29  import javax.jcr.NodeIterator;
30  import javax.jcr.PropertyIterator;
31  import javax.jcr.Session;
32  
33  import org.chromattic.ext.ntdef.NTFile;
34  import org.chromattic.ext.ntdef.Resource;
35  
36  import org.exoplatform.commons.api.event.EventManager;
37  import org.exoplatform.commons.persistence.impl.EntityManagerService;
38  import org.exoplatform.commons.search.index.IndexingService;
39  import org.exoplatform.commons.utils.CommonsUtils;
40  import org.exoplatform.container.PortalContainer;
41  import org.exoplatform.container.component.RequestLifeCycle;
42  import org.exoplatform.container.xml.InitParams;
43  import org.exoplatform.management.annotations.Managed;
44  import org.exoplatform.management.annotations.ManagedDescription;
45  import org.exoplatform.management.jmx.annotations.NameTemplate;
46  import org.exoplatform.management.jmx.annotations.Property;
47  import org.exoplatform.social.core.identity.provider.SpaceIdentityProvider;
48  import org.exoplatform.social.core.jpa.search.ProfileIndexingServiceConnector;
49  import org.exoplatform.social.core.jpa.storage.RDBMSIdentityStorageImpl;
50  import org.exoplatform.social.core.chromattic.entity.DisabledEntity;
51  import org.exoplatform.social.core.chromattic.entity.IdentityEntity;
52  import org.exoplatform.social.core.chromattic.entity.ProfileEntity;
53  import org.exoplatform.social.core.identity.model.Identity;
54  import org.exoplatform.social.core.identity.model.Profile;
55  import org.exoplatform.social.core.identity.provider.OrganizationIdentityProvider;
56  import org.exoplatform.social.core.jpa.storage.RDBMSSpaceStorageImpl;
57  import org.exoplatform.social.core.jpa.updater.utils.IdentityUtil;
58  import org.exoplatform.social.core.model.AvatarAttachment;
59  import org.exoplatform.social.core.space.model.Space;
60  import org.exoplatform.social.core.storage.api.SpaceStorage;
61  import org.exoplatform.social.core.storage.exception.NodeNotFoundException;
62  import org.exoplatform.social.core.storage.impl.IdentityStorageImpl;
63  
64  /**
65   * @author <a href="mailto:tuyennt@exoplatform.com">Tuyen Nguyen The</a>.
66   */
67  @Managed
68  @ManagedDescription("Social migration Identities from JCR to RDBMS.")
69  @NameTemplate({@Property(key = "service", value = "social"), @Property(key = "view", value = "migration-identities") })
70  public class IdentityMigrationService extends AbstractMigrationService<Identity> {
71  
72    public static final String EVENT_LISTENER_KEY = "SOC_IDENTITY_MIGRATION";
73  
74    protected final static String REMOVE_LIMIT_THRESHOLD_KEY = "REMOVE_LIMIT_THRESHOLD";
75  
76    private int REMOVE_LIMIT_THRESHOLD = 20;
77  
78    private long totalNumberIdentites = 0;
79  
80    private final RDBMSIdentityStorageImpl identityStorage;
81    private final IdentityStorageImpl jcrIdentityStorage;
82  
83    private SpaceStorage spaceStorage;
84  
85    private String identityQuery;
86  
87    private Set<String> identitiesMigrateFailed = new HashSet<>();
88    private Set<String> identitiesCleanupFailed = new HashSet<>();
89  
90    public IdentityMigrationService(InitParams initParams,
91                                    RDBMSIdentityStorageImpl identityStorage,
92                                    IdentityStorageImpl jcrIdentityStorage,
93                                    RDBMSSpaceStorageImpl spaceStorage,
94                                    EventManager<Identity, String> eventManager, EntityManagerService entityManagerService) {
95      super(initParams, jcrIdentityStorage, eventManager, entityManagerService);
96      this.LIMIT_THRESHOLD = getInteger(initParams, LIMIT_THRESHOLD_KEY, 200);
97      this.REMOVE_LIMIT_THRESHOLD = getInteger(initParams, REMOVE_LIMIT_THRESHOLD_KEY, 20);
98      this.identityStorage = identityStorage;
99      this.jcrIdentityStorage = jcrIdentityStorage;
100     this.spaceStorage = spaceStorage;
101   }
102 
103   @Override
104   protected void beforeMigration() throws Exception {
105     MigrationContext.setIdentityDone(false);
106     identitiesMigrateFailed = new HashSet<>();
107   }
108 
109   @Override
110   @Managed
111   @ManagedDescription("Manual to start run migration data of identities from JCR to RDBMS.")
112   public void doMigration() throws Exception {
113     long t = System.currentTimeMillis();
114 
115     long totalIdentities = getTotalNumberIdentities();
116 
117     //endTx(begunTx);
118 
119     LOG.info("|\\ START::Identity migration ---------------------------------");
120 
121     RequestLifeCycle.end();
122 
123     long offset = 0;
124     boolean cont = true;
125     boolean begunTx = false;
126     List<String> transactionList = new ArrayList<>();
127 
128     long numberSuccessful = 0;
129     long batchSize = 0;
130 
131     while(cont && !forceStop) {
132       try {
133 
134         try {
135 
136           RequestLifeCycle.begin(PortalContainer.getInstance());
137           begunTx = startTx();
138           transactionList = new ArrayList<>();
139           NodeIterator nodeIter = getIdentityNodes(offset, LIMIT_THRESHOLD);
140           batchSize = nodeIter.getSize();
141 
142           if (nodeIter == null || batchSize == 0) {
143             cont = false;
144 
145           } else {
146 
147             while (nodeIter.hasNext() && !forceStop) {
148               offset++;
149               Node identityNode = nodeIter.nextNode();
150               String identityName = identityNode.getName();
151               transactionList.add(identityName);
152 
153               LOG.info(String.format("|  \\ START::identity number: %s/%s (%s identity)", offset, totalIdentities, identityName));
154               long t1 = System.currentTimeMillis();
155 
156               String jcrid = identityNode.getUUID();
157               if((identityNode.hasProperty("soc:providerId") && identityNode.getProperty("soc:providerId").getString().equals(OrganizationIdentityProvider.NAME)) ||
158                 !identityNode.hasProperty("soc:isDeleted") || !identityNode.getProperty("soc:isDeleted").getBoolean()) {
159                 //SOC-5828 : if identity is user, we migrated it even if the user is deleted
160                 //if identity is space and deleted, we don't migrated it
161                 try {
162                   Identity identity = migrateIdentity(identityNode, jcrid);
163 
164                   if (identity != null) {
165                     String newId = identity.getId();
166                     identity.setId(jcrid);
167                     broadcastListener(identity, newId);
168                   }
169                   numberSuccessful++;
170                 } catch (Exception ex) {
171                   identitiesMigrateFailed.add(identityName);
172                   LOG.error("Error while migrate identity " + identityName, ex);
173                 }
174               } else {
175                 LOG.info("Ignore migration of identity with id {} since it's marked as deleted", identityName);
176               }
177               LOG.info(String.format("|  / END::identity number %s (%s identity) consumed %s(ms)", offset, identityNode.getName(), System.currentTimeMillis() - t1));
178             }
179           }
180 
181         } finally {
182           try {
183             endTx(begunTx);
184           } catch (Exception ex) {
185             // If commit was failed, all identities are failed also
186             identitiesMigrateFailed.addAll(transactionList);
187             numberSuccessful -= batchSize;
188           }
189           RequestLifeCycle.end();
190         }
191       } catch (Exception ex) {
192         LOG.error(ex);
193       }
194     }
195 
196     if (identitiesMigrateFailed.size() > 0) {
197       LOG.info(String.format("| / END::Identity migration failed for (%s) identity(s)", identitiesMigrateFailed.size()));
198     }
199     LOG.info(String.format("|// END::Identity migration for (%s) identity(s) consumed %s(ms)", numberSuccessful, System.currentTimeMillis() - t));
200 
201     LOG.info("| \\ START::Re-indexing identity(s) ---------------------------------");
202     IndexingService indexingService = CommonsUtils.getService(IndexingService.class);
203     indexingService.reindexAll(ProfileIndexingServiceConnector.TYPE);
204     LOG.info("| / END::Re-indexing identity(s) ---------------------------------");
205 
206     RequestLifeCycle.begin(PortalContainer.getInstance());
207   }
208 
209   @Override
210   protected void afterMigration() throws Exception {
211     MigrationContext.setIdentitiesMigrateFailed(identitiesMigrateFailed);
212     if (!forceStop && identitiesMigrateFailed.isEmpty()) {
213       MigrationContext.setIdentityDone(true);
214     }
215   }
216 
217   @Override
218   public void doRemove() throws Exception {
219     identitiesCleanupFailed = new HashSet<>();
220 
221     long totalIdentities = getTotalNumberIdentities();
222 
223     LOG.info("| \\ START::cleanup Identities ---------------------------------");
224     long t = System.currentTimeMillis();
225     long timePerIdentity = System.currentTimeMillis();
226     int offset = 0;
227     long failed = 0;
228     List<String> transactionList = new ArrayList<>();
229 
230     try {
231       boolean cont = true;
232       while (cont) {
233         try {
234 
235           RequestLifeCycle.begin(PortalContainer.getInstance());
236           failed = identitiesCleanupFailed.size();
237           transactionList = new ArrayList<>();
238 
239           NodeIterator nodeIter  = getIdentityNodes(failed, REMOVE_LIMIT_THRESHOLD);
240           if(nodeIter == null || nodeIter.getSize() == 0) {
241             cont = false;
242 
243           } else {
244 
245             if (nodeIter.getSize() < REMOVE_LIMIT_THRESHOLD) {
246               cont = false;
247             }
248             while (nodeIter.hasNext()) {
249               offset++;
250 
251               if (offset > totalIdentities) {
252                 cont = false;
253               }
254 
255               Node node = nodeIter.nextNode();
256               timePerIdentity = System.currentTimeMillis();
257               LOG.info(String.format("|  \\ START::cleanup Identity number: %s/%s (%s identity)", offset, totalIdentities, node.getName()));
258 
259               String name = node.getName();
260               if (!MigrationContext.isForceCleanup() && (MigrationContext.getIdentitiesCleanupConnectionFailed().contains(name)
261                       || MigrationContext.getIdentitiesCleanupActivityFailed().contains(name))) {
262                 identitiesCleanupFailed.add(name);
263                 LOG.warn("Will not remove this identity because the cleanup connection or activities were failed for it");
264                 continue;
265               }
266 
267               IdentityEntity identityEntity = _findById(IdentityEntity.class, node.getUUID());
268               String provider = identityEntity.getProviderId();
269               String activityMigrated = identityEntity.getProperty(MigrationContext.KEY_MIGRATE_ACTIVITIES);
270               String connectionMigrated = identityEntity.getProperty(MigrationContext.KEY_MIGRATE_CONNECTION);
271               if (!MigrationContext.TRUE_STRING.equalsIgnoreCase(activityMigrated)
272                       || (OrganizationIdentityProvider.NAME.equals(provider) && !MigrationContext.TRUE_STRING.equalsIgnoreCase(connectionMigrated))) {
273                 LOG.warn("Can not remove identity " + name + " due to migration was not successful for activities and connections");
274                 identitiesCleanupFailed.add(name);
275                 continue;
276               }
277 
278               //transactionList.add(name);
279 
280               try {
281                 PropertyIterator pit = node.getReferences();
282                 if (pit != null && pit.getSize() > 0) {
283                   int num = 0;
284                   while (pit.hasNext()) {
285                     num++;
286                     pit.nextProperty().remove();
287                     if (num % REMOVE_LIMIT_THRESHOLD == 0) {
288                       getSession().save();
289                     }
290                   }
291                   getSession().save();
292                 }
293                 node.remove();
294                 getSession().save();
295               } catch (Exception ex) {
296                 LOG.error("Error when cleanup the identity: " + name, ex);
297                 identitiesCleanupFailed.add(name);
298                 // Discard all change if there is any error
299                 getSession().getJCRSession().refresh(false);
300               }
301 
302               LOG.info(String.format("|  / END::cleanup (%s identity) consumed time %s(ms)", node.getName(), System.currentTimeMillis() - timePerIdentity));
303             }
304           }
305 
306         } finally {
307           RequestLifeCycle.end();
308         }
309       }
310 
311     } finally {
312       MigrationContext.setIdentitiesCleanupFailed(identitiesCleanupFailed);
313       if (identitiesCleanupFailed.size() > 0) {
314         LOG.warn("Cleanup failed for " + identitiesCleanupFailed.size() + " identities");
315       }
316       LOG.info(String.format("| / END::cleanup Identities migration for (%s) identity consumed %s(ms)", offset, System.currentTimeMillis() - t));
317     }
318   }
319 
320   private Identity migrateIdentity(Node node, String jcrId) throws Exception {
321     String providerId = node.getProperty("soc:providerId").getString();
322     // The node name is the identity id.
323     // Node name is soc:<name>, only the <name> is relevant
324     // Node name should equals to remoteId on all identities
325     String name = IdentityUtil.getIdentityName(node.getName());
326     String remoteId = node.getProperty("soc:remoteId").getString();
327 
328     if (!name.equals(remoteId)) {
329       LOG.info(String.format("Node name(%s) does not equals to remoteId(%s), need to adjust and make them equally before migrate", name, remoteId));
330       boolean needUpdateRemoteId = true;
331 
332       if (SpaceIdentityProvider.NAME.equals(providerId)) {
333         Space space = spaceStorage.getSpaceByPrettyName(name);
334         if (space == null) {
335           space = spaceStorage.getSpaceByPrettyName(remoteId);
336           // If we can not find the space for this identity, we could ignore to migrate
337           if (space == null) {
338             LOG.warn("The space with prettyName=" + remoteId + " does not exists, this identity will not be migrated.");
339             return null;
340           } else {
341             needUpdateRemoteId = false;
342             name = remoteId;
343           }
344         }
345       }
346 
347       if (needUpdateRemoteId) {
348         // Node name is more accurate in case user identity. Will update the remoteId and make it the same
349         LOG.info("Update remoteId to " + name + " to make it equals to node name");
350         node.setProperty("soc:remoteId", name);
351         node.getSession().save();
352 
353       } else {
354         // RemoteId is more accurate in case space identity and we found a space for this remoteName, we have to update node name to remoteId
355         LOG.info("Update node name to soc:" + name + " to make it equals to remoteId because we found a space with prettyName=" + name);
356         Session session = node.getSession();
357         Node parent = node.getParent();
358 
359         String parentId = parent.getUUID();
360         String path = node.getPath();
361         String parentPath = parent.getPath();
362         session.move(path, parentPath + "/soc:" + name);
363         session.save();
364 
365         node = session.getNodeByUUID(parentId).getNode("soc:" + name);
366       }
367     }
368 
369     Identity identity = identityStorage.findIdentity(providerId, name);
370     if (identity != null) {
371       LOG.info("Identity with providerId = " + identity.getProviderId() + " and remoteId=" + identity.getRemoteId() + " has already been migrated.");
372       return identity;
373     }
374 
375     identity = new Identity(providerId, name);
376     identity.setDeleted(node.getProperty("soc:isDeleted").getBoolean());
377 
378     if (node.isNodeType("soc:isDisabled")) {
379       identity.setEnable(false);
380     }
381 
382     identityStorage.saveIdentity(identity);
383 
384     //
385     String id = identity.getId();
386     identity.setId(jcrId);
387 
388     // Migrate profile
389     //TODO: please check the way to load profile data from JCR
390     Profile profile = new Profile(identity);
391     jcrIdentityStorage.loadProfile(profile);
392     String oldProfileId = profile.getId();
393     profile.setId("0");
394     identity.setId(id);
395 
396     // Process profile
397     ProfileEntity entity = _findById(ProfileEntity.class, oldProfileId);
398     NTFile avatar = entity.getAvatar();
399     if (avatar != null) {
400       Resource resource = avatar.getContentResource();
401       AvatarAttachment attachment = new AvatarAttachment();
402       attachment.setMimeType(resource.getMimeType());
403       attachment.setInputStream(new ByteArrayInputStream(resource.getData()));
404 
405       profile.setProperty(Profile.AVATAR, attachment);
406     }
407 
408 
409     identityStorage.saveProfile(profile);
410 
411     identity.setProfile(profile);
412 
413     return identity;
414   }
415 
416   public Identity migrateIdentity(String oldId) {
417     boolean begun = false;
418     try {
419       RequestLifeCycle.begin(PortalContainer.getInstance());
420       begun = startTx();
421       IdentityEntity jcrEntity = _findById(IdentityEntity.class, oldId);
422 
423       String providerId = jcrEntity.getProviderId();
424       String remoteId = jcrEntity.getRemoteId();
425 
426       Identity identity = identityStorage.findIdentity(providerId, remoteId);
427 
428       if (identity == null) {
429         identity = new Identity(providerId, remoteId);
430         identity.setDeleted(jcrEntity.isDeleted());
431         identity.setEnable(_getMixin(jcrEntity, DisabledEntity.class, false) == null);
432 
433         identityStorage.saveIdentity(identity);
434 
435         //
436         String id = identity.getId();
437         identity.setId(oldId);
438 
439         // Migrate profile
440         Profile profile = new Profile(identity);
441         jcrIdentityStorage.loadProfile(profile);
442         String oldProfileId = profile.getId();
443         profile.setId("0");
444         identity.setId(id);
445 
446         // Process profile
447         ProfileEntity entity = _findById(ProfileEntity.class, oldProfileId);
448         NTFile avatar = entity.getAvatar();
449         if (avatar != null) {
450           Resource resource = avatar.getContentResource();
451           AvatarAttachment attachment = new AvatarAttachment();
452           attachment.setMimeType(resource.getMimeType());
453           attachment.setInputStream(new ByteArrayInputStream(resource.getData()));
454 
455           profile.setProperty(Profile.AVATAR, attachment);
456         }
457 
458 
459         identityStorage.saveProfile(profile);
460 
461         identity.setProfile(profile);
462 
463       }
464 
465       if (identity != null) {
466         String newId = identity.getId();
467         identity.setId(oldId);
468         broadcastListener(identity, newId);
469       }
470 
471       return identity;
472     } catch (NodeNotFoundException ex) {
473       LOG.error("Can not find indentity with oldId: " + oldId, ex);
474       return null;
475     } catch (Exception ex) {
476       LOG.error("Exception while migrate identity with oldId: " + oldId, ex);
477       return null;
478     } finally {
479       try {
480         endTx(begun);
481       } catch (Exception ex) {
482         LOG.error("Error while commit transaction", ex);
483       }
484       RequestLifeCycle.end();
485     }
486   }
487 
488   private NodeIterator getIdentityNodes(long offset, int limit) {
489     if(identityQuery == null) {
490       identityQuery = new StringBuilder().append("SELECT * FROM soc:identitydefinition").toString();
491     }
492     return nodes(identityQuery, offset, limit);
493   }
494 
495   private long getTotalNumberIdentities() {
496     if (this.totalNumberIdentites == 0) {
497       if(identityQuery == null) {
498         identityQuery = new StringBuilder().append("SELECT * FROM soc:identitydefinition").toString();
499       }
500       this.totalNumberIdentites = nodes(identityQuery).getSize();
501     }
502     return this.totalNumberIdentites;
503   }
504 
505 
506   @Override
507   @Managed
508   @ManagedDescription("Manual to stop run migration data of identities from JCR to RDBMS.")
509   public void stop() {
510     super.stop();
511   }
512 
513   @Override
514   protected String getListenerKey() {
515     return EVENT_LISTENER_KEY;
516   }
517 }