/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.clients.consumer.internals;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Optional;
import java.util.Properties;
import java.util.stream.Stream;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.internals.ConsumerMetadata;
import org.apache.kafka.clients.consumer.internals.ConsumerUtils;
import org.apache.kafka.clients.consumer.internals.Deserializers;
import org.apache.kafka.clients.consumer.internals.FetchConfig;
import org.apache.kafka.clients.consumer.internals.ShareCompletedFetch;
import org.apache.kafka.clients.consumer.internals.ShareCompletedFetchTest;
import org.apache.kafka.clients.consumer.internals.ShareFetch;
import org.apache.kafka.clients.consumer.internals.ShareFetchBuffer;
import org.apache.kafka.clients.consumer.internals.ShareFetchCollector;
import org.apache.kafka.clients.consumer.internals.ShareFetchMetricsAggregator;
import org.apache.kafka.clients.consumer.internals.ShareFetchMetricsManager;
import org.apache.kafka.clients.consumer.internals.SubscriptionState;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.compress.Compression;
import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.internals.ClusterResourceListeners;
import org.apache.kafka.common.message.ShareFetchResponseData;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.BaseRecords;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.MemoryRecordsBuilder;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.BufferSupplier;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;

public class ShareFetchCollectorTest {
    private static final int DEFAULT_RECORD_COUNT = 10;
    private static final int DEFAULT_MAX_POLL_RECORDS = 500;
    private final TopicIdPartition topicAPartition0 = new TopicIdPartition(Uuid.randomUuid(), 0, "topic-a");
    private LogContext logContext;
    private SubscriptionState subscriptions;
    private FetchConfig fetchConfig;
    private ConsumerMetadata metadata;
    private ShareFetchBuffer fetchBuffer;
    private Deserializers<String, String> deserializers;
    private ShareFetchCollector<String, String> fetchCollector;
    private ShareCompletedFetchBuilder completedFetchBuilder;
    private ShareFetchMetricsAggregator shareFetchMetricsAggregator;

    @Test
    public void testFetchNormal() {
        int recordCount = 500;
        this.buildDependencies();
        this.subscribeAndAssign(this.topicAPartition0);
        ShareCompletedFetch completedFetch = this.completedFetchBuilder.recordCount(recordCount).build();
        Assertions.assertTrue((boolean)this.fetchBuffer.isEmpty());
        this.fetchBuffer.add(completedFetch);
        Assertions.assertFalse((boolean)this.fetchBuffer.isEmpty());
        Assertions.assertFalse((boolean)completedFetch.isInitialized());
        ShareFetch fetch = this.fetchCollector.collect(this.fetchBuffer);
        Assertions.assertFalse((boolean)fetch.isEmpty());
        Assertions.assertEquals((int)recordCount, (int)fetch.numRecords());
        Assertions.assertTrue((boolean)completedFetch.isInitialized());
        Assertions.assertFalse((boolean)completedFetch.isConsumed());
        Assertions.assertTrue((boolean)this.fetchBuffer.isEmpty());
        Assertions.assertNull((Object)this.fetchBuffer.peek());
        Assertions.assertNull((Object)this.fetchBuffer.poll());
        Assertions.assertNotNull((Object)this.fetchBuffer.nextInLineFetch());
        fetch = this.fetchCollector.collect(this.fetchBuffer);
        Assertions.assertEquals((int)0, (int)fetch.numRecords());
        Assertions.assertTrue((boolean)fetch.isEmpty());
        Assertions.assertTrue((boolean)completedFetch.isConsumed());
    }

