/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.kstream.internals;

import java.util.Collections;
import java.util.Set;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.streams.kstream.Aggregator;
import org.apache.kafka.streams.kstream.Initializer;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.internals.GroupedInternal;
import org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder;
import org.apache.kafka.streams.kstream.internals.KStreamAggProcessorSupplier;
import org.apache.kafka.streams.kstream.internals.KStreamImpl;
import org.apache.kafka.streams.kstream.internals.KTableImpl;
import org.apache.kafka.streams.kstream.internals.NamedInternal;
import org.apache.kafka.streams.kstream.internals.graph.GraphNode;
import org.apache.kafka.streams.kstream.internals.graph.OptimizableRepartitionNode;
import org.apache.kafka.streams.kstream.internals.graph.ProcessorGraphNode;
import org.apache.kafka.streams.kstream.internals.graph.ProcessorParameters;
import org.apache.kafka.streams.processor.internals.StoreFactory;

class GroupedStreamAggregateBuilder<K, V> {
    private final InternalStreamsBuilder builder;
    private final Serde<K> keySerde;
    private final Serde<V> valueSerde;
    private final boolean repartitionRequired;
    private final String userProvidedRepartitionTopicName;
    private final Set<String> subTopologySourceNodes;
    private final String name;
    private final GraphNode graphNode;
    private GraphNode repartitionNode;
    final Initializer<Long> countInitializer = () -> 0L;
    final Aggregator<K, V, Long> countAggregator = (aggKey, value, aggregate) -> aggregate + 1L;
    final Initializer<V> reduceInitializer = () -> null;

    GroupedStreamAggregateBuilder(InternalStreamsBuilder builder, GroupedInternal<K, V> groupedInternal, boolean repartitionRequired, Set<String> subTopologySourceNodes, String name, GraphNode graphNode) {
        this.builder = builder;
        this.keySerde = groupedInternal.keySerde();
        this.valueSerde = groupedInternal.valueSerde();
        this.repartitionRequired = repartitionRequired;
        this.subTopologySourceNodes = subTopologySourceNodes;
        this.name = name;
        this.graphNode = graphNode;
        this.userProvidedRepartitionTopicName = groupedInternal.name();
    }

    <KR, VR> KTable<KR, VR> build(NamedInternal functionName, StoreFactory storeFactory, KStreamAggProcessorSupplier<K, V, KR, VR> aggregateSupplier, String queryableStoreName, Serde<KR> keySerde, Serde<VR> valueSerde, boolean isOutputVersioned) {
        assert (queryableStoreName == null || queryableStoreName.equals(storeFactory.storeName()));
        String aggFunctionName = functionName.name();
        String sourceName = this.name;
        GraphNode parentNode = this.graphNode;
        if (this.repartitionRequired) {
            OptimizableRepartitionNode.OptimizableRepartitionNodeBuilder repartitionNodeBuilder = OptimizableRepartitionNode.optimizableRepartitionNodeBuilder();
            String repartitionTopicPrefix = this.userProvidedRepartitionTopicName != null ? this.userProvidedRepartitionTopicName : storeFactory.storeName();
            sourceName = this.createRepartitionSource(repartitionTopicPrefix, repartitionNodeBuilder);
            if (this.repartitionNode == null || this.userProvidedRepartitionTopicName == null) {
                this.repartitionNode = repartitionNodeBuilder.build();
            }
            this.builder.addGraphNode(parentNode, this.repartitionNode);
            parentNode = this.repartitionNode;
        }
        ProcessorGraphNode<K, V> statefulProcessorNode = new ProcessorGraphNode<K, V>(aggFunctionName, new ProcessorParameters<K, V, KR, VR>(aggregateSupplier, aggFunctionName));
        statefulProcessorNode.setOutputVersioned(isOutputVersioned);
        this.builder.addGraphNode(parentNode, statefulProcessorNode);
        return new KTableImpl(aggFunctionName, keySerde, valueSerde, sourceName.equals(this.name) ? this.subTopologySourceNodes : Collections.singleton(sourceName), queryableStoreName, aggregateSupplier, statefulProcessorNode, this.builder);
    }

    private String createRepartitionSource(String repartitionTopicNamePrefix, OptimizableRepartitionNode.OptimizableRepartitionNodeBuilder<K, V> optimizableRepartitionNodeBuilder) {
        return KStreamImpl.createRepartitionedSource(this.builder, this.keySerde, this.valueSerde, repartitionTopicNamePrefix, null, optimizableRepartitionNodeBuilder);
    }
}

