Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
232 changes: 203 additions & 29 deletions clams/app/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,11 @@
from typing import Union, Any, Optional, Dict, List, Tuple

from mmif import Mmif, Document, DocumentTypes, View
from mmif.utils.cli.describe import generate_param_hash # pytype: disable=import-error
from clams.appmetadata import AppMetadata, real_valued_primitives, python_type, map_param_kv_delimiter

logging.basicConfig(
level=logging.WARNING,
level=getattr(logging, os.environ.get('CLAMS_LOGLEVEL', 'WARNING').upper(), logging.WARNING),
format="%(asctime)s %(name)s %(levelname)-8s %(thread)d %(message)s",
datefmt="%Y-%m-%d %H:%M:%S")

Expand Down Expand Up @@ -47,7 +48,7 @@ class ClamsApp(ABC):
'description': 'The JSON body of the HTTP response will be re-formatted with 2-space indentation',
},
{
'name': 'runningTime', 'type': 'boolean', 'choices': None, 'default': False, 'multivalued': False,
'name': 'runningTime', 'type': 'boolean', 'choices': None, 'default': True, 'multivalued': False,
'description': 'The running time of the app will be recorded in the view metadata',
},
{
Expand Down Expand Up @@ -160,20 +161,19 @@ def annotate(self, mmif: Union[str, dict, Mmif], **runtime_params: List[str]) ->
hwFetch = refined.get('hwFetch', False)
runtime_recs = {}
if hwFetch:
import multiprocessing
import platform, shutil, subprocess
runtime_recs['architecture'] = platform.machine()
# runtime_recs['processor'] = platform.processor() # this only works on Windows
runtime_recs['cpu'] = f"{platform.machine()}, {multiprocessing.cpu_count()} cores"
runtime_recs['cuda'] = []
# Use cuda_profiler data if available, otherwise fallback to nvidia-smi
if cuda_profiler:
for gpu_info, peak_memory_bytes in cuda_profiler.items():
# Convert peak memory to human-readable format
peak_memory_mb = peak_memory_bytes / (1000 * 1000)
if peak_memory_mb >= 1000:
peak_memory_str = f"{peak_memory_mb / 1000:.2f} GiB"
else:
peak_memory_str = f"{peak_memory_mb:.1f} MiB"
runtime_recs['cuda'].append(f"{gpu_info}, Used {self._cuda_memory_to_str(peak_memory_bytes)}")
for gpu_name, mem_info in cuda_profiler.items():
total_str = self._cuda_memory_to_str(mem_info['total'])
available_str = self._cuda_memory_to_str(mem_info['available_before'])
peak_str = self._cuda_memory_to_str(mem_info['peak'])
runtime_recs['cuda'].append(
f"{gpu_name}, {total_str} total, {available_str} available, {peak_str} peak used"
)
elif shutil.which('nvidia-smi'):
for gpu in subprocess.run(['nvidia-smi', '--query-gpu=name,memory.total', '--format=csv,noheader'],
stdout=subprocess.PIPE).stdout.decode('utf-8').strip().split('\n'):
Expand Down Expand Up @@ -345,50 +345,224 @@ def _cuda_device_name_concat(name, mem):
mem = ClamsApp._cuda_memory_to_str(mem)
return f"{name}, With {mem}"

def _get_profile_path(self, param_hash: str) -> pathlib.Path:
"""
Get filesystem path for memory profile file.

Profile files are stored in a per-app directory under user's cache.

:param param_hash: Hash of parameters from :func:`mmif.utils.cli.describe.generate_param_hash`
:return: Path to the profile file
"""
# Sanitize app identifier for filesystem use
app_id = self.metadata.identifier.replace('/', '-').replace(':', '-')
cache_base = pathlib.Path(os.environ.get('XDG_CACHE_HOME', pathlib.Path.home() / '.cache'))
cache_dir = cache_base / 'clams' / 'memory_profiles' / app_id
return cache_dir / f"memory_{param_hash}.json"

@staticmethod
def _get_available_vram() -> int:
"""
Get currently available VRAM in bytes (GPU-wide, across all processes).

Uses nvidia-smi to get actual available memory, not just current process.

:return: Available VRAM in bytes, or 0 if unavailable
"""
try:
import subprocess
import shutil
if shutil.which('nvidia-smi'):
# Get free memory from nvidia-smi (reports GPU-wide, not per-process)
result = subprocess.run(
['nvidia-smi', '--query-gpu=memory.free', '--format=csv,noheader,nounits', '-i', '0'],
capture_output=True, text=True, timeout=5
)
if result.returncode == 0 and result.stdout.strip():
free_mb = float(result.stdout.strip())
return int(free_mb * 1024 * 1024) # Convert MB to bytes
except Exception:
pass

# Fallback to torch (only sees current process memory)
try:
import torch # pytype: disable=import-error
if not torch.cuda.is_available():
return 0

device = torch.cuda.current_device()
total = torch.cuda.get_device_properties(device).total_memory
used = max(torch.cuda.memory_allocated(device),
torch.cuda.memory_reserved(device))
return total - used
except Exception:
return 0

def _record_vram_usage(self, parameters: dict, peak_bytes: int) -> None:
"""
Record peak memory usage to profile file.

Uses atomic write (temp + rename) to avoid corruption from
concurrent writes. Only updates if new value is higher.

Profile files are JSON containing:
- peak_bytes: Peak VRAM usage by the torch process
- parameters: Original parameters for human readability

:param parameters: Request parameters (for hash and recording)
:param peak_bytes: Measured peak VRAM usage
"""
import json

if peak_bytes <= 0:
return

param_hash = generate_param_hash(parameters)
profile_path = self._get_profile_path(param_hash)

try:
profile_path.parent.mkdir(parents=True, exist_ok=True)

# Check if we should update
should_write = True
if profile_path.exists():
try:
existing_data = json.loads(profile_path.read_text())
existing = existing_data.get('peak_bytes', 0)
if peak_bytes <= existing:
should_write = False # Existing value is sufficient
else:
self.logger.debug(
f"Updating peak memory for {param_hash}: "
f"{existing/1024**3:.2f}GB -> {peak_bytes/1024**3:.2f}GB"
)
except (ValueError, IOError, json.JSONDecodeError):
pass # Corrupted file, overwrite

if should_write:
# Prepare profile data with original parameters for readability
# Filter out internal keys and non-serializable values
clean_params = {
k: v for k, v in parameters.items()
if k != self._RAW_PARAMS_KEY and not k.startswith('#')
}
profile_data = {
'peak_bytes': peak_bytes,
'parameters': clean_params
}

# Atomic write: write to temp, then rename
temp_path = profile_path.with_suffix('.tmp')
temp_path.write_text(json.dumps(profile_data, indent=2))
temp_path.rename(profile_path) # Atomic on POSIX

self.logger.info(
f"Recorded peak memory for {param_hash}: "
f"{peak_bytes/1024**3:.2f}GB"
)
except Exception as e:
self.logger.warning(f"Failed to record memory profile: {e}")

@staticmethod
def _profile_cuda_memory(func):
"""
Decorator for profiling CUDA memory usage during _annotate execution.

Decorator for profiling CUDA memory usage and managing VRAM availability.

This decorator:
1. Checks VRAM requirements before execution (if conditions met)
2. Rejects requests if insufficient VRAM
3. Records peak memory usage after execution
4. Calls empty_cache() for cleanup

:param func: The function to wrap (typically _annotate)
:return: Decorated function that returns (result, cuda_profiler)
where cuda_profiler is dict with "<GPU_NAME>, <GPU_TOTAL_MEMORY>" keys
and peak memory usage values
and dict values containing 'available_before' and 'peak' memory in bytes
"""
def wrapper(*args, **kwargs):
# Get the ClamsApp instance from the bound method
app_instance = getattr(func, '__self__', None)

cuda_profiler = {}
torch_available = False
cuda_available = False
device_count = 0

available_before = {}

try:
import torch # pytype: disable=import-error
torch_available = True
cuda_available = torch.cuda.is_available()
device_count = torch.cuda.device_count()
if cuda_available:
# Reset peak memory stats for all devices
torch.cuda.reset_peak_memory_stats('cuda')
except ImportError:
pass


# Capture available VRAM before execution and reset stats
if torch_available and cuda_available:
for device_id in range(device_count):
device_id_str = f'cuda:{device_id}'
# Get GPU-wide available memory via nvidia-smi
try:
import subprocess
import shutil
if shutil.which('nvidia-smi'):
result = subprocess.run(
['nvidia-smi', '--query-gpu=memory.free',
'--format=csv,noheader,nounits', '-i', str(device_id)],
capture_output=True, text=True, timeout=5
)
if result.returncode == 0 and result.stdout.strip():
free_mb = float(result.stdout.strip())
available_before[device_id] = int(free_mb * 1024 * 1024)
else:
# Fallback to torch (process-specific)
total = torch.cuda.get_device_properties(device_id_str).total_memory
allocated = torch.cuda.memory_allocated(device_id_str)
available_before[device_id] = total - allocated
else:
# Fallback to torch (process-specific)
total = torch.cuda.get_device_properties(device_id_str).total_memory
allocated = torch.cuda.memory_allocated(device_id_str)
available_before[device_id] = total - allocated
except Exception:
# Fallback to torch (process-specific)
total = torch.cuda.get_device_properties(device_id_str).total_memory
allocated = torch.cuda.memory_allocated(device_id_str)
available_before[device_id] = total - allocated
# Reset peak memory stats for all devices
torch.cuda.reset_peak_memory_stats('cuda')

try:
result = func(*args, **kwargs)


# Record peak memory usage
total_peak = 0
if torch_available and cuda_available and device_count > 0:
for device_id in range(device_count):
device_id = f'cuda:{device_id}'
peak_memory = torch.cuda.max_memory_allocated(device_id)
gpu_name = torch.cuda.get_device_name(device_id)
gpu_total_memory = torch.cuda.get_device_properties(device_id).total_memory
key = ClamsApp._cuda_device_name_concat(gpu_name, gpu_total_memory)
cuda_profiler[key] = peak_memory

device_id_str = f'cuda:{device_id}'
peak_memory = torch.cuda.max_memory_allocated(device_id_str)
total_peak = max(total_peak, peak_memory)
gpu_name = torch.cuda.get_device_name(device_id_str)
gpu_total_memory = torch.cuda.get_device_properties(device_id_str).total_memory
cuda_profiler[gpu_name] = {
'total': gpu_total_memory,
'available_before': available_before.get(device_id, 0),
'peak': peak_memory
}

# Record peak memory for future requests (if GPU app)
gpu_app = (
hasattr(app_instance, 'metadata') and
getattr(app_instance.metadata, 'est_gpu_mem_min', 0) > 0
)
if gpu_app and total_peak > 0:
app_instance._record_vram_usage(kwargs, total_peak)

return result, cuda_profiler
finally:
if torch_available and cuda_available:
torch.cuda.empty_cache()

return wrapper

@staticmethod
Expand Down
28 changes: 27 additions & 1 deletion clams/appmetadata/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -352,9 +352,20 @@ class AppMetadata(pydantic.BaseModel):
"a package name and its version in the string value at the minimum (e.g., ``clams-python==1.2.3``)."
)
more: Optional[Dict[str, str]] = pydantic.Field(
None,
None,
description="(optional) A string-to-string map that can be used to store any additional metadata of the app."
)
est_gpu_mem_min: int = pydantic.Field(
0,
description="(optional) Minimum GPU memory required to run the app, in megabytes (MB). "
"Set to 0 (default) if the app does not use GPU."
)
est_gpu_mem_typ: int = pydantic.Field(
0,
description="(optional) Typical GPU memory usage for default parameters, in megabytes (MB). "
"Must be equal or larger than est_gpu_mem_min. "
"Set to 0 (default) if the app does not use GPU."
)

