ElasticClient.java

/* 
 * Copyright (C) 2003-2015 eXo Platform SAS.
 *
 * This program is free software: you can redistribute it and/or modify
 * it under the terms of the GNU Lesser General Public License as published by
 * the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU Lesser General Public License for more details.
 *
 * You should have received a copy of the GNU Lesser General Public License
 * along with this program. If not, see http://www.gnu.org/licenses/ .
 */
package org.exoplatform.commons.search.es.client;

import java.io.IOException;
import java.io.InputStream;

import org.apache.commons.httpclient.HttpStatus;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.http.HttpResponse;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.client.HttpClient;
import org.apache.http.client.methods.*;
import org.apache.http.conn.HttpClientConnectionManager;
import org.apache.http.entity.ContentType;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.DefaultConnectionReuseStrategy;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.impl.client.HttpClients;

import org.exoplatform.services.log.ExoLogger;
import org.exoplatform.services.log.Log;

/**
 * Created by The eXo Platform SAS Author : Thibault Clement
 * tclement@exoplatform.com 10/16/15
 */
public abstract class ElasticClient {

  private static final String ES_INDEX_CLIENT_DEFAULT = "http://127.0.0.1:9200";

  private static final Log    LOG                     = ExoLogger.getExoLogger(ElasticClient.class);

  protected String            urlClient;
  protected HttpClient        client;
  protected ElasticIndexingAuditTrail auditTrail;

  public ElasticClient(ElasticIndexingAuditTrail auditTrail) {
    this.client = getHttpClient();
    this.urlClient = ES_INDEX_CLIENT_DEFAULT;
    if (auditTrail==null) {
      throw new IllegalArgumentException("AuditTrail is null");
    }
    this.auditTrail = auditTrail;
  }

  protected ElasticResponse sendHttpPostRequest(String url, String content) {
    ElasticResponse response;

    try {
      HttpPost httpTypeRequest = new HttpPost(url);
      if(StringUtils.isNotBlank(content)) {
        httpTypeRequest.setEntity(new StringEntity(content, ContentType.APPLICATION_JSON));
      }
      response = handleHttpResponse(client.execute(httpTypeRequest));
      LOG.debug("Sent request to ES:\n Method = POST \nURI =  {} \nContent = {}", url, content);
      logResultDependingOnStatusCode(url, response);
    } catch (IOException e) {
      throw new ElasticClientException(e);
    }
    return response;
  }

  protected ElasticResponse sendHttpPutRequest(String url, String content) {
    ElasticResponse response;
    try {
      HttpPut httpTypeRequest = new HttpPut(url);
      if(StringUtils.isNotBlank(content)) {
        httpTypeRequest.setEntity(new StringEntity(content, ContentType.APPLICATION_JSON));
      }
      response = handleHttpResponse(client.execute(httpTypeRequest));
      LOG.debug("Sent request to ES:\n Method = PUT \nURI =  '{}' \nContent = '{}'", url, content);
      logResultDependingOnStatusCode(url, response);
    } catch (IOException e) {
      throw new ElasticClientException(e);
    }
    return response;
  }

  protected ElasticResponse sendHttpDeleteRequest(String url) {
    ElasticResponse response;

    try {
      HttpDelete httpDeleteRequest = new HttpDelete(url);
      response = handleHttpResponse(client.execute(httpDeleteRequest));
      LOG.debug("Sent request to ES:\n Method = DELETE \nURI =  {}", url);
      logResultDependingOnStatusCode(url, response);
    } catch (IOException e) {
      throw new ElasticClientException(e);
    }
    return response;
  }

  protected ElasticResponse sendHttpGetRequest(String url) {
    ElasticResponse response;

    try {
      HttpGet httpGetRequest = new HttpGet(url);
      response = handleHttpResponse(client.execute(httpGetRequest));
      LOG.debug("Sent request to ES:\n Method = GET \nURI =  {}", url);
    } catch (IOException e) {
      throw new ElasticClientException(e);
    }
    return response;
  }

  protected ElasticResponse sendHttpHeadRequest(String url) {
    ElasticResponse response;

    try {
      HttpHead httpHeadRequest = new HttpHead(url);
      response = handleHttpResponse(client.execute(httpHeadRequest));
      LOG.debug("Sent request to ES:\n Method = HEAD \nURI =  {}", url);
    } catch (IOException e) {
      throw new ElasticClientException(e);
    }
    return response;
  }

  /**
   * Handle Http response receive from ES Log an INFO if the return status code
   * is 2xx Log an ERROR if the return code is different from 2xx
   *
   * @param httpResponse The Http Response to handle
   */
  private ElasticResponse handleHttpResponse(HttpResponse httpResponse) throws IOException {
    String response = null;
    InputStream is = null;

    if (httpResponse.getEntity()!=null) {
      try {
        is = httpResponse.getEntity().getContent();
        response = IOUtils.toString(is, "UTF-8");
      } finally {
        if (is != null) {
          is.close();
        }
      }
    }

    int statusCode = httpResponse.getStatusLine().getStatusCode();
    if (statusCode == HttpStatus.SC_UNAUTHORIZED) {
      throw new ElasticClientAuthenticationException();
    }
    return new ElasticResponse(response, statusCode);
  }

  private void logResultDependingOnStatusCode(String url, ElasticResponse response) {
    if (ElasticIndexingAuditTrail.isError(response.getStatusCode())) {
      LOG.error("Error when trying to send request to ES. Url: {}, StatusCode: {}, Message: {}",
          url,
          response.getStatusCode(),
          response.getMessage());
    } else {
      LOG.debug("Success request to ES. Url: {}, StatusCode: {}, Message: {}",
          url,
          response.getStatusCode(),
          response.getMessage());
    }
  }

  private HttpClient getHttpClient() {
    // Check if Basic Authentication need to be used
    HttpClientConnectionManager clientConnectionManager = getClientConnectionManager();
    HttpClientBuilder httpClientBuilder = HttpClients.custom()
        .setConnectionManager(clientConnectionManager)
        .setConnectionReuseStrategy(new DefaultConnectionReuseStrategy())
        .setMaxConnPerRoute(100);
    if (StringUtils.isNotBlank(getEsUsernameProperty())) {
      CredentialsProvider credsProvider = new BasicCredentialsProvider();
      credsProvider.setCredentials(
              new AuthScope(AuthScope.ANY_HOST, AuthScope.ANY_PORT),
              new UsernamePasswordCredentials(getEsUsernameProperty(),
                                              getEsPasswordProperty()));

      HttpClient httpClient = httpClientBuilder
          .setDefaultCredentialsProvider(credsProvider)
          .build();
      LOG.debug("Basic authentication for ES activated with username = {}",
                getEsUsernameProperty());
      return httpClient;
    } else {
      LOG.debug("Basic authentication for ES not activated");
      return httpClientBuilder.build();
    }
  }

  protected abstract String getEsUsernameProperty();

  protected abstract String getEsPasswordProperty();

  protected abstract HttpClientConnectionManager getClientConnectionManager();

}