/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.io.elasticsearch;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.TreeNode;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.databind.json.JsonMapper;
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Strings;
import com.google.common.hash.Hasher;
import com.google.common.hash.Hashing;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Base64;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.regex.Pattern;
import org.apache.avro.generic.GenericRecord;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.schema.GenericObject;
import org.apache.pulsar.client.api.schema.KeyValueSchema;
import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.common.schema.SchemaType;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.io.core.Sink;
import org.apache.pulsar.io.core.SinkContext;
import org.apache.pulsar.io.core.annotations.Connector;
import org.apache.pulsar.io.core.annotations.IOType;
import org.apache.pulsar.io.elasticsearch.ElasticSearchClient;
import org.apache.pulsar.io.elasticsearch.ElasticSearchConfig;
import org.apache.pulsar.io.elasticsearch.JsonConverter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Connector(name="elastic_search", type=IOType.SINK, help="A sink connector that sends pulsar messages to elastic search", configClass=ElasticSearchConfig.class)
public class ElasticSearchSink
implements Sink<GenericObject> {
    private static final Logger log = LoggerFactory.getLogger(ElasticSearchSink.class);
    private ElasticSearchConfig elasticSearchConfig;
    private ElasticSearchClient elasticsearchClient;
    private final ObjectMapper objectMapper = new ObjectMapper();
    private ObjectMapper sortedObjectMapper;
    private List<String> primaryFields = null;
    private final Pattern nonPrintableCharactersPattern = Pattern.compile("[\\p{C}]");
    private final Base64.Encoder base64Encoder = Base64.getEncoder().withoutPadding();

    public void open(Map<String, Object> config, SinkContext sinkContext) throws Exception {
        this.elasticSearchConfig = ElasticSearchConfig.load(config, sinkContext);
        this.elasticSearchConfig.validate();
        this.elasticsearchClient = new ElasticSearchClient(this.elasticSearchConfig, sinkContext);
        if (!Strings.isNullOrEmpty((String)this.elasticSearchConfig.getPrimaryFields())) {
            this.primaryFields = Arrays.asList(this.elasticSearchConfig.getPrimaryFields().split(","));
        }
        if (this.elasticSearchConfig.isCanonicalKeyFields()) {
            this.sortedObjectMapper = ((JsonMapper.Builder)((JsonMapper.Builder)JsonMapper.builder().configure(SerializationFeature.ORDER_MAP_ENTRIES_BY_KEYS, true)).nodeFactory(new JsonNodeFactory(){

                public ObjectNode objectNode() {
                    return new ObjectNode((JsonNodeFactory)this, new TreeMap());
                }
            })).build();
        }
    }

    public void close() {
        if (this.elasticsearchClient != null) {
            this.elasticsearchClient.close();
            this.elasticsearchClient = null;
        }
    }

    @VisibleForTesting
    void setElasticsearchClient(ElasticSearchClient elasticsearchClient) {
        this.elasticsearchClient = elasticsearchClient;
    }

    public void write(Record<GenericObject> record) throws Exception {
        Pair<String, String> idAndDoc = this.extractIdAndDocument(record);
        try {
            if (log.isDebugEnabled()) {
                log.debug("index doc {} {}", idAndDoc.getLeft(), idAndDoc.getRight());
            }
            if (idAndDoc.getRight() == null) {
                switch (this.elasticSearchConfig.getNullValueAction()) {
                    case DELETE: {
                        if (idAndDoc.getLeft() == null) break;
                        if (this.elasticSearchConfig.isBulkEnabled()) {
                            this.elasticsearchClient.bulkDelete(record, (String)idAndDoc.getLeft());
                            break;
                        }
                        this.elasticsearchClient.deleteDocument(record, (String)idAndDoc.getLeft());
                        break;
                    }
                    case IGNORE: {
                        break;
                    }
                    case FAIL: {
                        this.elasticsearchClient.failed((Exception)new PulsarClientException.InvalidMessageException("Unexpected null message value"));
                    }
                }
            } else if (this.elasticSearchConfig.isBulkEnabled()) {
                this.elasticsearchClient.bulkIndex(record, idAndDoc);
            } else {
                this.elasticsearchClient.indexDocument(record, idAndDoc);
            }
        }
        catch (JsonProcessingException jsonProcessingException) {
            switch (this.elasticSearchConfig.getMalformedDocAction()) {
                case IGNORE: {
                    break;
                }
                case WARN: {
                    log.warn("Ignoring malformed document messageId={}", record.getMessage().map(Message::getMessageId).orElse(null), (Object)jsonProcessingException);
                    this.elasticsearchClient.failed((Exception)((Object)jsonProcessingException));
                    break;
                }
                case FAIL: {
                    log.error("Malformed document messageId={}", record.getMessage().map(Message::getMessageId).orElse(null), (Object)jsonProcessingException);
                    this.elasticsearchClient.failed((Exception)((Object)jsonProcessingException));
                }
            }
        }
        catch (Exception e) {
            log.error("write error for {} {}:", new Object[]{idAndDoc.getLeft(), idAndDoc.getRight(), e});
            throw e;
        }
    }

    @VisibleForTesting
    ElasticSearchClient getElasticsearchClient() {
        return this.elasticsearchClient;
    }

    public Pair<String, String> extractIdAndDocument(Record<GenericObject> record) throws JsonProcessingException {
        String rawData;
        if (this.elasticSearchConfig.isSchemaEnable()) {
            Object key = null;
            GenericObject value = null;
            Schema keySchema = null;
            Schema valueSchema = null;
            if (record.getSchema() != null && record.getSchema() instanceof KeyValueSchema) {
                KeyValueSchema keyValueSchema = (KeyValueSchema)record.getSchema();
                keySchema = keyValueSchema.getKeySchema();
                valueSchema = keyValueSchema.getValueSchema();
                KeyValue keyValue = (KeyValue)((GenericObject)record.getValue()).getNativeObject();
                key = keyValue.getKey();
                value = (GenericObject)keyValue.getValue();
            } else {
                key = record.getKey().orElse(null);
                valueSchema = record.getSchema();
                value = this.getGenericObjectFromRecord(record);
            }
            String id = null;
            if (!this.elasticSearchConfig.isKeyIgnore() && key != null) {
                id = keySchema != null ? this.stringifyKey(keySchema, key) : key.toString();
            }
            String doc = null;
            if (value != null) {
                if (valueSchema != null) {
                    if (this.elasticSearchConfig.isCopyKeyFields() && (keySchema.getSchemaInfo().getType().equals((Object)SchemaType.AVRO) || keySchema.getSchemaInfo().getType().equals((Object)SchemaType.JSON))) {
                        JsonNode keyNode = this.extractJsonNode(keySchema, key);
                        JsonNode valueNode = this.extractJsonNode(valueSchema, value);
                        doc = this.stringify(JsonConverter.topLevelMerge(keyNode, valueNode));
                    } else {
                        doc = this.stringifyValue(valueSchema, value);
                    }
                } else {
                    doc = value.getNativeObject() instanceof byte[] ? new String((byte[])value.getNativeObject(), StandardCharsets.UTF_8) : value.getNativeObject().toString();
                }
            }
            if (doc != null && this.primaryFields != null) {
                try {
                    JsonNode jsonNode = this.objectMapper.readTree(doc);
                    id = this.stringifyKey(jsonNode, this.primaryFields);
                }
                catch (JsonProcessingException e) {
                    log.error("Failed to read JSON", (Throwable)e);
                    throw e;
                }
            }
            ElasticSearchConfig.IdHashingAlgorithm idHashingAlgorithm = this.elasticSearchConfig.getIdHashingAlgorithm();
            if (id != null && idHashingAlgorithm != null && idHashingAlgorithm != ElasticSearchConfig.IdHashingAlgorithm.NONE) {
                byte[] idBytes = id.getBytes(StandardCharsets.UTF_8);
                boolean performHashing = true;
                if (this.elasticSearchConfig.isConditionalIdHashing() && idBytes.length <= 512) {
                    performHashing = false;
                }
                if (performHashing) {
                    Hasher hasher = switch (idHashingAlgorithm) {
                        case ElasticSearchConfig.IdHashingAlgorithm.SHA256 -> Hashing.sha256().newHasher();
                        case ElasticSearchConfig.IdHashingAlgorithm.SHA512 -> Hashing.sha512().newHasher();
                        default -> throw new UnsupportedOperationException("Unsupported IdHashingAlgorithm: " + idHashingAlgorithm);
                    };
                    hasher.putBytes(idBytes);
                    id = this.base64Encoder.encodeToString(hasher.hash().asBytes());
                }
            }
            if (log.isDebugEnabled()) {
                SchemaType schemaType = null;
                if (record.getSchema() != null && record.getSchema().getSchemaInfo() != null) {
                    schemaType = record.getSchema().getSchemaInfo().getType();
                }
                log.debug("recordType={} schemaType={} id={} doc={}", new Object[]{record.getClass().getName(), schemaType, id, doc});
            }
            doc = this.sanitizeValue(doc);
            return Pair.of((Object)id, (Object)doc);
        }
        Message message = record.getMessage().orElse(null);
        if (message != null) {
            rawData = new String(message.getData(), StandardCharsets.UTF_8);
        } else {
            GenericObject recordObject = this.getGenericObjectFromRecord(record);
            rawData = this.stringifyValue(record.getSchema(), recordObject);
        }
        if (rawData == null || rawData.length() == 0) {
            throw new IllegalArgumentException("Record does not carry message information.");
        }
        String key = this.elasticSearchConfig.isKeyIgnore() ? null : (String)record.getKey().map(Object::toString).orElse(null);
        return Pair.of((Object)key, (Object)this.sanitizeValue(rawData));
    }

    private GenericObject getGenericObjectFromRecord(final Record record) {
        if (record.getValue() == null) {
            return null;
        }
        if (record.getValue() instanceof GenericObject) {
            return (GenericObject)record.getValue();
        }
        return new GenericObject(){

            public SchemaType getSchemaType() {
                return record.getSchema().getSchemaInfo().getType();
            }

            public Object getNativeObject() {
                return record.getValue();
            }
        };
    }

    private String sanitizeValue(String value) {
        if (value == null || !this.elasticSearchConfig.isStripNonPrintableCharacters()) {
            return value;
        }
        return this.nonPrintableCharactersPattern.matcher(value).replaceAll("");
    }

    public String stringifyKey(Schema<?> schema, Object val) throws JsonProcessingException {
        switch (schema.getSchemaInfo().getType()) {
            case INT8: {
                return Byte.toString((Byte)val);
            }
            case INT16: {
                return Short.toString((Short)val);
            }
            case INT32: {
                return Integer.toString((Integer)val);
            }
            case INT64: {
                return Long.toString((Long)val);
            }
            case STRING: {
                return (String)val;
            }
            case JSON: 
            case AVRO: {
                return this.stringifyKey(this.extractJsonNode(schema, val));
            }
        }
        throw new UnsupportedOperationException("Unsupported key schemaType=" + schema.getSchemaInfo().getType());
    }

    public String stringifyKey(JsonNode jsonNode) throws JsonProcessingException {
        ArrayList<String> fields = new ArrayList<String>();
        jsonNode.fieldNames().forEachRemaining(fields::add);
        return this.stringifyKey(jsonNode, fields);
    }

    public String stringifyKey(JsonNode jsonNode, List<String> fields) throws JsonProcessingException {
        String serializedId;
        Object toConvert = fields.size() == 1 ? jsonNode.get(fields.get(0)) : JsonConverter.toJsonArray(jsonNode, fields);
        if (this.elasticSearchConfig.isCanonicalKeyFields()) {
            Object obj = this.sortedObjectMapper.treeToValue((TreeNode)toConvert, Object.class);
            serializedId = this.sortedObjectMapper.writeValueAsString(obj);
        } else {
            serializedId = this.objectMapper.writeValueAsString(toConvert);
        }
        return serializedId.startsWith("\"") && serializedId.endsWith("\"") ? serializedId.substring(1, serializedId.length() - 1) : serializedId;
    }

    public String stringifyValue(Schema<?> schema, Object val) throws JsonProcessingException {
        JsonNode jsonNode = this.extractJsonNode(schema, val);
        return this.stringify(jsonNode);
    }

    public String stringify(JsonNode jsonNode) throws JsonProcessingException {
        return this.elasticSearchConfig.isStripNulls() ? this.objectMapper.writeValueAsString((Object)ElasticSearchSink.stripNullNodes(jsonNode)) : this.objectMapper.writeValueAsString((Object)jsonNode);
    }

    public static JsonNode stripNullNodes(JsonNode node) {
        Iterator it = node.iterator();
        while (it.hasNext()) {
            JsonNode child = (JsonNode)it.next();
            if (child.isNull()) {
                it.remove();
                continue;
            }
            ElasticSearchSink.stripNullNodes(child);
        }
        return node;
    }

    public JsonNode extractJsonNode(Schema<?> schema, Object val) throws JsonProcessingException {
        if (val == null) {
            return null;
        }
        switch (schema.getSchemaInfo().getType()) {
            case JSON: {
                Object nativeObject = ((org.apache.pulsar.client.api.schema.GenericRecord)val).getNativeObject();
                if (nativeObject instanceof String) {
                    try {
                        return this.objectMapper.readTree((String)nativeObject);
                    }
                    catch (JsonProcessingException e) {
                        log.error("Failed to read JSON string: {}", nativeObject, (Object)e);
                        throw e;
                    }
                }
                return (JsonNode)nativeObject;
            }
            case AVRO: {
                GenericRecord node = (GenericRecord)((org.apache.pulsar.client.api.schema.GenericRecord)val).getNativeObject();
                return JsonConverter.toJson(node);
            }
            case STRING: {
                try {
                    return this.objectMapper.readTree((String)((GenericObject)val).getNativeObject());
                }
                catch (JsonProcessingException e) {
                    throw new RuntimeException("Error parsing string as JSON.", e);
                }
            }
            case BYTES: {
                try {
                    return this.objectMapper.readTree((byte[])((GenericObject)val).getNativeObject());
                }
                catch (IOException e) {
                    throw new RuntimeException("Error parsing byte[] as JSON.", e);
                }
            }
        }
        throw new UnsupportedOperationException("Unsupported value schemaType=" + schema.getSchemaInfo().getType());
    }
}

