FileESMigration.java
package org.exoplatform.services.wcm.search;
import org.exoplatform.commons.api.persistence.ExoTransactional;
import org.exoplatform.commons.api.settings.SettingService;
import org.exoplatform.commons.api.settings.SettingValue;
import org.exoplatform.commons.api.settings.data.Context;
import org.exoplatform.commons.api.settings.data.Scope;
import org.exoplatform.commons.search.index.IndexingOperationProcessor;
import org.exoplatform.commons.search.index.IndexingService;
import org.exoplatform.commons.upgrade.UpgradeProductPlugin;
import org.exoplatform.container.xml.InitParams;
import org.exoplatform.services.jcr.RepositoryService;
import org.exoplatform.services.jcr.impl.core.query.SearchManager;
import org.exoplatform.services.jcr.impl.core.query.SystemSearchManager;
import org.exoplatform.services.log.ExoLogger;
import org.exoplatform.services.log.Log;
import org.exoplatform.services.scheduler.JobSchedulerService;
import org.exoplatform.services.wcm.core.NodetypeConstant;
import org.exoplatform.services.wcm.search.connector.FileindexingConnector;
import org.exoplatform.services.wcm.utils.WCMCoreUtils;
import org.quartz.InterruptableJob;
import org.quartz.UnableToInterruptJobException;
import javax.jcr.RepositoryException;
import javax.jcr.Session;
import javax.jcr.query.Query;
import javax.jcr.query.QueryManager;
import javax.jcr.query.QueryResult;
import java.util.concurrent.CompletableFuture;
/**
* Upgrade plugin to index all files in Elasticsearch
*/
public class FileESMigration extends UpgradeProductPlugin {
private static final Log LOG = ExoLogger.getLogger(FileESMigration.class);
public static final String FILE_ES_INDEXATION_KEY = "FILE_ES_INDEXATION";
public static final String FILE_ES_INDEXATION_DONE_KEY = "FILE_ES_INDEXATION_DONE";
public static final String FILE_JCR_COLLABORATION_REINDEXATION_DONE_KEY = "FILE_JCR_COLLABORATION_REINDEXATION_DONE";
public static final String FILE_JCR_SYSTEM_REINDEXATION_DONE_KEY = "FILE_JCR_SYSTEM_REINDEXATION_DONE";
private IndexingService indexingService;
private SettingService settingService;
private RepositoryService repositoryService;
private IndexingOperationProcessor indexingOperationProcessor;
private JobSchedulerService jobSchedulerService;
public FileESMigration(InitParams initParams,
IndexingService indexingService,
SettingService settingService,
RepositoryService repositoryService,
IndexingOperationProcessor indexingOperationProcessor,
JobSchedulerService jobSchedulerService) {
super(initParams);
this.indexingService = indexingService;
this.settingService = settingService;
this.repositoryService = repositoryService;
this.indexingOperationProcessor = indexingOperationProcessor;
this.jobSchedulerService = jobSchedulerService;
}
@Override
public void processUpgrade(String oldVersion, String newVersion) {
try {
if (!isIndexationInESDone()) {
printNumberOfFileToIndex();
indexInES();
}
if (!isJCRReindexationDone()) {
reindexJCR();
}
} catch(Exception e) {
throw new RuntimeException("Error while Files indexing in ES migration", e);
}
}
@Override
public boolean shouldProceedToUpgrade(String newVersion, String previousVersion) {
return !(isIndexationInESDone() && isJCRReindexationDone());
}
public void reindexJCR() throws Exception {
try {
SearchManager searchManager = (SearchManager) repositoryService.getCurrentRepository().getWorkspaceContainer("collaboration").getComponent(SearchManager.class);
if(!isJCRCollaboraionReindexationDone()) {
LOG.info("== Files ES migration - Starting reindexation of JCR collaboration workspace");
CompletableFuture<Boolean> reindexCollaborationWSResult = searchManager.reindexWorkspace(false, 0);
reindexCollaborationWSResult.thenAccept(successful -> {
if (successful) {
LOG.info("== Files ES migration - Reindexation of JCR collaboration workspace done");
settingService.set(Context.GLOBAL, Scope.GLOBAL.id(FILE_ES_INDEXATION_KEY), FILE_JCR_COLLABORATION_REINDEXATION_DONE_KEY, SettingValue.create(true));
} else {
LOG.error("== Files ES migration - Reindexation of JCR collaboration workspace failed. " +
"Check logs to fix the issue, then reindex it by restarting the server");
}
});
}
if(!isJCRSystemReindexationDone()) {
LOG.info("== Files ES migration - Starting reindexation of JCR system workspace");
SystemSearchManager systemSearchManager = (SystemSearchManager) repositoryService.getCurrentRepository().getWorkspaceContainer("system").getComponent(SystemSearchManager.class);
CompletableFuture<Boolean> reindexSystemWSResult = systemSearchManager.reindexWorkspace(false, 0);
reindexSystemWSResult.thenAccept(successful -> {
if (successful) {
LOG.info("== Files ES migration - Reindexation of JCR system workspace done");
settingService.set(Context.GLOBAL, Scope.GLOBAL.id(FILE_ES_INDEXATION_KEY), FILE_JCR_SYSTEM_REINDEXATION_DONE_KEY, SettingValue.create(true));
} else {
LOG.error("== Files ES migration - Reindexation of JCR system workspace failed. " +
"Check logs to fix the issue, then reindex it by restarting the server");
}
});
}
} catch (RepositoryException e) {
throw new Exception("Error while reindexing JCR collaboration and system workspaces", e);
}
}
/**
* Run the reindex all operation to fill the indexing queue with all files to index.
* ESBulkIndexer job is disabled and indexing queue processing (process()) is done
* synchronously to make sure the flag is set only when it is really done (allow to redo it in case
* of interruption), and also to make sur JCR reindexing is not launch during this operation (which will
* impact it since the reindex all operation use the JCR query manager to get all files).
*/
public void indexInES() throws Exception {
// Pause ESBulkIndexer job to avoid concurrent executions in the process() operation
LOG.info("== Files ES migration - Pause ESBulkIndexer job");
jobSchedulerService.pauseJob("ESBulkIndexer", "ElasticSearch");
try {
LOG.info("== Files ES migration - Stopping executing jobs ESBulkIndexer instances...");
jobSchedulerService.getAllExcutingJobs().stream()
.filter(jobExecutionContext -> jobExecutionContext.getJobDetail().getKey().getName().equals("ESBulkIndexer"))
.forEach(indexingJob -> {
try {
LOG.info("== Files ES migration - Interrupting executing job ESBulkIndexer instance " + indexingJob.getFireInstanceId());
((InterruptableJob) indexingJob.getJobInstance()).interrupt();
} catch (UnableToInterruptJobException e) {
LOG.error("Error while interrupting ES indexing queue job to run the migration", e);
}
});
LOG.info("== Files ES migration - Clear indexing queue for files");
indexingService.clearQueue(FileindexingConnector.TYPE);
LOG.info("== Files ES migration - Starting pushing all files in indexation queue");
indexingService.reindexAll(FileindexingConnector.TYPE);
// process the reindexAll operation synchronously to make sure it is done before the JCR workspace reindexation (otherwise JCR queries will not retrieve nodes)
processIndexation();
settingService.set(Context.GLOBAL, Scope.GLOBAL.id(FILE_ES_INDEXATION_KEY), FILE_ES_INDEXATION_DONE_KEY, SettingValue.create(true));
LOG.info("== Files ES migration - Push of all files in indexation queue done");
} catch(Exception e) {
throw new Exception("Error while indexing files in ES", e);
} finally {
LOG.info("== Files ES migration - Resume ESBulkIndexer job");
jobSchedulerService.resumeJob("ESBulkIndexer", "ElasticSearch");
}
}
@ExoTransactional
public void processIndexation() {
LOG.info("== Files ES migration - Process files");
indexingOperationProcessor.process();
}
private boolean isIndexationInESDone() {
SettingValue<?> done = settingService.get(Context.GLOBAL, Scope.GLOBAL.id(FILE_ES_INDEXATION_KEY), FILE_ES_INDEXATION_DONE_KEY);
return done != null && done.getValue().equals("true");
}
private boolean isJCRReindexationDone() {
return isJCRCollaboraionReindexationDone() && isJCRSystemReindexationDone();
}
private boolean isJCRCollaboraionReindexationDone() {
SettingValue<?> done = settingService.get(Context.GLOBAL, Scope.GLOBAL.id(FILE_ES_INDEXATION_KEY), FILE_JCR_COLLABORATION_REINDEXATION_DONE_KEY);
return done != null && done.getValue().equals("true");
}
private boolean isJCRSystemReindexationDone() {
SettingValue<?> done = settingService.get(Context.GLOBAL, Scope.GLOBAL.id(FILE_ES_INDEXATION_KEY), FILE_JCR_SYSTEM_REINDEXATION_DONE_KEY);
return done != null && done.getValue().equals("true");
}
private void printNumberOfFileToIndex() {
try {
Session session = WCMCoreUtils.getSystemSessionProvider().getSession("collaboration", repositoryService.getCurrentRepository());
QueryManager queryManager = session.getWorkspace().getQueryManager();
Query query = queryManager.createQuery("select * from " + NodetypeConstant.NT_FILE, Query.SQL);
QueryResult result = query.execute();
LOG.info("== Files ES migration - Number of files to index : " + result.getNodes().getSize());
} catch (RepositoryException e) {
LOG.error("== Files ES migration - Error while counting all nt:file to index", e);
}
}
}