/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.io.http;

import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import java.io.IOException;
import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.util.Base64;
import java.util.HashMap;
import java.util.Map;
import org.apache.avro.generic.GenericRecord;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.schema.GenericObject;
import org.apache.pulsar.client.api.schema.KeyValueSchema;
import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.io.core.Sink;
import org.apache.pulsar.io.core.SinkContext;
import org.apache.pulsar.io.http.HttpSinkConfig;
import org.apache.pulsar.io.http.JsonConverter;

public class HttpSink
implements Sink<GenericObject> {
    HttpSinkConfig httpSinkConfig;
    private HttpClient httpClient;
    private ObjectMapper mapper;
    private URI uri;

    public void open(Map<String, Object> config, SinkContext sinkContext) throws Exception {
        this.httpSinkConfig = HttpSinkConfig.load(config);
        this.uri = new URI(this.httpSinkConfig.getUrl());
        this.httpClient = HttpClient.newHttpClient();
        this.mapper = new ObjectMapper().registerModule((Module)new JavaTimeModule());
    }

    public void write(Record<GenericObject> record) throws Exception {
        Object json = HttpSink.toJsonSerializable(record.getSchema(), ((GenericObject)record.getValue()).getNativeObject());
        byte[] bytes = this.mapper.writeValueAsBytes(json);
        HttpRequest.Builder builder = HttpRequest.newBuilder().uri(this.uri).POST(HttpRequest.BodyPublishers.ofByteArray(bytes));
        this.httpSinkConfig.getHeaders().forEach(builder::header);
        record.getProperties().forEach((k, v) -> builder.header("PulsarProperties-" + k, (String)v));
        record.getTopicName().ifPresent(topic -> builder.header("PulsarTopic", (String)topic));
        record.getEventTime().ifPresent(eventTime -> builder.header("PulsarEventTime", eventTime.toString()));
        record.getKey().ifPresent(key -> builder.header("PulsarKey", (String)key));
        record.getMessage().ifPresent(message -> {
            if (message.getMessageId() != null) {
                String messageId = Base64.getEncoder().encodeToString(message.getMessageId().toByteArray());
                builder.header("PulsarMessageId", messageId);
            }
            if (message.getPublishTime() != 0L) {
                builder.header("PulsarPublishTime", String.valueOf(message.getPublishTime()));
            }
        });
        builder.header("Content-Type", "application/json");
        HttpResponse<String> response = this.httpClient.send(builder.build(), HttpResponse.BodyHandlers.ofString());
        if (response.statusCode() < 200 || response.statusCode() >= 300) {
            throw new IOException(String.format("HTTP call to %s failed with status code %s", this.uri, response.statusCode()));
        }
    }

    private static Object toJsonSerializable(Schema<?> schema, Object val) {
        if (schema == null || schema.getSchemaInfo().getType().isPrimitive()) {
            return val;
        }
        switch (schema.getSchemaInfo().getType()) {
            case KEY_VALUE: {
                KeyValueSchema keyValueSchema = (KeyValueSchema)schema;
                KeyValue keyValue = (KeyValue)val;
                HashMap<String, Object> jsonKeyValue = new HashMap<String, Object>();
                Object key = keyValue.getKey();
                Object value = keyValue.getValue();
                jsonKeyValue.put("key", HttpSink.toJsonSerializable(keyValueSchema.getKeySchema(), key instanceof GenericObject ? ((GenericObject)key).getNativeObject() : key));
                jsonKeyValue.put("value", HttpSink.toJsonSerializable(keyValueSchema.getValueSchema(), value instanceof GenericObject ? ((GenericObject)value).getNativeObject() : value));
                return jsonKeyValue;
            }
            case AVRO: {
                return JsonConverter.toJson((GenericRecord)val);
            }
            case JSON: {
                return val;
            }
        }
        throw new UnsupportedOperationException("Unsupported schema type =" + schema.getSchemaInfo().getType());
    }

    public void close() {
    }
}

