ElasticIndexingOperationProcessor.java
/*
* Copyright (C) 2003-2015 eXo Platform SAS.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with this program. If not, see http://www.gnu.org/licenses/ .
*/
package org.exoplatform.commons.search.index.impl;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.commons.lang.StringUtils;
import org.picocontainer.Startable;
import org.exoplatform.commons.api.persistence.DataInitializer;
import org.exoplatform.commons.api.persistence.ExoTransactional;
import org.exoplatform.commons.persistence.impl.EntityManagerService;
import org.exoplatform.commons.search.dao.IndexingOperationDAO;
import org.exoplatform.commons.search.domain.IndexingOperation;
import org.exoplatform.commons.search.domain.OperationType;
import org.exoplatform.commons.search.es.client.ElasticContentRequestBuilder;
import org.exoplatform.commons.search.es.client.ElasticIndexingAuditTrail;
import org.exoplatform.commons.search.es.client.ElasticIndexingClient;
import org.exoplatform.commons.search.index.IndexingOperationProcessor;
import org.exoplatform.commons.search.index.IndexingServiceConnector;
import org.exoplatform.commons.utils.PropertyManager;
import org.exoplatform.container.ExoContainer;
import org.exoplatform.container.ExoContainerContext;
import org.exoplatform.container.xml.InitParams;
import org.exoplatform.services.log.ExoLogger;
import org.exoplatform.services.log.Log;
/**
* Created by The eXo Platform SAS Author : Thibault Clement
* tclement@exoplatform.com 10/12/15
*/
public class ElasticIndexingOperationProcessor extends IndexingOperationProcessor implements Startable {
private static final Log LOG = ExoLogger.getExoLogger(ElasticIndexingOperationProcessor.class);
private static final String BATCH_NUMBER_PROPERTY_NAME = "exo.es.indexing.batch.number";
private static final Integer BATCH_NUMBER_DEFAULT = 1000;
private static final String REQUEST_SIZE_LIMIT_PROPERTY_NAME = "exo.es.indexing.request.size.limit";
/** in bytes, default=10MB **/
private static final Integer REQUEST_SIZE_LIMIT_DEFAULT = 10485760;
private static final String REINDEXING_BATCH_SIZE_PROPERTY_NAME = "exo.es.reindex.batch.size";
private static final int REINDEXING_BATCH_SIZE_DEFAULT_VALUE = 100;
// Service
private final IndexingOperationDAO indexingOperationDAO;
private final ElasticIndexingClient elasticIndexingClient;
private final ElasticContentRequestBuilder elasticContentRequestBuilder;
private final ElasticIndexingAuditTrail auditTrail;
private final EntityManagerService entityManagerService;
private Integer batchNumber = BATCH_NUMBER_DEFAULT;
private Integer requestSizeLimit = REQUEST_SIZE_LIMIT_DEFAULT;
private int reindexBatchSize = REINDEXING_BATCH_SIZE_DEFAULT_VALUE;
private Map<String, Set<String>> typesOrIndexUpgrading = new HashMap<>();
private ExecutorService executors = Executors.newCachedThreadPool();
private String esVersion;
private boolean interrupted = false;
public ElasticIndexingOperationProcessor(IndexingOperationDAO indexingOperationDAO,
ElasticIndexingClient elasticIndexingClient,
ElasticContentRequestBuilder elasticContentRequestBuilder,
ElasticIndexingAuditTrail auditTrail,
EntityManagerService entityManagerService,
DataInitializer dataInitializer,
InitParams initParams) {
this.indexingOperationDAO = indexingOperationDAO;
this.auditTrail = auditTrail;
this.entityManagerService = entityManagerService;
this.elasticIndexingClient = elasticIndexingClient;
this.elasticContentRequestBuilder = elasticContentRequestBuilder;
if (StringUtils.isNotBlank(PropertyManager.getProperty(BATCH_NUMBER_PROPERTY_NAME))) {
this.batchNumber = Integer.valueOf(PropertyManager.getProperty(BATCH_NUMBER_PROPERTY_NAME));
}
if (StringUtils.isNotBlank(PropertyManager.getProperty(REQUEST_SIZE_LIMIT_PROPERTY_NAME))) {
this.requestSizeLimit = Integer.valueOf(PropertyManager.getProperty(REQUEST_SIZE_LIMIT_PROPERTY_NAME));
}
if (StringUtils.isNotBlank(PropertyManager.getProperty(REINDEXING_BATCH_SIZE_PROPERTY_NAME))) {
this.reindexBatchSize = Integer.valueOf(PropertyManager.getProperty(REINDEXING_BATCH_SIZE_PROPERTY_NAME));
}
if (initParams == null || !initParams.containsKey("es.version")) {
throw new IllegalStateException("es.version parameter is mandatory");
}
this.esVersion = initParams.getValueParam("es.version").getValue();
LOG.info("Use ES Version {}", esVersion);
}
@Override
public void addConnector(IndexingServiceConnector indexingServiceConnector) {
addConnector(indexingServiceConnector, false);
}
@Override
public void addConnector(IndexingServiceConnector indexingServiceConnector, Boolean override) {
if (getConnectors().containsKey(indexingServiceConnector.getType()) && override.equals(false)) {
LOG.error("Impossible to add connector {}. A connector with the same name has already been registered.",
indexingServiceConnector.getType());
} else {
getConnectors().put(indexingServiceConnector.getType(), indexingServiceConnector);
LOG.info("An Indexing Connector has been added: {}", indexingServiceConnector.getType());
}
}
/**
* Handle the Indexing queue Get all data in the indexing queue, transform
* them to ES requests, send requests to ES This method is ONLY called by the
* job scheduler. This method is not annotated with @ExoTransactional because
* we don't want it to be executed in one transaction.
*
* A request lifecycle is started and ended for all jobs, it is done by
* org.exoplatform.services.scheduler.impl.JobEnvironmentConfigListener. It
* means that we have 1 entity manager per job execution. Because of that, we
* have to take care of cleaning the persistence context regularly to avoid
* to have too big sessions and bad performances.
*
* This method is synchronized to make sure the queue is processed by only one
* thread at a time, since the indexing queue does not support multi-thread
* processing for the moment.
*/
@Override
public synchronized void process() {
this.interrupted = false;
try {
// Loop until the number of data retrieved from indexing queue is less than
// BATCH_NUMBER (default = 1000)
int processedOperations;
do {
processedOperations = processBulk();
} while (processedOperations >= batchNumber);
} finally {
if (this.interrupted) {
LOG.info("Indexing queue processing interruption done");
}
}
}
/**
* Set the indexing process as interrupted in order to terminate it as soon
* as possible without finishing the whole process.
* Since the indexing process can take time (for a reindexAll operation for example), it
* allows to interrupt it gracefully (without killing the thread).
*/
@Override
public void interrupt() {
LOG.info("Indexing queue processing has been interrupted. Please wait until the service exists cleanly...");
this.interrupted = true;
}
private boolean isInterrupted() {
if(Thread.currentThread().isInterrupted()) {
LOG.info("Thread running indexing queue processing has been interrupted. Please wait until the service exists cleanly...");
this.interrupted = true;
}
return this.interrupted;
}
private boolean isUpgradeInProgress() {
return !typesOrIndexUpgrading.isEmpty();
}
private int processBulk() {
// Choose operation to delete from Queue one by one instead
if (isUpgradeInProgress()) {
LOG.info("Migration of indexes is in progress, indexation is suspended until migration finishes");
return 0;
}
// Map<OperationType={Create,Delete,...}, Map<String=EntityType,
// List<IndexingOperation>>> indexingQueueSorted
Map<OperationType, Map<String, List<IndexingOperation>>> indexingQueueSorted = new HashMap<>();
List<IndexingOperation> indexingOperations;
long maxIndexingOperationId = 0;
// Get BATCH_NUMBER (default = 1000) first indexing operations
indexingOperations = indexingOperationDAO.findAllFirst(batchNumber);
// Get all Indexing operations and order them per operation and type in map:
// <Operation, <Type, List<IndexingOperation>>>
for (IndexingOperation indexingOperation : indexingOperations) {
putIndexingOperationInMemoryQueue(indexingOperation, indexingQueueSorted);
// Get the max ID of IndexingOperation of the bulk
if (maxIndexingOperationId < indexingOperation.getId()) {
maxIndexingOperationId = indexingOperation.getId();
}
}
processInit(indexingQueueSorted);
processDeleteAll(indexingQueueSorted);
processReindexAll(indexingQueueSorted);
processCUD(indexingQueueSorted);
if(isInterrupted()) {
throw new RuntimeException("Indexing queue processing interrupted");
}
// Removes the processed IDs from the “indexing queue” table that have
// timestamp older than the timestamp of
// start of processing
indexingOperationDAO.deleteAllIndexingOperationsHavingIdLessThanOrEqual(maxIndexingOperationId);
// clear entity manager content after each bulk
entityManagerService.getEntityManager().clear();
return indexingOperations.size();
}
/**
* Add an indexing operation to the Temporary inMemory IndexingQueue
*
* @param indexingOperation the operation to add to the Temporary inMemory
* IndexingQueue
* @param indexingQueueSorted Temporary inMemory IndexingQueue
*/
private void putIndexingOperationInMemoryQueue(IndexingOperation indexingOperation,
Map<OperationType, Map<String, List<IndexingOperation>>> indexingQueueSorted) {
// Check if the Indexing Operation map already contains a specific operation
if (!indexingQueueSorted.containsKey(indexingOperation.getOperation())) {
// If not add a new operation in the map
indexingQueueSorted.put(indexingOperation.getOperation(), new HashMap<String, List<IndexingOperation>>());
}
// Check if the operation map already contains a specific type
if (!indexingQueueSorted.get(indexingOperation.getOperation()).containsKey(indexingOperation.getEntityType())) {
// If not add a new type for the operation above
indexingQueueSorted.get(indexingOperation.getOperation()).put(indexingOperation.getEntityType(),
new ArrayList<IndexingOperation>());
}
// Add the indexing operation in the specific Operation -> Type
indexingQueueSorted.get(indexingOperation.getOperation()).get(indexingOperation.getEntityType()).add(indexingOperation);
}
/**
* Process all the Create / Update / Delete operations
*
* @param indexingQueueSorted Temporary inMemory IndexingQueue
*/
private void processCUD(Map<OperationType, Map<String, List<IndexingOperation>>> indexingQueueSorted) {
// Initialize bulk request for CUD operations
String bulkRequest = "";
// Process Delete document operation
if (indexingQueueSorted.containsKey(OperationType.DELETE)) {
Map<String, List<IndexingOperation>> deleteIndexingOperationsMap = indexingQueueSorted.get(OperationType.DELETE);
for (String entityType : deleteIndexingOperationsMap.keySet()) {
ElasticIndexingServiceConnector connector = (ElasticIndexingServiceConnector) getConnectors().get(entityType);
List<IndexingOperation> deleteIndexingOperationsList = deleteIndexingOperationsMap.get(entityType);
if (deleteIndexingOperationsList == null || deleteIndexingOperationsList.isEmpty()) {
continue;
}
Iterator<IndexingOperation> deleteIndexingOperationsIterator = deleteIndexingOperationsList.iterator();
while (deleteIndexingOperationsIterator.hasNext()) {
if(isInterrupted()) {
return;
}
IndexingOperation deleteIndexQueue = deleteIndexingOperationsIterator.next();
try {
String deleteDocumentRequestContent = elasticContentRequestBuilder.getDeleteDocumentRequestContent(connector,
deleteIndexQueue.getEntityId());
if(deleteDocumentRequestContent != null) {
bulkRequest += deleteDocumentRequestContent;
}
// Choose operation to delete from Queue one by one instead
} catch(Exception e) {
LOG.warn("Error while *deleting* index entry of entity, type = " + entityType + ", id =" + (deleteIndexQueue == null ? null : deleteIndexQueue.getEntityId()) + ", cause:", e);
} finally {
// Remove the delete operations from the map
indexingQueueSorted.remove(OperationType.DELETE, deleteIndexQueue.getEntityId());
}
// Delete added indexation operation from queue even if the request fails
deleteIndexingOperationsIterator.remove();
// Remove the object from other create or update operations planned
// before the timestamp of the delete operation
deleteOperationsByEntityIdForTypesBefore(new OperationType[] { OperationType.CREATE },
indexingQueueSorted,
deleteIndexQueue);
deleteOperationsByEntityIdForTypes(new OperationType[] { OperationType.UPDATE }, indexingQueueSorted, deleteIndexQueue);
// Check if the bulk request limit size is already reached
bulkRequest = checkBulkRequestSizeReachedLimitation(bulkRequest);
}
}
}
// Process Create document operation
if (indexingQueueSorted.containsKey(OperationType.CREATE)) {
Map<String, List<IndexingOperation>> createIndexingOperationsMap = indexingQueueSorted.get(OperationType.CREATE);
for (String entityType : createIndexingOperationsMap.keySet()) {
ElasticIndexingServiceConnector connector = (ElasticIndexingServiceConnector) getConnectors().get(entityType);
List<IndexingOperation> createIndexingOperationsList = createIndexingOperationsMap.get(entityType);
if (createIndexingOperationsList == null || createIndexingOperationsList.isEmpty()) {
continue;
}
Iterator<IndexingOperation> createIndexingOperationsIterator = createIndexingOperationsList.iterator();
while (createIndexingOperationsIterator.hasNext()) {
if(isInterrupted()) {
return;
}
IndexingOperation createIndexQueue = createIndexingOperationsIterator.next();
try {
if(connector.isNeedIngestPipeline()) {
String singleRequestOperation = elasticContentRequestBuilder.getCreatePipelineDocumentRequestContent(connector, createIndexQueue.getEntityId());
if(singleRequestOperation != null) {
elasticIndexingClient.sendCreateDocOnPipeline(connector.getIndex(),
connector.getType(),
createIndexQueue.getEntityId(),
connector.getPipelineName(),
singleRequestOperation);
}
// Delete this single operation since it's not indexed in bulk
indexingQueueSorted.remove(OperationType.CREATE, createIndexQueue.getEntityId());
indexingQueueSorted.remove(OperationType.UPDATE, createIndexQueue.getEntityId());
} else {
String singleRequestOperation = elasticContentRequestBuilder.getCreateDocumentRequestContent(connector, createIndexQueue.getEntityId());
if(singleRequestOperation != null) {
bulkRequest += singleRequestOperation;
}
}
} catch(Exception e) {
LOG.warn("Error while *creating* index entry of entity, type = " + entityType + ", id =" + (createIndexQueue == null ? null : createIndexQueue.getEntityId()) + ", cause:", e);
} finally {
// Delete added indexation operation from queue even if the request fails
createIndexingOperationsIterator.remove();
// Remove the object from other update operations for this entityId
deleteOperationsByEntityIdForTypes(new OperationType[] { OperationType.UPDATE }, indexingQueueSorted, createIndexQueue);
// Delete this single operation since it's not indexed in bulk
indexingQueueSorted.remove(OperationType.CREATE, createIndexQueue.getEntityId());
}
// Check if the bulk request limit size is already reached
bulkRequest = checkBulkRequestSizeReachedLimitation(bulkRequest);
}
}
}
// Process Update document operation
if (indexingQueueSorted.containsKey(OperationType.UPDATE)) {
Map<String, List<IndexingOperation>> updateIndexingOperationsMap = indexingQueueSorted.get(OperationType.UPDATE);
for (String entityType : updateIndexingOperationsMap.keySet()) {
ElasticIndexingServiceConnector connector = (ElasticIndexingServiceConnector) getConnectors().get(entityType);
List<IndexingOperation> updateIndexingOperationsList = updateIndexingOperationsMap.get(entityType);
if (updateIndexingOperationsList == null || updateIndexingOperationsList.isEmpty()) {
continue;
}
Iterator<IndexingOperation> updateIndexingOperationsIterator = updateIndexingOperationsList.iterator();
while (updateIndexingOperationsIterator.hasNext()) {
if(isInterrupted()) {
return;
}
IndexingOperation updateIndexQueue = updateIndexingOperationsIterator.next();
try {
if(connector.isNeedIngestPipeline()) {
String singleRequestOperation = elasticContentRequestBuilder.getCreatePipelineDocumentRequestContent(connector, updateIndexQueue.getEntityId());
if(singleRequestOperation != null) {
elasticIndexingClient.sendCreateDocOnPipeline(connector.getIndex(),
connector.getType(),
updateIndexQueue.getEntityId(),
connector.getPipelineName(),
singleRequestOperation);
}
} else {
String singleRequestOperation = elasticContentRequestBuilder.getUpdateDocumentRequestContent(connector, updateIndexQueue.getEntityId());
if(singleRequestOperation != null) {
bulkRequest += singleRequestOperation;
}
}
} catch(Exception e) {
LOG.warn("Error while *updating* index entry of entity, type = " + entityType + ", id =" + (updateIndexQueue == null ? null : updateIndexQueue.getEntityId()) + ", cause:", e);
} finally {
// Delete this single operation since it's not indexed in bulk
indexingQueueSorted.remove(OperationType.UPDATE, updateIndexQueue.getEntityId());
}
// Delete added indexation operation from queue even if the request fails
updateIndexingOperationsIterator.remove();
// Check if the bulk request limit size is already reached
bulkRequest = checkBulkRequestSizeReachedLimitation(bulkRequest);
}
}
}
if (StringUtils.isNotBlank(bulkRequest) && !isInterrupted()) {
elasticIndexingClient.sendCUDRequest(bulkRequest);
}
}
/**
* Process all the requests for “init of the ES create mapping” (Operation
* type = I) in the indexing queue (if any)
*
* @param indexingQueueSorted Temporary inMemory IndexingQueue
*/
private void processInit(Map<OperationType, Map<String, List<IndexingOperation>>> indexingQueueSorted) {
if (indexingQueueSorted.containsKey(OperationType.INIT)) {
for (String entityType : indexingQueueSorted.get(OperationType.INIT).keySet()) {
if(isInterrupted()) {
return;
}
sendInitRequests(getConnectors().get(entityType));
}
indexingQueueSorted.remove(OperationType.INIT);
}
}
/**
* Process all the requests for “remove all documents of type” (Operation type
* = X) in the indexing queue (if any) = Delete type in ES
*
* @param indexingQueueSorted Temporary inMemory IndexingQueue
*/
private void processDeleteAll(Map<OperationType, Map<String, List<IndexingOperation>>> indexingQueueSorted) {
if (indexingQueueSorted.containsKey(OperationType.DELETE_ALL)) {
for (String entityType : indexingQueueSorted.get(OperationType.DELETE_ALL).keySet()) {
if(isInterrupted()) {
return;
}
if (indexingQueueSorted.get(OperationType.DELETE_ALL).containsKey(entityType)) {
for (IndexingOperation indexingOperation : indexingQueueSorted.get(OperationType.DELETE_ALL).get(entityType)) {
processDeleteAll(indexingOperation, indexingQueueSorted);
}
}
}
indexingQueueSorted.remove(OperationType.DELETE_ALL);
}
}
/**
* @param indexingOperation
* @param indexingQueueSorted Temporary inMemory IndexingQueue
*/
private void processDeleteAll(IndexingOperation indexingOperation,
Map<OperationType, Map<String, List<IndexingOperation>>> indexingQueueSorted) {
// Remove the type (= remove all documents of this type) and recreate it
ElasticIndexingServiceConnector connector = (ElasticIndexingServiceConnector) getConnectors().get(indexingOperation.getEntityType());
// log in Audit Trail
auditTrail.audit(ElasticIndexingAuditTrail.DELETE_ALL, null, null, connector.getType(), null, null, 0);
// Call ES
elasticIndexingClient.sendDeleteAllDocsOfTypeRequest(connector.getIndex(), connector.getType());
// Remove all useless CUD operation that was plan before this delete all
deleteOperationsForTypesBefore(new OperationType[] { OperationType.CREATE, OperationType.UPDATE, OperationType.DELETE },
indexingQueueSorted,
indexingOperation);
}
/**
* Process all the requests for “Reindex all documents of type” (Operation
* type = R) in the indexing queue (if any)
*
* @param indexingQueueSorted Temporary inMemory IndexingQueue
*/
private void processReindexAll(Map<OperationType, Map<String, List<IndexingOperation>>> indexingQueueSorted) {
if (indexingQueueSorted.containsKey(OperationType.REINDEX_ALL)) {
for (String entityType : indexingQueueSorted.get(OperationType.REINDEX_ALL).keySet()) {
if (indexingQueueSorted.get(OperationType.REINDEX_ALL).containsKey(entityType)) {
for (IndexingOperation indexingOperation : indexingQueueSorted.get(OperationType.REINDEX_ALL).get(entityType)) {
if(isInterrupted()) {
return;
}
reindexAllByEntityType(indexingOperation.getEntityType());
// clear entity manager content
entityManagerService.getEntityManager().clear();
}
}
}
indexingQueueSorted.remove(OperationType.REINDEX_ALL);
}
}
/**
* Reindex all the entities of the given entity type.
*
* @param entityType Entity type of the entities to reindex
*/
@ExoTransactional
private void reindexAllByEntityType(String entityType) {
long startTime = System.currentTimeMillis();
// 1- Delete all documents in ES (and purge the indexing queue)
indexingOperationDAO.create(new IndexingOperation(null, entityType, OperationType.DELETE_ALL));
// 2- Get all the documents ID
IndexingServiceConnector connector = getConnectors().get(entityType);
// 3- Inject as a CUD operation
int offset = 0;
int numberIndexed;
do {
if(isInterrupted()) {
return;
}
List<String> ids = connector.getAllIds(offset, reindexBatchSize);
if (ids == null) {
numberIndexed = 0;
} else {
List<IndexingOperation> operations = new ArrayList<>(ids.size());
for (String id : ids) {
operations.add(new IndexingOperation(id, entityType, OperationType.CREATE));
}
indexingOperationDAO.createAll(operations);
numberIndexed = ids.size();
offset += reindexBatchSize;
}
} while (numberIndexed == reindexBatchSize);
// 4- log in Audit Trail
auditTrail.audit(ElasticIndexingAuditTrail.REINDEX_ALL, null, null, entityType, null, null, (System.currentTimeMillis()-startTime));
}
private void deleteOperationsForTypesBefore(OperationType[] operations,
Map<OperationType, Map<String, List<IndexingOperation>>> indexingQueueSorted,
IndexingOperation refIindexOperation) {
for (OperationType operation : operations) {
if (indexingQueueSorted.containsKey(operation)) {
if (indexingQueueSorted.get(operation).containsKey(refIindexOperation.getEntityType())) {
for (Iterator<IndexingOperation> iterator = indexingQueueSorted.get(operation)
.get(refIindexOperation.getEntityType())
.iterator(); iterator.hasNext();) {
IndexingOperation indexingOperation = iterator.next();
// Check timestamp higher than the timestamp of the reference
// indexing operation, the index operation is removed
if (refIindexOperation.getId() > indexingOperation.getId()) {
iterator.remove();
}
}
}
}
}
}
private void deleteOperationsByEntityIdForTypesBefore(OperationType[] operations,
Map<OperationType, Map<String, List<IndexingOperation>>> indexingQueueSorted,
IndexingOperation indexQueue) {
for (OperationType operation : operations) {
if (indexingQueueSorted.containsKey(operation)) {
if (indexingQueueSorted.get(operation).containsKey(indexQueue.getEntityType())) {
for (Iterator<IndexingOperation> iterator = indexingQueueSorted.get(operation)
.get(indexQueue.getEntityType())
.iterator(); iterator.hasNext();) {
IndexingOperation indexingOperation = iterator.next();
// Check Id higher than the Id of the CUD indexing queue, the index
// queue is removed
if ((indexQueue.getId() > indexingOperation.getId())
&& indexingOperation.getEntityId().equals(indexQueue.getEntityId())) {
iterator.remove();
}
}
}
}
}
}
private void deleteOperationsByEntityIdForTypes(OperationType[] operations,
Map<OperationType, Map<String, List<IndexingOperation>>> indexingQueueSorted,
IndexingOperation indexQueue) {
for (OperationType operation : operations) {
if (indexingQueueSorted.containsKey(operation)) {
if (indexingQueueSorted.get(operation).containsKey(indexQueue.getEntityType())) {
for (Iterator<IndexingOperation> iterator = indexingQueueSorted.get(operation)
.get(indexQueue.getEntityType())
.iterator(); iterator.hasNext();) {
IndexingOperation indexingOperation = iterator.next();
if (indexingOperation.getEntityId().equals(indexQueue.getEntityId())) {
iterator.remove();
}
}
}
}
}
}
private void sendInitRequests(IndexingServiceConnector IndexingServiceConnector) {
ElasticIndexingServiceConnector connector = (ElasticIndexingServiceConnector) IndexingServiceConnector;
String indexAlias = connector.getIndex();
String previousIndex = connector.getPreviousIndex();
String index = connector.getCurrentIndex();
String type = connector.getType();
if (typesOrIndexUpgrading.containsKey(indexAlias) && typesOrIndexUpgrading.get(indexAlias).contains(type)) {
boolean newIndexExists = elasticIndexingClient.sendIsIndexExistsRequest(index);
// If the upgrade is incomplete (should point the alias on previous index)
boolean aliasExistsOnPreviousIndex = elasticIndexingClient.sendGetIndexAliasesRequest(previousIndex).contains(indexAlias);
if(!aliasExistsOnPreviousIndex) {
boolean aliasExistsOnCurrentIndex = newIndexExists && elasticIndexingClient.sendGetIndexAliasesRequest(index).contains(indexAlias);
// If the alias points to the new index, point it again to the previous one, else add new alias to previous
elasticIndexingClient.sendCreateIndexAliasRequest(previousIndex, aliasExistsOnCurrentIndex ? index : null, indexAlias);
}
if (newIndexExists) {
boolean newTypeExists = elasticIndexingClient.sendIsTypeExistsRequest(index, type);
if (newTypeExists) {
// Upgrade was interrupted, so remove it and upgrade again
LOG.warn("ES index upgrade '{}' was interrupted, the new index/type {}/{} will be recreated", previousIndex, index, type);
elasticIndexingClient.sendDeleteAllDocsOfTypeRequest(index, type);
}
// Send request to create type
elasticIndexingClient.sendCreateTypeRequest(index, connector.getType(), connector.getMapping());
} else {
elasticIndexingClient.sendCreateIndexRequest(index,
elasticContentRequestBuilder.getCreateIndexRequestContent(connector));
// Send request to create type
elasticIndexingClient.sendCreateTypeRequest(index, connector.getType(), connector.getMapping());
if(connector.isNeedIngestPipeline()) {
elasticIndexingClient.sendCreateAttachmentPipelineRequest(index, connector.getType(), connector.getPipelineName(), connector.getAttachmentProcessor());
}
}
// Init reindex. Once the reindex finished, the index alias will be updated to new index
executors.submit(new ReindexESType(ExoContainerContext.getCurrentContainer(), connector));
} else {
boolean useAlias = true;
if (index == null) {
index = indexAlias;
useAlias = false;
}
// Send request to create index
boolean newlyCreated = elasticIndexingClient.sendCreateIndexRequest(index,
elasticContentRequestBuilder.getCreateIndexRequestContent(connector));
if (newlyCreated && useAlias) {
elasticIndexingClient.sendCreateIndexAliasRequest(index, null, indexAlias);
}
// Send request to create type
elasticIndexingClient.sendCreateTypeRequest(index, connector.getType(), connector.getMapping());
if(connector.isNeedIngestPipeline()) {
elasticIndexingClient.sendCreateAttachmentPipelineRequest(index, connector.getType(), connector.getPipelineName(), connector.getAttachmentProcessor());
}
}
// Make sure that the migration is not executed twice on the same connector
connector.setPreviousIndex(null);
}
/**
* If the bulk request already reached a size limitation, the bulk request
* need to be sent immediately
*
* @param bulkRequest to analyze
* @return
*/
private String checkBulkRequestSizeReachedLimitation(String bulkRequest) {
if (bulkRequest.getBytes().length >= requestSizeLimit) {
elasticIndexingClient.sendCUDRequest(bulkRequest);
// return an empty bulk request
return "";
} else {
return bulkRequest;
}
}
public Integer getBatchNumber() {
return batchNumber;
}
public void setBatchNumber(Integer batchNumber) {
this.batchNumber = batchNumber;
}
public Integer getRequestSizeLimit() {
return requestSizeLimit;
}
public void setRequestSizeLimit(Integer requestSizeLimit) {
this.requestSizeLimit = requestSizeLimit;
}
public int getReindexBatchSize() {
return reindexBatchSize;
}
public void setReindexBatchSize(int reindexBatchSize) {
this.reindexBatchSize = reindexBatchSize;
}
@Override
public void start() {
String esVersion = elasticIndexingClient.sendGetESVersion();
if (esVersion == null || !esVersion.startsWith(this.esVersion + ".")) {
LOG.error("Expected Version of ES version is " + this.esVersion + " but was " + esVersion
+ ". If this is a compatible version, you can configure 'exo.es.version.minor' to delete this error message.");
}
// ES index and type need to be created for all registered connectors
initConnectors();
}
@Override
public void stop() {
executors.shutdownNow();
}
private void initConnectors() {
for (Map.Entry<String, IndexingServiceConnector> entry: getConnectors().entrySet()) {
ElasticIndexingServiceConnector connector = (ElasticIndexingServiceConnector) entry.getValue();
String previousIndex = connector.getPreviousIndex();
String index = connector.getCurrentIndex();
String indexAlias = connector.getIndex();
boolean needsUpgrade = false;
if (previousIndex != null) {
// Need to check the upgrade status (incomplete/ not run == new index doesn't exists or indexAlias is not added to new index)
needsUpgrade = elasticIndexingClient.sendIsIndexExistsRequest(previousIndex)
&& (!elasticIndexingClient.sendIsIndexExistsRequest(index)
|| !elasticIndexingClient.sendGetIndexAliasesRequest(index).contains(indexAlias));
}
if(needsUpgrade) {
if(!typesOrIndexUpgrading.containsKey(indexAlias)) {
typesOrIndexUpgrading.put(indexAlias, new HashSet<>());
}
typesOrIndexUpgrading.get(indexAlias).add(entry.getKey());
}
}
for (Map.Entry<String, IndexingServiceConnector> entry: getConnectors().entrySet()) {
sendInitRequests(entry.getValue());
}
}
public class ReindexESType implements Runnable {
private String index;
private String previousIndex;
private String indexAlias;
private String type;
private String pipeline;
private boolean reindexFromDB;
private ExoContainer exoContainer;
public ReindexESType(ExoContainer exoContainer, ElasticIndexingServiceConnector connector) {
this.exoContainer = exoContainer;
this.index = connector.getCurrentIndex();
this.previousIndex = connector.getPreviousIndex();
this.indexAlias = connector.getIndex();
this.type = connector.getType();
this.pipeline = connector.getPipelineName();
this.reindexFromDB = connector.isReindexOnUpgrade();
}
@Override
public void run() {
try {
if (reindexFromDB) {
ExoContainerContext.setCurrentContainer(exoContainer);
reindexAllByEntityType(type);
} else {
LOG.info("Reindexing index alias {} from old index {} to new index {}, for type {}",
indexAlias,
previousIndex,
index,
type);
try {
elasticIndexingClient.sendReindexTypeRequest(index, previousIndex, type, pipeline);
LOG.info("Reindexation finished for index alias {} from old index {} to new index {}, for type {}",
indexAlias,
previousIndex,
index,
type);
} catch (Exception e) {
LOG.warn("Reindexation using pipeline error for index alias {} from old index {} to new index {}, for type {}. The reindexation will proceed from eXo DB",
indexAlias,
previousIndex,
index,
type);
ExoContainerContext.setCurrentContainer(exoContainer);
reindexAllByEntityType(type);
}
}
// This algorithm should be thread safe
synchronized (typesOrIndexUpgrading) {
boolean indexMigrationInProgress = typesOrIndexUpgrading.get(indexAlias).size() > 1;
if (indexMigrationInProgress) {
LOG.info("The index {} has some types not completely migrated yet, the old index will be deleted after migration is finished",
previousIndex);
typesOrIndexUpgrading.get(indexAlias).remove(type);
} else {
LOG.info("Switching index alias {} from old index {} to new index {}", indexAlias, previousIndex, index);
elasticIndexingClient.sendCreateIndexAliasRequest(index, previousIndex, indexAlias);
typesOrIndexUpgrading.remove(indexAlias);
LOG.info("Remove old index {}", previousIndex);
elasticIndexingClient.sendDeleteIndexRequest(previousIndex);
if(typesOrIndexUpgrading.isEmpty()) {
LOG.info("ES indexes migration finished (except indexes that will be reindexed from DB)");
}
}
}
} catch (Exception e) {
LOG.error("An error occurred while upgrading index " + previousIndex + " type " + type, e);
} finally {
typesOrIndexUpgrading.remove(indexAlias);
}
}
}
}