ElasticIndexingClient.java

/* 
 * Copyright (C) 2003-2015 eXo Platform SAS.
 *
 * This program is free software: you can redistribute it and/or modify
 * it under the terms of the GNU Lesser General Public License as published by
 * the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU Lesser General Public License for more details.
 *
 * You should have received a copy of the GNU Lesser General Public License
 * along with this program. If not, see http://www.gnu.org/licenses/ .
 */
package org.exoplatform.commons.search.es.client;

import java.util.Collections;
import java.util.Map;
import java.util.Set;

import org.apache.commons.httpclient.HttpStatus;
import org.apache.commons.lang.StringUtils;
import org.apache.http.conn.HttpClientConnectionManager;
import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
import org.json.simple.JSONArray;
import org.json.simple.JSONObject;
import org.json.simple.JSONValue;
import org.json.simple.parser.JSONParser;
import org.json.simple.parser.ParseException;

import org.exoplatform.commons.utils.PropertyManager;
import org.exoplatform.services.log.ExoLogger;
import org.exoplatform.services.log.Log;

/**
 * Created by The eXo Platform SAS Author : Thibault Clement
 * tclement@exoplatform.com 9/1/15
 */
public class ElasticIndexingClient extends ElasticClient {
  public static final String        EMPTY_JSON                        = "{}";
  private static final Log          LOG                               = ExoLogger.getExoLogger(ElasticIndexingClient.class);
  private static final String       ES_INDEX_CLIENT_PROPERTY_NAME     = "exo.es.index.server.url";
  private static final String       ES_INDEX_CLIENT_PROPERTY_USERNAME = "exo.es.index.server.username";
  private static final String       ES_INDEX_CLIENT_PROPERTY_PASSWORD = "exo.es.index.server.password";
  private ElasticIndexingAuditTrail auditTrail;

  public ElasticIndexingClient(ElasticIndexingAuditTrail auditTrail) {
    super(auditTrail);
    if (auditTrail == null) {
      throw new IllegalArgumentException("AuditTrail is null");
    }
    this.auditTrail = auditTrail;
    // Get url client from exo global properties
    if (StringUtils.isNotBlank(PropertyManager.getProperty(ES_INDEX_CLIENT_PROPERTY_NAME))) {
      this.urlClient = PropertyManager.getProperty(ES_INDEX_CLIENT_PROPERTY_NAME);
      LOG.info("Using {} as Indexing URL", this.urlClient);
    } else {
      LOG.info("Using default as Indexing URL");
    }
  }

  /**
   * Send request to ES to create a new index
   */
  public boolean sendCreateIndexRequest(String index, String settings) {
    String indexURL = urlClient + "/" + index;
    if (sendIsIndexExistsRequest(index)) {
      LOG.info("Index {} already exists. Index creation requests will not be sent.", index);
      return false;
    } else {
      LOG.info("Index {} doesn't exist. Index creation requests will be sent.", index);

      long startTime = System.currentTimeMillis();
      ElasticResponse responseCreate = sendHttpPutRequest(indexURL, settings);
      auditTrail.audit(ElasticIndexingAuditTrail.CREATE_INDEX,
                       null,
                       index,
                       null,
                       responseCreate.getStatusCode(),
                       responseCreate.getMessage(),
                       (System.currentTimeMillis() - startTime));
      return true;
    }
  }

  /**
   * Send request to ES to create a new type
   */
  public void sendCreateTypeRequest(String index, String type, String mappings) {
    String url = urlClient + "/" + index + "/_mapping/" + type;
    if (!sendIsTypeExistsRequest(index, type)) {
      LOG.info("Mapping doesn't exist for type {}. Mapping creation requests will be sent.", type);
      long startTime = System.currentTimeMillis();
      ElasticResponse response = sendHttpPutRequest(url, mappings);
      auditTrail.audit(ElasticIndexingAuditTrail.CREATE_TYPE,
                       null,
                       index,
                       type,
                       response.getStatusCode(),
                       response.getMessage(),
                       (System.currentTimeMillis() - startTime));
    } else {
      LOG.info("Mapping already exists for type {}. Mapping creation requests will not be sent.", type);
    }
  }

  /**
   * Send request to ES to delete all documents of the given type
   */
  public void sendDeleteAllDocsOfTypeRequest(String index, String type) {
    long startTime = System.currentTimeMillis();
    String request = getDeleteAllDocumentsRequestContent();
    ElasticResponse response = sendHttpPostRequest(urlClient + "/" + index + "/" + type + "/_delete_by_query?conflicts=proceed&wait_for_completion=true",  request);
    auditTrail.audit(ElasticIndexingAuditTrail.DELETE_TYPE,
                     null,
                     index,
                     type,
                     response.getStatusCode(),
                     response.getMessage(),
                     (System.currentTimeMillis() - startTime));
  }

