View Javadoc
1   package org.exoplatform.services.wcm.search;
2   
3   import org.exoplatform.commons.api.persistence.ExoTransactional;
4   import org.exoplatform.commons.api.settings.SettingService;
5   import org.exoplatform.commons.api.settings.SettingValue;
6   import org.exoplatform.commons.api.settings.data.Context;
7   import org.exoplatform.commons.api.settings.data.Scope;
8   import org.exoplatform.commons.search.index.IndexingOperationProcessor;
9   import org.exoplatform.commons.search.index.IndexingService;
10  import org.exoplatform.commons.upgrade.UpgradeProductPlugin;
11  import org.exoplatform.container.xml.InitParams;
12  import org.exoplatform.services.jcr.RepositoryService;
13  import org.exoplatform.services.jcr.impl.core.query.SearchManager;
14  import org.exoplatform.services.jcr.impl.core.query.SystemSearchManager;
15  import org.exoplatform.services.log.ExoLogger;
16  import org.exoplatform.services.log.Log;
17  import org.exoplatform.services.scheduler.JobSchedulerService;
18  import org.exoplatform.services.wcm.core.NodetypeConstant;
19  import org.exoplatform.services.wcm.search.connector.FileindexingConnector;
20  import org.exoplatform.services.wcm.utils.WCMCoreUtils;
21  import org.quartz.InterruptableJob;
22  import org.quartz.UnableToInterruptJobException;
23  
24  import javax.jcr.RepositoryException;
25  import javax.jcr.Session;
26  import javax.jcr.query.Query;
27  import javax.jcr.query.QueryManager;
28  import javax.jcr.query.QueryResult;
29  import java.util.concurrent.CompletableFuture;
30  
31  /**
32   * Upgrade plugin to index all files in Elasticsearch
33   */
34  public class FileESMigration extends UpgradeProductPlugin {
35  
36    private static final Log LOG = ExoLogger.getLogger(FileESMigration.class);
37  
38    public static final String FILE_ES_INDEXATION_KEY = "FILE_ES_INDEXATION";
39  
40    public static final String FILE_ES_INDEXATION_DONE_KEY = "FILE_ES_INDEXATION_DONE";
41  
42    public static final String FILE_JCR_COLLABORATION_REINDEXATION_DONE_KEY = "FILE_JCR_COLLABORATION_REINDEXATION_DONE";
43  
44    public static final String FILE_JCR_SYSTEM_REINDEXATION_DONE_KEY = "FILE_JCR_SYSTEM_REINDEXATION_DONE";
45  
46    private IndexingService indexingService;
47  
48    private SettingService settingService;
49  
50    private RepositoryService repositoryService;
51  
52    private IndexingOperationProcessor indexingOperationProcessor;
53  
54    private JobSchedulerService jobSchedulerService;
55  
56    public FileESMigration(InitParams initParams,
57                           IndexingService indexingService,
58                           SettingService settingService,
59                           RepositoryService repositoryService,
60                           IndexingOperationProcessor indexingOperationProcessor,
61                           JobSchedulerService jobSchedulerService) {
62      super(initParams);
63      this.indexingService = indexingService;
64      this.settingService = settingService;
65      this.repositoryService = repositoryService;
66      this.indexingOperationProcessor = indexingOperationProcessor;
67      this.jobSchedulerService = jobSchedulerService;
68    }
69  
70    @Override
71    public void processUpgrade(String oldVersion, String newVersion) {
72      try {
73        if (!isIndexationInESDone()) {
74          printNumberOfFileToIndex();
75          indexInES();
76        }
77  
78        if (!isJCRReindexationDone()) {
79          reindexJCR();
80        }
81      } catch(Exception e) {
82        throw new RuntimeException("Error while Files indexing in ES migration", e);
83      }
84    }
85  
86    @Override
87    public boolean shouldProceedToUpgrade(String newVersion, String previousVersion) {
88      return !(isIndexationInESDone() && isJCRReindexationDone());
89    }
90  
91    public void reindexJCR() throws Exception {
92      try {
93        SearchManager searchManager = (SearchManager) repositoryService.getCurrentRepository().getWorkspaceContainer("collaboration").getComponent(SearchManager.class);
94  
95        if(!isJCRCollaboraionReindexationDone()) {
96          LOG.info("== Files ES migration - Starting reindexation of JCR collaboration workspace");
97          CompletableFuture<Boolean> reindexCollaborationWSResult = searchManager.reindexWorkspace(false, 0);
98          reindexCollaborationWSResult.thenAccept(successful -> {
99            if (successful) {
100             LOG.info("== Files ES migration - Reindexation of JCR collaboration workspace done");
101             settingService.set(Context.GLOBAL, Scope.GLOBAL.id(FILE_ES_INDEXATION_KEY), FILE_JCR_COLLABORATION_REINDEXATION_DONE_KEY, SettingValue.create(true));
102           } else {
103             LOG.error("== Files ES migration - Reindexation of JCR collaboration workspace failed. " +
104                     "Check logs to fix the issue, then reindex it by restarting the server");
105           }
106         });
107       }
108 
109       if(!isJCRSystemReindexationDone()) {
110         LOG.info("== Files ES migration - Starting reindexation of JCR system workspace");
111         SystemSearchManager systemSearchManager = (SystemSearchManager) repositoryService.getCurrentRepository().getWorkspaceContainer("system").getComponent(SystemSearchManager.class);
112         CompletableFuture<Boolean> reindexSystemWSResult = systemSearchManager.reindexWorkspace(false, 0);
113         reindexSystemWSResult.thenAccept(successful -> {
114           if (successful) {
115             LOG.info("== Files ES migration - Reindexation of JCR system workspace done");
116             settingService.set(Context.GLOBAL, Scope.GLOBAL.id(FILE_ES_INDEXATION_KEY), FILE_JCR_SYSTEM_REINDEXATION_DONE_KEY, SettingValue.create(true));
117           } else {
118             LOG.error("== Files ES migration - Reindexation of JCR system workspace failed. " +
119                     "Check logs to fix the issue, then reindex it by restarting the server");
120           }
121         });
122       }
123     } catch (RepositoryException e) {
124       throw new Exception("Error while reindexing JCR collaboration and system workspaces", e);
125     }
126   }
127 
128   /**
129    * Run the reindex all operation to fill the indexing queue with all files to index.
130    * ESBulkIndexer job is disabled and indexing queue processing (process()) is done
131    * synchronously to make sure the flag is set only when it is really done (allow to redo it in case
132    * of interruption), and also to make sur JCR reindexing is not launch during this operation (which will
133    * impact it since the reindex all operation use the JCR query manager to get all files).
134    */
135   public void indexInES() throws Exception {
136     // Pause ESBulkIndexer job to avoid concurrent executions in the process() operation
137     LOG.info("== Files ES migration - Pause ESBulkIndexer job");
138     jobSchedulerService.pauseJob("ESBulkIndexer", "ElasticSearch");
139 
140     try {
141       LOG.info("== Files ES migration - Stopping executing jobs ESBulkIndexer instances...");
142       jobSchedulerService.getAllExcutingJobs().stream()
143               .filter(jobExecutionContext -> jobExecutionContext.getJobDetail().getKey().getName().equals("ESBulkIndexer"))
144               .forEach(indexingJob -> {
145                 try {
146                   LOG.info("== Files ES migration - Interrupting executing job ESBulkIndexer instance " + indexingJob.getFireInstanceId());
147                   ((InterruptableJob) indexingJob.getJobInstance()).interrupt();
148                 } catch (UnableToInterruptJobException e) {
149                   LOG.error("Error while interrupting ES indexing queue job to run the migration", e);
150                 }
151               });
152 
153       LOG.info("== Files ES migration - Clear indexing queue for files");
154       indexingService.clearQueue(FileindexingConnector.TYPE);
155 
156       LOG.info("== Files ES migration - Starting pushing all files in indexation queue");
157       indexingService.reindexAll(FileindexingConnector.TYPE);
158 
159       // process the reindexAll operation synchronously to make sure it is done before the JCR workspace reindexation (otherwise JCR queries will not retrieve nodes)
160       processIndexation();
161       settingService.set(Context.GLOBAL, Scope.GLOBAL.id(FILE_ES_INDEXATION_KEY), FILE_ES_INDEXATION_DONE_KEY, SettingValue.create(true));
162       LOG.info("== Files ES migration - Push of all files in indexation queue done");
163     } catch(Exception e) {
164       throw new Exception("Error while indexing files in ES", e);
165     } finally {
166       LOG.info("== Files ES migration - Resume ESBulkIndexer job");
167       jobSchedulerService.resumeJob("ESBulkIndexer", "ElasticSearch");
168     }
169   }
170 
171   @ExoTransactional
172   public void processIndexation() {
173     LOG.info("== Files ES migration - Process files");
174     indexingOperationProcessor.process();
175   }
176 
177   private boolean isIndexationInESDone() {
178     SettingValue<?> done = settingService.get(Context.GLOBAL, Scope.GLOBAL.id(FILE_ES_INDEXATION_KEY), FILE_ES_INDEXATION_DONE_KEY);
179     return done != null && done.getValue().equals("true");
180   }
181 
182   private boolean isJCRReindexationDone() {
183     return isJCRCollaboraionReindexationDone() && isJCRSystemReindexationDone();
184   }
185 
186   private boolean isJCRCollaboraionReindexationDone() {
187     SettingValue<?> done = settingService.get(Context.GLOBAL, Scope.GLOBAL.id(FILE_ES_INDEXATION_KEY), FILE_JCR_COLLABORATION_REINDEXATION_DONE_KEY);
188     return done != null && done.getValue().equals("true");
189   }
190 
191   private boolean isJCRSystemReindexationDone() {
192     SettingValue<?> done = settingService.get(Context.GLOBAL, Scope.GLOBAL.id(FILE_ES_INDEXATION_KEY), FILE_JCR_SYSTEM_REINDEXATION_DONE_KEY);
193     return done != null && done.getValue().equals("true");
194   }
195 
196   private void printNumberOfFileToIndex() {
197     try {
198       Session session = WCMCoreUtils.getSystemSessionProvider().getSession("collaboration", repositoryService.getCurrentRepository());
199       QueryManager queryManager = session.getWorkspace().getQueryManager();
200       Query query = queryManager.createQuery("select * from " + NodetypeConstant.NT_FILE, Query.SQL);
201       QueryResult result = query.execute();
202       LOG.info("== Files ES migration - Number of files to index : " + result.getNodes().getSize());
203     } catch (RepositoryException e) {
204       LOG.error("== Files ES migration - Error while counting all nt:file to index", e);
205     }
206   }
207 }