/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.connector.kafka.dynamic.source;

import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.regex.Pattern;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.flink.annotation.Experimental;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.connector.kafka.dynamic.metadata.KafkaMetadataService;
import org.apache.flink.connector.kafka.dynamic.source.DynamicKafkaSource;
import org.apache.flink.connector.kafka.dynamic.source.DynamicKafkaSourceOptions;
import org.apache.flink.connector.kafka.dynamic.source.enumerator.subscriber.KafkaStreamSetSubscriber;
import org.apache.flink.connector.kafka.dynamic.source.enumerator.subscriber.KafkaStreamSubscriber;
import org.apache.flink.connector.kafka.dynamic.source.enumerator.subscriber.StreamPatternSubscriber;
import org.apache.flink.connector.kafka.source.KafkaSourceOptions;
import org.apache.flink.connector.kafka.source.enumerator.initializer.NoStoppingOffsetsInitializer;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
import org.apache.flink.util.Preconditions;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Experimental
public class DynamicKafkaSourceBuilder<T> {
    private static final Logger logger = LoggerFactory.getLogger(DynamicKafkaSourceBuilder.class);
    private KafkaStreamSubscriber kafkaStreamSubscriber = null;
    private KafkaMetadataService kafkaMetadataService = null;
    private KafkaRecordDeserializationSchema<T> deserializationSchema = null;
    private OffsetsInitializer startingOffsetsInitializer = OffsetsInitializer.earliest();
    private OffsetsInitializer stoppingOffsetsInitializer = new NoStoppingOffsetsInitializer();
    private Boundedness boundedness = Boundedness.CONTINUOUS_UNBOUNDED;
    private final Properties props = new Properties();

    DynamicKafkaSourceBuilder() {
    }

    public DynamicKafkaSourceBuilder<T> setStreamIds(Set<String> streamIds) {
        Preconditions.checkNotNull(streamIds);
        this.ensureSubscriberIsNull("streamIds");
        this.kafkaStreamSubscriber = new KafkaStreamSetSubscriber(streamIds);
        return this;
    }

    public DynamicKafkaSourceBuilder<T> setStreamPattern(Pattern streamPattern) {
        Preconditions.checkNotNull((Object)streamPattern);
        this.ensureSubscriberIsNull("stream pattern");
        this.kafkaStreamSubscriber = new StreamPatternSubscriber(streamPattern);
        return this;
    }

    public DynamicKafkaSourceBuilder<T> setKafkaStreamSubscriber(KafkaStreamSubscriber kafkaStreamSubscriber) {
        Preconditions.checkNotNull((Object)kafkaStreamSubscriber);
        this.ensureSubscriberIsNull("custom");
        this.kafkaStreamSubscriber = kafkaStreamSubscriber;
        return this;
    }

    public DynamicKafkaSourceBuilder<T> setBounded(OffsetsInitializer stoppingOffsetsInitializer) {
        this.boundedness = Boundedness.BOUNDED;
        this.stoppingOffsetsInitializer = stoppingOffsetsInitializer;
        return this;
    }

    public DynamicKafkaSourceBuilder<T> setKafkaMetadataService(KafkaMetadataService kafkaMetadataService) {
        this.kafkaMetadataService = kafkaMetadataService;
        return this;
    }

    public DynamicKafkaSourceBuilder<T> setDeserializer(KafkaRecordDeserializationSchema<T> recordDeserializer) {
        this.deserializationSchema = recordDeserializer;
        return this;
    }

    public DynamicKafkaSourceBuilder<T> setStartingOffsets(OffsetsInitializer startingOffsetsInitializer) {
        this.startingOffsetsInitializer = startingOffsetsInitializer;
        return this;
    }

    public DynamicKafkaSourceBuilder<T> setProperties(Properties properties) {
        this.props.putAll((Map<?, ?>)properties);
        return this;
    }

    public DynamicKafkaSourceBuilder<T> setProperty(String key, String value) {
        this.props.setProperty(key, value);
        return this;
    }

    public DynamicKafkaSourceBuilder<T> setGroupId(String groupId) {
        return this.setProperty("group.id", groupId);
    }