  /**
   * Send request to ES to perform a C-reate, U-pdate or D-elete operation on a
   * ES document
   * 
   * @param bulkRequest JSON containing C-reate, U-pdate or D-elete operation
   */
  public void sendCUDRequest(String bulkRequest) {
    long startTime = System.currentTimeMillis();
    ElasticResponse response = sendHttpPostRequest(urlClient + "/_bulk", bulkRequest);
    logBulkResponse(response.getMessage(), (System.currentTimeMillis() - startTime));
  }

  /**
   * Send request to ES to create a new Ingest pipeline for attachment
   * 
   * @param index
   * @param type
   * @param pipelineName
   * @param processorMappings
   */
  public void sendCreateAttachmentPipelineRequest(String index, String type, String pipelineName, String processorMappings) {
    String url = urlClient + "/_ingest/pipeline/" +  pipelineName;
    ElasticResponse responseExists = sendHttpGetRequest(url);
    if (responseExists.getStatusCode() == HttpStatus.SC_OK || responseExists.getStatusCode() == HttpStatus.SC_NOT_FOUND
        || responseExists.getStatusCode() == HttpStatus.SC_BAD_REQUEST) {
      if (EMPTY_JSON.equals(responseExists.getMessage())) {
        LOG.info("Pipeline doesn't exist for type {}. Mapping creation requests will be sent.", type);

        long startTime = System.currentTimeMillis();
        ElasticResponse response = sendHttpPutRequest(url, processorMappings);
        auditTrail.audit(ElasticIndexingAuditTrail.CREATE_PIPELINE,
                         null,
                         index,
                         type,
                         response.getStatusCode(),
                         response.getMessage(),
                         (System.currentTimeMillis() - startTime));
      } else {
        LOG.info("Pipeline already exists for type {}. Pipeline creation requests will not be sent.", type);
      }
    } else {
      LOG.error("Error while creating pipeline: Unsupported HttpStatusCode {}. url={}", responseExists.getStatusCode(), url);
    }
  }

  /**
   * Send request to ES to create a new Ingest pipeline for attachment
   * 
   * @param index
   * @param type
   * @param id
   * @param pipelineName
   * @param pipelineRequestOperation
   */
  public void sendCreateDocOnPipeline(String index, String type, String id, String pipelineName, String pipelineRequestOperation) {
    refreshIndex(index);
    String pipelineURL = urlClient + "/_ingest/pipeline/" +  pipelineName;
    ElasticResponse responseExists = sendHttpGetRequest(pipelineURL);
    if (responseExists.getStatusCode() == HttpStatus.SC_OK) {
      long startTime = System.currentTimeMillis();
      String url = urlClient + "/" + index + "/" + type + "/" + id + "?pipeline=" + pipelineName;
      ElasticResponse response = sendHttpPutRequest(url, pipelineRequestOperation);
      auditTrail.audit(ElasticIndexingAuditTrail.CREATE_DOC_PIPELINE,
                       null,
                       index,
                       type,
                       response.getStatusCode(),
                       response.getMessage(),
                       (System.currentTimeMillis() - startTime));
    } else {
      LOG.error("Error while creating attachment on pipeline '{}': Unsupported HttpStatusCode {}. url={}", pipelineName, responseExists.getStatusCode(), pipelineURL);
    }
  }

  /**
   * Send request to ES to create a new index alias for new ES Index and remove it from old index if exists
   */
  public void sendCreateIndexAliasRequest(String index, String oldIndex, String indexAlias) {
    if(oldIndex == null) {
      LOG.info("Index alias '{}' will be created to refer the index '{}'", indexAlias, index);
    } else {
      LOG.info("Index alias '{}' will be created to refer the index {} instead of old index '{}'", indexAlias, index, oldIndex);
    }
    long startTime = System.currentTimeMillis();
    String aliasesURL = urlClient + "/_aliases";
    ElasticResponse responseUpdateIndex = sendHttpPostRequest(aliasesURL, getCreateAliasRequestContent(index, oldIndex, indexAlias));
    if(responseUpdateIndex.getStatusCode() == HttpStatus.SC_OK) {
      auditTrail.audit(ElasticIndexingAuditTrail.CREATE_INDEX_ALIAS,
                       null,
                       index,
                       null,
                       responseUpdateIndex.getStatusCode(),
                       responseUpdateIndex.getMessage(),
                       (System.currentTimeMillis() - startTime));
    } else {
      auditTrail.audit(ElasticIndexingAuditTrail.CREATE_INDEX_ALIAS,
                       null,
                       index,
                       null,
                       responseUpdateIndex.getStatusCode(),
                       responseUpdateIndex.getMessage(),
                       (System.currentTimeMillis() - startTime));
      throw new ElasticClientException("Index alias " + indexAlias + " update from old index " + oldIndex + " to new index "
          + index + " error, http code = '" + responseUpdateIndex.getStatusCode() + "', message = '"
          + responseUpdateIndex.getMessage() + "'");
    }
  }

