/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.api.connector.sink2;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import javax.annotation.Nullable;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
import org.apache.flink.api.connector.sink.GlobalCommitter;
import org.apache.flink.api.connector.sink2.Committer;
import org.apache.flink.configuration.SinkOptions;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.metrics.groups.OperatorMetricGroup;
import org.apache.flink.metrics.groups.SinkCommitterMetricGroup;
import org.apache.flink.runtime.metrics.groups.InternalSinkCommitterMetricGroup;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
import org.apache.flink.streaming.api.connector.sink2.GlobalCommittableWrapper;
import org.apache.flink.streaming.api.connector.sink2.GlobalCommitterSerializer;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.operators.util.SimpleVersionedListState;
import org.apache.flink.streaming.api.transformations.SinkV1Adapter;
import org.apache.flink.streaming.runtime.operators.sink.committables.CheckpointCommittableManager;
import org.apache.flink.streaming.runtime.operators.sink.committables.CommittableCollector;
import org.apache.flink.streaming.runtime.operators.sink.committables.CommittableCollectorSerializer;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.function.SerializableSupplier;

@Internal
public class GlobalCommitterOperator<CommT, GlobalCommT>
extends AbstractStreamOperator<Void>
implements OneInputStreamOperator<CommittableMessage<CommT>, Void> {
    private static final ListStateDescriptor<byte[]> GLOBAL_COMMITTER_OPERATOR_RAW_STATES_DESC = new ListStateDescriptor("streaming_committer_raw_states", (TypeSerializer)BytePrimitiveArraySerializer.INSTANCE);
    private final SerializableSupplier<Committer<CommT>> committerFactory;
    private final SerializableSupplier<SimpleVersionedSerializer<CommT>> committableSerializerFactory;
    private final boolean commitOnInput;
    private ListState<GlobalCommittableWrapper<CommT, GlobalCommT>> globalCommitterState;
    private Committer<CommT> committer;
    private CommittableCollector<CommT> committableCollector;
    private long lastCompletedCheckpointId = -1L;
    private SimpleVersionedSerializer<CommT> committableSerializer;
    private SinkCommitterMetricGroup metricGroup;
    private int maxRetries;
    @Nullable
    private GlobalCommitter<CommT, GlobalCommT> globalCommitter;
    @Nullable
    private SimpleVersionedSerializer<GlobalCommT> globalCommittableSerializer;
    private List<GlobalCommT> sinkV1State = new ArrayList<GlobalCommT>();

    public GlobalCommitterOperator(SerializableSupplier<Committer<CommT>> committerFactory, SerializableSupplier<SimpleVersionedSerializer<CommT>> committableSerializerFactory, boolean commitOnInput) {
        this.committerFactory = (SerializableSupplier)Preconditions.checkNotNull(committerFactory);
        this.committableSerializerFactory = (SerializableSupplier)Preconditions.checkNotNull(committableSerializerFactory);
        this.commitOnInput = commitOnInput;
    }

    @Override
    public void setup(StreamTask<?, ?> containingTask, StreamConfig config, Output<StreamRecord<Void>> output) {
        super.setup(containingTask, config, output);
        this.committer = (Committer)this.committerFactory.get();
        this.metricGroup = InternalSinkCommitterMetricGroup.wrap((OperatorMetricGroup)this.metrics);
        this.committableCollector = CommittableCollector.of(this.metricGroup);
        this.committableSerializer = (SimpleVersionedSerializer)this.committableSerializerFactory.get();
        if (this.committer instanceof SinkV1Adapter.GlobalCommitterAdapter) {
            SinkV1Adapter.GlobalCommitterAdapter gc = (SinkV1Adapter.GlobalCommitterAdapter)this.committer;
            this.globalCommitter = gc.getGlobalCommitter();
            this.globalCommittableSerializer = gc.getGlobalCommittableSerializer();
        }
        this.maxRetries = (Integer)config.getConfiguration().get(SinkOptions.COMMITTER_RETRIES);
    }

    @Override
    public void snapshotState(StateSnapshotContext context) throws Exception {
        super.snapshotState(context);
        this.globalCommitterState.update(Collections.singletonList(new GlobalCommittableWrapper<CommT, GlobalCommT>(this.committableCollector.copy(), new ArrayList<GlobalCommT>(this.sinkV1State))));
    }

    @Override
    public void initializeState(StateInitializationContext context) throws Exception {
        super.initializeState(context);
        this.globalCommitterState = new SimpleVersionedListState<GlobalCommittableWrapper<CommT, GlobalCommT>>((ListState<byte[]>)context.getOperatorStateStore().getListState(GLOBAL_COMMITTER_OPERATOR_RAW_STATES_DESC), this.getCommitterStateSerializer());
        if (context.isRestored()) {
            ((Iterable)this.globalCommitterState.get()).forEach(cc -> {
                this.sinkV1State.addAll(cc.getGlobalCommittables());
                this.committableCollector.merge(cc.getCommittableCollector());
            });
            if (this.globalCommitter != null) {
                this.sinkV1State = this.globalCommitter.filterRecoveredCommittables(this.sinkV1State);
            }
            if (context.getRestoredCheckpointId().isPresent()) {
                this.commit(context.getRestoredCheckpointId().getAsLong());
            }
        }
    }

    private SimpleVersionedSerializer<GlobalCommittableWrapper<CommT, GlobalCommT>> getCommitterStateSerializer() {
        CommittableCollectorSerializer<CommT> committableCollectorSerializer = new CommittableCollectorSerializer<CommT>(this.committableSerializer, this.getRuntimeContext().getTaskInfo().getIndexOfThisSubtask(), this.getRuntimeContext().getTaskInfo().getMaxNumberOfParallelSubtasks(), this.metricGroup);
        return new GlobalCommitterSerializer<CommT, GlobalCommT>(committableCollectorSerializer, this.globalCommittableSerializer, this.metricGroup);
    }

    @Override
    public void notifyCheckpointComplete(long checkpointId) throws Exception {
        super.notifyCheckpointComplete(checkpointId);
        Preconditions.checkState((this.globalCommitter != null || this.sinkV1State.isEmpty() ? 1 : 0) != 0, (Object)"GlobalCommitter is required to commit SinkV1 state.");
        if (!this.commitOnInput) {
            this.commit(checkpointId);
        }
    }

    private void commit(long checkpointIdOrEOI) throws IOException, InterruptedException {
        if (this.globalCommitter != null && !this.sinkV1State.isEmpty()) {
            this.sinkV1State = this.globalCommitter.commit(this.sinkV1State);
        }
        this.lastCompletedCheckpointId = Math.max(this.lastCompletedCheckpointId, checkpointIdOrEOI);
        for (CheckpointCommittableManager<CommT> checkpointManager : this.committableCollector.getCheckpointCommittablesUpTo(this.lastCompletedCheckpointId)) {
            if (!checkpointManager.hasGloballyReceivedAll()) {
                return;
            }
            checkpointManager.commit(this.committer, this.maxRetries);
            this.committableCollector.remove(checkpointManager);
        }
    }

    @Override
    public void processElement(StreamRecord<CommittableMessage<CommT>> element) throws Exception {
        this.committableCollector.addMessage(element.getValue());
        if (this.commitOnInput) {
            this.commit(element.getValue().getCheckpointIdOrEOI());
        }
    }
}

