/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.clients.consumer.internals.events;

import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.regex.Pattern;
import java.util.stream.Stream;
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.SubscriptionPattern;
import org.apache.kafka.clients.consumer.internals.AutoOffsetResetStrategy;
import org.apache.kafka.clients.consumer.internals.CommitRequestManager;
import org.apache.kafka.clients.consumer.internals.ConsumerHeartbeatRequestManager;
import org.apache.kafka.clients.consumer.internals.ConsumerMembershipManager;
import org.apache.kafka.clients.consumer.internals.ConsumerMetadata;
import org.apache.kafka.clients.consumer.internals.CoordinatorRequestManager;
import org.apache.kafka.clients.consumer.internals.FetchRequestManager;
import org.apache.kafka.clients.consumer.internals.MockRebalanceListener;
import org.apache.kafka.clients.consumer.internals.NetworkClientDelegate;
import org.apache.kafka.clients.consumer.internals.OffsetsRequestManager;
import org.apache.kafka.clients.consumer.internals.RequestManagers;
import org.apache.kafka.clients.consumer.internals.StreamsGroupHeartbeatRequestManager;
import org.apache.kafka.clients.consumer.internals.StreamsMembershipManager;
import org.apache.kafka.clients.consumer.internals.SubscriptionState;
import org.apache.kafka.clients.consumer.internals.TopicMetadataRequestManager;
import org.apache.kafka.clients.consumer.internals.events.ApplicationEvent;
import org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor;
import org.apache.kafka.clients.consumer.internals.events.AssignmentChangeEvent;
import org.apache.kafka.clients.consumer.internals.events.AsyncCommitEvent;
import org.apache.kafka.clients.consumer.internals.events.CheckAndUpdatePositionsEvent;
import org.apache.kafka.clients.consumer.internals.events.CommitOnCloseEvent;
import org.apache.kafka.clients.consumer.internals.events.CompletableEvent;
import org.apache.kafka.clients.consumer.internals.events.CreateFetchRequestsEvent;
import org.apache.kafka.clients.consumer.internals.events.FetchCommittedOffsetsEvent;
import org.apache.kafka.clients.consumer.internals.events.ListOffsetsEvent;
import org.apache.kafka.clients.consumer.internals.events.PollEvent;
import org.apache.kafka.clients.consumer.internals.events.ResetOffsetEvent;
import org.apache.kafka.clients.consumer.internals.events.SeekUnvalidatedEvent;
import org.apache.kafka.clients.consumer.internals.events.StreamsOnAllTasksLostCallbackCompletedEvent;
import org.apache.kafka.clients.consumer.internals.events.StreamsOnTasksAssignedCallbackCompletedEvent;
import org.apache.kafka.clients.consumer.internals.events.StreamsOnTasksRevokedCallbackCompletedEvent;
import org.apache.kafka.clients.consumer.internals.events.SyncCommitEvent;
import org.apache.kafka.clients.consumer.internals.events.TopicMetadataEvent;
import org.apache.kafka.clients.consumer.internals.events.TopicPatternSubscriptionChangeEvent;
import org.apache.kafka.clients.consumer.internals.events.TopicRe2JPatternSubscriptionChangeEvent;
import org.apache.kafka.clients.consumer.internals.events.TopicSubscriptionChangeEvent;
import org.apache.kafka.clients.consumer.internals.events.UnsubscribeEvent;
import org.apache.kafka.clients.consumer.internals.events.UpdatePatternSubscriptionEvent;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.LogCaptureAppender;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.test.TestUtils;
import org.apache.logging.log4j.Level;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.mockito.verification.VerificationMode;

public class ApplicationEventProcessorTest {
    private final Time time = new MockTime();
    private final CommitRequestManager commitRequestManager = (CommitRequestManager)Mockito.mock(CommitRequestManager.class);
    private final ConsumerHeartbeatRequestManager heartbeatRequestManager = (ConsumerHeartbeatRequestManager)Mockito.mock(ConsumerHeartbeatRequestManager.class);
    private final ConsumerMembershipManager membershipManager = (ConsumerMembershipManager)Mockito.mock(ConsumerMembershipManager.class);
    private final OffsetsRequestManager offsetsRequestManager = (OffsetsRequestManager)Mockito.mock(OffsetsRequestManager.class);
    private SubscriptionState subscriptionState = (SubscriptionState)Mockito.mock(SubscriptionState.class);
    private final ConsumerMetadata metadata = (ConsumerMetadata)Mockito.mock(ConsumerMetadata.class);
    private final StreamsGroupHeartbeatRequestManager streamsGroupHeartbeatRequestManager = (StreamsGroupHeartbeatRequestManager)Mockito.mock(StreamsGroupHeartbeatRequestManager.class);
    private final StreamsMembershipManager streamsMembershipManager = (StreamsMembershipManager)Mockito.mock(StreamsMembershipManager.class);
    private ApplicationEventProcessor processor;