  /**
   * Send request to ES to get type existence information
   */
  public boolean sendIsTypeExistsRequest(String index, String type) {
    String url = urlClient + "/" + index + "/_mapping/" + type;
    ElasticResponse responseExists = sendHttpHeadRequest(url);
    if (responseExists.getStatusCode() == HttpStatus.SC_OK) {
      return true;
    } else if(responseExists.getStatusCode() == HttpStatus.SC_NOT_FOUND) {
      return false;
    } else {
      LOG.error("Error while checking Type existence: Unsupported HttpStatusCode {}. url={}", responseExists.getStatusCode(), url);
      throw new ElasticClientException("Can't request ES to get index/type " + index + "/" + type + " existence status");
    }
  }

  /**
   * Send request to ES to get index aliases
   */
  public Set<String> sendGetIndexAliasesRequest(String index) {
    String indexAliasURL = urlClient + "/" +  index + "/_aliases/";
    ElasticResponse responseExists = sendHttpGetRequest(indexAliasURL);
    // Test if he alias already exists
    if (responseExists.getStatusCode() == HttpStatus.SC_OK) {
      // Get all aliases information
      String aliasesURL = urlClient + "/_aliases";
      ElasticResponse responseAliases = sendHttpGetRequest(indexAliasURL);
      // An ES communication can happen, so throw an exception
      if (responseAliases.getStatusCode() != HttpStatus.SC_OK) {
        throw new ElasticClientException("Can't get aliases from URL " + aliasesURL);
      }
      String jsonResponse = responseAliases.getMessage();
      // Parse aliases mappings
      JSONParser parser = new JSONParser();
      Map<?, ?> json;
      try {
        json = (Map<?, ?>)parser.parse(jsonResponse);
      } catch (ParseException e) {
        throw new ElasticClientException("Unable to parse JSON response: " + jsonResponse, e);
      }

      // if alias exists and old index doesn't exist
      // this means the alias is made on new index name
      // So nothing to change
      if(!json.containsKey(index)) {
        return Collections.emptySet();
      }
      JSONObject indexAliases = (JSONObject)(((Map<?, ?>) json.get(index)).get("aliases"));
      return indexAliases.keySet();
    } else {
      throw new ElasticClientException("Uknow response code was sent by ES: \\n\\t\\t code = " + responseExists.getStatusCode()
          + ", \\n\\t\\t message: " + responseExists.getMessage());
    }
  }

  /**
   * Send request to ES to count all documents found in index
   */
  public long sendCountIndexObjectsRequest(String index) {
    refreshIndex(index);
    String indexCountObjectsURL = urlClient + "/" +  index + "/_count?q=*";
    ElasticResponse mappingsResponse = sendHttpGetRequest(indexCountObjectsURL);
    if (mappingsResponse.getStatusCode() == HttpStatus.SC_OK) {
      String jsonResponse = mappingsResponse.getMessage();
      // Parse mappings
      JSONParser parser = new JSONParser();
      Map<?, ?> json;
      try {
        json = (Map<?, ?>)parser.parse(jsonResponse);
      } catch (ParseException e) {
        throw new ElasticClientException("Unable to parse JSON response: " + jsonResponse, e);
      }

      if(!json.containsKey("count")) {
        throw new ElasticClientException("Unexpected content in JSON response from ES: " + jsonResponse);
      }
      return (Long)json.get("count");
    } else {
      throw new ElasticClientException("Uknow response code was sent by ES: \\n\\t\\t code = " + mappingsResponse.getStatusCode()
          + ", \\n\\t\\t message: " + mappingsResponse.getMessage());
    }
  }

