SpaceActivityStreamMigration.java

/*
 * Copyright (C) 2003-2013 eXo Platform SAS.
 *
 * This program is free software: you can redistribute it and/or modify
 * it under the terms of the GNU Affero General Public License as published by
 * the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU Affero General Public License for more details.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */
package org.exoplatform.social.core.updater;

import java.util.concurrent.atomic.AtomicInteger;

import javax.jcr.Node;
import javax.jcr.NodeIterator;

import org.exoplatform.container.PortalContainer;
import org.exoplatform.container.component.RequestLifeCycle;
import org.exoplatform.services.log.ExoLogger;
import org.exoplatform.services.log.Log;
import org.exoplatform.social.common.service.AsyncCallback;
import org.exoplatform.social.common.service.ProcessContext;
import org.exoplatform.social.common.service.SocialServiceContext;
import org.exoplatform.social.common.service.impl.SocialServiceContextImpl;
import org.exoplatform.social.common.service.utils.ConsoleUtils;
import org.exoplatform.social.common.service.utils.ObjectHelper;
import org.exoplatform.social.core.chromattic.entity.ProfileEntity;
import org.exoplatform.social.core.chromattic.entity.ProviderEntity;
import org.exoplatform.social.core.identity.model.Identity;
import org.exoplatform.social.core.identity.provider.SpaceIdentityProvider;
import org.exoplatform.social.core.storage.api.IdentityStorage;
import org.exoplatform.social.core.storage.impl.AbstractStorage;
import org.exoplatform.social.core.storage.impl.StorageUtils;
import org.exoplatform.social.core.storage.query.JCRProperties;
import org.exoplatform.social.core.storage.streams.SocialChromatticAsyncProcessor;
import org.exoplatform.social.core.storage.streams.StreamProcessContext;

public class SpaceActivityStreamMigration extends AbstractStorage {
  
 private static final Log LOG = ExoLogger.getLogger(UserActivityStreamUpdaterPlugin.class);
  
 private IdentityStorage identityStorage = null;
 
 private static AtomicInteger currentNumber = new AtomicInteger(0);
 
 private static final int BATCH_FLUSH_LIMIT = 40;

  private IdentityStorage getIdentityStorage() {
    if (this.identityStorage == null) {
       this.identityStorage = (IdentityStorage) PortalContainer.getInstance().getComponentInstanceOfType(IdentityStorage.class);
    }
    
    return identityStorage;
  }
  
  public void upgrade(int limit) {
    StringBuffer sb = new StringBuffer().append("SELECT * FROM soc:identitydefinition WHERE ");
    ProviderEntity provider = getProviderRoot().getProviders().get(SpaceIdentityProvider.NAME);
    //nothing needs to migrate.
    if (provider == null) {
      return;
    }
    
    sb.append(JCRProperties.path.getName()).append(" LIKE '").append(provider.getPath() + StorageUtils.SLASH_STR + StorageUtils.PERCENT_STR);
    sb.append("' AND NOT ").append(ProfileEntity.deleted.getName()).append(" = ").append("true");
    
    LOG.warn("SQL : " + sb.toString());
    NodeIterator it = nodes(sb.toString());
    long totalOfIdentity = it.getSize();
    Identity owner = null; 
    Node node = null;
    int batchIndex = 0;
    int offset = 0;
    try {
      while (it.hasNext()) {
        node = (Node) it.next();
        owner = getIdentityStorage().findIdentityById(node.getUUID());
        doUpgrade(owner, totalOfIdentity, limit);
        batchIndex++;
        offset++;
        
        //
        if (batchIndex == BATCH_FLUSH_LIMIT) {
          LOG.warn("UPGRAGE SESSION FLUSH: " + offset);
          StorageUtils.persistJCR(true);
          it = nodes(sb.toString());
          _skip(it, offset);
          batchIndex = 0;
        }
      }
    } catch (Exception e) {
      LOG.warn("Failed to migration for Space Activity Stream.");
    } finally {
      StorageUtils.persistJCR(false);
      StorageUtils.endRequest();
    }
  }
  
  private ProcessContext doUpgrade(Identity owner, long total, int limit) {
    //
    SocialServiceContext ctx = SocialServiceContextImpl.getInstance();
    StreamProcessContext processCtx = StreamProcessContext.getIntance(String.format("%s-[%s limit=%s]", StreamProcessContext.UPGRADE_STREAM_PROCESS, owner.getRemoteId(), limit), ctx);
    processCtx.identity(owner).limit(limit).totalProcesses((int)total);
    
    try {
      //ctx.getServiceExecutor().async(upgradeProcessor(), processCtx, createAsyncCallback());
      processCtx.getTraceElement().start();
      upgradeProcessor().start(processCtx);
      upgradeProcessor().process(processCtx);
      upgradeProcessor().end(processCtx);
      processCtx.getTraceElement().end();
      createAsyncCallback().done(processCtx);
    } catch (Exception e) {
      processCtx.setException(e);
    } finally {
      if (processCtx.isFailed()) {
        LOG.warn("Failed to migration for Space Activity Stream.", processCtx.getException());
      } else {
        LOG.info(processCtx.getTraceLog());
      }
    }
    
    return processCtx;
  }
  
  private SocialChromatticAsyncProcessor upgradeProcessor() {
    return new SocialChromatticAsyncProcessor(SocialServiceContextImpl.getInstance()) {

      @Override
      protected ProcessContext execute(ProcessContext processContext) throws Exception {
        StreamProcessContext streamCtx = ObjectHelper.cast(StreamProcessContext.class, processContext);
        int limit = streamCtx.getLimit();
        
        //
        StreamUpgradeProcessor.space(streamCtx.getIdentity()).upgrade(0, limit);
        return processContext;
      }

    };
  }
  
  private AsyncCallback createAsyncCallback() {
    return new AsyncCallback() {
      @Override
      public void done(ProcessContext processContext) {
        int value = currentNumber.incrementAndGet();
        int percent = (value*100) / processContext.getTotalProcesses();
        ConsoleUtils.logProgBar(percent);
      }
    };
  }
  
}