/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.network.partition.hybrid;

import java.util.ArrayDeque;
import java.util.Deque;
import java.util.Optional;
import java.util.TreeMap;
import org.apache.flink.runtime.io.network.partition.hybrid.BufferIndexAndChannel;
import org.apache.flink.runtime.io.network.partition.hybrid.HsSpillingInfoProvider;
import org.apache.flink.runtime.io.network.partition.hybrid.HsSpillingStrategy;
import org.apache.flink.runtime.io.network.partition.hybrid.HybridShuffleConfiguration;

public class HsFullSpillingStrategy
implements HsSpillingStrategy {
    private final float numBuffersTriggerSpillingRatio;
    private final float releaseBufferRatio;
    private final float releaseThreshold;

    public HsFullSpillingStrategy(HybridShuffleConfiguration hybridShuffleConfiguration) {
        this.numBuffersTriggerSpillingRatio = hybridShuffleConfiguration.getFullStrategyNumBuffersTriggerSpillingRatio();
        this.releaseThreshold = hybridShuffleConfiguration.getFullStrategyReleaseThreshold();
        this.releaseBufferRatio = hybridShuffleConfiguration.getFullStrategyReleaseBufferRatio();
    }

    @Override
    public Optional<HsSpillingStrategy.Decision> onBufferFinished(int numTotalUnSpillBuffers, int currentPoolSize) {
        return (float)numTotalUnSpillBuffers < this.numBuffersTriggerSpillingRatio * (float)currentPoolSize ? Optional.of(HsSpillingStrategy.Decision.NO_ACTION) : Optional.empty();
    }

    @Override
    public Optional<HsSpillingStrategy.Decision> onBufferConsumed(BufferIndexAndChannel consumedBuffer) {
        return Optional.of(HsSpillingStrategy.Decision.NO_ACTION);
    }

    @Override
    public Optional<HsSpillingStrategy.Decision> onMemoryUsageChanged(int numTotalRequestedBuffers, int currentPoolSize) {
        return (float)numTotalRequestedBuffers < (float)currentPoolSize * this.releaseThreshold ? Optional.of(HsSpillingStrategy.Decision.NO_ACTION) : Optional.empty();
    }

    @Override
    public HsSpillingStrategy.Decision decideActionWithGlobalInfo(HsSpillingInfoProvider spillingInfoProvider) {
        HsSpillingStrategy.Decision.Builder builder = HsSpillingStrategy.Decision.builder();
        int poolSize = spillingInfoProvider.getPoolSize();
        this.checkSpill(spillingInfoProvider, poolSize, builder);
        this.checkRelease(spillingInfoProvider, poolSize, builder);
        return builder.build();
    }

    @Override
    public HsSpillingStrategy.Decision onResultPartitionClosed(HsSpillingInfoProvider spillingInfoProvider) {
        HsSpillingStrategy.Decision.Builder builder = HsSpillingStrategy.Decision.builder();
        for (int subpartitionId = 0; subpartitionId < spillingInfoProvider.getNumSubpartitions(); ++subpartitionId) {
            builder.addBufferToSpill(subpartitionId, spillingInfoProvider.getBuffersInOrder(subpartitionId, HsSpillingInfoProvider.SpillStatus.NOT_SPILL, HsSpillingInfoProvider.ConsumeStatusWithId.ALL_ANY)).addBufferToRelease(subpartitionId, spillingInfoProvider.getBuffersInOrder(subpartitionId, HsSpillingInfoProvider.SpillStatus.ALL, HsSpillingInfoProvider.ConsumeStatusWithId.ALL_ANY));
        }
        return builder.build();
    }

    private void checkSpill(HsSpillingInfoProvider spillingInfoProvider, int poolSize, HsSpillingStrategy.Decision.Builder builder) {
        if ((float)spillingInfoProvider.getNumTotalUnSpillBuffers() < this.numBuffersTriggerSpillingRatio * (float)poolSize) {
            return;
        }
        for (int i = 0; i < spillingInfoProvider.getNumSubpartitions(); ++i) {
            builder.addBufferToSpill(i, spillingInfoProvider.getBuffersInOrder(i, HsSpillingInfoProvider.SpillStatus.NOT_SPILL, HsSpillingInfoProvider.ConsumeStatusWithId.ALL_ANY));
        }
    }

    private void checkRelease(HsSpillingInfoProvider spillingInfoProvider, int poolSize, HsSpillingStrategy.Decision.Builder builder) {
        if ((float)spillingInfoProvider.getNumTotalRequestedBuffers() < (float)poolSize * this.releaseThreshold) {
            return;
        }
        int releaseNum = (int)((float)poolSize * this.releaseBufferRatio);
        int numSubpartitions = spillingInfoProvider.getNumSubpartitions();
        int expectedSubpartitionReleaseNum = releaseNum / numSubpartitions;
        TreeMap<Integer, Deque<BufferIndexAndChannel>> bufferToRelease = new TreeMap<Integer, Deque<BufferIndexAndChannel>>();
        for (int subpartitionId = 0; subpartitionId < numSubpartitions; ++subpartitionId) {
            Deque<BufferIndexAndChannel> buffersInOrder = spillingInfoProvider.getBuffersInOrder(subpartitionId, HsSpillingInfoProvider.SpillStatus.SPILL, HsSpillingInfoProvider.ConsumeStatusWithId.ALL_ANY);
            int subpartitionReleaseNum = Math.min(buffersInOrder.size(), expectedSubpartitionReleaseNum);
            int subpartitionSurvivedNum = buffersInOrder.size() - subpartitionReleaseNum;
            while (subpartitionSurvivedNum-- != 0) {
                buffersInOrder.pollLast();
            }
            bufferToRelease.put(subpartitionId, buffersInOrder);
        }
        for (int i = 0; i < numSubpartitions; ++i) {
            Deque bufferIndexAndChannels = (Deque)bufferToRelease.get(i);
            if (bufferIndexAndChannels == null || bufferIndexAndChannels.isEmpty()) continue;
            builder.addBufferToRelease(i, bufferToRelease.getOrDefault(i, new ArrayDeque()));
        }
    }
}

