diff --git a/src/dstack/_internal/core/backends/kubernetes/compute.py b/src/dstack/_internal/core/backends/kubernetes/compute.py index 4f6379b17..7f8ef9123 100644 --- a/src/dstack/_internal/core/backends/kubernetes/compute.py +++ b/src/dstack/_internal/core/backends/kubernetes/compute.py @@ -117,9 +117,12 @@ def __init__(self, config: KubernetesConfig): def get_offers_by_requirements( self, requirements: Requirements ) -> list[InstanceOfferWithAvailability]: + gpu_request = 0 + if (gpu_spec := requirements.resources.gpu) is not None: + gpu_request = _get_gpu_request_from_gpu_spec(gpu_spec) instance_offers: list[InstanceOfferWithAvailability] = [] for node in self.api.list_node().items: - if (instance_offer := _get_instance_offer_from_node(node)) is not None: + if (instance_offer := _get_instance_offer_from_node(node, gpu_request)) is not None: instance_offers.extend( filter_offers_by_requirements([instance_offer], requirements) ) @@ -188,15 +191,15 @@ def run_job( if (cpu_max := resources_spec.cpu.count.max) is not None: resources_limits["cpu"] = str(cpu_max) if (gpu_spec := resources_spec.gpu) is not None: - gpu_min = gpu_spec.count.min - if gpu_min is not None and gpu_min > 0: + if (gpu_request := _get_gpu_request_from_gpu_spec(gpu_spec)) > 0: gpu_resource, node_affinity, node_taint = _get_pod_spec_parameters_for_gpu( self.api, gpu_spec ) - logger.debug("Requesting GPU resource: %s=%d", gpu_resource, gpu_min) + logger.debug("Requesting GPU resource: %s=%d", gpu_resource, gpu_request) + resources_requests[gpu_resource] = str(gpu_request) # Limit must be set (GPU resources cannot be overcommitted) # and must be equal to request. - resources_requests[gpu_resource] = resources_limits[gpu_resource] = str(gpu_min) + resources_limits[gpu_resource] = str(gpu_request) # It should be NoSchedule, but we also add NoExecute toleration just in case. for effect in [TaintEffect.NO_SCHEDULE, TaintEffect.NO_EXECUTE]: tolerations.append( @@ -335,7 +338,10 @@ def update_provisioning_data( provisioning_data.hostname = get_or_error(service_spec.cluster_ip) pod_spec = get_or_error(pod.spec) node = self.api.read_node(name=get_or_error(pod_spec.node_name)) - if (instance_offer := _get_instance_offer_from_node(node)) is not None: + # The original offer has a list of GPUs already sliced according to pod spec's GPU resource + # request, which is inferred from dstack's GPUSpec, see _get_gpu_request_from_gpu_spec + gpu_request = len(provisioning_data.instance_type.resources.gpus) + if (instance_offer := _get_instance_offer_from_node(node, gpu_request)) is not None: provisioning_data.instance_type = instance_offer.instance provisioning_data.region = instance_offer.region provisioning_data.price = instance_offer.price @@ -475,7 +481,13 @@ def terminate_gateway( ) -def _get_instance_offer_from_node(node: client.V1Node) -> Optional[InstanceOfferWithAvailability]: +def _get_gpu_request_from_gpu_spec(gpu_spec: GPUSpec) -> int: + return gpu_spec.count.min or 0 + + +def _get_instance_offer_from_node( + node: client.V1Node, gpu_request: int +) -> Optional[InstanceOfferWithAvailability]: try: node_name = get_or_error(get_or_error(node.metadata).name) node_status = get_or_error(node.status) @@ -499,7 +511,7 @@ def _get_instance_offer_from_node(node: client.V1Node) -> Optional[InstanceOffer cpus=cpus, cpu_arch=cpu_arch, memory_mib=memory_mib, - gpus=gpus, + gpus=gpus[:gpu_request], spot=False, disk=Disk(size_mib=disk_size_mib), ),