  /**
   * Send request to ES to get version
   */
  public String sendGetESVersion() {
    ElasticResponse mappingsResponse = sendHttpGetRequest(urlClient);
    if (mappingsResponse.getStatusCode() == HttpStatus.SC_OK) {
      String jsonResponse = mappingsResponse.getMessage();
      // Parse mappings
      JSONParser parser = new JSONParser();
      Map<?, ?> json;
      try {
        json = (Map<?, ?>)parser.parse(jsonResponse);
      } catch (ParseException e) {
        throw new ElasticClientException("Unable to parse JSON response: " + jsonResponse, e);
      }

      if(!json.containsKey("version")) {
        throw new ElasticClientException("Unexpected content in JSON response from ES: " + jsonResponse);
      }
      return (String) ((JSONObject)json.get("version")).get("number");
    } else {
      throw new ElasticClientException("Uknow response code was sent by ES: \\n\\t\\t code = " + mappingsResponse.getStatusCode()
          + ", \\n\\t\\t message: " + mappingsResponse.getMessage());
    }
  }

  /**
   * Deletes an index from ES
   * 
   * @param index index name to delete
   */
  public void sendDeleteIndexRequest(String index) {
    long startTime = System.currentTimeMillis();
    ElasticResponse response = sendHttpDeleteRequest(urlClient + "/" + index);
    auditTrail.audit(ElasticIndexingAuditTrail.DELETE_TYPE,
                     null,
                     index,
                     null,
                     response.getStatusCode(),
                     response.getMessage(),
                     (System.currentTimeMillis() - startTime));
    if(response.getStatusCode() != HttpStatus.SC_OK) {
      throw new ElasticClientException("Can't delete index " + index + ", reqponse code = " + response.getStatusCode()
          + ", message = " + response.getMessage());
    }
  }

  /**
   * This operation reindex the documents from old index/type to new index/type mapping.
   * A pipeline could be used when reindexing in case Ingest Attachment plugin is used
   * by a target type.
   * 
   * @param index target index name
   * @param oldIndex source index name
   * @param type source type name
   * @param pipeline target pipeline name (optional)
   */
  public void sendReindexTypeRequest(String index, String oldIndex, String type, String pipeline) {
    long startTime = System.currentTimeMillis();
    String request = getReindexRequestContent(index, oldIndex, type, pipeline);
    ElasticResponse response = sendHttpPostRequest(urlClient + "/_reindex", request);
    auditTrail.audit(ElasticIndexingAuditTrail.REINDEX_TYPE,
                     null,
                     index,
                     type,
                     response.getStatusCode(),
                     response.getMessage(),
                     (System.currentTimeMillis() - startTime));
    if(response.getStatusCode() != HttpStatus.SC_OK) {
      throw new ElasticClientException("Can't reindex index " + index + ", type = " + type + ", reqponse code = " + response.getStatusCode()
          + ", message = " + response.getMessage());
    }
  }

  /**
   * Send request to ES to test if index exists
   *
   * @param index ES index
   * @return true if index exists in ES
   */
  public boolean sendIsIndexExistsRequest(String index) {
    String indexURL = urlClient + "/" + index;
    ElasticResponse responseExists = sendHttpGetRequest(indexURL);
    if (responseExists.getStatusCode() == HttpStatus.SC_OK) {
      return true;
    } else if (responseExists.getStatusCode() == HttpStatus.SC_NOT_FOUND
        || responseExists.getStatusCode() == HttpStatus.SC_BAD_REQUEST) {
      return false;
    } else {
      throw new ElasticClientException("Can't get index '"+ index +"' status");
    }
  }

  private void refreshIndex(String index) {
    String indexRefreshURL = urlClient + "/" +  index + "/_refresh";
    sendHttpPostRequest(indexRefreshURL, null);
  }

  private void logBulkResponse(String response, long executionTime) {
    try {
      Object parsedResponse = JSONValue.parseWithException(response);
      if (!(parsedResponse instanceof JSONObject)) {
        LOG.error("Unable to parse Bulk response: response is not a JSON. response={}", response);
        throw new ElasticClientException("Unable to parse Bulk response: response is not a JSON.");
      }
      // process items
      Object items = ((JSONObject) parsedResponse).get("items");
      if (items != null) {
        if (!(items instanceof JSONArray)) {
          LOG.error("Unable to parse Bulk response: items is not a JSONArray. items={}", items);
          throw new ElasticClientException("Unable to parse Bulk response: items is not a JSONArray.");
        }
        // Looping over all the items is required because
        // in case of error, ES send a response with Status 200 and a flag
        // errors:true
        // in the JSON message
        for (Object item : ((JSONArray) items).toArray()) {
          if (!(item instanceof JSONObject)) {
            LOG.error("Unable to parse Bulk response: item is not a JSONObject. item={}", item);
            throw new ElasticClientException("Unable to parse Bulk response: item is not a JSONObject.");
          }
          logBulkResponseItem((JSONObject) item, executionTime);
        }
      }
    } catch (ParseException e) {
      throw new ElasticClientException("Unable to parse Bulk response", e);
    }
  }