    private void setupProcessor(boolean withGroupId) {
        RequestManagers requestManagers = new RequestManagers(new LogContext(), this.offsetsRequestManager, (TopicMetadataRequestManager)Mockito.mock(TopicMetadataRequestManager.class), (FetchRequestManager)Mockito.mock(FetchRequestManager.class), withGroupId ? Optional.of((CoordinatorRequestManager)Mockito.mock(CoordinatorRequestManager.class)) : Optional.empty(), withGroupId ? Optional.of(this.commitRequestManager) : Optional.empty(), withGroupId ? Optional.of(this.heartbeatRequestManager) : Optional.empty(), withGroupId ? Optional.of(this.membershipManager) : Optional.empty(), Optional.empty(), Optional.empty());
        this.processor = new ApplicationEventProcessor(new LogContext(), requestManagers, this.metadata, this.subscriptionState);
    }

    private void setupStreamProcessor(boolean withGroupId) {
        RequestManagers requestManagers = new RequestManagers(new LogContext(), this.offsetsRequestManager, (TopicMetadataRequestManager)Mockito.mock(TopicMetadataRequestManager.class), (FetchRequestManager)Mockito.mock(FetchRequestManager.class), withGroupId ? Optional.of((CoordinatorRequestManager)Mockito.mock(CoordinatorRequestManager.class)) : Optional.empty(), withGroupId ? Optional.of(this.commitRequestManager) : Optional.empty(), withGroupId ? Optional.of(this.heartbeatRequestManager) : Optional.empty(), Optional.empty(), withGroupId ? Optional.of(this.streamsGroupHeartbeatRequestManager) : Optional.empty(), withGroupId ? Optional.of(this.streamsMembershipManager) : Optional.empty());
        this.processor = new ApplicationEventProcessor(new LogContext(), requestManagers, this.metadata, this.subscriptionState);
    }

    @Test
    public void testPrepClosingCommitEvents() {
        this.setupProcessor(true);
        List<NetworkClientDelegate.UnsentRequest> results = this.mockCommitResults();
        ((CommitRequestManager)Mockito.doReturn((Object)new NetworkClientDelegate.PollResult(100L, results)).when((Object)this.commitRequestManager)).pollOnClose(ArgumentMatchers.anyLong());
        this.processor.process((ApplicationEvent)new CommitOnCloseEvent());
        ((CommitRequestManager)Mockito.verify((Object)this.commitRequestManager)).signalClose();
    }

    @Test
    public void testProcessUnsubscribeEventWithGroupId() {
        this.setupProcessor(true);
        Mockito.when((Object)this.heartbeatRequestManager.membershipManager()).thenReturn((Object)this.membershipManager);
        Mockito.when((Object)this.membershipManager.leaveGroup()).thenReturn(CompletableFuture.completedFuture(null));
        this.processor.process((ApplicationEvent)new UnsubscribeEvent(0L));
        ((ConsumerMembershipManager)Mockito.verify((Object)this.membershipManager)).leaveGroup();
    }

    @Test
    public void testProcessUnsubscribeEventWithoutGroupId() {
        this.setupProcessor(false);
        this.processor.process((ApplicationEvent)new UnsubscribeEvent(0L));
        ((SubscriptionState)Mockito.verify((Object)this.subscriptionState)).unsubscribe();
    }

    @ParameterizedTest
    @MethodSource(value={"applicationEvents"})
    public void testApplicationEventIsProcessed(ApplicationEvent e) {
        ApplicationEventProcessor applicationEventProcessor = (ApplicationEventProcessor)Mockito.mock(ApplicationEventProcessor.class);
        applicationEventProcessor.process(e);
        ((ApplicationEventProcessor)Mockito.verify((Object)applicationEventProcessor)).process((ApplicationEvent)ArgumentMatchers.any(e.getClass()));
    }

    private static Stream<Arguments> applicationEvents() {
        return Stream.of(Arguments.of((Object[])new Object[]{new PollEvent(100L)}), Arguments.of((Object[])new Object[]{new CreateFetchRequestsEvent(CompletableEvent.calculateDeadlineMs((long)12345L, (long)100L))}), Arguments.of((Object[])new Object[]{new CheckAndUpdatePositionsEvent(500L)}), Arguments.of((Object[])new Object[]{new TopicMetadataEvent("topic", Long.MAX_VALUE)}), Arguments.of((Object[])new Object[]{new AssignmentChangeEvent(12345L, 12345L, Collections.emptyList())}));
    }

    @ParameterizedTest
    @ValueSource(booleans={true, false})
    public void testListOffsetsEventIsProcessed(boolean requireTimestamp) {
        ApplicationEventProcessor applicationEventProcessor = (ApplicationEventProcessor)Mockito.mock(ApplicationEventProcessor.class);
        Map<TopicPartition, Long> timestamps = Collections.singletonMap(new TopicPartition("topic1", 1), 5L);
        ListOffsetsEvent e = new ListOffsetsEvent(timestamps, CompletableEvent.calculateDeadlineMs((Time)this.time, (long)100L), requireTimestamp);
        applicationEventProcessor.process((ApplicationEvent)e);
        ((ApplicationEventProcessor)Mockito.verify((Object)applicationEventProcessor)).process((ApplicationEvent)ArgumentMatchers.any(ListOffsetsEvent.class));
    }

