/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.cloud.stream.binder.kafka.streams;

import java.util.HashMap;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Predicate;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.springframework.cloud.stream.binder.kafka.streams.KafkaStreamsBindingInformationCatalogue;
import org.springframework.cloud.stream.binder.kafka.streams.SendToDlqAndContinue;
import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsBinderConfigurationProperties;
import org.springframework.cloud.stream.converter.CompositeMessageConverterFactory;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.converter.CompositeMessageConverter;
import org.springframework.messaging.converter.MessageConverter;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;

public class KafkaStreamsMessageConversionDelegate {
    private static final Log LOG = LogFactory.getLog(KafkaStreamsMessageConversionDelegate.class);
    private static final ThreadLocal<KeyValue<Object, Object>> keyValueThreadLocal = new ThreadLocal();
    private final CompositeMessageConverterFactory compositeMessageConverterFactory;
    private final SendToDlqAndContinue sendToDlqAndContinue;
    private final KafkaStreamsBindingInformationCatalogue kstreamBindingInformationCatalogue;
    private final KafkaStreamsBinderConfigurationProperties kstreamBinderConfigurationProperties;

    KafkaStreamsMessageConversionDelegate(CompositeMessageConverterFactory compositeMessageConverterFactory, SendToDlqAndContinue sendToDlqAndContinue, KafkaStreamsBindingInformationCatalogue kstreamBindingInformationCatalogue, KafkaStreamsBinderConfigurationProperties kstreamBinderConfigurationProperties) {
        this.compositeMessageConverterFactory = compositeMessageConverterFactory;
        this.sendToDlqAndContinue = sendToDlqAndContinue;
        this.kstreamBindingInformationCatalogue = kstreamBindingInformationCatalogue;
        this.kstreamBinderConfigurationProperties = kstreamBinderConfigurationProperties;
    }

    public KStream serializeOnOutbound(KStream<?, ?> outboundBindTarget) {
        String contentType = this.kstreamBindingInformationCatalogue.getContentType(outboundBindTarget);
        CompositeMessageConverter messageConverter = this.compositeMessageConverterFactory.getMessageConverterForAllRegistered();
        return outboundBindTarget.mapValues(arg_0 -> KafkaStreamsMessageConversionDelegate.lambda$serializeOnOutbound$0(contentType, (MessageConverter)messageConverter, arg_0));
    }

    public KStream deserializeOnInbound(Class<?> valueClass, KStream<?, ?> bindingTarget) {
        CompositeMessageConverter messageConverter = this.compositeMessageConverterFactory.getMessageConverterForAllRegistered();
        PerRecordContentTypeHolder perRecordContentTypeHolder = new PerRecordContentTypeHolder();
        this.resolvePerRecordContentType(bindingTarget, perRecordContentTypeHolder);
        KStream[] branch = bindingTarget.branch(new Predicate[]{(arg_0, arg_1) -> this.lambda$deserializeOnInbound$1(perRecordContentTypeHolder, valueClass, (MessageConverter)messageConverter, arg_0, arg_1), (k, v) -> true});
        this.processErrorFromDeserialization(bindingTarget, branch[1]);
        return branch[0].mapValues(o2 -> {
            Object objectValue = KafkaStreamsMessageConversionDelegate.keyValueThreadLocal.get().value;
            keyValueThreadLocal.remove();
            return objectValue;
        });
    }

    private void resolvePerRecordContentType(KStream<?, ?> outboundBindTarget, final PerRecordContentTypeHolder perRecordContentTypeHolder) {
        outboundBindTarget.process(() -> new Processor(){
            ProcessorContext context;

            public void init(ProcessorContext context) {
                this.context = context;
            }

            public void process(Object key, Object value) {
                Headers headers = this.context.headers();
                Iterable contentTypes = headers.headers("contentType");
                if (contentTypes != null && contentTypes.iterator().hasNext()) {
                    String contentType = new String(((Header)contentTypes.iterator().next()).value());
                    String cleanContentType = StringUtils.replace((String)contentType, (String)"\"", (String)"");
                    perRecordContentTypeHolder.setContentType(cleanContentType);
                }
            }

            public void close() {
            }
        }, new String[0]);
    }

