View Javadoc
1   package org.exoplatform.commons.search.index.impl;
2   
3   import org.apache.commons.lang.StringUtils;
4   import org.exoplatform.commons.search.dao.IndexingOperationDAO;
5   import org.exoplatform.commons.search.domain.IndexingOperation;
6   import org.exoplatform.commons.search.domain.OperationType;
7   import org.exoplatform.commons.search.index.IndexingService;
8   import org.exoplatform.services.log.ExoLogger;
9   import org.exoplatform.services.log.Log;
10  
11  import java.util.List;
12  
13  /**
14   * Created by The eXo Platform SAS
15   * Author : Thibault Clement
16   * tclement@exoplatform.com
17   * 7/22/15
18   */
19  public class QueueIndexingService implements IndexingService {
20  
21    private static final Log LOG = ExoLogger.getExoLogger(QueueIndexingService.class);
22  
23    private final IndexingOperationDAO indexingOperationDAO;
24  
25    public QueueIndexingService(IndexingOperationDAO indexingOperationDAO) {
26      this.indexingOperationDAO = indexingOperationDAO;
27    }
28  
29    @Override
30    public void init(String connectorName) {
31      addToIndexingQueue(connectorName, null, OperationType.INIT);
32    }
33  
34    @Override
35    public void index(String connectorName, String id) {
36      if (StringUtils.isBlank(id)) {
37        throw new IllegalArgumentException("Id is null");
38      }
39      addToIndexingQueue(connectorName, id, OperationType.CREATE);
40    }
41  
42    @Override
43    public void reindex(String connectorName, String id) {
44      if (StringUtils.isBlank(id)) {
45        throw new IllegalArgumentException("Id is null");
46      }
47      addToIndexingQueue(connectorName, id, OperationType.UPDATE);
48    }
49  
50    @Override
51    public void unindex(String connectorName, String id) {
52      if (StringUtils.isBlank(id)) {
53        throw new IllegalArgumentException("Id is null");
54      }
55      addToIndexingQueue(connectorName, id, OperationType.DELETE);
56    }
57  
58    @Override
59    public void reindexAll(String connectorName) {
60      addToIndexingQueue(connectorName, null, OperationType.REINDEX_ALL);
61    }
62  
63    @Override
64    public void unindexAll(String connectorName) {
65      addToIndexingQueue(connectorName, null, OperationType.DELETE_ALL);
66    }
67  
68    @Override
69    public void clearQueue() {
70      indexingOperationDAO.deleteAll();
71    }
72  
73    @Override
74    public void clearQueue(String entityType) {
75      indexingOperationDAO.deleteAllByEntityType(entityType);
76    }
77  
78    /**
79     * Add a new operation to the create queue
80     * @param connectorName Name of the connector
81     * @param entityId id of the document
82     * @param operation operation to the create {create, update, delete, init}
83     * @LevelAPI Experimental
84     */
85    private void addToIndexingQueue(String connectorName, String entityId, OperationType operation) {
86      if (operation==null) {
87        throw new IllegalArgumentException("Operation cannot be null");
88      }
89      /*
90      if (!getConnectors().containsKey(connectorName)) {
91        throw new IllegalStateException("Connector ["+connectorName+"] has not been registered.");
92      }*/
93      switch (operation) {
94        //A new type of document need to be initialise
95        case INIT: indexingOperationDAO.create(getIndexingOperation(connectorName, OperationType.INIT, entityId));
96          break;
97        //A new entity need to be indexed
98        case CREATE: indexingOperationDAO.create(getIndexingOperation(connectorName, OperationType.CREATE, entityId));
99          break;
100       //An existing entity need to be updated in the create
101       case UPDATE: indexingOperationDAO.create(getIndexingOperation(connectorName, OperationType.UPDATE, entityId));
102         break;
103       //An existing entity need to be deleted from the create
104       case DELETE: indexingOperationDAO.create(getIndexingOperation(connectorName, OperationType.DELETE, entityId));
105         break;
106       //All entities of a specific type need to be deleted
107       case DELETE_ALL: indexingOperationDAO.create(getIndexingOperation(connectorName, OperationType.DELETE_ALL, entityId));
108         break;
109       //All entities of a specific type need to be reindexed
110       case REINDEX_ALL: indexingOperationDAO.create(getIndexingOperation(connectorName, OperationType.REINDEX_ALL, entityId));
111         break;
112       default:
113         throw new IllegalArgumentException(operation+" is not an accepted operation for the Indexing Queue");
114     }
115   }
116 
117   private IndexingOperation getIndexingOperation (String connector, OperationType operation, String entityId) {
118     IndexingOperation indexingOperation = new IndexingOperation();
119     indexingOperation.setEntityType(connector);
120     indexingOperation.setOperation(operation);
121     if (entityId != null) indexingOperation.setEntityId(entityId);
122     return indexingOperation;
123   }
124 
125   /**
126    * Clear the indexQueue
127    * @LevelAPI Experimental
128    */
129   public void clearIndexingQueue() {
130     indexingOperationDAO.deleteAll();
131   }
132 
133   /**
134    * get the number of operations in indexQueue
135    * @LevelAPI Experimental
136    */
137   public Long getNumberOperations() {
138     return indexingOperationDAO.count();
139   }
140 
141   public List<IndexingOperation> getOperations(int offset, int limit) {
142     return indexingOperationDAO.findAll(offset, limit);
143   }
144 
145   public IndexingOperation getOperation(String operationId) {
146     return indexingOperationDAO.find(Long.getLong(operationId));
147   }
148 
149   public void deleteAllOperations() {
150     indexingOperationDAO.deleteAll();
151   }
152 
153   public void deleteOperation(IndexingOperation indexingOperation) {
154     indexingOperationDAO.delete(indexingOperation);
155   }
156 }