ElasticIndexingClient.java
/*
* Copyright (C) 2003-2015 eXo Platform SAS.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with this program. If not, see http://www.gnu.org/licenses/ .
*/
package org.exoplatform.commons.search.es.client;
import java.util.Collections;
import java.util.Map;
import java.util.Set;
import org.apache.commons.httpclient.HttpStatus;
import org.apache.commons.lang.StringUtils;
import org.apache.http.conn.HttpClientConnectionManager;
import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
import org.json.simple.JSONArray;
import org.json.simple.JSONObject;
import org.json.simple.JSONValue;
import org.json.simple.parser.JSONParser;
import org.json.simple.parser.ParseException;
import org.exoplatform.commons.utils.PropertyManager;
import org.exoplatform.services.log.ExoLogger;
import org.exoplatform.services.log.Log;
/**
* Created by The eXo Platform SAS Author : Thibault Clement
* tclement@exoplatform.com 9/1/15
*/
public class ElasticIndexingClient extends ElasticClient {
public static final String EMPTY_JSON = "{}";
private static final Log LOG = ExoLogger.getExoLogger(ElasticIndexingClient.class);
private static final String ES_INDEX_CLIENT_PROPERTY_NAME = "exo.es.index.server.url";
private static final String ES_INDEX_CLIENT_PROPERTY_USERNAME = "exo.es.index.server.username";
private static final String ES_INDEX_CLIENT_PROPERTY_PASSWORD = "exo.es.index.server.password";
private ElasticIndexingAuditTrail auditTrail;
public ElasticIndexingClient(ElasticIndexingAuditTrail auditTrail) {
super(auditTrail);
if (auditTrail == null) {
throw new IllegalArgumentException("AuditTrail is null");
}
this.auditTrail = auditTrail;
// Get url client from exo global properties
if (StringUtils.isNotBlank(PropertyManager.getProperty(ES_INDEX_CLIENT_PROPERTY_NAME))) {
this.urlClient = PropertyManager.getProperty(ES_INDEX_CLIENT_PROPERTY_NAME);
LOG.info("Using {} as Indexing URL", this.urlClient);
} else {
LOG.info("Using default as Indexing URL");
}
}
/**
* Send request to ES to create a new index
*/
public boolean sendCreateIndexRequest(String index, String settings) {
String indexURL = urlClient + "/" + index;
if (sendIsIndexExistsRequest(index)) {
LOG.info("Index {} already exists. Index creation requests will not be sent.", index);
return false;
} else {
LOG.info("Index {} doesn't exist. Index creation requests will be sent.", index);
long startTime = System.currentTimeMillis();
ElasticResponse responseCreate = sendHttpPutRequest(indexURL, settings);
auditTrail.audit(ElasticIndexingAuditTrail.CREATE_INDEX,
null,
index,
null,
responseCreate.getStatusCode(),
responseCreate.getMessage(),
(System.currentTimeMillis() - startTime));
return true;
}
}
/**
* Send request to ES to create a new type
*/
public void sendCreateTypeRequest(String index, String type, String mappings) {
String url = urlClient + "/" + index + "/_mapping/" + type;
if (!sendIsTypeExistsRequest(index, type)) {
LOG.info("Mapping doesn't exist for type {}. Mapping creation requests will be sent.", type);
long startTime = System.currentTimeMillis();
ElasticResponse response = sendHttpPutRequest(url, mappings);
auditTrail.audit(ElasticIndexingAuditTrail.CREATE_TYPE,
null,
index,
type,
response.getStatusCode(),
response.getMessage(),
(System.currentTimeMillis() - startTime));
} else {
LOG.info("Mapping already exists for type {}. Mapping creation requests will not be sent.", type);
}
}
/**
* Send request to ES to delete all documents of the given type
*/
public void sendDeleteAllDocsOfTypeRequest(String index, String type) {
long startTime = System.currentTimeMillis();
String request = getDeleteAllDocumentsRequestContent();
ElasticResponse response = sendHttpPostRequest(urlClient + "/" + index + "/" + type + "/_delete_by_query?conflicts=proceed&wait_for_completion=true", request);
auditTrail.audit(ElasticIndexingAuditTrail.DELETE_TYPE,
null,
index,
type,
response.getStatusCode(),
response.getMessage(),
(System.currentTimeMillis() - startTime));
}
/**
* Send request to ES to perform a C-reate, U-pdate or D-elete operation on a
* ES document
*
* @param bulkRequest JSON containing C-reate, U-pdate or D-elete operation
*/
public void sendCUDRequest(String bulkRequest) {
long startTime = System.currentTimeMillis();
ElasticResponse response = sendHttpPostRequest(urlClient + "/_bulk", bulkRequest);
logBulkResponse(response.getMessage(), (System.currentTimeMillis() - startTime));
}
/**
* Send request to ES to create a new Ingest pipeline for attachment
*
* @param index
* @param type
* @param pipelineName
* @param processorMappings
*/
public void sendCreateAttachmentPipelineRequest(String index, String type, String pipelineName, String processorMappings) {
String url = urlClient + "/_ingest/pipeline/" + pipelineName;
ElasticResponse responseExists = sendHttpGetRequest(url);
if (responseExists.getStatusCode() == HttpStatus.SC_OK || responseExists.getStatusCode() == HttpStatus.SC_NOT_FOUND
|| responseExists.getStatusCode() == HttpStatus.SC_BAD_REQUEST) {
if (EMPTY_JSON.equals(responseExists.getMessage())) {
LOG.info("Pipeline doesn't exist for type {}. Mapping creation requests will be sent.", type);
long startTime = System.currentTimeMillis();
ElasticResponse response = sendHttpPutRequest(url, processorMappings);
auditTrail.audit(ElasticIndexingAuditTrail.CREATE_PIPELINE,
null,
index,
type,
response.getStatusCode(),
response.getMessage(),
(System.currentTimeMillis() - startTime));
} else {
LOG.info("Pipeline already exists for type {}. Pipeline creation requests will not be sent.", type);
}
} else {
LOG.error("Error while creating pipeline: Unsupported HttpStatusCode {}. url={}", responseExists.getStatusCode(), url);
}
}
/**
* Send request to ES to create a new Ingest pipeline for attachment
*
* @param index
* @param type
* @param id
* @param pipelineName
* @param pipelineRequestOperation
*/
public void sendCreateDocOnPipeline(String index, String type, String id, String pipelineName, String pipelineRequestOperation) {
refreshIndex(index);
String pipelineURL = urlClient + "/_ingest/pipeline/" + pipelineName;
ElasticResponse responseExists = sendHttpGetRequest(pipelineURL);
if (responseExists.getStatusCode() == HttpStatus.SC_OK) {
long startTime = System.currentTimeMillis();
String url = urlClient + "/" + index + "/" + type + "/" + id + "?pipeline=" + pipelineName;
ElasticResponse response = sendHttpPutRequest(url, pipelineRequestOperation);
auditTrail.audit(ElasticIndexingAuditTrail.CREATE_DOC_PIPELINE,
null,
index,
type,
response.getStatusCode(),
response.getMessage(),
(System.currentTimeMillis() - startTime));
} else {
LOG.error("Error while creating attachment on pipeline '{}': Unsupported HttpStatusCode {}. url={}", pipelineName, responseExists.getStatusCode(), pipelineURL);
}
}
/**
* Send request to ES to create a new index alias for new ES Index and remove it from old index if exists
*/
public void sendCreateIndexAliasRequest(String index, String oldIndex, String indexAlias) {
if(oldIndex == null) {
LOG.info("Index alias '{}' will be created to refer the index '{}'", indexAlias, index);
} else {
LOG.info("Index alias '{}' will be created to refer the index {} instead of old index '{}'", indexAlias, index, oldIndex);
}
long startTime = System.currentTimeMillis();
String aliasesURL = urlClient + "/_aliases";
ElasticResponse responseUpdateIndex = sendHttpPostRequest(aliasesURL, getCreateAliasRequestContent(index, oldIndex, indexAlias));
if(responseUpdateIndex.getStatusCode() == HttpStatus.SC_OK) {
auditTrail.audit(ElasticIndexingAuditTrail.CREATE_INDEX_ALIAS,
null,
index,
null,
responseUpdateIndex.getStatusCode(),
responseUpdateIndex.getMessage(),
(System.currentTimeMillis() - startTime));
} else {
auditTrail.audit(ElasticIndexingAuditTrail.CREATE_INDEX_ALIAS,
null,
index,
null,
responseUpdateIndex.getStatusCode(),
responseUpdateIndex.getMessage(),
(System.currentTimeMillis() - startTime));
throw new ElasticClientException("Index alias " + indexAlias + " update from old index " + oldIndex + " to new index "
+ index + " error, http code = '" + responseUpdateIndex.getStatusCode() + "', message = '"
+ responseUpdateIndex.getMessage() + "'");
}
}
/**
* Send request to ES to get type existence information
*/
public boolean sendIsTypeExistsRequest(String index, String type) {
String url = urlClient + "/" + index + "/_mapping/" + type;
ElasticResponse responseExists = sendHttpHeadRequest(url);
if (responseExists.getStatusCode() == HttpStatus.SC_OK) {
return true;
} else if(responseExists.getStatusCode() == HttpStatus.SC_NOT_FOUND) {
return false;
} else {
LOG.error("Error while checking Type existence: Unsupported HttpStatusCode {}. url={}", responseExists.getStatusCode(), url);
throw new ElasticClientException("Can't request ES to get index/type " + index + "/" + type + " existence status");
}
}
/**
* Send request to ES to get index aliases
*/
public Set<String> sendGetIndexAliasesRequest(String index) {
String indexAliasURL = urlClient + "/" + index + "/_aliases/";
ElasticResponse responseExists = sendHttpGetRequest(indexAliasURL);
// Test if he alias already exists
if (responseExists.getStatusCode() == HttpStatus.SC_OK) {
// Get all aliases information
String aliasesURL = urlClient + "/_aliases";
ElasticResponse responseAliases = sendHttpGetRequest(indexAliasURL);
// An ES communication can happen, so throw an exception
if (responseAliases.getStatusCode() != HttpStatus.SC_OK) {
throw new ElasticClientException("Can't get aliases from URL " + aliasesURL);
}
String jsonResponse = responseAliases.getMessage();
// Parse aliases mappings
JSONParser parser = new JSONParser();
Map<?, ?> json;
try {
json = (Map<?, ?>)parser.parse(jsonResponse);
} catch (ParseException e) {
throw new ElasticClientException("Unable to parse JSON response: " + jsonResponse, e);
}
// if alias exists and old index doesn't exist
// this means the alias is made on new index name
// So nothing to change
if(!json.containsKey(index)) {
return Collections.emptySet();
}
JSONObject indexAliases = (JSONObject)(((Map<?, ?>) json.get(index)).get("aliases"));
return indexAliases.keySet();
} else {
throw new ElasticClientException("Uknow response code was sent by ES: \\n\\t\\t code = " + responseExists.getStatusCode()
+ ", \\n\\t\\t message: " + responseExists.getMessage());
}
}
/**
* Send request to ES to count all documents found in index
*/
public long sendCountIndexObjectsRequest(String index) {
refreshIndex(index);
String indexCountObjectsURL = urlClient + "/" + index + "/_count?q=*";
ElasticResponse mappingsResponse = sendHttpGetRequest(indexCountObjectsURL);
if (mappingsResponse.getStatusCode() == HttpStatus.SC_OK) {
String jsonResponse = mappingsResponse.getMessage();
// Parse mappings
JSONParser parser = new JSONParser();
Map<?, ?> json;
try {
json = (Map<?, ?>)parser.parse(jsonResponse);
} catch (ParseException e) {
throw new ElasticClientException("Unable to parse JSON response: " + jsonResponse, e);
}
if(!json.containsKey("count")) {
throw new ElasticClientException("Unexpected content in JSON response from ES: " + jsonResponse);
}
return (Long)json.get("count");
} else {
throw new ElasticClientException("Uknow response code was sent by ES: \\n\\t\\t code = " + mappingsResponse.getStatusCode()
+ ", \\n\\t\\t message: " + mappingsResponse.getMessage());
}
}
/**
* Send request to ES to get version
*/
public String sendGetESVersion() {
ElasticResponse mappingsResponse = sendHttpGetRequest(urlClient);
if (mappingsResponse.getStatusCode() == HttpStatus.SC_OK) {
String jsonResponse = mappingsResponse.getMessage();
// Parse mappings
JSONParser parser = new JSONParser();
Map<?, ?> json;
try {
json = (Map<?, ?>)parser.parse(jsonResponse);
} catch (ParseException e) {
throw new ElasticClientException("Unable to parse JSON response: " + jsonResponse, e);
}
if(!json.containsKey("version")) {
throw new ElasticClientException("Unexpected content in JSON response from ES: " + jsonResponse);
}
return (String) ((JSONObject)json.get("version")).get("number");
} else {
throw new ElasticClientException("Uknow response code was sent by ES: \\n\\t\\t code = " + mappingsResponse.getStatusCode()
+ ", \\n\\t\\t message: " + mappingsResponse.getMessage());
}
}
/**
* Deletes an index from ES
*
* @param index index name to delete
*/
public void sendDeleteIndexRequest(String index) {
long startTime = System.currentTimeMillis();
ElasticResponse response = sendHttpDeleteRequest(urlClient + "/" + index);
auditTrail.audit(ElasticIndexingAuditTrail.DELETE_TYPE,
null,
index,
null,
response.getStatusCode(),
response.getMessage(),
(System.currentTimeMillis() - startTime));
if(response.getStatusCode() != HttpStatus.SC_OK) {
throw new ElasticClientException("Can't delete index " + index + ", reqponse code = " + response.getStatusCode()
+ ", message = " + response.getMessage());
}
}
/**
* This operation reindex the documents from old index/type to new index/type mapping.
* A pipeline could be used when reindexing in case Ingest Attachment plugin is used
* by a target type.
*
* @param index target index name
* @param oldIndex source index name
* @param type source type name
* @param pipeline target pipeline name (optional)
*/
public void sendReindexTypeRequest(String index, String oldIndex, String type, String pipeline) {
long startTime = System.currentTimeMillis();
String request = getReindexRequestContent(index, oldIndex, type, pipeline);
ElasticResponse response = sendHttpPostRequest(urlClient + "/_reindex", request);
auditTrail.audit(ElasticIndexingAuditTrail.REINDEX_TYPE,
null,
index,
type,
response.getStatusCode(),
response.getMessage(),
(System.currentTimeMillis() - startTime));
if(response.getStatusCode() != HttpStatus.SC_OK) {
throw new ElasticClientException("Can't reindex index " + index + ", type = " + type + ", reqponse code = " + response.getStatusCode()
+ ", message = " + response.getMessage());
}
}
/**
* Send request to ES to test if index exists
*
* @param index ES index
* @return true if index exists in ES
*/
public boolean sendIsIndexExistsRequest(String index) {
String indexURL = urlClient + "/" + index;
ElasticResponse responseExists = sendHttpGetRequest(indexURL);
if (responseExists.getStatusCode() == HttpStatus.SC_OK) {
return true;
} else if (responseExists.getStatusCode() == HttpStatus.SC_NOT_FOUND
|| responseExists.getStatusCode() == HttpStatus.SC_BAD_REQUEST) {
return false;
} else {
throw new ElasticClientException("Can't get index '"+ index +"' status");
}
}
private void refreshIndex(String index) {
String indexRefreshURL = urlClient + "/" + index + "/_refresh";
sendHttpPostRequest(indexRefreshURL, null);
}
private void logBulkResponse(String response, long executionTime) {
try {
Object parsedResponse = JSONValue.parseWithException(response);
if (!(parsedResponse instanceof JSONObject)) {
LOG.error("Unable to parse Bulk response: response is not a JSON. response={}", response);
throw new ElasticClientException("Unable to parse Bulk response: response is not a JSON.");
}
// process items
Object items = ((JSONObject) parsedResponse).get("items");
if (items != null) {
if (!(items instanceof JSONArray)) {
LOG.error("Unable to parse Bulk response: items is not a JSONArray. items={}", items);
throw new ElasticClientException("Unable to parse Bulk response: items is not a JSONArray.");
}
// Looping over all the items is required because
// in case of error, ES send a response with Status 200 and a flag
// errors:true
// in the JSON message
for (Object item : ((JSONArray) items).toArray()) {
if (!(item instanceof JSONObject)) {
LOG.error("Unable to parse Bulk response: item is not a JSONObject. item={}", item);
throw new ElasticClientException("Unable to parse Bulk response: item is not a JSONObject.");
}
logBulkResponseItem((JSONObject) item, executionTime);
}
}
} catch (ParseException e) {
throw new ElasticClientException("Unable to parse Bulk response", e);
}
}
private void logBulkResponseItem(JSONObject item, long executionTime) {
for (Map.Entry operation : (Set<Map.Entry>) item.entrySet()) {
String operationName = operation.getKey() == null ? null : (String) operation.getKey();
if (operation.getValue() != null) {
JSONObject operationDetails = (JSONObject) operation.getValue();
String index = operationDetails.get("_index") == null ? null : (String) operationDetails.get("_index");
String type = operationDetails.get("_type") == null ? null : (String) operationDetails.get("_type");
String id = operationDetails.get("_id") == null ? null : (String) operationDetails.get("_id");
Long status = operationDetails.get("status") == null ? null : (Long) operationDetails.get("status");
String error = operationDetails.get("error") == null ? null : (String) ((JSONObject) operationDetails.get("error")).get("reason");
Integer httpStatusCode = status == null ? null : status.intValue();
if (ElasticIndexingAuditTrail.isError(httpStatusCode)) {
auditTrail.logRejectedDocumentBulkOperation(operationName, id, index, type, httpStatusCode, error, executionTime);
} else {
if (auditTrail.isFullLogEnabled()) {
auditTrail.logAcceptedBulkOperation(operationName, id, index, type, httpStatusCode, error, executionTime);
}
}
}
}
}
private String getDeleteAllDocumentsRequestContent() {
JSONObject deleteAllRequest = new JSONObject();
JSONObject deleteQueryRequest = new JSONObject();
deleteQueryRequest.put("match_all", new JSONObject());
deleteAllRequest.put("query", deleteQueryRequest);
String request = deleteAllRequest.toJSONString();
LOG.debug("Delete All request to ES: \n {}", request);
return request;
}
private String getReindexRequestContent(String index, String oldIndex, String type, String pipeline) {
JSONObject reindexRequest = new JSONObject();
JSONObject reindexSourceRequest = new JSONObject();
reindexRequest.put("source", reindexSourceRequest);
reindexSourceRequest.put("index", oldIndex);
reindexSourceRequest.put("type", type);
JSONObject reindexDestRequest = new JSONObject();
reindexRequest.put("dest", reindexDestRequest);
reindexDestRequest.put("index", index);
if(pipeline != null) {
reindexDestRequest.put("pipeline", pipeline);
}
String request = reindexRequest.toJSONString();
LOG.debug("Reindex Request from old index {} type {} to new index : \n {}", oldIndex, type, index, request);
return request;
}
private String getCreateAliasRequestContent(String index, String oldIndex, String alias) {
JSONObject updateAliasRequest = new JSONObject();
JSONArray updateAliasActionsRequest = new JSONArray();
updateAliasRequest.put("actions", updateAliasActionsRequest);
if(oldIndex != null) {
JSONObject updateAliasActionRemoveRequest = new JSONObject();
JSONObject updateAliasActionRemoveOptionsRequest = new JSONObject();
updateAliasActionRemoveRequest.put("remove", updateAliasActionRemoveOptionsRequest);
updateAliasActionRemoveOptionsRequest.put("alias", alias);
updateAliasActionRemoveOptionsRequest.put("index", oldIndex);
updateAliasActionsRequest.add(updateAliasActionRemoveRequest);
}
JSONObject updateAliasActionAddRequest = new JSONObject();
JSONObject updateAliasActionAddOptionsRequest = new JSONObject();
updateAliasActionAddRequest.put("add", updateAliasActionAddOptionsRequest);
updateAliasActionAddOptionsRequest.put("alias", alias);
updateAliasActionAddOptionsRequest.put("index", index);
updateAliasActionsRequest.add(updateAliasActionAddRequest);
String request = updateAliasRequest.toJSONString();
LOG.debug("Create Index alias ES: \n {}", request);
return request;
}
@Override
protected String getEsUsernameProperty() {
return PropertyManager.getProperty(ES_INDEX_CLIENT_PROPERTY_USERNAME);
}
@Override
protected String getEsPasswordProperty() {
return PropertyManager.getProperty(ES_INDEX_CLIENT_PROPERTY_PASSWORD);
}
@Override
protected HttpClientConnectionManager getClientConnectionManager() {
return new PoolingHttpClientConnectionManager();
}
}