/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.cloud.stream.binder.kstream;

import org.aopalliance.aop.Advice;
import org.aopalliance.intercept.MethodInterceptor;
import org.aopalliance.intercept.MethodInvocation;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.KStream;
import org.springframework.aop.framework.ProxyFactory;
import org.springframework.cloud.stream.binding.AbstractBindingTargetFactory;
import org.springframework.cloud.stream.config.BindingProperties;
import org.springframework.cloud.stream.config.BindingServiceProperties;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;

public class KStreamBoundElementFactory
extends AbstractBindingTargetFactory<KStream> {
    private final StreamsBuilder kStreamBuilder;
    private final BindingServiceProperties bindingServiceProperties;

    public KStreamBoundElementFactory(StreamsBuilder kStreamBuilder, BindingServiceProperties bindingServiceProperties) {
        super(KStream.class);
        this.bindingServiceProperties = bindingServiceProperties;
        this.kStreamBuilder = kStreamBuilder;
    }

    public KStream createInput(String name) {
        KStream stream = this.kStreamBuilder.stream(this.bindingServiceProperties.getBindingDestination(name));
        stream = stream.map((key, value) -> {
            KeyValue keyValue;
            BindingProperties bindingProperties = this.bindingServiceProperties.getBindingProperties(name);
            String contentType = bindingProperties.getContentType();
            if (!StringUtils.isEmpty((Object)contentType) && !bindingProperties.getConsumer().isUseNativeDecoding()) {
                Message message = MessageBuilder.withPayload((Object)value).setHeader("contentType", (Object)contentType).build();
                keyValue = new KeyValue(key, (Object)message);
            } else {
                keyValue = new KeyValue(key, value);
            }
            return keyValue;
        });
        return stream;
    }

    public KStream createOutput(String name) {
        KStreamWrapperHandler wrapper = new KStreamWrapperHandler();
        ProxyFactory proxyFactory = new ProxyFactory(new Class[]{KStreamWrapper.class, KStream.class});
        proxyFactory.addAdvice((Advice)wrapper);
        return (KStream)proxyFactory.getProxy();
    }

    private static class KStreamWrapperHandler
    implements KStreamWrapper,
    MethodInterceptor {
        private KStream<Object, Object> delegate;

        private KStreamWrapperHandler() {
        }

        @Override
        public void wrap(KStream<Object, Object> delegate) {
            Assert.notNull(delegate, (String)"delegate cannot be null");
            Assert.isNull(this.delegate, (String)("delegate already set to " + this.delegate));
            this.delegate = delegate;
        }

        public Object invoke(MethodInvocation methodInvocation) throws Throwable {
            if (methodInvocation.getMethod().getDeclaringClass().equals(KStream.class)) {
                Assert.notNull(this.delegate, (String)("Trying to invoke " + methodInvocation.getMethod() + "  but no delegate has been set."));
                return methodInvocation.getMethod().invoke(this.delegate, methodInvocation.getArguments());
            }
            if (methodInvocation.getMethod().getDeclaringClass().equals(KStreamWrapper.class)) {
                return methodInvocation.getMethod().invoke((Object)this, methodInvocation.getArguments());
            }
            throw new IllegalStateException("Only KStream method invocations are permitted");
        }
    }

    public static interface KStreamWrapper {
        public void wrap(KStream<Object, Object> var1);
    }
}

