/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.operators.sink.committables;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.connector.sink2.Committer;
import org.apache.flink.metrics.groups.SinkCommitterMetricGroup;
import org.apache.flink.streaming.api.connector.sink2.CommittableSummary;
import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage;
import org.apache.flink.streaming.runtime.operators.sink.committables.CheckpointCommittableManager;
import org.apache.flink.streaming.runtime.operators.sink.committables.CommitRequestImpl;
import org.apache.flink.streaming.runtime.operators.sink.committables.SubtaskCommittableManager;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class CheckpointCommittableManagerImpl<CommT>
implements CheckpointCommittableManager<CommT> {
    private final Map<Integer, SubtaskCommittableManager<CommT>> subtasksCommittableManagers;
    private final long checkpointId;
    private final int numberOfSubtasks;
    private final SinkCommitterMetricGroup metricGroup;
    private static final Logger LOG = LoggerFactory.getLogger(CheckpointCommittableManagerImpl.class);

    @VisibleForTesting
    CheckpointCommittableManagerImpl(Map<Integer, SubtaskCommittableManager<CommT>> subtasksCommittableManagers, int numberOfSubtasks, long checkpointId, SinkCommitterMetricGroup metricGroup) {
        this.subtasksCommittableManagers = (Map)Preconditions.checkNotNull(subtasksCommittableManagers);
        this.numberOfSubtasks = numberOfSubtasks;
        this.checkpointId = checkpointId;
        this.metricGroup = metricGroup;
    }

    public static <CommT> CheckpointCommittableManagerImpl<CommT> forSummary(CommittableSummary<CommT> summary, SinkCommitterMetricGroup metricGroup) {
        return new CheckpointCommittableManagerImpl<CommT>(new HashMap<Integer, SubtaskCommittableManager<CommT>>(), summary.getNumberOfSubtasks(), summary.getCheckpointIdOrEOI(), metricGroup);
    }

    @Override
    public long getCheckpointId() {
        return this.checkpointId;
    }

    @Override
    public int getNumberOfSubtasks() {
        return this.numberOfSubtasks;
    }

    Collection<SubtaskCommittableManager<CommT>> getSubtaskCommittableManagers() {
        return this.subtasksCommittableManagers.values();
    }

    void addSummary(CommittableSummary<CommT> summary) {
        long checkpointId = summary.getCheckpointIdOrEOI();
        SubtaskCommittableManager manager = new SubtaskCommittableManager(summary.getNumberOfCommittables(), summary.getSubtaskId(), checkpointId, this.metricGroup);
        if (checkpointId == Long.MAX_VALUE) {
            SubtaskCommittableManager merged = this.subtasksCommittableManagers.merge(summary.getSubtaskId(), manager, SubtaskCommittableManager::merge);
            LOG.debug("Adding EOI summary (new={}}, merged={}}).", manager, (Object)merged);
        } else {
            SubtaskCommittableManager existing = this.subtasksCommittableManagers.putIfAbsent(summary.getSubtaskId(), manager);
            if (existing != null) {
                throw new UnsupportedOperationException(String.format("Received duplicate committable summary for checkpoint %s + subtask %s (new=%s, old=%s). Please check the status of FLINK-25920", checkpointId, summary.getSubtaskId(), manager, existing));
            }
            LOG.debug("Setting the summary for checkpointId {} with {}", (Object)this.checkpointId, manager);
        }
    }

    void addCommittable(CommittableWithLineage<CommT> committable) {
        this.getSubtaskCommittableManager(committable.getSubtaskId()).add(committable);
    }

    SubtaskCommittableManager<CommT> getSubtaskCommittableManager(int subtaskId) {
        SubtaskCommittableManager<CommT> committables = this.subtasksCommittableManagers.get(subtaskId);
        return (SubtaskCommittableManager)Preconditions.checkNotNull(committables, (String)"Unknown subtask for %s", (Object[])new Object[]{subtaskId});
    }

    @Override
    public CommittableSummary<CommT> getSummary(int emittingSubtaskId, int emittingNumberOfSubtasks) {
        return new CommittableSummary(emittingSubtaskId, emittingNumberOfSubtasks, this.checkpointId, this.subtasksCommittableManagers.values().stream().mapToInt(SubtaskCommittableManager::getNumCommittables).sum(), this.subtasksCommittableManagers.values().stream().mapToInt(SubtaskCommittableManager::getNumPending).sum(), this.subtasksCommittableManagers.values().stream().mapToInt(SubtaskCommittableManager::getNumFailed).sum());
    }

    boolean isFinished() {
        return this.subtasksCommittableManagers.values().stream().allMatch(SubtaskCommittableManager::isFinished);
    }

    @Override
    public Collection<CommittableWithLineage<CommT>> commit(Committer<CommT> committer) throws IOException, InterruptedException {
        Collection<CommitRequestImpl<CommT>> requests = this.getPendingRequests(true);
        requests.forEach(CommitRequestImpl::setSelected);
        committer.commit(new ArrayList<CommitRequestImpl<CommT>>(requests));
        requests.forEach(CommitRequestImpl::setCommittedIfNoError);
        Collection<CommittableWithLineage<CommT>> committed = this.drainFinished();
        this.metricGroup.setCurrentPendingCommittablesGauge(() -> this.getPendingRequests(false).size());
        return committed;
    }

    Collection<CommitRequestImpl<CommT>> getPendingRequests(boolean onlyIfFullyReceived) {
        return this.subtasksCommittableManagers.values().stream().filter(subtask -> !onlyIfFullyReceived || subtask.hasReceivedAll()).flatMap(SubtaskCommittableManager::getPendingRequests).collect(Collectors.toList());
    }

    Collection<CommittableWithLineage<CommT>> drainFinished() {
        return this.subtasksCommittableManagers.values().stream().flatMap(subtask -> subtask.drainCommitted().stream()).collect(Collectors.toList());
    }

    CheckpointCommittableManagerImpl<CommT> merge(CheckpointCommittableManagerImpl<CommT> other) {
        Preconditions.checkArgument((other.checkpointId == this.checkpointId ? 1 : 0) != 0);
        CheckpointCommittableManagerImpl<CommT> merged = this.copy();
        for (Map.Entry<Integer, SubtaskCommittableManager<CommT>> subtaskEntry : other.subtasksCommittableManagers.entrySet()) {
            merged.subtasksCommittableManagers.merge(subtaskEntry.getKey(), subtaskEntry.getValue(), SubtaskCommittableManager::merge);
        }
        return merged;
    }

    CheckpointCommittableManagerImpl<CommT> copy() {
        return new CheckpointCommittableManagerImpl<CommT>(this.subtasksCommittableManagers.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> ((SubtaskCommittableManager)e.getValue()).copy())), this.numberOfSubtasks, this.checkpointId, this.metricGroup);
    }

    public boolean equals(Object o) {
        if (this == o) {
            return true;
        }
        if (o == null || this.getClass() != o.getClass()) {
            return false;
        }
        CheckpointCommittableManagerImpl that = (CheckpointCommittableManagerImpl)o;
        return this.checkpointId == that.checkpointId && this.numberOfSubtasks == that.numberOfSubtasks && Objects.equals(this.subtasksCommittableManagers, that.subtasksCommittableManagers);
    }

    public int hashCode() {
        return Objects.hash(this.subtasksCommittableManagers, this.checkpointId, this.numberOfSubtasks);
    }

    public String toString() {
        return "CheckpointCommittableManagerImpl{numberOfSubtasks=" + this.numberOfSubtasks + ", checkpointId=" + this.checkpointId + ", subtasksCommittableManagers=" + this.subtasksCommittableManagers + "}";
    }
}

