StreamUpgradeProcessor.java

/*
 * Copyright (C) 2003-2013 eXo Platform SAS.
 *
 * This program is free software: you can redistribute it and/or modify
 * it under the terms of the GNU Affero General Public License as published by
 * the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU Affero General Public License for more details.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */
package org.exoplatform.social.core.updater;

import java.util.List;

import org.exoplatform.container.PortalContainer;
import org.exoplatform.social.core.activity.model.ExoSocialActivity;
import org.exoplatform.social.core.identity.model.Identity;
import org.exoplatform.social.core.storage.api.ActivityStorage;
import org.exoplatform.social.core.storage.api.ActivityStreamStorage;
import org.exoplatform.social.core.storage.api.IdentityStorage;
import org.exoplatform.social.core.storage.impl.ActivityStreamStorageImpl.ActivityRefType;

public abstract class StreamUpgradeProcessor {
  /**
   * Retrieves the activities list from JCR
   * @param owner
   * @param offset
   * @param limit
   * @return
   */
  abstract List<ExoSocialActivity> load(Identity owner, int offset, int limit);
  
  /**
   * Gets total of activities for given Identity
   * @param owner
   * @return
   */
  abstract int size(Identity owner);
  /**
   * Fetches all of activities and migrate 
   */
  public void upgrade() {
    
    int offset = 0;
    int limit = 100;
    int totalSize = size(this.owner);
    
    limit = Math.min(limit, totalSize);
    int loaded = upgradeRange(offset, limit);
    
    if (limit != totalSize) {
      while (loaded == 100) {
        offset += limit;
        
        //prevent to over totalSize
        if (offset + limit > totalSize) {
          limit = totalSize - offset;
        }
        
        //
        loaded = upgradeRange(offset, limit);
      }
    }

  }
  
  /**
   * Fetches all of activities with offset and limit and migrate
   *  
   * @param offset
   * @param limit
   */
  public void upgrade(int offset, int limit) {
    if (limit == -1) {
      upgrade();
    } else {
      upgradeRange(offset, limit);
      upgradeStreamSize();
    }
    
  }
  
  protected IdentityStorage getIdentityStorage() {
    if (this.identityStorage == null) {
       this.identityStorage = (IdentityStorage) PortalContainer.getInstance().getComponentInstanceOfType(IdentityStorage.class);
    }
    
    return identityStorage;
  }
  
  protected ActivityStorage getActivityStorage() {
    if (this.activityStorage == null) {
       this.activityStorage = (ActivityStorage) PortalContainer.getInstance().getComponentInstanceOfType(ActivityStorage.class);
    }
    
    return activityStorage;
  }
  
  protected ActivityStreamStorage getStreamStorage() {
    if (this.streamStorage == null) {
       this.streamStorage = (ActivityStreamStorage) PortalContainer.getInstance().getComponentInstanceOfType(ActivityStreamStorage.class);
    }
    
    return streamStorage;
  }
  
  
  public static StreamUpgradeProcessor feed(Identity owner) {
    return new StreamUpgradeProcessor(owner, ActivityRefType.FEED) {

      @Override
      List<ExoSocialActivity> load(Identity owner, int offset, int limit) {
        return getActivityStorage().getActivityFeedForUpgrade(owner, offset, limit);
      }

      @Override
      int size(Identity owner) {
        return getActivityStorage().getNumberOfActivitesOnActivityFeedForUpgrade(owner);
      }
      
    };
  }
  
  public static StreamUpgradeProcessor connection(Identity owner) {
    return new StreamUpgradeProcessor(owner, ActivityRefType.CONNECTION) {

      @Override
      List<ExoSocialActivity> load(Identity owner, int offset, int limit) {
        return getActivityStorage().getActivityFeedForUpgrade(owner, offset, limit);
      }
      
      @Override
      int size(Identity owner) {
        return getActivityStorage().getNumberOfActivitiesOfConnectionsForUpgrade(owner);
      }
      
    };
  }
  
  public static StreamUpgradeProcessor user(Identity owner) {
    return new StreamUpgradeProcessor(owner, ActivityRefType.MY_ACTIVITIES) {

      @Override
      List<ExoSocialActivity> load(Identity owner, int offset, int limit) {
        return getActivityStorage().getUserActivitiesForUpgrade(owner, offset, limit);
      }
      
      @Override
      int size(Identity owner) {
        return getActivityStorage().getNumberOfUserActivitiesForUpgrade(owner);
      }
      
    };
  }
  
  public static StreamUpgradeProcessor myspaces(Identity owner) {
    return new StreamUpgradeProcessor(owner, ActivityRefType.MY_SPACES) {

      @Override
      List<ExoSocialActivity> load(Identity owner, int offset, int limit) {
        return getActivityStorage().getUserSpacesActivitiesForUpgrade(owner, offset, limit);
      }
      
      @Override
      int size(Identity owner) {
        return getActivityStorage().getNumberOfUserSpacesActivitiesForUpgrade(owner);
      }
      
    };
  }
  
  public static StreamUpgradeProcessor space(Identity owner) {
    return new StreamUpgradeProcessor(owner, ActivityRefType.SPACE_STREAM) {

      @Override
      List<ExoSocialActivity> load(Identity owner, int offset, int limit) {
        return getActivityStorage().getSpaceActivitiesForUpgrade(owner, offset, limit);
      }
      
      @Override
      int size(Identity owner) {
        return getActivityStorage().getNumberOfSpaceActivitiesForUpgrade(owner);
      }
      
    };
  }
  
  private IdentityStorage identityStorage = null;
  private ActivityStorage activityStorage = null;
  private ActivityStreamStorage streamStorage = null;
  
  private final Identity owner;
  private final ActivityRefType type;
  
  public StreamUpgradeProcessor(Identity owner, ActivityRefType type) {
    this.owner = owner;
    this.type = type;
  }
  
  private int upgradeRange(int  offset, int limit) {
    List<ExoSocialActivity> got = load(this.owner, offset, limit);
    if (got.size() > 0) {
      getStreamStorage().createActivityRef(owner, got, type);
    }
    
    
    //
    return got.size();
  }
  
  private int upgradeStreamSize() {
    int got = size(this.owner);
    if (got > 0) {
      getStreamStorage().migrateStreamSize(owner, got, this.type);
    }
    //
    return got;
  }
}