1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.exoplatform.social.core.jpa.updater;
21
22 import java.io.ByteArrayInputStream;
23 import java.util.ArrayList;
24 import java.util.HashSet;
25 import java.util.List;
26 import java.util.Set;
27
28 import javax.jcr.Node;
29 import javax.jcr.NodeIterator;
30 import javax.jcr.PropertyIterator;
31 import javax.jcr.Session;
32
33 import org.chromattic.ext.ntdef.NTFile;
34 import org.chromattic.ext.ntdef.Resource;
35
36 import org.exoplatform.commons.api.event.EventManager;
37 import org.exoplatform.commons.persistence.impl.EntityManagerService;
38 import org.exoplatform.commons.search.index.IndexingService;
39 import org.exoplatform.commons.utils.CommonsUtils;
40 import org.exoplatform.container.PortalContainer;
41 import org.exoplatform.container.component.RequestLifeCycle;
42 import org.exoplatform.container.xml.InitParams;
43 import org.exoplatform.management.annotations.Managed;
44 import org.exoplatform.management.annotations.ManagedDescription;
45 import org.exoplatform.management.jmx.annotations.NameTemplate;
46 import org.exoplatform.management.jmx.annotations.Property;
47 import org.exoplatform.social.core.identity.provider.SpaceIdentityProvider;
48 import org.exoplatform.social.core.jpa.search.ProfileIndexingServiceConnector;
49 import org.exoplatform.social.core.jpa.storage.RDBMSIdentityStorageImpl;
50 import org.exoplatform.social.core.chromattic.entity.DisabledEntity;
51 import org.exoplatform.social.core.chromattic.entity.IdentityEntity;
52 import org.exoplatform.social.core.chromattic.entity.ProfileEntity;
53 import org.exoplatform.social.core.identity.model.Identity;
54 import org.exoplatform.social.core.identity.model.Profile;
55 import org.exoplatform.social.core.identity.provider.OrganizationIdentityProvider;
56 import org.exoplatform.social.core.jpa.storage.RDBMSSpaceStorageImpl;
57 import org.exoplatform.social.core.jpa.updater.utils.IdentityUtil;
58 import org.exoplatform.social.core.model.AvatarAttachment;
59 import org.exoplatform.social.core.space.model.Space;
60 import org.exoplatform.social.core.storage.api.SpaceStorage;
61 import org.exoplatform.social.core.storage.exception.NodeNotFoundException;
62 import org.exoplatform.social.core.storage.impl.IdentityStorageImpl;
63
64
65
66
67 @Managed
68 @ManagedDescription("Social migration Identities from JCR to RDBMS.")
69 @NameTemplate({@Property(key = "service", value = "social"), @Property(key = "view", value = "migration-identities") })
70 public class IdentityMigrationService extends AbstractMigrationService<Identity> {
71
72 public static final String EVENT_LISTENER_KEY = "SOC_IDENTITY_MIGRATION";
73
74 protected final static String REMOVE_LIMIT_THRESHOLD_KEY = "REMOVE_LIMIT_THRESHOLD";
75
76 private int REMOVE_LIMIT_THRESHOLD = 20;
77
78 private long totalNumberIdentites = 0;
79
80 private final RDBMSIdentityStorageImpl identityStorage;
81 private final IdentityStorageImpl jcrIdentityStorage;
82
83 private SpaceStorage spaceStorage;
84
85 private String identityQuery;
86
87 private Set<String> identitiesMigrateFailed = new HashSet<>();
88 private Set<String> identitiesCleanupFailed = new HashSet<>();
89
90 public IdentityMigrationService(InitParams initParams,
91 RDBMSIdentityStorageImpl identityStorage,
92 IdentityStorageImpl jcrIdentityStorage,
93 RDBMSSpaceStorageImpl spaceStorage,
94 EventManager<Identity, String> eventManager, EntityManagerService entityManagerService) {
95 super(initParams, jcrIdentityStorage, eventManager, entityManagerService);
96 this.LIMIT_THRESHOLD = getInteger(initParams, LIMIT_THRESHOLD_KEY, 200);
97 this.REMOVE_LIMIT_THRESHOLD = getInteger(initParams, REMOVE_LIMIT_THRESHOLD_KEY, 20);
98 this.identityStorage = identityStorage;
99 this.jcrIdentityStorage = jcrIdentityStorage;
100 this.spaceStorage = spaceStorage;
101 }
102
103 @Override
104 protected void beforeMigration() throws Exception {
105 MigrationContext.setIdentityDone(false);
106 identitiesMigrateFailed = new HashSet<>();
107 }
108
109 @Override
110 @Managed
111 @ManagedDescription("Manual to start run migration data of identities from JCR to RDBMS.")
112 public void doMigration() throws Exception {
113 long t = System.currentTimeMillis();
114
115 long totalIdentities = getTotalNumberIdentities();
116
117
118
119 LOG.info("|\\ START::Identity migration ---------------------------------");
120
121 RequestLifeCycle.end();
122
123 long offset = 0;
124 boolean cont = true;
125 boolean begunTx = false;
126 List<String> transactionList = new ArrayList<>();
127
128 long numberSuccessful = 0;
129 long batchSize = 0;
130
131 while(cont && !forceStop) {
132 try {
133
134 try {
135
136 RequestLifeCycle.begin(PortalContainer.getInstance());
137 begunTx = startTx();
138 transactionList = new ArrayList<>();
139 NodeIterator nodeIter = getIdentityNodes(offset, LIMIT_THRESHOLD);
140 batchSize = nodeIter.getSize();
141
142 if (nodeIter == null || batchSize == 0) {
143 cont = false;
144
145 } else {
146
147 while (nodeIter.hasNext() && !forceStop) {
148 offset++;
149 Node identityNode = nodeIter.nextNode();
150 String identityName = identityNode.getName();
151 transactionList.add(identityName);
152
153 LOG.info(String.format("| \\ START::identity number: %s/%s (%s identity)", offset, totalIdentities, identityName));
154 long t1 = System.currentTimeMillis();
155
156 String jcrid = identityNode.getUUID();
157 if((identityNode.hasProperty("soc:providerId") && identityNode.getProperty("soc:providerId").getString().equals(OrganizationIdentityProvider.NAME)) ||
158 !identityNode.hasProperty("soc:isDeleted") || !identityNode.getProperty("soc:isDeleted").getBoolean()) {
159
160
161 try {
162 Identity identity = migrateIdentity(identityNode, jcrid);
163
164 if (identity != null) {
165 String newId = identity.getId();
166 identity.setId(jcrid);
167 broadcastListener(identity, newId);
168 }
169 numberSuccessful++;
170 } catch (Exception ex) {
171 identitiesMigrateFailed.add(identityName);
172 LOG.error("Error while migrate identity " + identityName, ex);
173 }
174 } else {
175 LOG.info("Ignore migration of identity with id {} since it's marked as deleted", identityName);
176 }
177 LOG.info(String.format("| / END::identity number %s (%s identity) consumed %s(ms)", offset, identityNode.getName(), System.currentTimeMillis() - t1));
178 }
179 }
180
181 } finally {
182 try {
183 endTx(begunTx);
184 } catch (Exception ex) {
185
186 identitiesMigrateFailed.addAll(transactionList);
187 numberSuccessful -= batchSize;
188 }
189 RequestLifeCycle.end();
190 }
191 } catch (Exception ex) {
192 LOG.error(ex);
193 }
194 }
195
196 if (identitiesMigrateFailed.size() > 0) {
197 LOG.info(String.format("| / END::Identity migration failed for (%s) identity(s)", identitiesMigrateFailed.size()));
198 }
199 LOG.info(String.format("|// END::Identity migration for (%s) identity(s) consumed %s(ms)", numberSuccessful, System.currentTimeMillis() - t));
200
201 LOG.info("| \\ START::Re-indexing identity(s) ---------------------------------");
202 IndexingService indexingService = CommonsUtils.getService(IndexingService.class);
203 indexingService.reindexAll(ProfileIndexingServiceConnector.TYPE);
204 LOG.info("| / END::Re-indexing identity(s) ---------------------------------");
205
206 RequestLifeCycle.begin(PortalContainer.getInstance());
207 }
208
209 @Override
210 protected void afterMigration() throws Exception {
211 MigrationContext.setIdentitiesMigrateFailed(identitiesMigrateFailed);
212 if (!forceStop && identitiesMigrateFailed.isEmpty()) {
213 MigrationContext.setIdentityDone(true);
214 }
215 }
216
217 @Override
218 public void doRemove() throws Exception {
219 identitiesCleanupFailed = new HashSet<>();
220
221 long totalIdentities = getTotalNumberIdentities();
222
223 LOG.info("| \\ START::cleanup Identities ---------------------------------");
224 long t = System.currentTimeMillis();
225 long timePerIdentity = System.currentTimeMillis();
226 int offset = 0;
227 long failed = 0;
228 List<String> transactionList = new ArrayList<>();
229
230 try {
231 boolean cont = true;
232 while (cont) {
233 try {
234
235 RequestLifeCycle.begin(PortalContainer.getInstance());
236 failed = identitiesCleanupFailed.size();
237 transactionList = new ArrayList<>();
238
239 NodeIterator nodeIter = getIdentityNodes(failed, REMOVE_LIMIT_THRESHOLD);
240 if(nodeIter == null || nodeIter.getSize() == 0) {
241 cont = false;
242
243 } else {
244
245 if (nodeIter.getSize() < REMOVE_LIMIT_THRESHOLD) {
246 cont = false;
247 }
248 while (nodeIter.hasNext()) {
249 offset++;
250
251 if (offset > totalIdentities) {
252 cont = false;
253 }
254
255 Node node = nodeIter.nextNode();
256 timePerIdentity = System.currentTimeMillis();
257 LOG.info(String.format("| \\ START::cleanup Identity number: %s/%s (%s identity)", offset, totalIdentities, node.getName()));
258
259 String name = node.getName();
260 if (!MigrationContext.isForceCleanup() && (MigrationContext.getIdentitiesCleanupConnectionFailed().contains(name)
261 || MigrationContext.getIdentitiesCleanupActivityFailed().contains(name))) {
262 identitiesCleanupFailed.add(name);
263 LOG.warn("Will not remove this identity because the cleanup connection or activities were failed for it");
264 continue;
265 }
266
267 IdentityEntity identityEntity = _findById(IdentityEntity.class, node.getUUID());
268 String provider = identityEntity.getProviderId();
269 String activityMigrated = identityEntity.getProperty(MigrationContext.KEY_MIGRATE_ACTIVITIES);
270 String connectionMigrated = identityEntity.getProperty(MigrationContext.KEY_MIGRATE_CONNECTION);
271 if (!MigrationContext.TRUE_STRING.equalsIgnoreCase(activityMigrated)
272 || (OrganizationIdentityProvider.NAME.equals(provider) && !MigrationContext.TRUE_STRING.equalsIgnoreCase(connectionMigrated))) {
273 LOG.warn("Can not remove identity " + name + " due to migration was not successful for activities and connections");
274 identitiesCleanupFailed.add(name);
275 continue;
276 }
277
278
279
280 try {
281 PropertyIterator pit = node.getReferences();
282 if (pit != null && pit.getSize() > 0) {
283 int num = 0;
284 while (pit.hasNext()) {
285 num++;
286 pit.nextProperty().remove();
287 if (num % REMOVE_LIMIT_THRESHOLD == 0) {
288 getSession().save();
289 }
290 }
291 getSession().save();
292 }
293 node.remove();
294 getSession().save();
295 } catch (Exception ex) {
296 LOG.error("Error when cleanup the identity: " + name, ex);
297 identitiesCleanupFailed.add(name);
298
299 getSession().getJCRSession().refresh(false);
300 }
301
302 LOG.info(String.format("| / END::cleanup (%s identity) consumed time %s(ms)", node.getName(), System.currentTimeMillis() - timePerIdentity));
303 }
304 }
305
306 } finally {
307 RequestLifeCycle.end();
308 }
309 }
310
311 } finally {
312 MigrationContext.setIdentitiesCleanupFailed(identitiesCleanupFailed);
313 if (identitiesCleanupFailed.size() > 0) {
314 LOG.warn("Cleanup failed for " + identitiesCleanupFailed.size() + " identities");
315 }
316 LOG.info(String.format("| / END::cleanup Identities migration for (%s) identity consumed %s(ms)", offset, System.currentTimeMillis() - t));
317 }
318 }
319
320 private Identity migrateIdentity(Node node, String jcrId) throws Exception {
321 String providerId = node.getProperty("soc:providerId").getString();
322
323
324
325 String name = IdentityUtil.getIdentityName(node.getName());
326 String remoteId = node.getProperty("soc:remoteId").getString();
327
328 if (!name.equals(remoteId)) {
329 LOG.info(String.format("Node name(%s) does not equals to remoteId(%s), need to adjust and make them equally before migrate", name, remoteId));
330 boolean needUpdateRemoteId = true;
331
332 if (SpaceIdentityProvider.NAME.equals(providerId)) {
333 Space space = spaceStorage.getSpaceByPrettyName(name);
334 if (space == null) {
335 space = spaceStorage.getSpaceByPrettyName(remoteId);
336
337 if (space == null) {
338 LOG.warn("The space with prettyName=" + remoteId + " does not exists, this identity will not be migrated.");
339 return null;
340 } else {
341 needUpdateRemoteId = false;
342 name = remoteId;
343 }
344 }
345 }
346
347 if (needUpdateRemoteId) {
348
349 LOG.info("Update remoteId to " + name + " to make it equals to node name");
350 node.setProperty("soc:remoteId", name);
351 node.getSession().save();
352
353 } else {
354
355 LOG.info("Update node name to soc:" + name + " to make it equals to remoteId because we found a space with prettyName=" + name);
356 Session session = node.getSession();
357 Node parent = node.getParent();
358
359 String parentId = parent.getUUID();
360 String path = node.getPath();
361 String parentPath = parent.getPath();
362 session.move(path, parentPath + "/soc:" + name);
363 session.save();
364
365 node = session.getNodeByUUID(parentId).getNode("soc:" + name);
366 }
367 }
368
369 Identity identity = identityStorage.findIdentity(providerId, name);
370 if (identity != null) {
371 LOG.info("Identity with providerId = " + identity.getProviderId() + " and remoteId=" + identity.getRemoteId() + " has already been migrated.");
372 return identity;
373 }
374
375 identity = new Identity(providerId, name);
376 identity.setDeleted(node.getProperty("soc:isDeleted").getBoolean());
377
378 if (node.isNodeType("soc:isDisabled")) {
379 identity.setEnable(false);
380 }
381
382 identityStorage.saveIdentity(identity);
383
384
385 String id = identity.getId();
386 identity.setId(jcrId);
387
388
389
390 Profile profile = new Profile(identity);
391 jcrIdentityStorage.loadProfile(profile);
392 String oldProfileId = profile.getId();
393 profile.setId("0");
394 identity.setId(id);
395
396
397 ProfileEntity entity = _findById(ProfileEntity.class, oldProfileId);
398 NTFile avatar = entity.getAvatar();
399 if (avatar != null) {
400 Resource resource = avatar.getContentResource();
401 AvatarAttachment attachment = new AvatarAttachment();
402 attachment.setMimeType(resource.getMimeType());
403 attachment.setInputStream(new ByteArrayInputStream(resource.getData()));
404
405 profile.setProperty(Profile.AVATAR, attachment);
406 }
407
408
409 identityStorage.saveProfile(profile);
410
411 identity.setProfile(profile);
412
413 return identity;
414 }
415
416 public Identity migrateIdentity(String oldId) {
417 boolean begun = false;
418 try {
419 RequestLifeCycle.begin(PortalContainer.getInstance());
420 begun = startTx();
421 IdentityEntity jcrEntity = _findById(IdentityEntity.class, oldId);
422
423 String providerId = jcrEntity.getProviderId();
424 String remoteId = jcrEntity.getRemoteId();
425
426 Identity identity = identityStorage.findIdentity(providerId, remoteId);
427
428 if (identity == null) {
429 identity = new Identity(providerId, remoteId);
430 identity.setDeleted(jcrEntity.isDeleted());
431 identity.setEnable(_getMixin(jcrEntity, DisabledEntity.class, false) == null);
432
433 identityStorage.saveIdentity(identity);
434
435
436 String id = identity.getId();
437 identity.setId(oldId);
438
439
440 Profile profile = new Profile(identity);
441 jcrIdentityStorage.loadProfile(profile);
442 String oldProfileId = profile.getId();
443 profile.setId("0");
444 identity.setId(id);
445
446
447 ProfileEntity entity = _findById(ProfileEntity.class, oldProfileId);
448 NTFile avatar = entity.getAvatar();
449 if (avatar != null) {
450 Resource resource = avatar.getContentResource();
451 AvatarAttachment attachment = new AvatarAttachment();
452 attachment.setMimeType(resource.getMimeType());
453 attachment.setInputStream(new ByteArrayInputStream(resource.getData()));
454
455 profile.setProperty(Profile.AVATAR, attachment);
456 }
457
458
459 identityStorage.saveProfile(profile);
460
461 identity.setProfile(profile);
462
463 }
464
465 if (identity != null) {
466 String newId = identity.getId();
467 identity.setId(oldId);
468 broadcastListener(identity, newId);
469 }
470
471 return identity;
472 } catch (NodeNotFoundException ex) {
473 LOG.error("Can not find indentity with oldId: " + oldId, ex);
474 return null;
475 } catch (Exception ex) {
476 LOG.error("Exception while migrate identity with oldId: " + oldId, ex);
477 return null;
478 } finally {
479 try {
480 endTx(begun);
481 } catch (Exception ex) {
482 LOG.error("Error while commit transaction", ex);
483 }
484 RequestLifeCycle.end();
485 }
486 }
487
488 private NodeIterator getIdentityNodes(long offset, int limit) {
489 if(identityQuery == null) {
490 identityQuery = new StringBuilder().append("SELECT * FROM soc:identitydefinition").toString();
491 }
492 return nodes(identityQuery, offset, limit);
493 }
494
495 private long getTotalNumberIdentities() {
496 if (this.totalNumberIdentites == 0) {
497 if(identityQuery == null) {
498 identityQuery = new StringBuilder().append("SELECT * FROM soc:identitydefinition").toString();
499 }
500 this.totalNumberIdentites = nodes(identityQuery).getSize();
501 }
502 return this.totalNumberIdentites;
503 }
504
505
506 @Override
507 @Managed
508 @ManagedDescription("Manual to stop run migration data of identities from JCR to RDBMS.")
509 public void stop() {
510 super.stop();
511 }
512
513 @Override
514 protected String getListenerKey() {
515 return EVENT_LISTENER_KEY;
516 }
517 }