    @ParameterizedTest
    @ValueSource(booleans={true, false})
    public void testAssignmentChangeEvent(boolean withGroupId) {
        long currentTimeMs = 12345L;
        TopicPartition tp = new TopicPartition("topic", 0);
        AssignmentChangeEvent event = new AssignmentChangeEvent(12345L, 12345L, Collections.singleton(tp));
        this.setupProcessor(withGroupId);
        ((SubscriptionState)Mockito.doReturn((Object)true).when((Object)this.subscriptionState)).assignFromUser(Collections.singleton(tp));
        this.processor.process((ApplicationEvent)event);
        if (withGroupId) {
            ((CommitRequestManager)Mockito.verify((Object)this.commitRequestManager)).updateTimerAndMaybeCommit(12345L);
        } else {
            ((CommitRequestManager)Mockito.verify((Object)this.commitRequestManager, (VerificationMode)Mockito.never())).updateTimerAndMaybeCommit(12345L);
        }
        ((ConsumerMetadata)Mockito.verify((Object)this.metadata)).requestUpdateForNewTopics();
        ((SubscriptionState)Mockito.verify((Object)this.subscriptionState)).assignFromUser(Collections.singleton(tp));
        Assertions.assertDoesNotThrow(() -> (Void)event.future().get());
    }

    @Test
    public void testAssignmentChangeEventWithException() {
        AssignmentChangeEvent event = new AssignmentChangeEvent(12345L, 12345L, Collections.emptyList());
        this.setupProcessor(false);
        ((SubscriptionState)Mockito.doThrow((Throwable[])new Throwable[]{new IllegalStateException()}).when((Object)this.subscriptionState)).assignFromUser((Set)ArgumentMatchers.any());
        this.processor.process((ApplicationEvent)event);
        ExecutionException e = (ExecutionException)Assertions.assertThrows(ExecutionException.class, () -> event.future().get());
        Assertions.assertInstanceOf(IllegalStateException.class, (Object)e.getCause());
    }

    @Test
    public void testResetOffsetEvent() {
        Set<TopicPartition> tp = Collections.singleton(new TopicPartition("topic", 0));
        AutoOffsetResetStrategy strategy = AutoOffsetResetStrategy.LATEST;
        ResetOffsetEvent event = new ResetOffsetEvent(tp, strategy, 12345L);
        this.setupProcessor(false);
        this.processor.process((ApplicationEvent)event);
        ((SubscriptionState)Mockito.verify((Object)this.subscriptionState)).requestOffsetReset(event.topicPartitions(), event.offsetResetStrategy());
    }

    @Test
    public void testSeekUnvalidatedEvent() {
        TopicPartition tp = new TopicPartition("topic", 0);
        Optional<Integer> offsetEpoch = Optional.of(1);
        SubscriptionState.FetchPosition position = new SubscriptionState.FetchPosition(0L, offsetEpoch, Metadata.LeaderAndEpoch.noLeaderOrEpoch());
        SeekUnvalidatedEvent event = new SeekUnvalidatedEvent(12345L, tp, 0L, offsetEpoch);
        this.setupProcessor(false);
        ((ConsumerMetadata)Mockito.doReturn((Object)Metadata.LeaderAndEpoch.noLeaderOrEpoch()).when((Object)this.metadata)).currentLeader(tp);
        ((SubscriptionState)Mockito.doNothing().when((Object)this.subscriptionState)).seekUnvalidated((TopicPartition)ArgumentMatchers.eq((Object)tp), (SubscriptionState.FetchPosition)ArgumentMatchers.any());
        this.processor.process((ApplicationEvent)event);
        ((ConsumerMetadata)Mockito.verify((Object)this.metadata)).updateLastSeenEpochIfNewer(tp, offsetEpoch.get().intValue());
        ((ConsumerMetadata)Mockito.verify((Object)this.metadata)).currentLeader(tp);
        ((SubscriptionState)Mockito.verify((Object)this.subscriptionState)).seekUnvalidated(tp, position);
        Assertions.assertDoesNotThrow(() -> (Void)event.future().get());
    }

    @Test
    public void testSeekUnvalidatedEventWithException() {
        TopicPartition tp = new TopicPartition("topic", 0);
        SeekUnvalidatedEvent event = new SeekUnvalidatedEvent(12345L, tp, 0L, Optional.empty());
        this.setupProcessor(false);
        ((ConsumerMetadata)Mockito.doReturn((Object)Metadata.LeaderAndEpoch.noLeaderOrEpoch()).when((Object)this.metadata)).currentLeader(tp);
        ((SubscriptionState)Mockito.doThrow((Throwable[])new Throwable[]{new IllegalStateException()}).when((Object)this.subscriptionState)).seekUnvalidated((TopicPartition)ArgumentMatchers.eq((Object)tp), (SubscriptionState.FetchPosition)ArgumentMatchers.any());
        this.processor.process((ApplicationEvent)event);
        ExecutionException e = (ExecutionException)Assertions.assertThrows(ExecutionException.class, () -> event.future().get());
        Assertions.assertInstanceOf(IllegalStateException.class, (Object)e.getCause());
    }

