FileindexingConnector.java
package org.exoplatform.services.wcm.search.connector;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.StringUtils;
import org.exoplatform.commons.search.domain.Document;
import org.exoplatform.commons.search.index.impl.ElasticIndexingServiceConnector;
import org.exoplatform.commons.utils.CommonsUtils;
import org.exoplatform.container.xml.InitParams;
import org.exoplatform.services.cms.documents.TrashService;
import org.exoplatform.services.jcr.RepositoryService;
import org.exoplatform.services.jcr.access.AccessControlList;
import org.exoplatform.services.jcr.core.ExtendedNode;
import org.exoplatform.services.jcr.core.ExtendedSession;
import org.exoplatform.services.jcr.core.ManageableRepository;
import org.exoplatform.services.jcr.impl.core.NodeImpl;
import org.exoplatform.services.jcr.impl.core.query.QueryImpl;
import org.exoplatform.services.log.ExoLogger;
import org.exoplatform.services.log.Log;
import org.exoplatform.services.wcm.core.NodetypeConstant;
import org.exoplatform.services.wcm.utils.WCMCoreUtils;
import javax.jcr.*;
import javax.jcr.nodetype.NodeTypeManager;
import javax.jcr.nodetype.PropertyDefinition;
import javax.jcr.query.*;
import java.io.IOException;
import java.io.InputStream;
import java.util.*;
import java.util.stream.Collectors;
/**
* Indexing Connector for Files
*/
public class FileindexingConnector extends ElasticIndexingServiceConnector {
private static final Log LOGGER = ExoLogger.getExoLogger(FileindexingConnector.class);
public static final String TYPE = "file";
private RepositoryService repositoryService;
private TrashService trashService;
public FileindexingConnector(InitParams initParams) {
super(initParams);
this.repositoryService = CommonsUtils.getService(RepositoryService.class);
this.trashService = CommonsUtils.getService(TrashService.class);
}
@Override
public boolean isNeedIngestPipeline() {
return true;
}
@Override
public String getPipelineName() {
return "file";
}
@Override
public String getMapping() {
StringBuilder mapping = new StringBuilder()
.append("{")
.append(" \"properties\" : {\n")
.append(" \"repository\" : {\"type\" : \"keyword\"},\n")
.append(" \"workspace\" : {\"type\" : \"keyword\"},\n")
.append(" \"path\" : {\"type\" : \"keyword\"},\n")
.append(" \"author\" : {\"type\" : \"keyword\"},\n")
.append(" \"permissions\" : {\"type\" : \"keyword\"},\n")
.append(" \"createdDate\" : {\"type\" : \"date\", \"format\": \"epoch_millis\"},\n")
.append(" \"lastUpdatedDate\" : {\"type\" : \"date\", \"format\": \"epoch_millis\"},\n")
.append(" \"fileType\" : {\"type\" : \"keyword\"},\n")
.append(" \"fileSize\" : {\"type\" : \"long\"},\n")
.append(" \"name\" : {\"type\" : \"text\", \"analyzer\": \"letter_lowercase_asciifolding\"},\n")
.append(" \"title\" : {\"type\" : \"text\", \"analyzer\": \"letter_lowercase_asciifolding\"},\n")
.append(" \"dc:title\" : {\"type\" : \"text\"},\n")
.append(" \"dc:creator\" : {\"type\" : \"text\"},\n")
.append(" \"dc:subject\" : {\"type\" : \"text\"},\n")
.append(" \"dc:description\" : {\"type\" : \"text\"},\n")
.append(" \"dc:publisher\" : {\"type\" : \"text\"},\n")
.append(" \"dc:contributor\" : {\"type\" : \"text\"},\n")
.append(" \"dc:date\" : {\"type\" : \"date\", \"format\": \"epoch_millis\"},\n")
.append(" \"dc:resourceType\" : {\"type\" : \"text\"},\n")
.append(" \"dc:format\" : {\"type\" : \"text\"},\n")
.append(" \"dc:identifier\" : {\"type\" : \"text\"},\n")
.append(" \"dc:source\" : {\"type\" : \"text\"},\n")
.append(" \"dc:language\" : {\"type\" : \"text\"},\n")
.append(" \"dc:relation\" : {\"type\" : \"text\"},\n")
.append(" \"dc:coverage\" : {\"type\" : \"text\"},\n")
.append(" \"dc:rights\" : {\"type\" : \"text\"}\n")
.append(" }\n")
.append("}");
return mapping.toString();
}
@Override
public String getAttachmentProcessor() {
StringBuilder processors = new StringBuilder()
.append("{")
.append(" \"description\" : \"File processor\",\n")
.append(" \"processors\" : [{\n")
.append(" \"attachment\" : {\n")
.append(" \"field\" : \"file\",\n")
.append(" \"indexed_chars\" : -1,\n")
.append(" \"properties\" : [\"content\"]\n")
.append(" }\n")
.append(" },{\n")
.append(" \"remove\" : {\n")
.append(" \"field\" : \"file\"\n")
.append(" }\n")
.append(" }]\n")
.append("}");
return processors.toString();
}
@Override
public Document create(String id) {
if(StringUtils.isEmpty(id)) {
return null;
}
try {
ExtendedSession session = (ExtendedSession) WCMCoreUtils.getSystemSessionProvider().getSession("collaboration", repositoryService.getCurrentRepository());
Node node = session.getNodeByIdentifier(id);
if(node == null || !node.isNodeType(NodetypeConstant.NT_FILE) || trashService.isInTrash(node) || isInContentFolder(node)) {
return null;
}
Map<String, String> fields = new HashMap<>();
fields.put("name", node.getName());
fields.put("repository", ((ManageableRepository) session.getRepository()).getConfiguration().getName());
fields.put("workspace", session.getWorkspace().getName());
fields.put("path", node.getPath());
if(node.hasProperty(NodetypeConstant.EXO_TITLE)) {
fields.put("title", node.getProperty(NodetypeConstant.EXO_TITLE).getString());
} else {
fields.put("title", node.getName());
}
if(node.hasProperty(NodetypeConstant.EXO_OWNER)) {
fields.put("author", node.getProperty(NodetypeConstant.EXO_OWNER).getString());
}
if(node.hasProperty("jcr:created")) {
fields.put("createdDate", String.valueOf(node.getProperty("jcr:created").getDate().getTimeInMillis()));
}
Node contentNode = node.getNode(NodetypeConstant.JCR_CONTENT);
if(contentNode != null) {
if (contentNode.hasProperty(NodetypeConstant.JCR_MIMETYPE)) {
fields.put("fileType", contentNode.getProperty(NodetypeConstant.JCR_MIMETYPE).getString());
}
InputStream fileStream = contentNode.getProperty(NodetypeConstant.JCR_DATA).getStream();
byte[] fileBytes = IOUtils.toByteArray(fileStream);
fields.put("file", Base64.getEncoder().encodeToString(fileBytes));
fields.put("fileSize", String.valueOf(fileBytes.length));
// Dublin Core metadata
Map<String, String> dublinCoreMetadata = extractDublinCoreMetadata(contentNode);
if(dublinCoreMetadata != null) {
fields.putAll(dublinCoreMetadata);
}
}
LOGGER.info("ES document generated for file with id={} path=\"{}\"", id, node.getPath());
return new Document(TYPE, id, null, new Date(), computePermissions(node), fields);
} catch (RepositoryException | IOException e) {
LOGGER.error("Error while indexing file " + id, e);
}
return null;
}
protected boolean isInContentFolder(Node node) {
try {
return
( (node.isNodeType("exo:htmlFile") && org.exoplatform.services.cms.impl.Utils.isDocument(node.getParent())) ||
(node.isNodeType("exo:cssFile") && org.exoplatform.services.cms.impl.Utils.isDocument(node.getParent().getParent())) ||
(node.isNodeType("exo:jsFile") && org.exoplatform.services.cms.impl.Utils.isDocument(node.getParent().getParent())) ||
(node.isNodeType("nt:file") && (node.getPath().contains("/medias/images")||node.getPath().contains("/medias/videos")||node.getPath().contains("/medias/audio")) && org.exoplatform.services.cms.impl.Utils.isDocument(node.getParent().getParent().getParent()))
);
} catch (Exception e) {
return false;
}
}
@Override
public Document update(String id) {
return create(id);
}
@Override
public List<String> getAllIds(int offset, int limit) {
List<String> allIds = new ArrayList<>();
try {
Session session = WCMCoreUtils.getSystemSessionProvider().getSession("collaboration", repositoryService.getCurrentRepository());
QueryManager queryManager = session.getWorkspace().getQueryManager();
Query query = queryManager.createQuery("select * from " + NodetypeConstant.NT_FILE, Query.SQL);
QueryImpl queryImpl = (QueryImpl) query;
queryImpl.setOffset(offset);
queryImpl.setLimit(limit);
QueryResult result = queryImpl.execute();
NodeIterator nodeIterator = result.getNodes();
while(nodeIterator.hasNext()) {
NodeImpl node = (NodeImpl) nodeIterator.nextNode();
// use node internal identifier to be sure to have an id for all nodes
allIds.add(node.getInternalIdentifier());
}
} catch (RepositoryException e) {
throw new RuntimeException("Error while fetching all nt:file nodes", e);
}
if(Thread.currentThread().isInterrupted()) {
throw new RuntimeException("Indexing queue processing interrupted");
}
LOGGER.info("Fetched {} files to push in indexing queue (offset={}, limit={})", allIds.size(), offset, limit);
return allIds;
}
protected Map<String, String> extractDublinCoreMetadata(Node contentNode) throws RepositoryException {
Map<String, String> dcFields = null;
if (contentNode.isNodeType(NodetypeConstant.DC_ELEMENT_SET)) {
dcFields = new HashMap<>();
NodeTypeManager nodeTypeManager = repositoryService.getCurrentRepository().getNodeTypeManager();
PropertyDefinition[] dcPropertyDefinitions = nodeTypeManager.getNodeType(NodetypeConstant.DC_ELEMENT_SET).getPropertyDefinitions();
for (PropertyDefinition propertyDefinition : dcPropertyDefinitions) {
String propertyName = propertyDefinition.getName();
if (contentNode.hasProperty(propertyName)) {
Property property = contentNode.getProperty(propertyName);
if(property != null) {
String strValue = null;
if (propertyDefinition.isMultiple()) {
Value[] values = property.getValues();
if(values != null && values.length > 0) {
Value value = values[0];
if (property.getType() == PropertyType.DATE) {
strValue = String.valueOf(value.getDate().toInstant().toEpochMilli());
} else {
strValue = value.getString();
}
}
} else {
if (property.getType() == PropertyType.DATE) {
strValue = String.valueOf(property.getDate().toInstant().toEpochMilli());
} else {
strValue = property.getString();
}
}
if(strValue != null) {
dcFields.put(propertyName, strValue);
}
}
}
}
}
return dcFields;
}
private Set<String> computePermissions(Node node) throws RepositoryException {
Set<String> permissions = new HashSet<>();
AccessControlList acl = ((ExtendedNode) node).getACL();
//Add the owner
permissions.add(acl.getOwner());
//Add permissions
if (acl.getPermissionEntries() != null) {
permissions.addAll(acl.getPermissionEntries().stream().map(permission -> permission.getIdentity()).collect(Collectors.toSet()));
}
return permissions;
}
}