ElasticIndexingClient.java

  1. /*
  2.  * Copyright (C) 2003-2015 eXo Platform SAS.
  3.  *
  4.  * This program is free software: you can redistribute it and/or modify
  5.  * it under the terms of the GNU Lesser General Public License as published by
  6.  * the Free Software Foundation, either version 3 of the License, or
  7.  * (at your option) any later version.
  8.  *
  9.  * This program is distributed in the hope that it will be useful,
  10.  * but WITHOUT ANY WARRANTY; without even the implied warranty of
  11.  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  12.  * GNU Lesser General Public License for more details.
  13.  *
  14.  * You should have received a copy of the GNU Lesser General Public License
  15.  * along with this program. If not, see http://www.gnu.org/licenses/ .
  16.  */
  17. package org.exoplatform.commons.search.es.client;

  18. import java.util.Collections;
  19. import java.util.Map;
  20. import java.util.Set;

  21. import org.apache.commons.httpclient.HttpStatus;
  22. import org.apache.commons.lang.StringUtils;
  23. import org.apache.http.conn.HttpClientConnectionManager;
  24. import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
  25. import org.json.simple.JSONArray;
  26. import org.json.simple.JSONObject;
  27. import org.json.simple.JSONValue;
  28. import org.json.simple.parser.JSONParser;
  29. import org.json.simple.parser.ParseException;

  30. import org.exoplatform.commons.utils.PropertyManager;
  31. import org.exoplatform.services.log.ExoLogger;
  32. import org.exoplatform.services.log.Log;

  33. /**
  34.  * Created by The eXo Platform SAS Author : Thibault Clement
  35.  * tclement@exoplatform.com 9/1/15
  36.  */
  37. public class ElasticIndexingClient extends ElasticClient {
  38.   public static final String        EMPTY_JSON                        = "{}";
  39.   private static final Log          LOG                               = ExoLogger.getExoLogger(ElasticIndexingClient.class);
  40.   private static final String       ES_INDEX_CLIENT_PROPERTY_NAME     = "exo.es.index.server.url";
  41.   private static final String       ES_INDEX_CLIENT_PROPERTY_USERNAME = "exo.es.index.server.username";
  42.   private static final String       ES_INDEX_CLIENT_PROPERTY_PASSWORD = "exo.es.index.server.password";
  43.   private ElasticIndexingAuditTrail auditTrail;

  44.   public ElasticIndexingClient(ElasticIndexingAuditTrail auditTrail) {
  45.     super(auditTrail);
  46.     if (auditTrail == null) {
  47.       throw new IllegalArgumentException("AuditTrail is null");
  48.     }
  49.     this.auditTrail = auditTrail;
  50.     // Get url client from exo global properties
  51.     if (StringUtils.isNotBlank(PropertyManager.getProperty(ES_INDEX_CLIENT_PROPERTY_NAME))) {
  52.       this.urlClient = PropertyManager.getProperty(ES_INDEX_CLIENT_PROPERTY_NAME);
  53.       LOG.info("Using {} as Indexing URL", this.urlClient);
  54.     } else {
  55.       LOG.info("Using default as Indexing URL");
  56.     }
  57.   }

  58.   /**
  59.    * Send request to ES to create a new index
  60.    */
  61.   public boolean sendCreateIndexRequest(String index, String settings) {
  62.     String indexURL = urlClient + "/" + index;
  63.     if (sendIsIndexExistsRequest(index)) {
  64.       LOG.info("Index {} already exists. Index creation requests will not be sent.", index);
  65.       return false;
  66.     } else {
  67.       LOG.info("Index {} doesn't exist. Index creation requests will be sent.", index);

  68.       long startTime = System.currentTimeMillis();
  69.       ElasticResponse responseCreate = sendHttpPutRequest(indexURL, settings);
  70.       auditTrail.audit(ElasticIndexingAuditTrail.CREATE_INDEX,
  71.                        null,
  72.                        index,
  73.                        null,
  74.                        responseCreate.getStatusCode(),
  75.                        responseCreate.getMessage(),
  76.                        (System.currentTimeMillis() - startTime));
  77.       return true;
  78.     }
  79.   }

  80.   /**
  81.    * Send request to ES to create a new type
  82.    */
  83.   public void sendCreateTypeRequest(String index, String type, String mappings) {
  84.     String url = urlClient + "/" + index + "/_mapping/" + type;
  85.     if (!sendIsTypeExistsRequest(index, type)) {
  86.       LOG.info("Mapping doesn't exist for type {}. Mapping creation requests will be sent.", type);
  87.       long startTime = System.currentTimeMillis();
  88.       ElasticResponse response = sendHttpPutRequest(url, mappings);
  89.       auditTrail.audit(ElasticIndexingAuditTrail.CREATE_TYPE,
  90.                        null,
  91.                        index,
  92.                        type,
  93.                        response.getStatusCode(),
  94.                        response.getMessage(),
  95.                        (System.currentTimeMillis() - startTime));
  96.     } else {
  97.       LOG.info("Mapping already exists for type {}. Mapping creation requests will not be sent.", type);
  98.     }
  99.   }

  100.   /**
  101.    * Send request to ES to delete all documents of the given type
  102.    */
  103.   public void sendDeleteAllDocsOfTypeRequest(String index, String type) {
  104.     long startTime = System.currentTimeMillis();
  105.     String request = getDeleteAllDocumentsRequestContent();
  106.     ElasticResponse response = sendHttpPostRequest(urlClient + "/" + index + "/" + type + "/_delete_by_query?conflicts=proceed&wait_for_completion=true",  request);
  107.     auditTrail.audit(ElasticIndexingAuditTrail.DELETE_TYPE,
  108.                      null,
  109.                      index,
  110.                      type,
  111.                      response.getStatusCode(),
  112.                      response.getMessage(),
  113.                      (System.currentTimeMillis() - startTime));
  114.   }

  115.   /**
  116.    * Send request to ES to perform a C-reate, U-pdate or D-elete operation on a
  117.    * ES document
  118.    *
  119.    * @param bulkRequest JSON containing C-reate, U-pdate or D-elete operation
  120.    */
  121.   public void sendCUDRequest(String bulkRequest) {
  122.     long startTime = System.currentTimeMillis();
  123.     ElasticResponse response = sendHttpPostRequest(urlClient + "/_bulk", bulkRequest);
  124.     logBulkResponse(response.getMessage(), (System.currentTimeMillis() - startTime));
  125.   }

  126.   /**
  127.    * Send request to ES to create a new Ingest pipeline for attachment
  128.    *
  129.    * @param index
  130.    * @param type
  131.    * @param pipelineName
  132.    * @param processorMappings
  133.    */
  134.   public void sendCreateAttachmentPipelineRequest(String index, String type, String pipelineName, String processorMappings) {
  135.     String url = urlClient + "/_ingest/pipeline/" +  pipelineName;
  136.     ElasticResponse responseExists = sendHttpGetRequest(url);
  137.     if (responseExists.getStatusCode() == HttpStatus.SC_OK || responseExists.getStatusCode() == HttpStatus.SC_NOT_FOUND
  138.         || responseExists.getStatusCode() == HttpStatus.SC_BAD_REQUEST) {
  139.       if (EMPTY_JSON.equals(responseExists.getMessage())) {
  140.         LOG.info("Pipeline doesn't exist for type {}. Mapping creation requests will be sent.", type);

  141.         long startTime = System.currentTimeMillis();
  142.         ElasticResponse response = sendHttpPutRequest(url, processorMappings);
  143.         auditTrail.audit(ElasticIndexingAuditTrail.CREATE_PIPELINE,
  144.                          null,
  145.                          index,
  146.                          type,
  147.                          response.getStatusCode(),
  148.                          response.getMessage(),
  149.                          (System.currentTimeMillis() - startTime));
  150.       } else {
  151.         LOG.info("Pipeline already exists for type {}. Pipeline creation requests will not be sent.", type);
  152.       }
  153.     } else {
  154.       LOG.error("Error while creating pipeline: Unsupported HttpStatusCode {}. url={}", responseExists.getStatusCode(), url);
  155.     }
  156.   }

  157.   /**
  158.    * Send request to ES to create a new Ingest pipeline for attachment
  159.    *
  160.    * @param index
  161.    * @param type
  162.    * @param id
  163.    * @param pipelineName
  164.    * @param pipelineRequestOperation
  165.    */
  166.   public void sendCreateDocOnPipeline(String index, String type, String id, String pipelineName, String pipelineRequestOperation) {
  167.     refreshIndex(index);
  168.     String pipelineURL = urlClient + "/_ingest/pipeline/" +  pipelineName;
  169.     ElasticResponse responseExists = sendHttpGetRequest(pipelineURL);
  170.     if (responseExists.getStatusCode() == HttpStatus.SC_OK) {
  171.       long startTime = System.currentTimeMillis();
  172.       String url = urlClient + "/" + index + "/" + type + "/" + id + "?pipeline=" + pipelineName;
  173.       ElasticResponse response = sendHttpPutRequest(url, pipelineRequestOperation);
  174.       auditTrail.audit(ElasticIndexingAuditTrail.CREATE_DOC_PIPELINE,
  175.                        null,
  176.                        index,
  177.                        type,
  178.                        response.getStatusCode(),
  179.                        response.getMessage(),
  180.                        (System.currentTimeMillis() - startTime));
  181.     } else {
  182.       LOG.error("Error while creating attachment on pipeline '{}': Unsupported HttpStatusCode {}. url={}", pipelineName, responseExists.getStatusCode(), pipelineURL);
  183.     }
  184.   }

  185.   /**
  186.    * Send request to ES to create a new index alias for new ES Index and remove it from old index if exists
  187.    */
  188.   public void sendCreateIndexAliasRequest(String index, String oldIndex, String indexAlias) {
  189.     if(oldIndex == null) {
  190.       LOG.info("Index alias '{}' will be created to refer the index '{}'", indexAlias, index);
  191.     } else {
  192.       LOG.info("Index alias '{}' will be created to refer the index {} instead of old index '{}'", indexAlias, index, oldIndex);
  193.     }
  194.     long startTime = System.currentTimeMillis();
  195.     String aliasesURL = urlClient + "/_aliases";
  196.     ElasticResponse responseUpdateIndex = sendHttpPostRequest(aliasesURL, getCreateAliasRequestContent(index, oldIndex, indexAlias));
  197.     if(responseUpdateIndex.getStatusCode() == HttpStatus.SC_OK) {
  198.       auditTrail.audit(ElasticIndexingAuditTrail.CREATE_INDEX_ALIAS,
  199.                        null,
  200.                        index,
  201.                        null,
  202.                        responseUpdateIndex.getStatusCode(),
  203.                        responseUpdateIndex.getMessage(),
  204.                        (System.currentTimeMillis() - startTime));
  205.     } else {
  206.       auditTrail.audit(ElasticIndexingAuditTrail.CREATE_INDEX_ALIAS,
  207.                        null,
  208.                        index,
  209.                        null,
  210.                        responseUpdateIndex.getStatusCode(),
  211.                        responseUpdateIndex.getMessage(),
  212.                        (System.currentTimeMillis() - startTime));
  213.       throw new ElasticClientException("Index alias " + indexAlias + " update from old index " + oldIndex + " to new index "
  214.           + index + " error, http code = '" + responseUpdateIndex.getStatusCode() + "', message = '"
  215.           + responseUpdateIndex.getMessage() + "'");
  216.     }
  217.   }

  218.   /**
  219.    * Send request to ES to get type existence information
  220.    */
  221.   public boolean sendIsTypeExistsRequest(String index, String type) {
  222.     String url = urlClient + "/" + index + "/_mapping/" + type;
  223.     ElasticResponse responseExists = sendHttpHeadRequest(url);
  224.     if (responseExists.getStatusCode() == HttpStatus.SC_OK) {
  225.       return true;
  226.     } else if(responseExists.getStatusCode() == HttpStatus.SC_NOT_FOUND) {
  227.       return false;
  228.     } else {
  229.       LOG.error("Error while checking Type existence: Unsupported HttpStatusCode {}. url={}", responseExists.getStatusCode(), url);
  230.       throw new ElasticClientException("Can't request ES to get index/type " + index + "/" + type + " existence status");
  231.     }
  232.   }

  233.   /**
  234.    * Send request to ES to get index aliases
  235.    */
  236.   public Set<String> sendGetIndexAliasesRequest(String index) {
  237.     String indexAliasURL = urlClient + "/" +  index + "/_aliases/";
  238.     ElasticResponse responseExists = sendHttpGetRequest(indexAliasURL);
  239.     // Test if he alias already exists
  240.     if (responseExists.getStatusCode() == HttpStatus.SC_OK) {
  241.       // Get all aliases information
  242.       String aliasesURL = urlClient + "/_aliases";
  243.       ElasticResponse responseAliases = sendHttpGetRequest(indexAliasURL);
  244.       // An ES communication can happen, so throw an exception
  245.       if (responseAliases.getStatusCode() != HttpStatus.SC_OK) {
  246.         throw new ElasticClientException("Can't get aliases from URL " + aliasesURL);
  247.       }
  248.       String jsonResponse = responseAliases.getMessage();
  249.       // Parse aliases mappings
  250.       JSONParser parser = new JSONParser();
  251.       Map<?, ?> json;
  252.       try {
  253.         json = (Map<?, ?>)parser.parse(jsonResponse);
  254.       } catch (ParseException e) {
  255.         throw new ElasticClientException("Unable to parse JSON response: " + jsonResponse, e);
  256.       }

  257.       // if alias exists and old index doesn't exist
  258.       // this means the alias is made on new index name
  259.       // So nothing to change
  260.       if(!json.containsKey(index)) {
  261.         return Collections.emptySet();
  262.       }
  263.       JSONObject indexAliases = (JSONObject)(((Map<?, ?>) json.get(index)).get("aliases"));
  264.       return indexAliases.keySet();
  265.     } else {
  266.       throw new ElasticClientException("Uknow response code was sent by ES: \\n\\t\\t code = " + responseExists.getStatusCode()
  267.           + ", \\n\\t\\t message: " + responseExists.getMessage());
  268.     }
  269.   }

  270.   /**
  271.    * Send request to ES to count all documents found in index
  272.    */
  273.   public long sendCountIndexObjectsRequest(String index) {
  274.     refreshIndex(index);
  275.     String indexCountObjectsURL = urlClient + "/" +  index + "/_count?q=*";
  276.     ElasticResponse mappingsResponse = sendHttpGetRequest(indexCountObjectsURL);
  277.     if (mappingsResponse.getStatusCode() == HttpStatus.SC_OK) {
  278.       String jsonResponse = mappingsResponse.getMessage();
  279.       // Parse mappings
  280.       JSONParser parser = new JSONParser();
  281.       Map<?, ?> json;
  282.       try {
  283.         json = (Map<?, ?>)parser.parse(jsonResponse);
  284.       } catch (ParseException e) {
  285.         throw new ElasticClientException("Unable to parse JSON response: " + jsonResponse, e);
  286.       }

  287.       if(!json.containsKey("count")) {
  288.         throw new ElasticClientException("Unexpected content in JSON response from ES: " + jsonResponse);
  289.       }
  290.       return (Long)json.get("count");
  291.     } else {
  292.       throw new ElasticClientException("Uknow response code was sent by ES: \\n\\t\\t code = " + mappingsResponse.getStatusCode()
  293.           + ", \\n\\t\\t message: " + mappingsResponse.getMessage());
  294.     }
  295.   }

  296.   /**
  297.    * Send request to ES to get version
  298.    */
  299.   public String sendGetESVersion() {
  300.     ElasticResponse mappingsResponse = sendHttpGetRequest(urlClient);
  301.     if (mappingsResponse.getStatusCode() == HttpStatus.SC_OK) {
  302.       String jsonResponse = mappingsResponse.getMessage();
  303.       // Parse mappings
  304.       JSONParser parser = new JSONParser();
  305.       Map<?, ?> json;
  306.       try {
  307.         json = (Map<?, ?>)parser.parse(jsonResponse);
  308.       } catch (ParseException e) {
  309.         throw new ElasticClientException("Unable to parse JSON response: " + jsonResponse, e);
  310.       }

  311.       if(!json.containsKey("version")) {
  312.         throw new ElasticClientException("Unexpected content in JSON response from ES: " + jsonResponse);
  313.       }
  314.       return (String) ((JSONObject)json.get("version")).get("number");
  315.     } else {
  316.       throw new ElasticClientException("Uknow response code was sent by ES: \\n\\t\\t code = " + mappingsResponse.getStatusCode()
  317.           + ", \\n\\t\\t message: " + mappingsResponse.getMessage());
  318.     }
  319.   }

  320.   /**
  321.    * Deletes an index from ES
  322.    *
  323.    * @param index index name to delete
  324.    */
  325.   public void sendDeleteIndexRequest(String index) {
  326.     long startTime = System.currentTimeMillis();
  327.     ElasticResponse response = sendHttpDeleteRequest(urlClient + "/" + index);
  328.     auditTrail.audit(ElasticIndexingAuditTrail.DELETE_TYPE,
  329.                      null,
  330.                      index,
  331.                      null,
  332.                      response.getStatusCode(),
  333.                      response.getMessage(),
  334.                      (System.currentTimeMillis() - startTime));
  335.     if(response.getStatusCode() != HttpStatus.SC_OK) {
  336.       throw new ElasticClientException("Can't delete index " + index + ", reqponse code = " + response.getStatusCode()
  337.           + ", message = " + response.getMessage());
  338.     }
  339.   }

  340.   /**
  341.    * This operation reindex the documents from old index/type to new index/type mapping.
  342.    * A pipeline could be used when reindexing in case Ingest Attachment plugin is used
  343.    * by a target type.
  344.    *
  345.    * @param index target index name
  346.    * @param oldIndex source index name
  347.    * @param type source type name
  348.    * @param pipeline target pipeline name (optional)
  349.    */
  350.   public void sendReindexTypeRequest(String index, String oldIndex, String type, String pipeline) {
  351.     long startTime = System.currentTimeMillis();
  352.     String request = getReindexRequestContent(index, oldIndex, type, pipeline);
  353.     ElasticResponse response = sendHttpPostRequest(urlClient + "/_reindex", request);
  354.     auditTrail.audit(ElasticIndexingAuditTrail.REINDEX_TYPE,
  355.                      null,
  356.                      index,
  357.                      type,
  358.                      response.getStatusCode(),
  359.                      response.getMessage(),
  360.                      (System.currentTimeMillis() - startTime));
  361.     if(response.getStatusCode() != HttpStatus.SC_OK) {
  362.       throw new ElasticClientException("Can't reindex index " + index + ", type = " + type + ", reqponse code = " + response.getStatusCode()
  363.           + ", message = " + response.getMessage());
  364.     }
  365.   }

  366.   /**
  367.    * Send request to ES to test if index exists
  368.    *
  369.    * @param index ES index
  370.    * @return true if index exists in ES
  371.    */
  372.   public boolean sendIsIndexExistsRequest(String index) {
  373.     String indexURL = urlClient + "/" + index;
  374.     ElasticResponse responseExists = sendHttpGetRequest(indexURL);
  375.     if (responseExists.getStatusCode() == HttpStatus.SC_OK) {
  376.       return true;
  377.     } else if (responseExists.getStatusCode() == HttpStatus.SC_NOT_FOUND
  378.         || responseExists.getStatusCode() == HttpStatus.SC_BAD_REQUEST) {
  379.       return false;
  380.     } else {
  381.       throw new ElasticClientException("Can't get index '"+ index +"' status");
  382.     }
  383.   }

  384.   private void refreshIndex(String index) {
  385.     String indexRefreshURL = urlClient + "/" +  index + "/_refresh";
  386.     sendHttpPostRequest(indexRefreshURL, null);
  387.   }

  388.   private void logBulkResponse(String response, long executionTime) {
  389.     try {
  390.       Object parsedResponse = JSONValue.parseWithException(response);
  391.       if (!(parsedResponse instanceof JSONObject)) {
  392.         LOG.error("Unable to parse Bulk response: response is not a JSON. response={}", response);
  393.         throw new ElasticClientException("Unable to parse Bulk response: response is not a JSON.");
  394.       }
  395.       // process items
  396.       Object items = ((JSONObject) parsedResponse).get("items");
  397.       if (items != null) {
  398.         if (!(items instanceof JSONArray)) {
  399.           LOG.error("Unable to parse Bulk response: items is not a JSONArray. items={}", items);
  400.           throw new ElasticClientException("Unable to parse Bulk response: items is not a JSONArray.");
  401.         }
  402.         // Looping over all the items is required because
  403.         // in case of error, ES send a response with Status 200 and a flag
  404.         // errors:true
  405.         // in the JSON message
  406.         for (Object item : ((JSONArray) items).toArray()) {
  407.           if (!(item instanceof JSONObject)) {
  408.             LOG.error("Unable to parse Bulk response: item is not a JSONObject. item={}", item);
  409.             throw new ElasticClientException("Unable to parse Bulk response: item is not a JSONObject.");
  410.           }
  411.           logBulkResponseItem((JSONObject) item, executionTime);
  412.         }
  413.       }
  414.     } catch (ParseException e) {
  415.       throw new ElasticClientException("Unable to parse Bulk response", e);
  416.     }
  417.   }

  418.   private void logBulkResponseItem(JSONObject item, long executionTime) {
  419.     for (Map.Entry operation : (Set<Map.Entry>) item.entrySet()) {
  420.       String operationName = operation.getKey() == null ? null : (String) operation.getKey();
  421.       if (operation.getValue() != null) {
  422.         JSONObject operationDetails = (JSONObject) operation.getValue();
  423.         String index = operationDetails.get("_index") == null ? null : (String) operationDetails.get("_index");
  424.         String type = operationDetails.get("_type") == null ? null : (String) operationDetails.get("_type");
  425.         String id = operationDetails.get("_id") == null ? null : (String) operationDetails.get("_id");
  426.         Long status = operationDetails.get("status") == null ? null : (Long) operationDetails.get("status");
  427.         String error = operationDetails.get("error") == null ? null : (String) ((JSONObject) operationDetails.get("error")).get("reason");
  428.         Integer httpStatusCode = status == null ? null : status.intValue();
  429.         if (ElasticIndexingAuditTrail.isError(httpStatusCode)) {
  430.           auditTrail.logRejectedDocumentBulkOperation(operationName, id, index, type, httpStatusCode, error, executionTime);
  431.         } else {
  432.           if (auditTrail.isFullLogEnabled()) {
  433.             auditTrail.logAcceptedBulkOperation(operationName, id, index, type, httpStatusCode, error, executionTime);
  434.           }
  435.         }
  436.       }
  437.     }
  438.   }

  439.   private String getDeleteAllDocumentsRequestContent() {

  440.     JSONObject deleteAllRequest = new JSONObject();
  441.     JSONObject deleteQueryRequest = new JSONObject();
  442.     deleteQueryRequest.put("match_all", new JSONObject());
  443.     deleteAllRequest.put("query", deleteQueryRequest);

  444.     String request = deleteAllRequest.toJSONString();

  445.     LOG.debug("Delete All request to ES: \n {}", request);
  446.     return request;
  447.   }

  448.   private String getReindexRequestContent(String index, String oldIndex, String type, String pipeline) {
  449.     JSONObject reindexRequest = new JSONObject();

  450.     JSONObject reindexSourceRequest = new JSONObject();
  451.     reindexRequest.put("source", reindexSourceRequest);
  452.     reindexSourceRequest.put("index", oldIndex);
  453.     reindexSourceRequest.put("type", type);

  454.     JSONObject reindexDestRequest = new JSONObject();
  455.     reindexRequest.put("dest", reindexDestRequest);
  456.     reindexDestRequest.put("index", index);
  457.     if(pipeline != null) {
  458.       reindexDestRequest.put("pipeline", pipeline);
  459.     }

  460.     String request = reindexRequest.toJSONString();

  461.     LOG.debug("Reindex Request from old index {} type {} to new index : \n {}", oldIndex, type, index, request);
  462.     return request;
  463.   }

  464.   private String getCreateAliasRequestContent(String index, String oldIndex, String alias) {
  465.     JSONObject updateAliasRequest = new JSONObject();
  466.     JSONArray updateAliasActionsRequest = new JSONArray();
  467.     updateAliasRequest.put("actions", updateAliasActionsRequest);
  468.     if(oldIndex != null) {
  469.       JSONObject updateAliasActionRemoveRequest = new JSONObject();
  470.       JSONObject updateAliasActionRemoveOptionsRequest = new JSONObject();
  471.       updateAliasActionRemoveRequest.put("remove", updateAliasActionRemoveOptionsRequest);
  472.       updateAliasActionRemoveOptionsRequest.put("alias", alias);
  473.       updateAliasActionRemoveOptionsRequest.put("index", oldIndex);
  474.       updateAliasActionsRequest.add(updateAliasActionRemoveRequest);
  475.     }
  476.     JSONObject updateAliasActionAddRequest = new JSONObject();
  477.     JSONObject updateAliasActionAddOptionsRequest = new JSONObject();
  478.     updateAliasActionAddRequest.put("add", updateAliasActionAddOptionsRequest);
  479.     updateAliasActionAddOptionsRequest.put("alias", alias);
  480.     updateAliasActionAddOptionsRequest.put("index", index);
  481.     updateAliasActionsRequest.add(updateAliasActionAddRequest);

  482.     String request = updateAliasRequest.toJSONString();

  483.     LOG.debug("Create Index alias ES: \n {}", request);
  484.     return request;
  485.   }

  486.   @Override
  487.   protected String getEsUsernameProperty() {
  488.     return PropertyManager.getProperty(ES_INDEX_CLIENT_PROPERTY_USERNAME);
  489.   }

  490.   @Override
  491.   protected String getEsPasswordProperty() {
  492.     return PropertyManager.getProperty(ES_INDEX_CLIENT_PROPERTY_PASSWORD);
  493.   }

  494.   @Override
  495.   protected HttpClientConnectionManager getClientConnectionManager() {
  496.     return new PoolingHttpClientConnectionManager();
  497.   }

  498. }