    @Test
    public void testPollEvent() {
        PollEvent event = new PollEvent(12345L);
        this.setupProcessor(true);
        Mockito.when((Object)this.heartbeatRequestManager.membershipManager()).thenReturn((Object)this.membershipManager);
        this.processor.process((ApplicationEvent)event);
        Assertions.assertTrue((boolean)event.reconcileAndAutoCommit().isDone());
        ((CommitRequestManager)Mockito.verify((Object)this.commitRequestManager)).updateTimerAndMaybeCommit(12345L);
        ((ConsumerMembershipManager)Mockito.verify((Object)this.membershipManager)).onConsumerPoll();
        ((ConsumerHeartbeatRequestManager)Mockito.verify((Object)this.heartbeatRequestManager)).resetPollTimer(12345L);
    }

    @Test
    public void testTopicSubscriptionChangeEvent() {
        Set<String> topics = Set.of("topic1", "topic2");
        Optional<MockRebalanceListener> listener = Optional.of(new MockRebalanceListener());
        TopicSubscriptionChangeEvent event = new TopicSubscriptionChangeEvent(topics, listener, 12345L);
        this.setupProcessor(true);
        Mockito.when((Object)this.subscriptionState.subscribe(topics, listener)).thenReturn((Object)true);
        Mockito.when((Object)this.metadata.requestUpdateForNewTopics()).thenReturn((Object)1);
        Mockito.when((Object)this.heartbeatRequestManager.membershipManager()).thenReturn((Object)this.membershipManager);
        this.processor.process((ApplicationEvent)event);
        ((SubscriptionState)Mockito.verify((Object)this.subscriptionState)).subscribe(topics, listener);
        ((ConsumerMetadata)Mockito.verify((Object)this.metadata)).requestUpdateForNewTopics();
        Assertions.assertEquals((int)1, (int)this.processor.metadataVersionSnapshot());
        ((ConsumerMembershipManager)Mockito.verify((Object)this.membershipManager)).onSubscriptionUpdated();
        ((ConsumerMembershipManager)Mockito.verify((Object)this.membershipManager, (VerificationMode)Mockito.never())).onConsumerPoll();
        Assertions.assertDoesNotThrow(() -> (Void)event.future().get());
    }

    @Test
    public void testFetchCommittedOffsetsEvent() {
        TopicPartition tp0 = new TopicPartition("topic", 0);
        TopicPartition tp1 = new TopicPartition("topic", 1);
        TopicPartition tp2 = new TopicPartition("topic", 2);
        Set<TopicPartition> partitions = Set.of(tp0, tp1, tp2);
        Map<TopicPartition, OffsetAndMetadata> topicPartitionOffsets = Map.of(tp0, new OffsetAndMetadata(10L, Optional.of(2), ""), tp1, new OffsetAndMetadata(15L, Optional.empty(), ""), tp2, new OffsetAndMetadata(20L, Optional.of(3), ""));
        FetchCommittedOffsetsEvent event = new FetchCommittedOffsetsEvent(partitions, 12345L);
        this.setupProcessor(true);
        Mockito.when((Object)this.commitRequestManager.fetchOffsets(partitions, 12345L)).thenReturn(CompletableFuture.completedFuture(topicPartitionOffsets));
        this.processor.process((ApplicationEvent)event);
        ((CommitRequestManager)Mockito.verify((Object)this.commitRequestManager)).fetchOffsets(partitions, 12345L);
        Assertions.assertEquals(topicPartitionOffsets, (Object)Assertions.assertDoesNotThrow(() -> (Map)event.future().get()));
    }

    @Test
    public void testTopicSubscriptionChangeEventWithIllegalSubscriptionState() {
        this.subscriptionState = new SubscriptionState(new LogContext(), AutoOffsetResetStrategy.EARLIEST);
        Optional<MockRebalanceListener> listener = Optional.of(new MockRebalanceListener());
        TopicSubscriptionChangeEvent event = new TopicSubscriptionChangeEvent(Set.of("topic1", "topic2"), listener, 12345L);
        this.subscriptionState.subscribe(Pattern.compile("topic.*"), listener);
        this.setupProcessor(true);
        Mockito.when((Object)this.metadata.requestUpdateForNewTopics()).thenReturn((Object)1);
        Mockito.when((Object)this.heartbeatRequestManager.membershipManager()).thenReturn((Object)this.membershipManager);
        this.processor.process((ApplicationEvent)event);
        ExecutionException e = (ExecutionException)Assertions.assertThrows(ExecutionException.class, () -> event.future().get());
        Assertions.assertInstanceOf(IllegalStateException.class, (Object)e.getCause());
        Assertions.assertEquals((Object)"Subscription to topics, partitions and pattern are mutually exclusive", (Object)e.getCause().getMessage());
    }

