/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.kafka.test.utils;

import java.time.Duration;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Properties;
import java.util.stream.Collectors;
import org.apache.commons.logging.LogFactory;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.assertj.core.api.AbstractBooleanAssert;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.IterableAssert;
import org.springframework.beans.DirectFieldAccessor;
import org.springframework.core.log.LogAccessor;
import org.springframework.kafka.test.EmbeddedKafkaBroker;
import org.springframework.util.Assert;

public final class KafkaTestUtils {
    private static final LogAccessor logger = new LogAccessor(LogFactory.getLog(KafkaTestUtils.class));
    private static Properties defaults;

    private KafkaTestUtils() {
    }

    public static Map<String, Object> consumerProps(String group, String autoCommit, EmbeddedKafkaBroker embeddedKafka) {
        return KafkaTestUtils.consumerProps(embeddedKafka.getBrokersAsString(), group, autoCommit);
    }

    public static Map<String, Object> producerProps(EmbeddedKafkaBroker embeddedKafka) {
        return KafkaTestUtils.senderProps(embeddedKafka.getBrokersAsString());
    }

    public static Map<String, Object> consumerProps(String brokers, String group, String autoCommit) {
        HashMap<String, Object> props = new HashMap<String, Object>();
        props.put("bootstrap.servers", brokers);
        props.put("group.id", group);
        props.put("enable.auto.commit", autoCommit);
        props.put("auto.commit.interval.ms", "10");
        props.put("session.timeout.ms", "60000");
        props.put("key.deserializer", IntegerDeserializer.class);
        props.put("value.deserializer", StringDeserializer.class);
        return props;
    }

    public static Map<String, Object> senderProps(String brokers) {
        HashMap<String, Object> props = new HashMap<String, Object>();
        props.put("bootstrap.servers", brokers);
        props.put("retries", 0);
        props.put("batch.size", "16384");
        props.put("linger.ms", 1);
        props.put("buffer.memory", "33554432");
        props.put("key.serializer", IntegerSerializer.class);
        props.put("value.serializer", StringSerializer.class);
        return props;
    }

    public static <K, V> ConsumerRecord<K, V> getSingleRecord(Consumer<K, V> consumer, String topic) {
        return KafkaTestUtils.getSingleRecord(consumer, topic, 60000L);
    }

    public static <K, V> ConsumerRecord<K, V> getSingleRecord(Consumer<K, V> consumer, String topic, long timeout) {
        ConsumerRecords<K, V> received = KafkaTestUtils.getRecords(consumer, timeout);
        Iterator iterator = received.records(topic).iterator();
        ((AbstractBooleanAssert)Assertions.assertThat((boolean)iterator.hasNext()).as("No records found for topic", new Object[0])).isTrue();
        iterator.next();
        ((AbstractBooleanAssert)Assertions.assertThat((boolean)iterator.hasNext()).as("More than one record for topic found", new Object[0])).isFalse();
        if (received.count() > 1) {
            HashMap<TopicPartition, Long> reset = new HashMap<TopicPartition, Long>();
            received.forEach(rec -> {
                if (!rec.topic().equals(topic)) {
                    reset.computeIfAbsent(new TopicPartition(rec.topic(), rec.partition()), tp -> rec.offset());
                }
            });
            reset.forEach((tp, off) -> consumer.seek(tp, off.longValue()));
        }
        return (ConsumerRecord)received.records(topic).iterator().next();
    }

    public static <K, V> ConsumerRecords<K, V> getRecords(Consumer<K, V> consumer) {
        return KafkaTestUtils.getRecords(consumer, 60000L);
    }

    public static <K, V> ConsumerRecords<K, V> getRecords(Consumer<K, V> consumer, long timeout) {
        logger.debug((CharSequence)"Polling...");
        ConsumerRecords received = consumer.poll(Duration.ofMillis(timeout));
        logger.debug(() -> "Received: " + received.count() + ", " + received.partitions().stream().flatMap(p -> received.records(p).stream()).map(r -> r.topic() + "-" + r.partition() + "@" + r.offset()).collect(Collectors.toList()));
        ((IterableAssert)Assertions.assertThat((Iterable)received).as("null received from consumer.poll()", new Object[0])).isNotNull();
        return received;
    }

    public static Object getPropertyValue(Object root, String propertyPath) {
        Object value = null;
        DirectFieldAccessor accessor = new DirectFieldAccessor(root);
        String[] tokens = propertyPath.split("\\.");
        for (int i = 0; i < tokens.length; ++i) {
            value = accessor.getPropertyValue(tokens[i]);
            if (value == null) {
                if (i == tokens.length - 1) {
                    return null;
                }
                throw new IllegalArgumentException("intermediate property '" + tokens[i] + "' is null");
            }
            accessor = new DirectFieldAccessor(value);
        }
        return value;
    }

    public static <T> T getPropertyValue(Object root, String propertyPath, Class<T> type) {
        Object value = KafkaTestUtils.getPropertyValue(root, propertyPath);
        if (value != null) {
            Assert.isAssignable(type, value.getClass());
        }
        return (T)value;
    }

    public static Properties defaultPropertyOverrides() {
        if (defaults == null) {
            Properties props = new Properties();
            props.setProperty("enable.auto.commit", "false");
            defaults = props;
        }
        return defaults;
    }
}

