QueueIndexingService.java

package org.exoplatform.commons.search.index.impl;

import org.apache.commons.lang.StringUtils;
import org.exoplatform.commons.search.dao.IndexingOperationDAO;
import org.exoplatform.commons.search.domain.IndexingOperation;
import org.exoplatform.commons.search.domain.OperationType;
import org.exoplatform.commons.search.index.IndexingService;
import org.exoplatform.services.log.ExoLogger;
import org.exoplatform.services.log.Log;

import java.util.List;

/**
 * Created by The eXo Platform SAS
 * Author : Thibault Clement
 * tclement@exoplatform.com
 * 7/22/15
 */
public class QueueIndexingService implements IndexingService {

  private static final Log LOG = ExoLogger.getExoLogger(QueueIndexingService.class);

  private final IndexingOperationDAO indexingOperationDAO;

  public QueueIndexingService(IndexingOperationDAO indexingOperationDAO) {
    this.indexingOperationDAO = indexingOperationDAO;
  }

  @Override
  public void init(String connectorName) {
    addToIndexingQueue(connectorName, null, OperationType.INIT);
  }

  @Override
  public void index(String connectorName, String id) {
    if (StringUtils.isBlank(id)) {
      throw new IllegalArgumentException("Id is null");
    }
    addToIndexingQueue(connectorName, id, OperationType.CREATE);
  }

  @Override
  public void reindex(String connectorName, String id) {
    if (StringUtils.isBlank(id)) {
      throw new IllegalArgumentException("Id is null");
    }
    addToIndexingQueue(connectorName, id, OperationType.UPDATE);
  }

  @Override
  public void unindex(String connectorName, String id) {
    if (StringUtils.isBlank(id)) {
      throw new IllegalArgumentException("Id is null");
    }
    addToIndexingQueue(connectorName, id, OperationType.DELETE);
  }

  @Override
  public void reindexAll(String connectorName) {
    addToIndexingQueue(connectorName, null, OperationType.REINDEX_ALL);
  }

  @Override
  public void unindexAll(String connectorName) {
    addToIndexingQueue(connectorName, null, OperationType.DELETE_ALL);
  }

  @Override
  public void clearQueue() {
    indexingOperationDAO.deleteAll();
  }

  @Override
  public void clearQueue(String entityType) {
    indexingOperationDAO.deleteAllByEntityType(entityType);
  }

  /**
   * Add a new operation to the create queue
   * @param connectorName Name of the connector
   * @param entityId id of the document
   * @param operation operation to the create {create, update, delete, init}
   * @LevelAPI Experimental
   */
  private void addToIndexingQueue(String connectorName, String entityId, OperationType operation) {
    if (operation==null) {
      throw new IllegalArgumentException("Operation cannot be null");
    }
    /*
    if (!getConnectors().containsKey(connectorName)) {
      throw new IllegalStateException("Connector ["+connectorName+"] has not been registered.");
    }*/
    switch (operation) {
      //A new type of document need to be initialise
      case INIT: indexingOperationDAO.create(getIndexingOperation(connectorName, OperationType.INIT, entityId));
        break;
      //A new entity need to be indexed
      case CREATE: indexingOperationDAO.create(getIndexingOperation(connectorName, OperationType.CREATE, entityId));
        break;
      //An existing entity need to be updated in the create
      case UPDATE: indexingOperationDAO.create(getIndexingOperation(connectorName, OperationType.UPDATE, entityId));
        break;
      //An existing entity need to be deleted from the create
      case DELETE: indexingOperationDAO.create(getIndexingOperation(connectorName, OperationType.DELETE, entityId));
        break;
      //All entities of a specific type need to be deleted
      case DELETE_ALL: indexingOperationDAO.create(getIndexingOperation(connectorName, OperationType.DELETE_ALL, entityId));
        break;
      //All entities of a specific type need to be reindexed
      case REINDEX_ALL: indexingOperationDAO.create(getIndexingOperation(connectorName, OperationType.REINDEX_ALL, entityId));
        break;
      default:
        throw new IllegalArgumentException(operation+" is not an accepted operation for the Indexing Queue");
    }
  }

  private IndexingOperation getIndexingOperation (String connector, OperationType operation, String entityId) {
    IndexingOperation indexingOperation = new IndexingOperation();
    indexingOperation.setEntityType(connector);
    indexingOperation.setOperation(operation);
    if (entityId != null) indexingOperation.setEntityId(entityId);
    return indexingOperation;
  }

  /**
   * Clear the indexQueue
   * @LevelAPI Experimental
   */
  public void clearIndexingQueue() {
    indexingOperationDAO.deleteAll();
  }

  /**
   * get the number of operations in indexQueue
   * @LevelAPI Experimental
   */
  public Long getNumberOperations() {
    return indexingOperationDAO.count();
  }

  public List<IndexingOperation> getOperations(int offset, int limit) {
    return indexingOperationDAO.findAll(offset, limit);
  }

  public IndexingOperation getOperation(String operationId) {
    return indexingOperationDAO.find(Long.getLong(operationId));
  }

  public void deleteAllOperations() {
    indexingOperationDAO.deleteAll();
  }

  public void deleteOperation(IndexingOperation indexingOperation) {
    indexingOperationDAO.delete(indexingOperation);
  }
}