    @Test
    public void testTopicPatternSubscriptionChangeEvent() {
        Pattern pattern = Pattern.compile("topic.*");
        Set<String> topics = Set.of("topic.1", "topic.2");
        Optional<MockRebalanceListener> listener = Optional.of(new MockRebalanceListener());
        TopicPatternSubscriptionChangeEvent event = new TopicPatternSubscriptionChangeEvent(pattern, listener, 12345L);
        this.setupProcessor(true);
        Cluster cluster = (Cluster)Mockito.mock(Cluster.class);
        Mockito.when((Object)this.metadata.fetch()).thenReturn((Object)cluster);
        Mockito.when((Object)cluster.topics()).thenReturn(topics);
        Mockito.when((Object)this.subscriptionState.matchesSubscribedPattern("topic.1")).thenReturn((Object)true);
        Mockito.when((Object)this.subscriptionState.matchesSubscribedPattern("topic.2")).thenReturn((Object)true);
        Mockito.when((Object)this.subscriptionState.subscribeFromPattern(topics)).thenReturn((Object)true);
        Mockito.when((Object)this.metadata.requestUpdateForNewTopics()).thenReturn((Object)1);
        Mockito.when((Object)this.heartbeatRequestManager.membershipManager()).thenReturn((Object)this.membershipManager);
        this.processor.process((ApplicationEvent)event);
        ((SubscriptionState)Mockito.verify((Object)this.subscriptionState)).subscribe(pattern, listener);
        ((SubscriptionState)Mockito.verify((Object)this.subscriptionState)).subscribeFromPattern(topics);
        ((ConsumerMetadata)Mockito.verify((Object)this.metadata, (VerificationMode)Mockito.times((int)2))).requestUpdateForNewTopics();
        Assertions.assertEquals((int)1, (int)this.processor.metadataVersionSnapshot());
        ((ConsumerMembershipManager)Mockito.verify((Object)this.membershipManager)).onSubscriptionUpdated();
        ((ConsumerMembershipManager)Mockito.verify((Object)this.membershipManager, (VerificationMode)Mockito.never())).onConsumerPoll();
        Assertions.assertDoesNotThrow(() -> (Void)event.future().get());
    }

    @Test
    public void testTopicPatternSubscriptionTriggersJoin() {
        TopicPatternSubscriptionChangeEvent event = new TopicPatternSubscriptionChangeEvent(Pattern.compile("topic.*"), Optional.of(new MockRebalanceListener()), 12345L);
        this.setupProcessor(true);
        Cluster cluster = (Cluster)Mockito.mock(Cluster.class);
        Mockito.when((Object)this.metadata.fetch()).thenReturn((Object)cluster);
        Mockito.when((Object)this.heartbeatRequestManager.membershipManager()).thenReturn((Object)this.membershipManager);
        Mockito.when((Object)this.subscriptionState.subscribeFromPattern((Set)ArgumentMatchers.any())).thenReturn((Object)false);
        this.processor.process((ApplicationEvent)event);
        ((ConsumerMembershipManager)Mockito.verify((Object)this.membershipManager)).onSubscriptionUpdated();
        Mockito.clearInvocations((Object[])new ConsumerMembershipManager[]{this.membershipManager});
        Mockito.when((Object)this.subscriptionState.subscribeFromPattern((Set)ArgumentMatchers.any())).thenReturn((Object)true);
        this.processor.process((ApplicationEvent)event);
        ((ConsumerMembershipManager)Mockito.verify((Object)this.membershipManager)).onSubscriptionUpdated();
    }

    @Test
    public void testTopicPatternSubscriptionChangeEventWithIllegalSubscriptionState() {
        this.subscriptionState = new SubscriptionState(new LogContext(), AutoOffsetResetStrategy.EARLIEST);
        Optional<MockRebalanceListener> listener = Optional.of(new MockRebalanceListener());
        TopicPatternSubscriptionChangeEvent event = new TopicPatternSubscriptionChangeEvent(Pattern.compile("topic.*"), listener, 12345L);
        this.setupProcessor(true);
        this.subscriptionState.subscribe(Set.of("topic.1", "topic.2"), listener);
        this.processor.process((ApplicationEvent)event);
        ExecutionException e = (ExecutionException)Assertions.assertThrows(ExecutionException.class, () -> event.future().get());
        Assertions.assertInstanceOf(IllegalStateException.class, (Object)e.getCause());
        Assertions.assertEquals((Object)"Subscription to topics, partitions and pattern are mutually exclusive", (Object)e.getCause().getMessage());
    }

