Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 20 additions & 8 deletions src/dstack/_internal/core/backends/kubernetes/compute.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,9 +117,12 @@ def __init__(self, config: KubernetesConfig):
def get_offers_by_requirements(
self, requirements: Requirements
) -> list[InstanceOfferWithAvailability]:
gpu_request = 0
if (gpu_spec := requirements.resources.gpu) is not None:
gpu_request = _get_gpu_request_from_gpu_spec(gpu_spec)
instance_offers: list[InstanceOfferWithAvailability] = []
for node in self.api.list_node().items:
if (instance_offer := _get_instance_offer_from_node(node)) is not None:
if (instance_offer := _get_instance_offer_from_node(node, gpu_request)) is not None:
instance_offers.extend(
filter_offers_by_requirements([instance_offer], requirements)
)
Expand Down Expand Up @@ -188,15 +191,15 @@ def run_job(
if (cpu_max := resources_spec.cpu.count.max) is not None:
resources_limits["cpu"] = str(cpu_max)
if (gpu_spec := resources_spec.gpu) is not None:
gpu_min = gpu_spec.count.min
if gpu_min is not None and gpu_min > 0:
if (gpu_request := _get_gpu_request_from_gpu_spec(gpu_spec)) > 0:
gpu_resource, node_affinity, node_taint = _get_pod_spec_parameters_for_gpu(
self.api, gpu_spec
)
logger.debug("Requesting GPU resource: %s=%d", gpu_resource, gpu_min)
logger.debug("Requesting GPU resource: %s=%d", gpu_resource, gpu_request)
resources_requests[gpu_resource] = str(gpu_request)
# Limit must be set (GPU resources cannot be overcommitted)
# and must be equal to request.
resources_requests[gpu_resource] = resources_limits[gpu_resource] = str(gpu_min)
resources_limits[gpu_resource] = str(gpu_request)
# It should be NoSchedule, but we also add NoExecute toleration just in case.
for effect in [TaintEffect.NO_SCHEDULE, TaintEffect.NO_EXECUTE]:
tolerations.append(
Expand Down Expand Up @@ -335,7 +338,10 @@ def update_provisioning_data(
provisioning_data.hostname = get_or_error(service_spec.cluster_ip)
pod_spec = get_or_error(pod.spec)
node = self.api.read_node(name=get_or_error(pod_spec.node_name))
if (instance_offer := _get_instance_offer_from_node(node)) is not None:
# The original offer has a list of GPUs already sliced according to pod spec's GPU resource
# request, which is inferred from dstack's GPUSpec, see _get_gpu_request_from_gpu_spec
gpu_request = len(provisioning_data.instance_type.resources.gpus)
if (instance_offer := _get_instance_offer_from_node(node, gpu_request)) is not None:
provisioning_data.instance_type = instance_offer.instance
provisioning_data.region = instance_offer.region
provisioning_data.price = instance_offer.price
Expand Down Expand Up @@ -475,7 +481,13 @@ def terminate_gateway(
)


def _get_instance_offer_from_node(node: client.V1Node) -> Optional[InstanceOfferWithAvailability]:
def _get_gpu_request_from_gpu_spec(gpu_spec: GPUSpec) -> int:
return gpu_spec.count.min or 0


def _get_instance_offer_from_node(
node: client.V1Node, gpu_request: int
) -> Optional[InstanceOfferWithAvailability]:
try:
node_name = get_or_error(get_or_error(node.metadata).name)
node_status = get_or_error(node.status)
Expand All @@ -499,7 +511,7 @@ def _get_instance_offer_from_node(node: client.V1Node) -> Optional[InstanceOffer
cpus=cpus,
cpu_arch=cpu_arch,
memory_mib=memory_mib,
gpus=gpus,
gpus=gpus[:gpu_request],
spot=False,
disk=Disk(size_mib=disk_size_mib),
),
Expand Down