model_config = {
'title': 'CLAMS AppMetadata',
Expand All @@ -372,6 +383,21 @@ def assign_versions(cls, data):
data.mmif_version = get_mmif_specver()
return data

@pydantic.model_validator(mode='after')
@classmethod
def validate_gpu_memory(cls, data):
import warnings
if data.est_gpu_mem_typ > 0 and data.est_gpu_mem_min > 0:
if data.est_gpu_mem_typ < data.est_gpu_mem_min:
warnings.warn(
f"est_gpu_mem_typ ({data.est_gpu_mem_typ} MB) is less than "
f"est_gpu_mem_min ({data.est_gpu_mem_min} MB). "
f"Setting est_gpu_mem_typ to {data.est_gpu_mem_min} MB.",
UserWarning
)
data.est_gpu_mem_typ = data.est_gpu_mem_min
return data

@pydantic.field_validator('identifier', mode='before')
@classmethod
def append_version(cls, val):
Expand Down
3 changes: 3 additions & 0 deletions clams/develop/templates/app/metadata.py.template
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ def appmetadata() -> AppMetadata:
# this trick can also be useful (replace ANALYZER_NAME with the pypi dist name)
analyzer_version=[l.strip().rsplit('==')[-1] for l in open(pathlib.Path(__file__).parent / 'requirements.txt').readlines() if re.match(r'^ANALYZER_NAME==', l)][0],
analyzer_license="", # short name for a software license
# GPU memory estimates (in MB). Set to 0 if the app does not use GPU.
est_gpu_mem_min=0, # estimated memory usage with minimal computation parameters
est_gpu_mem_typ=0, # estimated memory usage with default parameters, must be >= est_gpu_mem_min
)
# and then add I/O specifications: an app must have at least one input and one output
metadata.add_input(DocumentTypes.Document)
Expand Down
Loading
Loading