/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.translators;

import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.function.BiConsumer;
import java.util.function.Function;
import javax.annotation.Nullable;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.SupportsConcurrentExecutionAttempts;
import org.apache.flink.api.common.operators.SlotSharingGroup;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.streaming.api.connector.sink2.CommittableMessageTypeInfo;
import org.apache.flink.streaming.api.connector.sink2.WithPostCommitTopology;
import org.apache.flink.streaming.api.connector.sink2.WithPreCommitTopology;
import org.apache.flink.streaming.api.connector.sink2.WithPreWriteTopology;
import org.apache.flink.streaming.api.datastream.CustomSinkOperatorUidHashes;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.graph.TransformationTranslator;
import org.apache.flink.streaming.api.transformations.PartitionTransformation;
import org.apache.flink.streaming.api.transformations.PhysicalTransformation;
import org.apache.flink.streaming.api.transformations.SinkTransformation;
import org.apache.flink.streaming.api.transformations.StreamExchangeMode;
import org.apache.flink.streaming.runtime.operators.sink.CommitterOperatorFactory;
import org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperatorFactory;
import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner;
import org.apache.flink.util.Preconditions;

@Internal
public class SinkTransformationTranslator<Input, Output>
implements TransformationTranslator<Output, SinkTransformation<Input, Output>> {
    private static final String COMMITTER_NAME = "Committer";
    private static final String WRITER_NAME = "Writer";

    @Override
    public Collection<Integer> translateForBatch(SinkTransformation<Input, Output> transformation, TransformationTranslator.Context context) {
        return this.translateInternal(transformation, context, true);
    }

    @Override
    public Collection<Integer> translateForStreaming(SinkTransformation<Input, Output> transformation, TransformationTranslator.Context context) {
        return this.translateInternal(transformation, context, false);
    }

    private Collection<Integer> translateInternal(SinkTransformation<Input, Output> transformation, TransformationTranslator.Context context, boolean batch) {
        SinkExpander<Input> expander = new SinkExpander<Input>(transformation.getInputStream(), transformation.getSink(), transformation, context, batch);
        ((SinkExpander)expander).expand();
        return Collections.emptyList();
    }

    private static class SinkExpander<T> {
        private final SinkTransformation<T, ?> transformation;
        private final Sink<T> sink;
        private final TransformationTranslator.Context context;
        private final DataStream<T> inputStream;
        private final StreamExecutionEnvironment executionEnvironment;
        private final Optional<Integer> environmentParallelism;
        private final boolean isBatchMode;
        private final boolean isCheckpointingEnabled;

        public SinkExpander(DataStream<T> inputStream, Sink<T> sink, SinkTransformation<T, ?> transformation, TransformationTranslator.Context context, boolean isBatchMode) {
            this.inputStream = inputStream;
            this.executionEnvironment = inputStream.getExecutionEnvironment();
            this.environmentParallelism = this.executionEnvironment.getConfig().toConfiguration().getOptional(CoreOptions.DEFAULT_PARALLELISM);
            this.isCheckpointingEnabled = this.executionEnvironment.getCheckpointConfig().isCheckpointingEnabled();
            this.transformation = transformation;
            this.sink = sink;
            this.context = context;
            this.isBatchMode = isBatchMode;
        }

        private void expand() {
            int sizeBefore = this.executionEnvironment.getTransformations().size();
            DataStream prewritten = this.inputStream;
            if (this.sink instanceof WithPreWriteTopology) {
                prewritten = this.adjustTransformations(prewritten, ((WithPreWriteTopology)this.sink)::addPreWriteTopology, true, this.sink instanceof SupportsConcurrentExecutionAttempts);
            }
            if (this.sink instanceof TwoPhaseCommittingSink) {
                this.addCommittingTopology(this.sink, prewritten);
            } else {
                this.adjustTransformations(prewritten, input -> input.transform(SinkTransformationTranslator.WRITER_NAME, CommittableMessageTypeInfo.noOutput(), new SinkWriterOperatorFactory(this.sink)), false, this.sink instanceof SupportsConcurrentExecutionAttempts);
            }
            List<Transformation<?>> sinkTransformations = this.executionEnvironment.getTransformations().subList(sizeBefore, this.executionEnvironment.getTransformations().size());
            sinkTransformations.forEach(this.context::transform);
            while (this.executionEnvironment.getTransformations().size() > sizeBefore) {
                this.executionEnvironment.getTransformations().remove(this.executionEnvironment.getTransformations().size() - 1);
            }
        }

        private <CommT> void addCommittingTopology(Sink<T> sink, DataStream<T> inputStream) {
            TwoPhaseCommittingSink committingSink = (TwoPhaseCommittingSink)sink;
            TypeInformation typeInformation = CommittableMessageTypeInfo.of(() -> ((TwoPhaseCommittingSink)committingSink).getCommittableSerializer());
            DataStream written = this.adjustTransformations(inputStream, input -> input.transform(SinkTransformationTranslator.WRITER_NAME, typeInformation, new SinkWriterOperatorFactory(sink)), false, sink instanceof SupportsConcurrentExecutionAttempts);
            DataStream precommitted = this.addFailOverRegion(written);
            if (sink instanceof WithPreCommitTopology) {
                precommitted = this.adjustTransformations(precommitted, ((WithPreCommitTopology)sink)::addPreCommitTopology, true, false);
            }
            DataStream committed = this.adjustTransformations(precommitted, pc -> pc.transform(SinkTransformationTranslator.COMMITTER_NAME, typeInformation, new CommitterOperatorFactory(committingSink, this.isBatchMode, this.isCheckpointingEnabled)), false, false);
            if (sink instanceof WithPostCommitTopology) {
                DataStream postcommitted = this.addFailOverRegion(committed);
                this.adjustTransformations(postcommitted, pc -> {
                    ((WithPostCommitTopology)sink).addPostCommitTopology(pc);
                    return null;
                }, true, false);
            }
        }

        private <I> DataStream<I> addFailOverRegion(DataStream<I> input) {
            return new DataStream<I>(this.executionEnvironment, new PartitionTransformation<I>(input.getTransformation(), new ForwardPartitioner(), StreamExchangeMode.BATCH));
        }

        private <I, R> R adjustTransformations(DataStream<I> inputStream, Function<DataStream<I>, R> action, boolean isExpandedTopology, boolean supportsConcurrentExecutionAttempts) {
            this.executionEnvironment.setParallelism(-1);
            int numTransformsBefore = this.executionEnvironment.getTransformations().size();
            R result = action.apply(inputStream);
            List<Transformation<?>> transformations = this.executionEnvironment.getTransformations();
            List<Transformation<?>> expandedTransformations = transformations.subList(numTransformsBefore, transformations.size());
            CustomSinkOperatorUidHashes operatorsUidHashes = this.transformation.getSinkOperatorsUidHashes();
            for (Transformation<?> subTransformation : expandedTransformations) {
                String subUid = subTransformation.getUid();
                if (isExpandedTopology && subUid != null && !subUid.isEmpty()) {
                    Preconditions.checkState((this.transformation.getUid() != null && !this.transformation.getUid().isEmpty() ? 1 : 0) != 0, (Object)("Sink " + this.transformation.getName() + " requires to set a uid since its customized topology has set uid for some operators."));
                }
                this.setOperatorUidHashIfPossible(subTransformation, SinkTransformationTranslator.WRITER_NAME, operatorsUidHashes.getWriterUidHash());
                this.setOperatorUidHashIfPossible(subTransformation, SinkTransformationTranslator.COMMITTER_NAME, operatorsUidHashes.getCommitterUidHash());
                this.setOperatorUidHashIfPossible(subTransformation, "Global Committer", operatorsUidHashes.getGlobalCommitterUidHash());
                this.concatUid(subTransformation, Transformation::getUid, Transformation::setUid, subTransformation.getName());
                this.concatProperty(subTransformation, Transformation::getCoLocationGroupKey, Transformation::setCoLocationGroupKey);
                this.concatProperty(subTransformation, Transformation::getName, Transformation::setName);
                this.concatProperty(subTransformation, Transformation::getDescription, Transformation::setDescription);
                Optional ssg = this.transformation.getSlotSharingGroup();
                if (ssg.isPresent() && !subTransformation.getSlotSharingGroup().isPresent()) {
                    subTransformation.setSlotSharingGroup((SlotSharingGroup)ssg.get());
                }
                if (subTransformation.getParallelism() == -1) {
                    subTransformation.setParallelism(this.transformation.getParallelism());
                }
                if (subTransformation.getMaxParallelism() < 0 && this.transformation.getMaxParallelism() > 0) {
                    subTransformation.setMaxParallelism(this.transformation.getMaxParallelism());
                }
                if (!(subTransformation instanceof PhysicalTransformation)) continue;
                PhysicalTransformation physicalSubTransformation = (PhysicalTransformation)subTransformation;
                if (this.transformation.getChainingStrategy() != null) {
                    physicalSubTransformation.setChainingStrategy(this.transformation.getChainingStrategy());
                }
                physicalSubTransformation.setSupportsConcurrentExecutionAttempts(supportsConcurrentExecutionAttempts);
            }
            if (this.environmentParallelism.isPresent()) {
                this.executionEnvironment.getConfig().setParallelism(this.environmentParallelism.get().intValue());
            } else {
                this.executionEnvironment.getConfig().resetParallelism();
            }
            return result;
        }

        private void setOperatorUidHashIfPossible(Transformation<?> transformation, String writerName, @Nullable String operatorUidHash) {
            if (operatorUidHash == null || !transformation.getName().equals(writerName)) {
                return;
            }
            transformation.setUidHash(operatorUidHash);
        }

        private void concatUid(Transformation<?> subTransformation, Function<Transformation<?>, String> getter, BiConsumer<Transformation<?>, String> setter, @Nullable String transformationName) {
            if (transformationName != null && getter.apply(this.transformation) != null) {
                if (transformationName.equals(SinkTransformationTranslator.COMMITTER_NAME)) {
                    String committerFormat = "Sink Committer: %s";
                    setter.accept(subTransformation, String.format("Sink Committer: %s", getter.apply(this.transformation)));
                    return;
                }
                if (transformationName.equals(SinkTransformationTranslator.WRITER_NAME)) {
                    setter.accept(subTransformation, getter.apply(this.transformation));
                    return;
                }
                if (transformationName.equals("Global Committer")) {
                    String committerFormat = "Sink %s Global Committer";
                    setter.accept(subTransformation, String.format("Sink %s Global Committer", getter.apply(this.transformation)));
                    return;
                }
            }
            this.concatProperty(subTransformation, getter, setter);
        }

        private void concatProperty(Transformation<?> subTransformation, Function<Transformation<?>, String> getter, BiConsumer<Transformation<?>, String> setter) {
            if (getter.apply(this.transformation) != null && getter.apply(subTransformation) != null) {
                setter.accept(subTransformation, getter.apply(this.transformation) + ": " + getter.apply(subTransformation));
            }
        }
    }
}

