/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.scheduler;

import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.clusterframework.types.SlotProfile;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.jobmanager.scheduler.Locality;
import org.apache.flink.runtime.jobmaster.LogicalSlot;
import org.apache.flink.runtime.jobmaster.SlotRequestId;
import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlot;
import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotProvider;
import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequest;
import org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot;
import org.apache.flink.runtime.scheduler.ExecutionSlotAllocationContext;
import org.apache.flink.runtime.scheduler.ExecutionSlotAllocator;
import org.apache.flink.runtime.scheduler.ExecutionSlotAllocatorFactory;
import org.apache.flink.runtime.scheduler.ExecutionSlotAssignment;
import org.apache.flink.runtime.util.DualKeyLinkedMap;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;

public class SimpleExecutionSlotAllocator
implements ExecutionSlotAllocator {
    private final PhysicalSlotProvider slotProvider;
    private final boolean slotWillBeOccupiedIndefinitely;
    private final Function<ExecutionAttemptID, ResourceProfile> resourceProfileRetriever;
    private final DualKeyLinkedMap<ExecutionAttemptID, SlotRequestId, CompletableFuture<LogicalSlot>> requestedPhysicalSlots;

    SimpleExecutionSlotAllocator(PhysicalSlotProvider slotProvider, Function<ExecutionAttemptID, ResourceProfile> resourceProfileRetriever, boolean slotWillBeOccupiedIndefinitely) {
        this.slotProvider = (PhysicalSlotProvider)Preconditions.checkNotNull((Object)slotProvider);
        this.slotWillBeOccupiedIndefinitely = slotWillBeOccupiedIndefinitely;
        this.resourceProfileRetriever = (Function)Preconditions.checkNotNull(resourceProfileRetriever);
        this.requestedPhysicalSlots = new DualKeyLinkedMap();
    }

    @Override
    public List<ExecutionSlotAssignment> allocateSlotsFor(List<ExecutionAttemptID> executionAttemptIds) {
        return executionAttemptIds.stream().map(id -> new ExecutionSlotAssignment((ExecutionAttemptID)id, this.allocateSlotFor((ExecutionAttemptID)id))).collect(Collectors.toList());
    }

    private CompletableFuture<LogicalSlot> allocateSlotFor(ExecutionAttemptID executionAttemptId) {
        if (this.requestedPhysicalSlots.containsKeyA(executionAttemptId)) {
            return this.requestedPhysicalSlots.getValueByKeyA(executionAttemptId);
        }
        SlotRequestId slotRequestId = new SlotRequestId();
        ResourceProfile resourceProfile = this.resourceProfileRetriever.apply(executionAttemptId);
        SlotProfile slotProfile = SlotProfile.priorAllocation(resourceProfile, resourceProfile, Collections.emptyList(), Collections.emptyList(), Collections.emptySet());
        PhysicalSlotRequest request = new PhysicalSlotRequest(slotRequestId, slotProfile, this.slotWillBeOccupiedIndefinitely);
        CompletionStage slotFuture = this.slotProvider.allocatePhysicalSlot(request).thenApply(physicalSlotRequest -> this.allocateLogicalSlotFromPhysicalSlot(slotRequestId, physicalSlotRequest.getPhysicalSlot(), this.slotWillBeOccupiedIndefinitely));
        ((CompletableFuture)slotFuture).exceptionally(throwable -> {
            this.requestedPhysicalSlots.removeKeyA(executionAttemptId);
            this.slotProvider.cancelSlotRequest(slotRequestId, (Throwable)throwable);
            return null;
        });
        this.requestedPhysicalSlots.put(executionAttemptId, slotRequestId, (CompletableFuture<LogicalSlot>)slotFuture);
        return slotFuture;
    }

    @Override
    public void cancel(ExecutionAttemptID executionAttemptId) {
        CompletableFuture<LogicalSlot> slotFuture = this.requestedPhysicalSlots.getValueByKeyA(executionAttemptId);
        if (slotFuture != null) {
            slotFuture.cancel(false);
        }
    }

    private void returnLogicalSlot(LogicalSlot slot) {
        this.releaseSlot(slot, (Throwable)new FlinkException("Slot is being returned from SimpleExecutionSlotAllocator."));
    }

    private void releaseSlot(LogicalSlot slot, Throwable cause) {
        this.requestedPhysicalSlots.removeKeyB(slot.getSlotRequestId());
        this.slotProvider.cancelSlotRequest(slot.getSlotRequestId(), cause);
    }

    private LogicalSlot allocateLogicalSlotFromPhysicalSlot(SlotRequestId slotRequestId, PhysicalSlot physicalSlot, boolean slotWillBeOccupiedIndefinitely) {
        SingleLogicalSlot singleLogicalSlot = new SingleLogicalSlot(slotRequestId, physicalSlot, Locality.UNKNOWN, this::returnLogicalSlot, slotWillBeOccupiedIndefinitely);
        LogicalSlotHolder logicalSlotHolder = new LogicalSlotHolder(singleLogicalSlot);
        if (physicalSlot.tryAssignPayload(logicalSlotHolder)) {
            return singleLogicalSlot;
        }
        throw new IllegalStateException("BUG: Unexpected physical slot payload assignment failure!");
    }

    public static class Factory
    implements ExecutionSlotAllocatorFactory {
        private final PhysicalSlotProvider slotProvider;
        private final boolean slotWillBeOccupiedIndefinitely;

        public Factory(PhysicalSlotProvider slotProvider, boolean slotWillBeOccupiedIndefinitely) {
            this.slotProvider = slotProvider;
            this.slotWillBeOccupiedIndefinitely = slotWillBeOccupiedIndefinitely;
        }

        @Override
        public ExecutionSlotAllocator createInstance(ExecutionSlotAllocationContext context) {
            return new SimpleExecutionSlotAllocator(this.slotProvider, id -> context.getResourceProfile(id.getExecutionVertexId()), this.slotWillBeOccupiedIndefinitely);
        }
    }

    private class LogicalSlotHolder
    implements PhysicalSlot.Payload {
        private final SingleLogicalSlot logicalSlot;

        private LogicalSlotHolder(SingleLogicalSlot logicalSlot) {
            this.logicalSlot = (SingleLogicalSlot)Preconditions.checkNotNull((Object)logicalSlot);
        }

        @Override
        public void release(Throwable cause) {
            this.logicalSlot.release(cause);
            SimpleExecutionSlotAllocator.this.releaseSlot(this.logicalSlot, (Throwable)new FlinkException("Physical slot releases its payload."));
        }

        @Override
        public boolean willOccupySlotIndefinitely() {
            return this.logicalSlot.willOccupySlotIndefinitely();
        }
    }
}

