/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.io.kafka;

import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.ByteBufferDeserializer;
import org.apache.kafka.common.serialization.BytesDeserializer;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.DoubleDeserializer;
import org.apache.kafka.common.serialization.FloatDeserializer;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.ShortDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.schema.SchemaInfoImpl;
import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.schema.SchemaType;
import org.apache.pulsar.io.core.annotations.Connector;
import org.apache.pulsar.io.core.annotations.IOType;
import org.apache.pulsar.io.kafka.AvroSchemaCache;
import org.apache.pulsar.io.kafka.ByteBufferSchemaWrapper;
import org.apache.pulsar.io.kafka.BytesWithKafkaSchema;
import org.apache.pulsar.io.kafka.KafkaAbstractSource;
import org.apache.pulsar.io.kafka.KafkaSourceConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Connector(name="kafka", type=IOType.SOURCE, help="Transfer data from Kafka to Pulsar.", configClass=KafkaSourceConfig.class)
public class KafkaBytesSource
extends KafkaAbstractSource<ByteBuffer> {
    private static final Logger log = LoggerFactory.getLogger(KafkaBytesSource.class);
    private AvroSchemaCache schemaCache;
    private Schema<ByteBuffer> keySchema;
    private Schema<ByteBuffer> valueSchema;
    private boolean produceKeyValue;

    @Override
    protected Properties beforeCreateConsumer(Properties props) {
        boolean needsSchemaCache;
        props.putIfAbsent("key.deserializer", StringDeserializer.class.getName());
        props.putIfAbsent("value.deserializer", ByteArrayDeserializer.class.getName());
        log.info("Created kafka consumer config : {}", (Object)props);
        this.keySchema = KafkaBytesSource.getSchemaFromDeserializerAndAdaptConfiguration("key.deserializer", props, true);
        this.valueSchema = KafkaBytesSource.getSchemaFromDeserializerAndAdaptConfiguration("value.deserializer", props, false);
        boolean bl = needsSchemaCache = this.keySchema == DeferredSchemaPlaceholder.INSTANCE || this.valueSchema == DeferredSchemaPlaceholder.INSTANCE;
        if (needsSchemaCache) {
            this.initSchemaCache(props);
        }
        if (this.keySchema.getSchemaInfo().getType() != SchemaType.STRING) {
            this.produceKeyValue = true;
        }
        return props;
    }

    private void initSchemaCache(Properties props) {
        KafkaAvroDeserializerConfig config = new KafkaAvroDeserializerConfig((Map)props);
        List urls = config.getSchemaRegistryUrls();
        int maxSchemaObject = config.getMaxSchemasPerSubject();
        CachedSchemaRegistryClient schemaRegistryClient = new CachedSchemaRegistryClient(urls, maxSchemaObject);
        log.info("initializing SchemaRegistry Client, urls:{}, maxSchemasPerSubject: {}", (Object)urls, (Object)maxSchemaObject);
        this.schemaCache = new AvroSchemaCache((SchemaRegistryClient)schemaRegistryClient);
    }

    @Override
    public KafkaAbstractSource.KafkaRecord<ByteBuffer> buildRecord(ConsumerRecord<Object, Object> consumerRecord) {
        if (this.produceKeyValue) {
            ByteBuffer key = KafkaBytesSource.extractSimpleValue(consumerRecord.key());
            ByteBuffer value = KafkaBytesSource.extractSimpleValue(consumerRecord.value());
            Schema<ByteBuffer> currentKeySchema = this.getSchemaFromObject(consumerRecord.key(), this.keySchema);
            Schema<ByteBuffer> currentValueSchema = this.getSchemaFromObject(consumerRecord.value(), this.valueSchema);
            return new KafkaAbstractSource.KeyValueKafkaRecord<ByteBuffer, ByteBuffer>(consumerRecord, new KeyValue((Object)key, (Object)value), currentKeySchema, currentValueSchema, this.copyKafkaHeaders(consumerRecord));
        }
        Object value = consumerRecord.value();
        return new KafkaAbstractSource.KafkaRecord<ByteBuffer>(consumerRecord, KafkaBytesSource.extractSimpleValue(value), this.getSchemaFromObject(value, this.valueSchema), this.copyKafkaHeaders(consumerRecord));
    }

    private static ByteBuffer extractSimpleValue(Object value) {
        if (value == null) {
            return null;
        }
        if (value instanceof BytesWithKafkaSchema) {
            return ((BytesWithKafkaSchema)value).getValue();
        }
        if (value instanceof ByteBuffer) {
            return (ByteBuffer)value;
        }
        throw new IllegalArgumentException("Unexpected type from Kafka: " + String.valueOf(value.getClass()));
    }

    private Schema<ByteBuffer> getSchemaFromObject(Object value, Schema<ByteBuffer> fallback) {
        if (value instanceof BytesWithKafkaSchema) {
            return this.schemaCache.get(((BytesWithKafkaSchema)value).getSchemaId());
        }
        return fallback;
    }

    private static Schema<ByteBuffer> getSchemaFromDeserializerAndAdaptConfiguration(String key, Properties props, boolean isKey) {
        Schema result;
        String kafkaDeserializerClass = props.getProperty(key);
        Objects.requireNonNull(kafkaDeserializerClass);
        props.put(key, ByteBufferDeserializer.class.getCanonicalName());
        if (ByteArrayDeserializer.class.getName().equals(kafkaDeserializerClass) || ByteBufferDeserializer.class.getName().equals(kafkaDeserializerClass) || BytesDeserializer.class.getName().equals(kafkaDeserializerClass)) {
            result = Schema.BYTEBUFFER;
        } else if (StringDeserializer.class.getName().equals(kafkaDeserializerClass)) {
            if (isKey) {
                props.put(key, kafkaDeserializerClass);
            }
            result = Schema.STRING;
        } else if (DoubleDeserializer.class.getName().equals(kafkaDeserializerClass)) {
            result = Schema.DOUBLE;
        } else if (FloatDeserializer.class.getName().equals(kafkaDeserializerClass)) {
            result = Schema.FLOAT;
        } else if (IntegerDeserializer.class.getName().equals(kafkaDeserializerClass)) {
            result = Schema.INT32;
        } else if (LongDeserializer.class.getName().equals(kafkaDeserializerClass)) {
            result = Schema.INT64;
        } else if (ShortDeserializer.class.getName().equals(kafkaDeserializerClass)) {
            result = Schema.INT16;
        } else {
            if (KafkaAvroDeserializer.class.getName().equals(kafkaDeserializerClass)) {
                props.put(key, ExtractKafkaAvroSchemaDeserializer.class.getName());
                return DeferredSchemaPlaceholder.INSTANCE;
            }
            throw new IllegalArgumentException("Unsupported deserializer " + kafkaDeserializerClass);
        }
        return new ByteBufferSchemaWrapper(result);
    }

    Schema<ByteBuffer> getKeySchema() {
        return this.keySchema;
    }

    Schema<ByteBuffer> getValueSchema() {
        return this.valueSchema;
    }

    boolean isProduceKeyValue() {
        return this.produceKeyValue;
    }

    static final class DeferredSchemaPlaceholder
    extends ByteBufferSchemaWrapper {
        static final DeferredSchemaPlaceholder INSTANCE = new DeferredSchemaPlaceholder();

        DeferredSchemaPlaceholder() {
            super((SchemaInfo)SchemaInfoImpl.builder().type(SchemaType.AVRO).properties(Collections.emptyMap()).schema(new byte[0]).build());
        }
    }

    public static class ExtractKafkaAvroSchemaDeserializer
    implements Deserializer<BytesWithKafkaSchema> {
        public BytesWithKafkaSchema deserialize(String topic, byte[] payload) {
            if (payload == null) {
                return null;
            }
            try {
                ByteBuffer buffer = ByteBuffer.wrap(payload);
                buffer.get();
                int id = buffer.getInt();
                return new BytesWithKafkaSchema(buffer, id);
            }
            catch (Exception err) {
                throw new SerializationException("Error deserializing Avro message", (Throwable)err);
            }
        }
    }
}

