1 package org.exoplatform.services.wcm.search;
2
3 import org.exoplatform.commons.api.persistence.ExoTransactional;
4 import org.exoplatform.commons.api.settings.SettingService;
5 import org.exoplatform.commons.api.settings.SettingValue;
6 import org.exoplatform.commons.api.settings.data.Context;
7 import org.exoplatform.commons.api.settings.data.Scope;
8 import org.exoplatform.commons.search.index.IndexingOperationProcessor;
9 import org.exoplatform.commons.search.index.IndexingService;
10 import org.exoplatform.commons.upgrade.UpgradeProductPlugin;
11 import org.exoplatform.container.xml.InitParams;
12 import org.exoplatform.services.jcr.RepositoryService;
13 import org.exoplatform.services.jcr.impl.core.query.SearchManager;
14 import org.exoplatform.services.jcr.impl.core.query.SystemSearchManager;
15 import org.exoplatform.services.log.ExoLogger;
16 import org.exoplatform.services.log.Log;
17 import org.exoplatform.services.scheduler.JobSchedulerService;
18 import org.exoplatform.services.wcm.core.NodetypeConstant;
19 import org.exoplatform.services.wcm.search.connector.FileindexingConnector;
20 import org.exoplatform.services.wcm.utils.WCMCoreUtils;
21 import org.quartz.InterruptableJob;
22 import org.quartz.UnableToInterruptJobException;
23
24 import javax.jcr.RepositoryException;
25 import javax.jcr.Session;
26 import javax.jcr.query.Query;
27 import javax.jcr.query.QueryManager;
28 import javax.jcr.query.QueryResult;
29 import java.util.concurrent.CompletableFuture;
30
31
32
33
34 public class FileESMigration extends UpgradeProductPlugin {
35
36 private static final Log LOG = ExoLogger.getLogger(FileESMigration.class);
37
38 public static final String FILE_ES_INDEXATION_KEY = "FILE_ES_INDEXATION";
39
40 public static final String FILE_ES_INDEXATION_DONE_KEY = "FILE_ES_INDEXATION_DONE";
41
42 public static final String FILE_JCR_COLLABORATION_REINDEXATION_DONE_KEY = "FILE_JCR_COLLABORATION_REINDEXATION_DONE";
43
44 public static final String FILE_JCR_SYSTEM_REINDEXATION_DONE_KEY = "FILE_JCR_SYSTEM_REINDEXATION_DONE";
45
46 private IndexingService indexingService;
47
48 private SettingService settingService;
49
50 private RepositoryService repositoryService;
51
52 private IndexingOperationProcessor indexingOperationProcessor;
53
54 private JobSchedulerService jobSchedulerService;
55
56 public FileESMigration(InitParams initParams,
57 IndexingService indexingService,
58 SettingService settingService,
59 RepositoryService repositoryService,
60 IndexingOperationProcessor indexingOperationProcessor,
61 JobSchedulerService jobSchedulerService) {
62 super(initParams);
63 this.indexingService = indexingService;
64 this.settingService = settingService;
65 this.repositoryService = repositoryService;
66 this.indexingOperationProcessor = indexingOperationProcessor;
67 this.jobSchedulerService = jobSchedulerService;
68 }
69
70 @Override
71 public void processUpgrade(String oldVersion, String newVersion) {
72 try {
73 if (!isIndexationInESDone()) {
74 printNumberOfFileToIndex();
75 indexInES();
76 }
77
78 if (!isJCRReindexationDone()) {
79 reindexJCR();
80 }
81 } catch(Exception e) {
82 throw new RuntimeException("Error while Files indexing in ES migration", e);
83 }
84 }
85
86 @Override
87 public boolean shouldProceedToUpgrade(String newVersion, String previousVersion) {
88 return !(isIndexationInESDone() && isJCRReindexationDone());
89 }
90
91 public void reindexJCR() throws Exception {
92 try {
93 SearchManager searchManager = (SearchManager) repositoryService.getCurrentRepository().getWorkspaceContainer("collaboration").getComponent(SearchManager.class);
94
95 if(!isJCRCollaboraionReindexationDone()) {
96 LOG.info("== Files ES migration - Starting reindexation of JCR collaboration workspace");
97 CompletableFuture<Boolean> reindexCollaborationWSResult = searchManager.reindexWorkspace(false, 0);
98 reindexCollaborationWSResult.thenAccept(successful -> {
99 if (successful) {
100 LOG.info("== Files ES migration - Reindexation of JCR collaboration workspace done");
101 settingService.set(Context.GLOBAL, Scope.GLOBAL.id(FILE_ES_INDEXATION_KEY), FILE_JCR_COLLABORATION_REINDEXATION_DONE_KEY, SettingValue.create(true));
102 } else {
103 LOG.error("== Files ES migration - Reindexation of JCR collaboration workspace failed. " +
104 "Check logs to fix the issue, then reindex it by restarting the server");
105 }
106 });
107 }
108
109 if(!isJCRSystemReindexationDone()) {
110 LOG.info("== Files ES migration - Starting reindexation of JCR system workspace");
111 SystemSearchManager systemSearchManager = (SystemSearchManager) repositoryService.getCurrentRepository().getWorkspaceContainer("system").getComponent(SystemSearchManager.class);
112 CompletableFuture<Boolean> reindexSystemWSResult = systemSearchManager.reindexWorkspace(false, 0);
113 reindexSystemWSResult.thenAccept(successful -> {
114 if (successful) {
115 LOG.info("== Files ES migration - Reindexation of JCR system workspace done");
116 settingService.set(Context.GLOBAL, Scope.GLOBAL.id(FILE_ES_INDEXATION_KEY), FILE_JCR_SYSTEM_REINDEXATION_DONE_KEY, SettingValue.create(true));
117 } else {
118 LOG.error("== Files ES migration - Reindexation of JCR system workspace failed. " +
119 "Check logs to fix the issue, then reindex it by restarting the server");
120 }
121 });
122 }
123 } catch (RepositoryException e) {
124 throw new Exception("Error while reindexing JCR collaboration and system workspaces", e);
125 }
126 }
127
128
129
130
131
132
133
134
135 public void indexInES() throws Exception {
136
137 LOG.info("== Files ES migration - Pause ESBulkIndexer job");
138 jobSchedulerService.pauseJob("ESBulkIndexer", "ElasticSearch");
139
140 try {
141 LOG.info("== Files ES migration - Stopping executing jobs ESBulkIndexer instances...");
142 jobSchedulerService.getAllExcutingJobs().stream()
143 .filter(jobExecutionContext -> jobExecutionContext.getJobDetail().getKey().getName().equals("ESBulkIndexer"))
144 .forEach(indexingJob -> {
145 try {
146 LOG.info("== Files ES migration - Interrupting executing job ESBulkIndexer instance " + indexingJob.getFireInstanceId());
147 ((InterruptableJob) indexingJob.getJobInstance()).interrupt();
148 } catch (UnableToInterruptJobException e) {
149 LOG.error("Error while interrupting ES indexing queue job to run the migration", e);
150 }
151 });
152
153 LOG.info("== Files ES migration - Clear indexing queue for files");
154 indexingService.clearQueue(FileindexingConnector.TYPE);
155
156 LOG.info("== Files ES migration - Starting pushing all files in indexation queue");
157 indexingService.reindexAll(FileindexingConnector.TYPE);
158
159
160 processIndexation();
161 settingService.set(Context.GLOBAL, Scope.GLOBAL.id(FILE_ES_INDEXATION_KEY), FILE_ES_INDEXATION_DONE_KEY, SettingValue.create(true));
162 LOG.info("== Files ES migration - Push of all files in indexation queue done");
163 } catch(Exception e) {
164 throw new Exception("Error while indexing files in ES", e);
165 } finally {
166 LOG.info("== Files ES migration - Resume ESBulkIndexer job");
167 jobSchedulerService.resumeJob("ESBulkIndexer", "ElasticSearch");
168 }
169 }
170
171 @ExoTransactional
172 public void processIndexation() {
173 LOG.info("== Files ES migration - Process files");
174 indexingOperationProcessor.process();
175 }
176
177 private boolean isIndexationInESDone() {
178 SettingValue<?> done = settingService.get(Context.GLOBAL, Scope.GLOBAL.id(FILE_ES_INDEXATION_KEY), FILE_ES_INDEXATION_DONE_KEY);
179 return done != null && done.getValue().equals("true");
180 }
181
182 private boolean isJCRReindexationDone() {
183 return isJCRCollaboraionReindexationDone() && isJCRSystemReindexationDone();
184 }
185
186 private boolean isJCRCollaboraionReindexationDone() {
187 SettingValue<?> done = settingService.get(Context.GLOBAL, Scope.GLOBAL.id(FILE_ES_INDEXATION_KEY), FILE_JCR_COLLABORATION_REINDEXATION_DONE_KEY);
188 return done != null && done.getValue().equals("true");
189 }
190
191 private boolean isJCRSystemReindexationDone() {
192 SettingValue<?> done = settingService.get(Context.GLOBAL, Scope.GLOBAL.id(FILE_ES_INDEXATION_KEY), FILE_JCR_SYSTEM_REINDEXATION_DONE_KEY);
193 return done != null && done.getValue().equals("true");
194 }
195
196 private void printNumberOfFileToIndex() {
197 try {
198 Session session = WCMCoreUtils.getSystemSessionProvider().getSession("collaboration", repositoryService.getCurrentRepository());
199 QueryManager queryManager = session.getWorkspace().getQueryManager();
200 Query query = queryManager.createQuery("select * from " + NodetypeConstant.NT_FILE, Query.SQL);
201 QueryResult result = query.execute();
202 LOG.info("== Files ES migration - Number of files to index : " + result.getNodes().getSize());
203 } catch (RepositoryException e) {
204 LOG.error("== Files ES migration - Error while counting all nt:file to index", e);
205 }
206 }
207 }