UserActivityStreamMigration.java
/*
* Copyright (C) 2003-2013 eXo Platform SAS.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.exoplatform.social.core.updater;
import java.util.concurrent.atomic.AtomicInteger;
import javax.jcr.Node;
import javax.jcr.NodeIterator;
import org.exoplatform.container.PortalContainer;
import org.exoplatform.services.log.ExoLogger;
import org.exoplatform.services.log.Log;
import org.exoplatform.social.common.service.AsyncCallback;
import org.exoplatform.social.common.service.ProcessContext;
import org.exoplatform.social.common.service.SocialServiceContext;
import org.exoplatform.social.common.service.impl.SocialServiceContextImpl;
import org.exoplatform.social.common.service.utils.ConsoleUtils;
import org.exoplatform.social.common.service.utils.ObjectHelper;
import org.exoplatform.social.core.chromattic.entity.IdentityEntity;
import org.exoplatform.social.core.chromattic.entity.ProfileEntity;
import org.exoplatform.social.core.identity.model.Identity;
import org.exoplatform.social.core.identity.provider.OrganizationIdentityProvider;
import org.exoplatform.social.core.storage.api.IdentityStorage;
import org.exoplatform.social.core.storage.impl.AbstractStorage;
import org.exoplatform.social.core.storage.impl.StorageUtils;
import org.exoplatform.social.core.storage.query.JCRProperties;
import org.exoplatform.social.core.storage.streams.SocialChromatticSyncProcessor;
import org.exoplatform.social.core.storage.streams.StreamProcessContext;
public class UserActivityStreamMigration extends AbstractStorage {
private static final Log LOG = ExoLogger.getLogger(UserActivityStreamUpdaterPlugin.class);
private static final int BATCH_FLUSH_LIMIT = 10;
private IdentityStorage identityStorage = null;
private static AtomicInteger currentNumber = new AtomicInteger(0);
private IdentityStorage getIdentityStorage() {
if (this.identityStorage == null) {
this.identityStorage = (IdentityStorage) PortalContainer.getInstance().getComponentInstanceOfType(IdentityStorage.class);
}
return identityStorage;
}
/**
* Upgrade base on range
* @param limit
* @param from
* @param to
* @param prefix
*/
public void upgrade(int limit, int from, int to, String prefix) {
StringBuffer sb = new StringBuffer().append("SELECT * FROM soc:identitydefinition WHERE ");
sb.append(JCRProperties.path.getName()).append(" LIKE '").append(getProviderRoot().getProviders().get(OrganizationIdentityProvider.NAME).getPath() + StorageUtils.SLASH_STR + StorageUtils.PERCENT_STR);
sb.append("' AND NOT ").append(ProfileEntity.deleted.getName()).append(" = ").append("true");
boolean hasGroup = false;
//
if ((to - from)> 0) {
sb.append(" AND (");
hasGroup = true;
}
String name;
boolean hasOR = false;
int i = from;
for(;i <= to; i++) {
if (hasOR) {
sb.append(" OR ");
}
name = prefix + i;
sb.append(IdentityEntity.remoteId.getName()).append(" = '").append(name).append("'");
hasOR = true;
}
if (hasGroup) {
sb.append(") ");
}
LOG.warn("SQL : " + sb.toString());
NodeIterator it = nodes(sb.toString());
long totalOfIdentity = to - from;
Identity owner = null;
Node node = null;
try {
while (it.hasNext()) {
node = (Node) it.next();
owner = getIdentityStorage().findIdentityById(node.getUUID());
doUpgrade(owner, totalOfIdentity, limit);
}
} catch (Exception e) {
LOG.warn("Failed to migration for Activity Stream.");
}
}
public void upgrade(int limit) {
StringBuffer sb = new StringBuffer().append("SELECT * FROM soc:identitydefinition WHERE ");
sb.append(JCRProperties.path.getName()).append(" LIKE '").append(getProviderRoot().getProviders().get(OrganizationIdentityProvider.NAME).getPath() + StorageUtils.SLASH_STR + StorageUtils.PERCENT_STR);
sb.append("' AND NOT ").append(ProfileEntity.deleted.getName()).append(" = ").append("true");
LOG.warn("SQL : " + sb.toString());
NodeIterator it = nodes(sb.toString());
long totalOfIdentity = it.getSize();
Identity owner = null;
Node node = null;
int batchIndex = 0;
int offset = 0;
try {
while (it.hasNext()) {
node = (Node) it.next();
owner = getIdentityStorage().findIdentityById(node.getUUID());
doUpgrade(owner, totalOfIdentity, limit);
batchIndex++;
offset++;
//
if (batchIndex == BATCH_FLUSH_LIMIT) {
LOG.warn("UPGRAGE SESSION FLUSH: " + offset);
StorageUtils.persistJCR(true);
it = nodes(sb.toString());
_skip(it, offset);
batchIndex = 0;
}
}
} catch (Exception e) {
LOG.error("Failed to migration for Activity Stream.", e);
} finally {
StorageUtils.persistJCR(false);
StorageUtils.endRequest();
}
}
private ProcessContext doUpgrade(Identity owner, long total, int limit) {
//
SocialServiceContext ctx = SocialServiceContextImpl.getInstance();
StreamProcessContext processCtx = StreamProcessContext.getIntance(String.format("%s-[%s limit=%s]", StreamProcessContext.UPGRADE_STREAM_PROCESS, owner.getRemoteId(), limit), ctx);
processCtx.identity(owner).limit(limit).totalProcesses((int)total);
try {
upgradeProcessor().start(processCtx);
upgradeProcessor().process(processCtx);
upgradeProcessor().end(processCtx);
createAsyncCallback().done(processCtx);
} catch (Exception e) {
processCtx.setException(e);
} finally {
if (processCtx.isFailed()) {
LOG.warn("Failed to migration for Activity Stream.", processCtx.getException());
} else {
LOG.info(processCtx.getTraceLog());
}
}
return processCtx;
}
private SocialChromatticSyncProcessor upgradeProcessor() {
return new SocialChromatticSyncProcessor(SocialServiceContextImpl.getInstance()) {
@Override
protected ProcessContext execute(ProcessContext processContext) throws Exception {
StreamProcessContext streamCtx = ObjectHelper.cast(StreamProcessContext.class, processContext);
int limit = streamCtx.getLimit();
//
StreamUpgradeProcessor.feed(streamCtx.getIdentity()).upgrade(0, limit);
StreamUpgradeProcessor.connection(streamCtx.getIdentity()).upgrade(0, limit);
StreamUpgradeProcessor.myspaces(streamCtx.getIdentity()).upgrade(0, limit);
StreamUpgradeProcessor.user(streamCtx.getIdentity()).upgrade(0, limit);
return processContext;
}
};
}
private AsyncCallback createAsyncCallback() {
return new AsyncCallback() {
@Override
public void done(ProcessContext processContext) {
int value = currentNumber.incrementAndGet();
int percent = (value*100) / processContext.getTotalProcesses();
ConsoleUtils.logProgBar(percent);
}
};
}
}