    @ParameterizedTest
    @MethodSource(value={"testErrorInInitializeSource"})
    public void testErrorInInitialize(final RuntimeException expectedException) {
        this.buildDependencies();
        this.subscribeAndAssign(this.topicAPartition0);
        this.fetchCollector = new ShareFetchCollector<String, String>(this.logContext, this.metadata, this.subscriptions, this.fetchConfig, this.deserializers){

            protected ShareCompletedFetch initialize(ShareCompletedFetch completedFetch) {
                throw expectedException;
            }
        };
        ShareCompletedFetch completedFetch = this.completedFetchBuilder.recordCount(10).build();
        this.fetchBuffer.add(completedFetch);
        Assertions.assertFalse((boolean)this.fetchBuffer.isEmpty());
        Assertions.assertThrows(expectedException.getClass(), () -> this.fetchCollector.collect(this.fetchBuffer));
    }

    @Test
    public void testFetchWithTopicAuthorizationFailed() {
        this.buildDependencies();
        this.subscribeAndAssign(this.topicAPartition0);
        ShareCompletedFetch completedFetch = this.completedFetchBuilder.error(Errors.TOPIC_AUTHORIZATION_FAILED).build();
        this.fetchBuffer.add(completedFetch);
        Assertions.assertThrows(TopicAuthorizationException.class, () -> this.fetchCollector.collect(this.fetchBuffer));
    }

    @Test
    public void testFetchWithUnknownLeaderEpoch() {
        this.buildDependencies();
        this.subscribeAndAssign(this.topicAPartition0);
        ShareCompletedFetch completedFetch = this.completedFetchBuilder.error(Errors.UNKNOWN_LEADER_EPOCH).build();
        this.fetchBuffer.add(completedFetch);
        ShareFetch fetch = this.fetchCollector.collect(this.fetchBuffer);
        Assertions.assertTrue((boolean)fetch.isEmpty());
    }

    @Test
    public void testFetchWithUnknownServerError() {
        this.buildDependencies();
        this.subscribeAndAssign(this.topicAPartition0);
        ShareCompletedFetch completedFetch = this.completedFetchBuilder.error(Errors.UNKNOWN_SERVER_ERROR).build();
        this.fetchBuffer.add(completedFetch);
        ShareFetch fetch = this.fetchCollector.collect(this.fetchBuffer);
        Assertions.assertTrue((boolean)fetch.isEmpty());
    }

    @Test
    public void testFetchWithCorruptMessage() {
        this.buildDependencies();
        this.subscribeAndAssign(this.topicAPartition0);
        ShareCompletedFetch completedFetch = this.completedFetchBuilder.error(Errors.CORRUPT_MESSAGE).build();
        this.fetchBuffer.add(completedFetch);
        Assertions.assertThrows(KafkaException.class, () -> this.fetchCollector.collect(this.fetchBuffer));
    }

    @ParameterizedTest
    @MethodSource(value={"testFetchWithOtherErrorsSource"})
    public void testFetchWithOtherErrors(Errors error) {
        this.buildDependencies();
        this.subscribeAndAssign(this.topicAPartition0);
        ShareCompletedFetch completedFetch = this.completedFetchBuilder.error(error).build();
        this.fetchBuffer.add(completedFetch);
        Assertions.assertThrows(IllegalStateException.class, () -> this.fetchCollector.collect(this.fetchBuffer));
    }

    private void buildDependencies() {
        this.logContext = new LogContext();
        Properties p = new Properties();
        p.put("bootstrap.servers", "localhost:9092");
        p.put("key.deserializer", StringSerializer.class.getName());
        p.put("value.deserializer", StringSerializer.class.getName());
        p.put("max.poll.records", String.valueOf(500));
        ConsumerConfig config = new ConsumerConfig(p);
        Metrics metrics = ConsumerUtils.createMetrics((ConsumerConfig)config, (Time)Time.SYSTEM);
        this.deserializers = new Deserializers((Deserializer)new StringDeserializer(), (Deserializer)new StringDeserializer(), metrics);
        ShareFetchMetricsManager shareFetchMetricsManager = ConsumerUtils.createShareFetchMetricsManager((Metrics)metrics);
        HashSet<TopicPartition> partitionSet = new HashSet<TopicPartition>();
        partitionSet.add(this.topicAPartition0.topicPartition());
        this.shareFetchMetricsAggregator = new ShareFetchMetricsAggregator(shareFetchMetricsManager, partitionSet);
        this.subscriptions = ConsumerUtils.createSubscriptionState((ConsumerConfig)config, (LogContext)this.logContext);
        this.fetchConfig = new FetchConfig(config);
        this.metadata = new ConsumerMetadata(0L, 1000L, 10000L, false, false, this.subscriptions, this.logContext, new ClusterResourceListeners());
        this.fetchCollector = new ShareFetchCollector(this.logContext, this.metadata, this.subscriptions, this.fetchConfig, this.deserializers);
        this.fetchBuffer = new ShareFetchBuffer(this.logContext);
        this.completedFetchBuilder = new ShareCompletedFetchBuilder();
    }