    public DynamicKafkaSourceBuilder<T> setClientIdPrefix(String prefix) {
        return this.setProperty(KafkaSourceOptions.CLIENT_ID_PREFIX.key(), prefix);
    }

    public DynamicKafkaSource<T> build() {
        logger.info("Building the DynamicKafkaSource");
        this.sanityCheck();
        this.setRequiredConsumerProperties();
        return new DynamicKafkaSource<T>(this.kafkaStreamSubscriber, this.kafkaMetadataService, this.deserializationSchema, this.startingOffsetsInitializer, this.stoppingOffsetsInitializer, this.props, this.boundedness);
    }

    private void setRequiredConsumerProperties() {
        this.maybeOverride("key.deserializer", ByteArrayDeserializer.class.getName(), true);
        this.maybeOverride("value.deserializer", ByteArrayDeserializer.class.getName(), true);
        if (!this.props.containsKey("group.id")) {
            logger.warn("Offset commit on checkpoint is disabled because {} is not specified", (Object)"group.id");
            this.maybeOverride(KafkaSourceOptions.COMMIT_OFFSETS_ON_CHECKPOINT.key(), "false", false);
        }
        this.maybeOverride("enable.auto.commit", "false", false);
        this.maybeOverride("auto.offset.reset", this.startingOffsetsInitializer.getAutoOffsetResetStrategy().name().toLowerCase(), true);
        this.maybeOverride(KafkaSourceOptions.PARTITION_DISCOVERY_INTERVAL_MS.key(), "-1", this.boundedness == Boundedness.BOUNDED);
        this.maybeOverride(DynamicKafkaSourceOptions.STREAM_METADATA_DISCOVERY_INTERVAL_MS.key(), "-1", this.boundedness == Boundedness.BOUNDED);
        this.maybeOverride(DynamicKafkaSourceOptions.STREAM_METADATA_DISCOVERY_FAILURE_THRESHOLD.key(), "0", this.boundedness == Boundedness.BOUNDED);
        this.maybeOverride(KafkaSourceOptions.CLIENT_ID_PREFIX.key(), (String)(this.props.containsKey("group.id") ? this.props.getProperty("group.id") : "DynamicKafkaSource-" + RandomStringUtils.randomAlphabetic((int)8)), false);
    }

    private boolean maybeOverride(String key, String value, boolean override) {
        boolean overridden = false;
        String userValue = this.props.getProperty(key);
        if (userValue != null) {
            if (override) {
                logger.warn(String.format("Property %s is provided but will be overridden from %s to %s", key, userValue, value));
                this.props.setProperty(key, value);
                overridden = true;
            }
        } else {
            this.props.setProperty(key, value);
        }
        return overridden;
    }

    private void sanityCheck() {
        Preconditions.checkNotNull((Object)this.kafkaStreamSubscriber, (String)"Kafka stream subscriber is required but not provided");
        Preconditions.checkNotNull((Object)this.kafkaMetadataService, (String)"Kafka Metadata Service is required but not provided");
        Preconditions.checkNotNull(this.deserializationSchema, (String)"Deserialization schema is required but not provided.");
        Preconditions.checkState((this.props.containsKey("group.id") || !this.offsetCommitEnabledManually() ? 1 : 0) != 0, (Object)String.format("Property %s is required when offset commit is enabled", "group.id"));
    }

    private boolean offsetCommitEnabledManually() {
        boolean autoCommit = this.props.containsKey("enable.auto.commit") && Boolean.parseBoolean(this.props.getProperty("enable.auto.commit"));
        boolean commitOnCheckpoint = this.props.containsKey(KafkaSourceOptions.COMMIT_OFFSETS_ON_CHECKPOINT.key()) && Boolean.parseBoolean(this.props.getProperty(KafkaSourceOptions.COMMIT_OFFSETS_ON_CHECKPOINT.key()));
        return autoCommit || commitOnCheckpoint;
    }

    private void ensureSubscriberIsNull(String attemptingSubscribeMode) {
        if (this.kafkaStreamSubscriber != null) {
            throw new IllegalStateException(String.format("Cannot use %s for consumption because a %s is already set for consumption.", attemptingSubscribeMode, this.kafkaStreamSubscriber.getClass().getSimpleName()));
        }
    }
}

