/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.cloud.sleuth.instrument.kafka;

import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.BDDAssertions;
import org.assertj.core.api.ObjectAssert;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Answers;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.reactivestreams.Publisher;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.config.BeanDefinitionCustomizer;
import org.springframework.cloud.sleuth.CurrentTraceContext;
import org.springframework.cloud.sleuth.Span;
import org.springframework.cloud.sleuth.TraceContext;
import org.springframework.cloud.sleuth.Tracer;
import org.springframework.cloud.sleuth.exporter.FinishedSpan;
import org.springframework.cloud.sleuth.instrument.kafka.KafkaTestUtils;
import org.springframework.cloud.sleuth.instrument.kafka.ReactiveKafkaTracingPropagator;
import org.springframework.cloud.sleuth.instrument.kafka.TracingKafkaPropagatorGetter;
import org.springframework.cloud.sleuth.instrument.kafka.TracingKafkaReceiver;
import org.springframework.cloud.sleuth.instrument.reactor.ReactorSleuth;
import org.springframework.cloud.sleuth.propagation.Propagator;
import org.springframework.cloud.sleuth.test.TestSpanHandler;
import org.springframework.cloud.sleuth.test.TestTracingAwareSupplier;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.containers.wait.strategy.Wait;
import org.testcontainers.containers.wait.strategy.WaitStrategy;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
import org.testcontainers.utility.DockerImageName;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Hooks;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import reactor.kafka.receiver.KafkaReceiver;
import reactor.kafka.receiver.ReceiverOptions;
import reactor.kafka.receiver.ReceiverRecord;
import reactor.test.StepVerifier;