    private void subscribeAndAssign(TopicIdPartition tp) {
        this.subscriptions.subscribe(Collections.singleton(tp.topic()), Optional.empty());
        this.subscriptions.assignFromSubscribed(Collections.singleton(tp.topicPartition()));
    }

    private static Stream<Arguments> testFetchWithOtherErrorsSource() {
        ArrayList<Errors> errors = new ArrayList<Errors>(Arrays.asList(Errors.values()));
        errors.removeAll(Arrays.asList(Errors.NONE, Errors.NOT_LEADER_OR_FOLLOWER, Errors.REPLICA_NOT_AVAILABLE, Errors.KAFKA_STORAGE_ERROR, Errors.FENCED_LEADER_EPOCH, Errors.OFFSET_NOT_AVAILABLE, Errors.UNKNOWN_TOPIC_OR_PARTITION, Errors.UNKNOWN_TOPIC_ID, Errors.INCONSISTENT_TOPIC_ID, Errors.OFFSET_OUT_OF_RANGE, Errors.TOPIC_AUTHORIZATION_FAILED, Errors.UNKNOWN_LEADER_EPOCH, Errors.UNKNOWN_SERVER_ERROR, Errors.CORRUPT_MESSAGE));
        return errors.stream().map(xva$0 -> Arguments.of((Object[])new Object[]{xva$0}));
    }

    private static Stream<Arguments> testErrorInInitializeSource() {
        return Stream.of(Arguments.of((Object[])new Object[]{new RuntimeException()}), Arguments.of((Object[])new Object[]{new KafkaException()}));
    }

    private class ShareCompletedFetchBuilder {
        private int recordCount = 10;
        private Errors error = null;

        private ShareCompletedFetchBuilder() {
        }

        private ShareCompletedFetchBuilder recordCount(int recordCount) {
            this.recordCount = recordCount;
            return this;
        }

        private ShareCompletedFetchBuilder error(Errors error) {
            this.error = error;
            return this;
        }

        private ShareCompletedFetch build() {
            MemoryRecords records;
            ByteBuffer allocate = ByteBuffer.allocate(1024);
            try (MemoryRecordsBuilder builder = MemoryRecords.builder((ByteBuffer)allocate, (Compression)Compression.NONE, (TimestampType)TimestampType.CREATE_TIME, (long)0L);){
                for (int i = 0; i < this.recordCount; ++i) {
                    builder.append(0L, "key".getBytes(), ("value-" + i).getBytes());
                }
                records = builder.build();
            }
            ShareFetchResponseData.PartitionData partitionData = new ShareFetchResponseData.PartitionData().setPartitionIndex(ShareFetchCollectorTest.this.topicAPartition0.partition()).setRecords((BaseRecords)records).setAcquiredRecords(ShareCompletedFetchTest.acquiredRecords(0L, this.recordCount));
            if (this.error != null) {
                partitionData.setErrorCode(this.error.code());
            }
            return new ShareCompletedFetch(ShareFetchCollectorTest.this.logContext, BufferSupplier.create(), 0, ShareFetchCollectorTest.this.topicAPartition0, partitionData, ShareFetchCollectorTest.this.shareFetchMetricsAggregator, ApiKeys.SHARE_FETCH.latestVersion());
        }
    }
}

