/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.network.partition.hybrid;

import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Deque;
import java.util.List;
import java.util.Optional;
import java.util.TreeMap;
import org.apache.flink.runtime.io.network.partition.hybrid.BufferIndexAndChannel;
import org.apache.flink.runtime.io.network.partition.hybrid.HsSpillingInfoProvider;
import org.apache.flink.runtime.io.network.partition.hybrid.HsSpillingStrategy;
import org.apache.flink.runtime.io.network.partition.hybrid.HsSpillingStrategyUtils;
import org.apache.flink.runtime.io.network.partition.hybrid.HybridShuffleConfiguration;

public class HsFullSpillingStrategy
implements HsSpillingStrategy {
    private final float numBuffersTriggerSpillingRatio;
    private final float releaseBufferRatio;
    private final float releaseThreshold;

    public HsFullSpillingStrategy(HybridShuffleConfiguration hybridShuffleConfiguration) {
        this.numBuffersTriggerSpillingRatio = hybridShuffleConfiguration.getFullStrategyNumBuffersTriggerSpillingRatio();
        this.releaseThreshold = hybridShuffleConfiguration.getFullStrategyReleaseThreshold();
        this.releaseBufferRatio = hybridShuffleConfiguration.getFullStrategyReleaseBufferRatio();
    }

    @Override
    public Optional<HsSpillingStrategy.Decision> onBufferFinished(int numTotalUnSpillBuffers, int currentPoolSize) {
        return (float)numTotalUnSpillBuffers < this.numBuffersTriggerSpillingRatio * (float)currentPoolSize ? Optional.of(HsSpillingStrategy.Decision.NO_ACTION) : Optional.empty();
    }

    @Override
    public Optional<HsSpillingStrategy.Decision> onBufferConsumed(BufferIndexAndChannel consumedBuffer) {
        return Optional.of(HsSpillingStrategy.Decision.NO_ACTION);
    }

    @Override
    public Optional<HsSpillingStrategy.Decision> onMemoryUsageChanged(int numTotalRequestedBuffers, int currentPoolSize) {
        return (float)numTotalRequestedBuffers < (float)currentPoolSize * this.releaseThreshold ? Optional.of(HsSpillingStrategy.Decision.NO_ACTION) : Optional.empty();
    }

    @Override
    public HsSpillingStrategy.Decision decideActionWithGlobalInfo(HsSpillingInfoProvider spillingInfoProvider) {
        HsSpillingStrategy.Decision.Builder builder = HsSpillingStrategy.Decision.builder();
        int poolSize = spillingInfoProvider.getPoolSize();
        this.checkSpill(spillingInfoProvider, poolSize, builder);
        this.checkRelease(spillingInfoProvider, poolSize, builder);
        return builder.build();
    }

    @Override
    public HsSpillingStrategy.Decision onResultPartitionClosed(HsSpillingInfoProvider spillingInfoProvider) {
        HsSpillingStrategy.Decision.Builder builder = HsSpillingStrategy.Decision.builder();
        for (int subpartitionId = 0; subpartitionId < spillingInfoProvider.getNumSubpartitions(); ++subpartitionId) {
            builder.addBufferToSpill(subpartitionId, spillingInfoProvider.getBuffersInOrder(subpartitionId, HsSpillingInfoProvider.SpillStatus.NOT_SPILL, HsSpillingInfoProvider.ConsumeStatus.ALL)).addBufferToRelease(subpartitionId, spillingInfoProvider.getBuffersInOrder(subpartitionId, HsSpillingInfoProvider.SpillStatus.ALL, HsSpillingInfoProvider.ConsumeStatus.ALL));
        }
        return builder.build();
    }

    private void checkSpill(HsSpillingInfoProvider spillingInfoProvider, int poolSize, HsSpillingStrategy.Decision.Builder builder) {
        if ((float)spillingInfoProvider.getNumTotalUnSpillBuffers() < this.numBuffersTriggerSpillingRatio * (float)poolSize) {
            return;
        }
        for (int i = 0; i < spillingInfoProvider.getNumSubpartitions(); ++i) {
            builder.addBufferToSpill(i, spillingInfoProvider.getBuffersInOrder(i, HsSpillingInfoProvider.SpillStatus.NOT_SPILL, HsSpillingInfoProvider.ConsumeStatus.ALL));
        }
    }

    private void checkRelease(HsSpillingInfoProvider spillingInfoProvider, int poolSize, HsSpillingStrategy.Decision.Builder builder) {
        if ((float)spillingInfoProvider.getNumTotalRequestedBuffers() < (float)poolSize * this.releaseThreshold) {
            return;
        }
        int releaseNum = (int)((float)spillingInfoProvider.getPoolSize() * this.releaseBufferRatio);
        TreeMap<Integer, Deque<BufferIndexAndChannel>> consumedBuffersToRelease = new TreeMap<Integer, Deque<BufferIndexAndChannel>>();
        int numConsumedBuffers = 0;
        for (int subpartitionId = 0; subpartitionId < spillingInfoProvider.getNumSubpartitions(); ++subpartitionId) {
            Deque<BufferIndexAndChannel> consumedSpillSubpartitionBuffers = spillingInfoProvider.getBuffersInOrder(subpartitionId, HsSpillingInfoProvider.SpillStatus.SPILL, HsSpillingInfoProvider.ConsumeStatus.CONSUMED);
            numConsumedBuffers += consumedSpillSubpartitionBuffers.size();
            consumedBuffersToRelease.put(subpartitionId, consumedSpillSubpartitionBuffers);
        }
        TreeMap<Integer, List<BufferIndexAndChannel>> unconsumedBufferToRelease = new TreeMap<Integer, List<BufferIndexAndChannel>>();
        if (releaseNum > numConsumedBuffers) {
            TreeMap<Integer, Deque<BufferIndexAndChannel>> unconsumedBuffers = new TreeMap<Integer, Deque<BufferIndexAndChannel>>();
            for (int subpartitionId = 0; subpartitionId < spillingInfoProvider.getNumSubpartitions(); ++subpartitionId) {
                unconsumedBuffers.put(subpartitionId, spillingInfoProvider.getBuffersInOrder(subpartitionId, HsSpillingInfoProvider.SpillStatus.SPILL, HsSpillingInfoProvider.ConsumeStatus.NOT_CONSUMED));
            }
            unconsumedBufferToRelease.putAll(HsSpillingStrategyUtils.getBuffersByConsumptionPriorityInOrder(spillingInfoProvider.getNextBufferIndexToConsume(), unconsumedBuffers, releaseNum - numConsumedBuffers));
        }
        for (int i = 0; i < spillingInfoProvider.getNumSubpartitions(); ++i) {
            ArrayList<BufferIndexAndChannel> toRelease = new ArrayList<BufferIndexAndChannel>();
            toRelease.addAll(consumedBuffersToRelease.getOrDefault(i, new ArrayDeque()));
            toRelease.addAll(unconsumedBufferToRelease.getOrDefault(i, new ArrayList()));
            builder.addBufferToRelease(i, toRelease);
        }
    }
}

