/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.state.api;

import java.util.OptionalInt;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.functions.GroupReduceFunction;
import org.apache.flink.api.common.functions.Partitioner;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.operators.MapPartitionOperator;
import org.apache.flink.api.java.operators.PartitionOperator;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.memory.ManagedMemoryUseCase;
import org.apache.flink.runtime.checkpoint.OperatorState;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.state.api.SavepointWriterOperatorFactory;
import org.apache.flink.state.api.functions.Timestamper;
import org.apache.flink.state.api.output.BoundedOneInputStreamTaskRunner;
import org.apache.flink.state.api.output.OperatorSubtaskStateReducer;
import org.apache.flink.state.api.output.TaggedOperatorSubtaskState;
import org.apache.flink.state.api.output.operators.BroadcastStateBootstrapOperator;
import org.apache.flink.state.api.output.partitioner.HashSelector;
import org.apache.flink.state.api.output.partitioner.KeyGroupRangePartitioner;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.StreamOperator;

@Deprecated
@PublicEvolving
public class BootstrapTransformation<T> {
    private final DataSet<T> dataSet;
    private final SavepointWriterOperatorFactory factory;
    @Nullable
    private final KeySelector<T, ?> originalKeySelector;
    @Nullable
    private final HashSelector<T> hashKeySelector;
    @Nullable
    private final TypeInformation<?> keyType;
    private final OptionalInt operatorMaxParallelism;
    @Nullable
    private final Timestamper<T> timestamper;

    BootstrapTransformation(DataSet<T> dataSet, OptionalInt operatorMaxParallelism, @Nullable Timestamper<T> timestamper, SavepointWriterOperatorFactory factory) {
        this.dataSet = dataSet;
        this.operatorMaxParallelism = operatorMaxParallelism;
        this.factory = factory;
        this.timestamper = timestamper;
        this.originalKeySelector = null;
        this.hashKeySelector = null;
        this.keyType = null;
    }

    <K> BootstrapTransformation(DataSet<T> dataSet, OptionalInt operatorMaxParallelism, @Nullable Timestamper<T> timestamper, SavepointWriterOperatorFactory factory, @Nonnull KeySelector<T, K> keySelector, @Nonnull TypeInformation<K> keyType) {
        this.dataSet = dataSet;
        this.operatorMaxParallelism = operatorMaxParallelism;
        this.factory = factory;
        this.timestamper = timestamper;
        this.originalKeySelector = keySelector;
        this.hashKeySelector = new HashSelector<T>(keySelector);
        this.keyType = keyType;
    }

    int getMaxParallelism(int globalMaxParallelism) {
        return this.operatorMaxParallelism.orElse(globalMaxParallelism);
    }

    DataSet<OperatorState> writeOperatorState(OperatorID operatorID, @Nullable StateBackend stateBackend, Configuration config, int globalMaxParallelism, Path savepointPath) {
        int localMaxParallelism = this.getMaxParallelism(globalMaxParallelism);
        return this.writeOperatorSubtaskStates(operatorID, stateBackend, config, savepointPath, localMaxParallelism).reduceGroup((GroupReduceFunction)new OperatorSubtaskStateReducer(operatorID, localMaxParallelism)).name("reduce(OperatorSubtaskState)");
    }

    @VisibleForTesting
    MapPartitionOperator<T, TaggedOperatorSubtaskState> writeOperatorSubtaskStates(OperatorID operatorID, @Nullable StateBackend stateBackend, Path savepointPath, int localMaxParallelism) {
        return this.writeOperatorSubtaskStates(operatorID, stateBackend, new Configuration(), savepointPath, localMaxParallelism);
    }

    private MapPartitionOperator<T, TaggedOperatorSubtaskState> writeOperatorSubtaskStates(OperatorID operatorID, @Nullable StateBackend stateBackend, Configuration additionalConfig, Path savepointPath, int localMaxParallelism) {
        PartitionOperator input = this.dataSet;
        if (this.originalKeySelector != null) {
            input = this.dataSet.partitionCustom((Partitioner)new KeyGroupRangePartitioner(localMaxParallelism), this.hashKeySelector);
        }
        StreamOperator operator = this.factory.createOperator(System.currentTimeMillis(), savepointPath);
        operator = (StreamOperator)this.dataSet.clean(operator);
        StreamConfig config = this.getConfig(operatorID, stateBackend, additionalConfig, (StreamOperator<TaggedOperatorSubtaskState>)operator);
        BoundedOneInputStreamTaskRunner<T> operatorRunner = new BoundedOneInputStreamTaskRunner<T>(config, localMaxParallelism, this.timestamper);
        MapPartitionOperator subtaskStates = (MapPartitionOperator)input.mapPartition(operatorRunner).name(operatorID.toHexString());
        if (operator instanceof BroadcastStateBootstrapOperator) {
            subtaskStates = (MapPartitionOperator)subtaskStates.setParallelism(1);
        } else {
            int currentParallelism = BootstrapTransformation.getParallelism(subtaskStates);
            if (currentParallelism > localMaxParallelism) {
                subtaskStates.setParallelism(localMaxParallelism);
            }
        }
        return subtaskStates;
    }

    @VisibleForTesting
    StreamConfig getConfig(OperatorID operatorID, @Nullable StateBackend stateBackend, Configuration additionalConfig, StreamOperator<TaggedOperatorSubtaskState> operator) {
        Configuration deepCopy = new Configuration(this.dataSet.getExecutionEnvironment().getConfiguration());
        deepCopy.addAll(additionalConfig);
        StreamConfig config = new StreamConfig(deepCopy);
        config.setChainStart();
        config.setCheckpointingEnabled(true);
        config.setCheckpointMode(CheckpointingMode.EXACTLY_ONCE);
        if (this.keyType != null) {
            TypeSerializer keySerializer = this.keyType.createSerializer(this.dataSet.getExecutionEnvironment().getConfig());
            config.setStateKeySerializer(keySerializer);
            config.setStatePartitioner(0, this.originalKeySelector);
        }
        config.setStreamOperator(operator);
        config.setOperatorName(operatorID.toHexString());
        config.setOperatorID(operatorID);
        config.setStateBackend(stateBackend);
        config.setManagedMemoryFractionOperatorOfUseCase(ManagedMemoryUseCase.STATE_BACKEND, 1.0);
        config.serializeAllConfigs();
        return config;
    }

    private static <T> int getParallelism(MapPartitionOperator<T, TaggedOperatorSubtaskState> subtaskStates) {
        int parallelism = subtaskStates.getParallelism();
        if (parallelism == -1) {
            parallelism = subtaskStates.getExecutionEnvironment().getParallelism();
        }
        return parallelism;
    }
}