  private void logBulkResponseItem(JSONObject item, long executionTime) {
    for (Map.Entry operation : (Set<Map.Entry>) item.entrySet()) {
      String operationName = operation.getKey() == null ? null : (String) operation.getKey();
      if (operation.getValue() != null) {
        JSONObject operationDetails = (JSONObject) operation.getValue();
        String index = operationDetails.get("_index") == null ? null : (String) operationDetails.get("_index");
        String type = operationDetails.get("_type") == null ? null : (String) operationDetails.get("_type");
        String id = operationDetails.get("_id") == null ? null : (String) operationDetails.get("_id");
        Long status = operationDetails.get("status") == null ? null : (Long) operationDetails.get("status");
        String error = operationDetails.get("error") == null ? null : (String) ((JSONObject) operationDetails.get("error")).get("reason");
        Integer httpStatusCode = status == null ? null : status.intValue();
        if (ElasticIndexingAuditTrail.isError(httpStatusCode)) {
          auditTrail.logRejectedDocumentBulkOperation(operationName, id, index, type, httpStatusCode, error, executionTime);
        } else {
          if (auditTrail.isFullLogEnabled()) {
            auditTrail.logAcceptedBulkOperation(operationName, id, index, type, httpStatusCode, error, executionTime);
          }
        }
      }
    }
  }

  private String getDeleteAllDocumentsRequestContent() {

    JSONObject deleteAllRequest = new JSONObject();
    JSONObject deleteQueryRequest = new JSONObject();
    deleteQueryRequest.put("match_all", new JSONObject());
    deleteAllRequest.put("query", deleteQueryRequest);

    String request = deleteAllRequest.toJSONString();

    LOG.debug("Delete All request to ES: \n {}", request);
    return request;
  }

  private String getReindexRequestContent(String index, String oldIndex, String type, String pipeline) {
    JSONObject reindexRequest = new JSONObject();

    JSONObject reindexSourceRequest = new JSONObject();
    reindexRequest.put("source", reindexSourceRequest);
    reindexSourceRequest.put("index", oldIndex);
    reindexSourceRequest.put("type", type);

    JSONObject reindexDestRequest = new JSONObject();
    reindexRequest.put("dest", reindexDestRequest);
    reindexDestRequest.put("index", index);
    if(pipeline != null) {
      reindexDestRequest.put("pipeline", pipeline);
    }

    String request = reindexRequest.toJSONString();

    LOG.debug("Reindex Request from old index {} type {} to new index : \n {}", oldIndex, type, index, request);
    return request;
  }

  private String getCreateAliasRequestContent(String index, String oldIndex, String alias) {
    JSONObject updateAliasRequest = new JSONObject();
    JSONArray updateAliasActionsRequest = new JSONArray();
    updateAliasRequest.put("actions", updateAliasActionsRequest);
    if(oldIndex != null) {
      JSONObject updateAliasActionRemoveRequest = new JSONObject();
      JSONObject updateAliasActionRemoveOptionsRequest = new JSONObject();
      updateAliasActionRemoveRequest.put("remove", updateAliasActionRemoveOptionsRequest);
      updateAliasActionRemoveOptionsRequest.put("alias", alias);
      updateAliasActionRemoveOptionsRequest.put("index", oldIndex);
      updateAliasActionsRequest.add(updateAliasActionRemoveRequest);
    }
    JSONObject updateAliasActionAddRequest = new JSONObject();
    JSONObject updateAliasActionAddOptionsRequest = new JSONObject();
    updateAliasActionAddRequest.put("add", updateAliasActionAddOptionsRequest);
    updateAliasActionAddOptionsRequest.put("alias", alias);
    updateAliasActionAddOptionsRequest.put("index", index);
    updateAliasActionsRequest.add(updateAliasActionAddRequest);

    String request = updateAliasRequest.toJSONString();

    LOG.debug("Create Index alias ES: \n {}", request);
    return request;
  }

  @Override
  protected String getEsUsernameProperty() {
    return PropertyManager.getProperty(ES_INDEX_CLIENT_PROPERTY_USERNAME);
  }

  @Override
  protected String getEsPasswordProperty() {
    return PropertyManager.getProperty(ES_INDEX_CLIENT_PROPERTY_PASSWORD);
  }

  @Override
  protected HttpClientConnectionManager getClientConnectionManager() {
    return new PoolingHttpClientConnectionManager();
  }

}