    @Test
    public void testUpdatePatternSubscriptionEventOnlyTakesEffectWhenMetadataHasNewVersion() {
        UpdatePatternSubscriptionEvent event1 = new UpdatePatternSubscriptionEvent(12345L);
        this.setupProcessor(true);
        Mockito.when((Object)this.subscriptionState.hasPatternSubscription()).thenReturn((Object)true);
        Mockito.when((Object)this.metadata.updateVersion()).thenReturn((Object)0);
        this.processor.process((ApplicationEvent)event1);
        Assertions.assertDoesNotThrow(() -> (Void)event1.future().get());
        Cluster cluster = (Cluster)Mockito.mock(Cluster.class);
        Set<String> topics = Set.of("topic.1", "topic.2");
        Mockito.when((Object)this.metadata.updateVersion()).thenReturn((Object)1);
        Mockito.when((Object)this.subscriptionState.hasPatternSubscription()).thenReturn((Object)true);
        Mockito.when((Object)this.metadata.fetch()).thenReturn((Object)cluster);
        Mockito.when((Object)this.heartbeatRequestManager.membershipManager()).thenReturn((Object)this.membershipManager);
        Mockito.when((Object)cluster.topics()).thenReturn(topics);
        Mockito.when((Object)this.subscriptionState.matchesSubscribedPattern("topic.1")).thenReturn((Object)true);
        Mockito.when((Object)this.subscriptionState.matchesSubscribedPattern("topic.2")).thenReturn((Object)true);
        Mockito.when((Object)this.subscriptionState.subscribeFromPattern(topics)).thenReturn((Object)true);
        Mockito.when((Object)this.metadata.requestUpdateForNewTopics()).thenReturn((Object)1);
        UpdatePatternSubscriptionEvent event2 = new UpdatePatternSubscriptionEvent(12345L);
        this.processor.process((ApplicationEvent)event2);
        ((ConsumerMetadata)Mockito.verify((Object)this.metadata)).requestUpdateForNewTopics();
        ((SubscriptionState)Mockito.verify((Object)this.subscriptionState)).subscribeFromPattern(topics);
        Assertions.assertEquals((int)1, (int)this.processor.metadataVersionSnapshot());
        ((ConsumerMembershipManager)Mockito.verify((Object)this.membershipManager)).onSubscriptionUpdated();
        Assertions.assertDoesNotThrow(() -> (Void)event2.future().get());
    }

    @Test
    public void testR2JPatternSubscriptionEventSuccess() {
        SubscriptionPattern pattern = new SubscriptionPattern("t*");
        Optional<ConsumerRebalanceListener> listener = Optional.of((ConsumerRebalanceListener)Mockito.mock(ConsumerRebalanceListener.class));
        TopicRe2JPatternSubscriptionChangeEvent event = new TopicRe2JPatternSubscriptionChangeEvent(pattern, listener, 12345L);
        this.setupProcessor(true);
        this.processor.process((ApplicationEvent)event);
        ((SubscriptionState)Mockito.verify((Object)this.subscriptionState)).subscribe(pattern, listener);
        ((SubscriptionState)Mockito.verify((Object)this.subscriptionState, (VerificationMode)Mockito.never())).subscribeFromPattern((Set)ArgumentMatchers.any());
        ((ConsumerMembershipManager)Mockito.verify((Object)this.membershipManager)).onSubscriptionUpdated();
        Assertions.assertDoesNotThrow(() -> (Void)event.future().get());
    }

    @Test
    public void testR2JPatternSubscriptionEventFailureWithMixedSubscriptionType() {
        SubscriptionPattern pattern = new SubscriptionPattern("t*");
        Optional<ConsumerRebalanceListener> listener = Optional.of((ConsumerRebalanceListener)Mockito.mock(ConsumerRebalanceListener.class));
        TopicRe2JPatternSubscriptionChangeEvent event = new TopicRe2JPatternSubscriptionChangeEvent(pattern, listener, 12345L);
        IllegalStateException mixedSubscriptionError = new IllegalStateException("Subscription to topics, partitions and pattern are mutually exclusive");
        ((SubscriptionState)Mockito.doThrow((Throwable[])new Throwable[]{mixedSubscriptionError}).when((Object)this.subscriptionState)).subscribe(pattern, listener);
        this.setupProcessor(true);
        this.processor.process((ApplicationEvent)event);
        ((SubscriptionState)Mockito.verify((Object)this.subscriptionState)).subscribe(pattern, listener);
        Exception thrown = TestUtils.assertFutureThrows(IllegalStateException.class, event.future());
        Assertions.assertEquals((Object)mixedSubscriptionError, (Object)thrown);
    }

    @Test
    public void testSyncCommitEventWithEmptyOffsets() {
        Map<TopicPartition, OffsetAndMetadata> allConsumed = Map.of(new TopicPartition("topic", 0), new OffsetAndMetadata(10L, Optional.of(1), ""));
        SyncCommitEvent event = new SyncCommitEvent(Optional.empty(), 12345L);
        this.setupProcessor(true);
        ((SubscriptionState)Mockito.doReturn(allConsumed).when((Object)this.subscriptionState)).allConsumed();
        ((CommitRequestManager)Mockito.doReturn(CompletableFuture.completedFuture(allConsumed)).when((Object)this.commitRequestManager)).commitSync(allConsumed, 12345L);
        this.processor.process((ApplicationEvent)event);
        ((CommitRequestManager)Mockito.verify((Object)this.commitRequestManager)).commitSync(allConsumed, 12345L);
        Assertions.assertTrue((boolean)event.offsetsReady.isDone());
        Map committedOffsets = (Map)Assertions.assertDoesNotThrow(() -> (Map)event.future().get());
        Assertions.assertEquals(allConsumed, (Object)committedOffsets);
    }