    private void convertAndSetMessage(Object o, Class<?> valueClass, MessageConverter messageConverter, Message<?> msg) {
        Object result = valueClass.isAssignableFrom(msg.getPayload().getClass()) ? msg.getPayload() : messageConverter.fromMessage(msg, valueClass);
        Assert.notNull((Object)result, (String)("Failed to convert message " + msg));
        keyValueThreadLocal.set((KeyValue<Object, Object>)new KeyValue(o, result));
    }

    private void processErrorFromDeserialization(final KStream<?, ?> bindingTarget, KStream<?, ?> branch) {
        branch.process(() -> new Processor(){
            ProcessorContext context;

            public void init(ProcessorContext context) {
                this.context = context;
            }

            public void process(Object o, Object o2) {
                if (o2 != null) {
                    if (KafkaStreamsMessageConversionDelegate.this.kstreamBindingInformationCatalogue.isDlqEnabled(bindingTarget)) {
                        String destination = this.context.topic();
                        if (o2 instanceof Message) {
                            Message message = (Message)o2;
                            KafkaStreamsMessageConversionDelegate.this.sendToDlqAndContinue.sendToDlq(destination, (byte[])o, (byte[])message.getPayload(), this.context.partition());
                        } else {
                            KafkaStreamsMessageConversionDelegate.this.sendToDlqAndContinue.sendToDlq(destination, (byte[])o, (byte[])o2, this.context.partition());
                        }
                    } else {
                        if (KafkaStreamsMessageConversionDelegate.this.kstreamBinderConfigurationProperties.getSerdeError() == KafkaStreamsBinderConfigurationProperties.SerdeError.logAndFail) {
                            throw new IllegalStateException("Inbound deserialization failed. Stopping further processing of records.");
                        }
                        if (KafkaStreamsMessageConversionDelegate.this.kstreamBinderConfigurationProperties.getSerdeError() == KafkaStreamsBinderConfigurationProperties.SerdeError.logAndContinue) {
                            LOG.error((Object)"Inbound deserialization failed. Skipping this record and continuing.");
                        }
                    }
                }
            }

            public void close() {
            }
        }, new String[0]);
    }

    private /* synthetic */ boolean lambda$deserializeOnInbound$1(PerRecordContentTypeHolder perRecordContentTypeHolder, Class valueClass, MessageConverter messageConverter, Object o, Object o2) {
        boolean isValidRecord = false;
        try {
            if (o2 != null) {
                if (o2 instanceof Message || o2 instanceof String || o2 instanceof byte[]) {
                    Message m1 = null;
                    m1 = o2 instanceof Message ? (perRecordContentTypeHolder.contentType != null ? MessageBuilder.fromMessage((Message)((Message)o2)).setHeader("contentType", (Object)perRecordContentTypeHolder.contentType).build() : (Message)o2) : (perRecordContentTypeHolder.contentType != null ? MessageBuilder.withPayload((Object)o2).setHeader("contentType", (Object)perRecordContentTypeHolder.contentType).build() : MessageBuilder.withPayload((Object)o2).build());
                    this.convertAndSetMessage(o, valueClass, messageConverter, m1);
                } else {
                    keyValueThreadLocal.set((KeyValue<Object, Object>)new KeyValue(o, o2));
                }
                isValidRecord = true;
            } else {
                LOG.info((Object)"Received a tombstone record. This will be skipped from further processing.");
            }
        }
        catch (Exception exception) {
            // empty catch block
        }
        return isValidRecord;
    }

    private static /* synthetic */ Object lambda$serializeOnOutbound$0(String contentType, MessageConverter messageConverter, Object v) {
        Message message = v instanceof Message ? (Message)v : MessageBuilder.withPayload((Object)v).build();
        HashMap<String, String> headers = new HashMap<String, String>((Map<String, String>)message.getHeaders());
        if (!StringUtils.isEmpty((Object)contentType)) {
            headers.put("contentType", contentType);
        }
        MessageHeaders messageHeaders = new MessageHeaders(headers);
        return messageConverter.toMessage(message.getPayload(), messageHeaders).getPayload();
    }

    private static class PerRecordContentTypeHolder {
        String contentType;

        private PerRecordContentTypeHolder() {
        }

        void setContentType(String contentType) {
            this.contentType = contentType;
        }
    }
}