@Testcontainers
@ExtendWith(value={MockitoExtension.class})
@Tag(value="DockerRequired")
public abstract class KafkaReceiverTest
implements TestTracingAwareSupplier {
    static final String HOOK_KEY = "org.springframework.cloud.sleuth.autoconfig.instrument.reactor.TraceReactorAutoConfiguration.TraceReactorConfiguration";
    protected String testTopic;
    protected Tracer tracer = this.tracerTest().tracing().tracer();
    protected Propagator propagator = this.tracerTest().tracing().propagator();
    protected TestSpanHandler spans = this.tracerTest().handler();
    protected CurrentTraceContext currentTraceContext = this.tracerTest().tracing().currentTraceContext();
    protected Propagator.Getter<ConsumerRecord<?, ?>> extractor = new TracingKafkaPropagatorGetter();
    private Disposable consumerSubscription;
    private Disposable shareableReceiverDisposable;
    protected Flux<ReceiverRecord<String, String>> shareableReceiver;
    protected final AtomicInteger receivedCounter = new AtomicInteger(0);
    AnnotationConfigApplicationContext springContext = new AnnotationConfigApplicationContext();
    @Mock(answer=Answers.RETURNS_DEEP_STUBS)
    BeanFactory beanFactory;
    @Container
    protected static final KafkaContainer kafkaContainer = (KafkaContainer)((KafkaContainer)new KafkaContainer(DockerImageName.parse((String)"confluentinc/cp-kafka:6.1.1")).withExposedPorts(new Integer[]{9093})).waitingFor((WaitStrategy)Wait.forListeningPort());

    @BeforeAll
    static void setupAll() {
        kafkaContainer.start();
    }

    @AfterAll
    static void destroyAll() {
        kafkaContainer.stop();
    }

    @BeforeEach
    void setup() {
        Hooks.resetOnEachOperator();
        Hooks.resetOnLastOperator();
        Schedulers.resetOnScheduleHooks();
        this.testTopic = UUID.randomUUID().toString();
        HashMap<String, Object> consumerProperties = new HashMap<String, Object>();
        consumerProperties.put("bootstrap.servers", kafkaContainer.getBootstrapServers());
        consumerProperties.put("group.id", "test-consumer-group");
        consumerProperties.put("key.deserializer", StringDeserializer.class);
        consumerProperties.put("value.deserializer", StringDeserializer.class);
        consumerProperties.put("auto.offset.reset", "earliest");
        ReceiverOptions options = ReceiverOptions.create(consumerProperties);
        options = options.withKeyDeserializer((Deserializer)new StringDeserializer()).withValueDeserializer((Deserializer)new StringDeserializer()).subscription(Collections.singletonList(this.testTopic));
        TracingKafkaReceiver kafkaReceiver = new TracingKafkaReceiver(new ReactiveKafkaTracingPropagator(this.tracer, this.propagator, this.extractor), KafkaReceiver.create((ReceiverOptions)options));
        this.shareableReceiver = kafkaReceiver.receive().publish().autoConnect(0, disposable -> {
            this.shareableReceiverDisposable = disposable;
        });
        this.consumerSubscription = this.shareableReceiver.subscribeOn(Schedulers.single()).subscribe(record -> this.receivedCounter.incrementAndGet());
        this.receivedCounter.set(0);
    }

    @AfterEach
    void destroy() {
        this.springContext.close();
        Hooks.resetOnEachOperator();
        Hooks.resetOnLastOperator();
        Schedulers.resetOnScheduleHooks();
        this.consumerSubscription.dispose();
        if (this.shareableReceiverDisposable != null) {
            this.shareableReceiverDisposable.dispose();
        }
    }

    @Test
    public void should_create_and_finish_consumer_span() {
        KafkaProducer<String, String> kafkaProducer = KafkaTestUtils.buildTestKafkaProducer(kafkaContainer.getBootstrapServers());
        ProducerRecord producerRecord = new ProducerRecord(this.testTopic, (Object)"test", (Object)"test");
        kafkaProducer.send(producerRecord);
        Awaitility.await().atMost(Duration.ofSeconds(15L)).until(() -> this.receivedCounter.intValue() == 1);
        BDDAssertions.then((Object)this.tracer.currentSpan()).isNull();
        BDDAssertions.then((Iterable)this.spans).hasSize(1);
        FinishedSpan span = this.spans.get(0);
        BDDAssertions.then((Comparable)span.getKind()).isEqualTo((Object)Span.Kind.CONSUMER);
        BDDAssertions.then((Map)span.getTags()).isNotEmpty();
        BDDAssertions.then((String)((String)span.getTags().get("kafka.topic"))).isEqualTo(this.testTopic);
        BDDAssertions.then((String)((String)span.getTags().get("kafka.offset"))).isEqualTo("0");
        BDDAssertions.then((String)((String)span.getTags().get("kafka.partition"))).isEqualTo("0");
    }

    @Test
    public void should_pass_tracing_context_for_consumers() {
        this.springContext.registerBean(Tracer.class, () -> this.tracer, new BeanDefinitionCustomizer[0]);
        this.springContext.registerBean(CurrentTraceContext.class, () -> this.currentTraceContext, new BeanDefinitionCustomizer[0]);
        this.springContext.refresh();
        Hooks.onEachOperator((String)HOOK_KEY, (Function)ReactorSleuth.onEachOperatorForOnEachInstrumentation((ConfigurableApplicationContext)this.springContext));
        Hooks.onLastOperator((String)HOOK_KEY, (Function)ReactorSleuth.onLastOperatorForOnEachInstrumentation((ConfigurableApplicationContext)this.springContext));
        KafkaProducer<String, String> kafkaProducer = KafkaTestUtils.buildTestKafkaProducer(kafkaContainer.getBootstrapServers());
        ProducerRecord producerRecord = new ProducerRecord(this.testTopic, (Object)"test", (Object)"test-with-trace");
        producerRecord.headers().add("b3", "80f198ee56343ba864fe8b2a57d3eff7-e457b5a2e4d86bd1-1".getBytes());
        kafkaProducer.send(producerRecord);
        Flux testFlux = this.shareableReceiver.flatMap(record -> Mono.deferContextual(Mono::just));
        StepVerifier.create((Publisher)testFlux).assertNext(contextView -> {
            TraceContext traceContext = (TraceContext)contextView.get(TraceContext.class);
            ((ObjectAssert)Assertions.assertThat((Object)traceContext).returns((Object)"80f198ee56343ba864fe8b2a57d3eff7", TraceContext::traceId)).returns((Object)"e457b5a2e4d86bd1", TraceContext::parentId);
        }).thenCancel().verify(Duration.ofSeconds(15L));
    }

    @Override
    public void cleanUpTracing() {
        this.spans.clear();
    }
}