    @Test
    public void testSyncCommitEvent() {
        Map<TopicPartition, OffsetAndMetadata> offsets = Map.of(new TopicPartition("topic", 0), new OffsetAndMetadata(10L, Optional.of(1), ""));
        SyncCommitEvent event = new SyncCommitEvent(Optional.of(offsets), 12345L);
        this.setupProcessor(true);
        ((CommitRequestManager)Mockito.doReturn(CompletableFuture.completedFuture(offsets)).when((Object)this.commitRequestManager)).commitSync(offsets, 12345L);
        this.processor.process((ApplicationEvent)event);
        ((CommitRequestManager)Mockito.verify((Object)this.commitRequestManager)).commitSync(offsets, 12345L);
        Assertions.assertTrue((boolean)event.offsetsReady.isDone());
        Map committedOffsets = (Map)Assertions.assertDoesNotThrow(() -> (Map)event.future().get());
        Assertions.assertEquals(offsets, (Object)committedOffsets);
    }

    @Test
    public void testSyncCommitEventWithoutCommitRequestManager() {
        SyncCommitEvent event = new SyncCommitEvent(Optional.empty(), 12345L);
        this.setupProcessor(false);
        this.processor.process((ApplicationEvent)event);
        TestUtils.assertFutureThrows(KafkaException.class, event.future());
    }

    @Test
    public void testSyncCommitEventWithException() {
        SyncCommitEvent event = new SyncCommitEvent(Optional.empty(), 12345L);
        this.setupProcessor(true);
        CompletableFuture future = new CompletableFuture();
        future.completeExceptionally(new IllegalStateException());
        ((CommitRequestManager)Mockito.doReturn(future).when((Object)this.commitRequestManager)).commitSync((Map)ArgumentMatchers.any(), ArgumentMatchers.anyLong());
        this.processor.process((ApplicationEvent)event);
        ((CommitRequestManager)Mockito.verify((Object)this.commitRequestManager)).commitSync(Collections.emptyMap(), 12345L);
        Assertions.assertTrue((boolean)event.offsetsReady.isDone());
        TestUtils.assertFutureThrows(IllegalStateException.class, event.future());
    }

    @Test
    public void testAsyncCommitEventWithEmptyOffsets() {
        Map<TopicPartition, OffsetAndMetadata> allConsumed = Map.of(new TopicPartition("topic", 0), new OffsetAndMetadata(10L, Optional.of(1), ""));
        AsyncCommitEvent event = new AsyncCommitEvent(Optional.empty());
        this.setupProcessor(true);
        ((CommitRequestManager)Mockito.doReturn(CompletableFuture.completedFuture(allConsumed)).when((Object)this.commitRequestManager)).commitAsync(allConsumed);
        ((SubscriptionState)Mockito.doReturn(allConsumed).when((Object)this.subscriptionState)).allConsumed();
        this.processor.process((ApplicationEvent)event);
        ((CommitRequestManager)Mockito.verify((Object)this.commitRequestManager)).commitAsync(allConsumed);
        Assertions.assertTrue((boolean)event.offsetsReady.isDone());
        Map committedOffsets = (Map)Assertions.assertDoesNotThrow(() -> (Map)event.future().get());
        Assertions.assertEquals(allConsumed, (Object)committedOffsets);
    }

    @Test
    public void testAsyncCommitEvent() {
        Map<TopicPartition, OffsetAndMetadata> offsets = Map.of(new TopicPartition("topic", 0), new OffsetAndMetadata(10L, Optional.of(1), ""));
        AsyncCommitEvent event = new AsyncCommitEvent(Optional.of(offsets));
        this.setupProcessor(true);
        ((CommitRequestManager)Mockito.doReturn(CompletableFuture.completedFuture(offsets)).when((Object)this.commitRequestManager)).commitAsync(offsets);
        this.processor.process((ApplicationEvent)event);
        ((CommitRequestManager)Mockito.verify((Object)this.commitRequestManager)).commitAsync(offsets);
        Assertions.assertTrue((boolean)event.offsetsReady.isDone());
        Map committedOffsets = (Map)Assertions.assertDoesNotThrow(() -> (Map)event.future().get());
        Assertions.assertEquals(offsets, (Object)committedOffsets);
    }

    @Test
    public void testAsyncCommitEventWithoutCommitRequestManager() {
        AsyncCommitEvent event = new AsyncCommitEvent(Optional.empty());
        this.setupProcessor(false);
        this.processor.process((ApplicationEvent)event);
        TestUtils.assertFutureThrows(KafkaException.class, event.future());
    }

    @Test
    public void testAsyncCommitEventWithException() {
        AsyncCommitEvent event = new AsyncCommitEvent(Optional.empty());
        this.setupProcessor(true);
        ((SubscriptionState)Mockito.doReturn(Collections.emptyMap()).when((Object)this.subscriptionState)).allConsumed();
        CompletableFuture future = new CompletableFuture();
        future.completeExceptionally(new IllegalStateException());
        ((CommitRequestManager)Mockito.doReturn(future).when((Object)this.commitRequestManager)).commitAsync((Map)ArgumentMatchers.any());
        this.processor.process((ApplicationEvent)event);
        ((CommitRequestManager)Mockito.verify((Object)this.commitRequestManager)).commitAsync(Collections.emptyMap());
        Assertions.assertTrue((boolean)event.offsetsReady.isDone());
        TestUtils.assertFutureThrows(IllegalStateException.class, event.future());
    }

    @Test
    public void testStreamsOnTasksRevokedCallbackCompletedEvent() {
        this.setupStreamProcessor(true);
        StreamsOnTasksRevokedCallbackCompletedEvent event = new StreamsOnTasksRevokedCallbackCompletedEvent(new CompletableFuture(), Optional.empty());
        this.processor.process((ApplicationEvent)event);
        ((StreamsMembershipManager)Mockito.verify((Object)this.streamsMembershipManager)).onTasksRevokedCallbackCompleted(event);
    }

    @Test
    public void testStreamsOnTasksRevokedCallbackCompletedEventWithoutStreamsMembershipManager() {
        this.setupStreamProcessor(false);
        StreamsOnTasksRevokedCallbackCompletedEvent event = new StreamsOnTasksRevokedCallbackCompletedEvent(new CompletableFuture(), Optional.empty());
        try (LogCaptureAppender logAppender = LogCaptureAppender.createAndRegister();){
            logAppender.setClassLogger(ApplicationEventProcessor.class, Level.WARN);
            this.processor.process((ApplicationEvent)event);
            Assertions.assertTrue((boolean)logAppender.getMessages().stream().anyMatch(e -> e.contains("An internal error occurred; the Streams membership manager was not present, so the notification of the onTasksRevoked callback execution could not be sent")));
            ((StreamsMembershipManager)Mockito.verify((Object)this.streamsMembershipManager, (VerificationMode)Mockito.never())).onTasksRevokedCallbackCompleted(event);
        }
    }

    @Test
    public void testStreamsOnTasksAssignedCallbackCompletedEvent() {
        this.setupStreamProcessor(true);
        StreamsOnTasksAssignedCallbackCompletedEvent event = new StreamsOnTasksAssignedCallbackCompletedEvent(new CompletableFuture(), Optional.empty());
        this.processor.process((ApplicationEvent)event);
        ((StreamsMembershipManager)Mockito.verify((Object)this.streamsMembershipManager)).onTasksAssignedCallbackCompleted(event);
    }

    @Test
    public void testStreamsOnTasksAssignedCallbackCompletedEventWithoutStreamsMembershipManager() {
        this.setupStreamProcessor(false);
        StreamsOnTasksAssignedCallbackCompletedEvent event = new StreamsOnTasksAssignedCallbackCompletedEvent(new CompletableFuture(), Optional.empty());
        try (LogCaptureAppender logAppender = LogCaptureAppender.createAndRegister();){
            logAppender.setClassLogger(ApplicationEventProcessor.class, Level.WARN);
            this.processor.process((ApplicationEvent)event);
            Assertions.assertTrue((boolean)logAppender.getMessages().stream().anyMatch(e -> e.contains("An internal error occurred; the Streams membership manager was not present, so the notification of the onTasksAssigned callback execution could not be sent")));
            ((StreamsMembershipManager)Mockito.verify((Object)this.streamsMembershipManager, (VerificationMode)Mockito.never())).onTasksAssignedCallbackCompleted(event);
        }
    }

    @Test
    public void testStreamsOnAllTasksLostCallbackCompletedEvent() {
        this.setupStreamProcessor(true);
        StreamsOnAllTasksLostCallbackCompletedEvent event = new StreamsOnAllTasksLostCallbackCompletedEvent(new CompletableFuture(), Optional.empty());
        this.processor.process((ApplicationEvent)event);
        ((StreamsMembershipManager)Mockito.verify((Object)this.streamsMembershipManager)).onAllTasksLostCallbackCompleted(event);
    }

    @Test
    public void testStreamsOnAllTasksLostCallbackCompletedEventWithoutStreamsMembershipManager() {
        this.setupStreamProcessor(false);
        StreamsOnAllTasksLostCallbackCompletedEvent event = new StreamsOnAllTasksLostCallbackCompletedEvent(new CompletableFuture(), Optional.empty());
        try (LogCaptureAppender logAppender = LogCaptureAppender.createAndRegister();){
            logAppender.setClassLogger(ApplicationEventProcessor.class, Level.WARN);
            this.processor.process((ApplicationEvent)event);
            Assertions.assertTrue((boolean)logAppender.getMessages().stream().anyMatch(e -> e.contains("An internal error occurred; the Streams membership manager was not present, so the notification of the onAllTasksLost callback execution could not be sent")));
            ((StreamsMembershipManager)Mockito.verify((Object)this.streamsMembershipManager, (VerificationMode)Mockito.never())).onAllTasksLostCallbackCompleted(event);
        }
    }

    private List<NetworkClientDelegate.UnsentRequest> mockCommitResults() {
        return Collections.singletonList((NetworkClientDelegate.UnsentRequest)Mockito.mock(NetworkClientDelegate.UnsentRequest.class));
    }
}

