diff --git a/clams/app/__init__.py b/clams/app/__init__.py index 6550327..b10af76 100644 --- a/clams/app/__init__.py +++ b/clams/app/__init__.py @@ -8,15 +8,21 @@ from datetime import datetime from urllib import parse as urlparser -__all__ = ['ClamsApp'] +__all__ = ['ClamsApp', 'InsufficientVRAMError'] + + +class InsufficientVRAMError(RuntimeError): + """Raised when insufficient GPU memory is available for processing.""" + pass from typing import Union, Any, Optional, Dict, List, Tuple from mmif import Mmif, Document, DocumentTypes, View +from mmif.utils.cli.describe import generate_param_hash # pytype: disable=import-error from clams.appmetadata import AppMetadata, real_valued_primitives, python_type, map_param_kv_delimiter logging.basicConfig( - level=logging.WARNING, + level=getattr(logging, os.environ.get('CLAMS_LOGLEVEL', 'WARNING').upper(), logging.WARNING), format="%(asctime)s %(name)s %(levelname)-8s %(thread)d %(message)s", datefmt="%Y-%m-%d %H:%M:%S") @@ -47,7 +53,7 @@ class ClamsApp(ABC): 'description': 'The JSON body of the HTTP response will be re-formatted with 2-space indentation', }, { - 'name': 'runningTime', 'type': 'boolean', 'choices': None, 'default': False, 'multivalued': False, + 'name': 'runningTime', 'type': 'boolean', 'choices': None, 'default': True, 'multivalued': False, 'description': 'The running time of the app will be recorded in the view metadata', }, { @@ -166,14 +172,12 @@ def annotate(self, mmif: Union[str, dict, Mmif], **runtime_params: List[str]) -> runtime_recs['cuda'] = [] # Use cuda_profiler data if available, otherwise fallback to nvidia-smi if cuda_profiler: - for gpu_info, peak_memory_bytes in cuda_profiler.items(): - # Convert peak memory to human-readable format - peak_memory_mb = peak_memory_bytes / (1000 * 1000) - if peak_memory_mb >= 1000: - peak_memory_str = f"{peak_memory_mb / 1000:.2f} GiB" - else: - peak_memory_str = f"{peak_memory_mb:.1f} MiB" - runtime_recs['cuda'].append(f"{gpu_info}, Used {self._cuda_memory_to_str(peak_memory_bytes)}") + for gpu_info, mem_info in cuda_profiler.items(): + available_str = self._cuda_memory_to_str(mem_info['available_before']) + peak_str = self._cuda_memory_to_str(mem_info['peak']) + runtime_recs['cuda'].append( + f"{gpu_info}, {available_str} available, {peak_str} peak used" + ) elif shutil.which('nvidia-smi'): for gpu in subprocess.run(['nvidia-smi', '--query-gpu=name,memory.total', '--format=csv,noheader'], stdout=subprocess.PIPE).stdout.decode('utf-8').strip().split('\n'): @@ -345,50 +349,301 @@ def _cuda_device_name_concat(name, mem): mem = ClamsApp._cuda_memory_to_str(mem) return f"{name}, With {mem}" + def _get_profile_path(self, param_hash: str) -> pathlib.Path: + """ + Get filesystem path for memory profile file. + + Profile files are stored in a per-app directory under user's cache. + + :param param_hash: Hash of parameters from :func:`mmif.utils.cli.describe.generate_param_hash` + :return: Path to the profile file + """ + # Sanitize app identifier for filesystem use + app_id = self.metadata.identifier.replace('/', '-').replace(':', '-') + cache_base = pathlib.Path(os.environ.get('XDG_CACHE_HOME', pathlib.Path.home() / '.cache')) + cache_dir = cache_base / 'clams' / 'memory_profiles' / app_id + return cache_dir / f"memory_{param_hash}.txt" + + @staticmethod + def _check_vram_available(required_bytes: int, safety_margin: float = 0.1) -> bool: + """ + Check if sufficient VRAM is currently available (GPU-wide). + + :param required_bytes: Bytes needed for model + :param safety_margin: Additional safety buffer as fraction of required (default 10%) + :return: True if sufficient VRAM available + """ + try: + available = ClamsApp._get_available_vram() + if available == 0: + # Can't determine available VRAM, fail open + return True + + # Apply safety margin to required bytes + required_with_margin = required_bytes * (1 + safety_margin) + + return available >= required_with_margin + + except Exception: + # If we can't check, fail open (allow the request) + return True + + @staticmethod + def _get_available_vram() -> int: + """ + Get currently available VRAM in bytes (GPU-wide, across all processes). + + Uses nvidia-smi to get actual available memory, not just current process. + + :return: Available VRAM in bytes, or 0 if unavailable + """ + try: + import subprocess + import shutil + if shutil.which('nvidia-smi'): + # Get free memory from nvidia-smi (reports GPU-wide, not per-process) + result = subprocess.run( + ['nvidia-smi', '--query-gpu=memory.free', '--format=csv,noheader,nounits', '-i', '0'], + capture_output=True, text=True, timeout=5 + ) + if result.returncode == 0 and result.stdout.strip(): + free_mb = float(result.stdout.strip()) + return int(free_mb * 1024 * 1024) # Convert MB to bytes + except Exception: + pass + + # Fallback to torch (only sees current process memory) + try: + import torch # pytype: disable=import-error + if not torch.cuda.is_available(): + return 0 + + device = torch.cuda.current_device() + total = torch.cuda.get_device_properties(device).total_memory + used = max(torch.cuda.memory_allocated(device), + torch.cuda.memory_reserved(device)) + return total - used + except Exception: + return 0 + + def _get_estimated_vram_usage(self, **parameters) -> Optional[Dict[str, Any]]: + """ + Get model memory requirements for VRAM checking. + + Default implementation uses conservative 80% for first request, + then historical measurements for subsequent requests. + + Apps can override this to provide explicit model sizes. + + :param parameters: Runtime parameters from the request + :return: Dict with 'size_bytes', 'name', and 'source', or None + """ + param_hash = generate_param_hash(parameters) + profile_path = self._get_profile_path(param_hash) + + # Priority 1: Historical measurement + if profile_path.exists(): + try: + measured = int(profile_path.read_text().strip()) + return { + 'size_bytes': int(measured * 1.2), # 20% safety buffer + 'name': f'params:{param_hash}', + 'source': 'historical' + } + except (ValueError, IOError) as e: + self.logger.warning(f"Failed to read profile {profile_path}: {e}") + + # Priority 2: Conservative first request (80% of total VRAM) + try: + import torch # pytype: disable=import-error + if torch.cuda.is_available(): + device = torch.cuda.current_device() + total_vram = torch.cuda.get_device_properties(device).total_memory + conservative_requirement = int(total_vram * 0.8) + + self.logger.info( + f"First request for params:{param_hash}: " + f"requesting 80% of VRAM ({conservative_requirement/1024**3:.2f}GB) " + f"until actual usage is measured" + ) + + return { + 'size_bytes': conservative_requirement, + 'name': f'params:{param_hash}', + 'source': 'conservative-first-request' + } + except ImportError: + pass + except Exception as e: + self.logger.warning(f"Failed to get CUDA info: {e}") + + return None + + def _record_vram_usage(self, parameters: dict, peak_bytes: int) -> None: + """ + Record peak memory usage to profile file. + + Uses atomic write (temp + rename) to avoid corruption from + concurrent writes. Only updates if new value is higher. + + :param parameters: Request parameters (for hash) + :param peak_bytes: Measured peak VRAM usage + """ + if peak_bytes <= 0: + return + + param_hash = generate_param_hash(parameters) + profile_path = self._get_profile_path(param_hash) + + try: + profile_path.parent.mkdir(parents=True, exist_ok=True) + + # Check if we should update + should_write = True + if profile_path.exists(): + try: + existing = int(profile_path.read_text().strip()) + if peak_bytes <= existing: + should_write = False # Existing value is sufficient + else: + self.logger.debug( + f"Updating peak memory for {param_hash}: " + f"{existing/1024**3:.2f}GB -> {peak_bytes/1024**3:.2f}GB" + ) + except (ValueError, IOError): + pass # Corrupted file, overwrite + + if should_write: + # Atomic write: write to temp, then rename + temp_path = profile_path.with_suffix('.tmp') + temp_path.write_text(str(peak_bytes)) + temp_path.rename(profile_path) # Atomic on POSIX + + self.logger.info( + f"Recorded peak memory for {param_hash}: " + f"{peak_bytes/1024**3:.2f}GB" + ) + except Exception as e: + self.logger.warning(f"Failed to record memory profile: {e}") + @staticmethod def _profile_cuda_memory(func): """ - Decorator for profiling CUDA memory usage during _annotate execution. - + Decorator for profiling CUDA memory usage and managing VRAM availability. + + This decorator: + 1. Checks VRAM requirements before execution (if conditions met) + 2. Rejects requests if insufficient VRAM + 3. Records peak memory usage after execution + 4. Calls empty_cache() for cleanup + :param func: The function to wrap (typically _annotate) :return: Decorated function that returns (result, cuda_profiler) where cuda_profiler is dict with ", " keys - and peak memory usage values + and dict values containing 'available_before' and 'peak' memory in bytes """ def wrapper(*args, **kwargs): + # Get the ClamsApp instance from the bound method + app_instance = getattr(func, '__self__', None) + cuda_profiler = {} torch_available = False cuda_available = False device_count = 0 - + available_before = {} + try: import torch # pytype: disable=import-error torch_available = True cuda_available = torch.cuda.is_available() device_count = torch.cuda.device_count() - if cuda_available: - # Reset peak memory stats for all devices - torch.cuda.reset_peak_memory_stats('cuda') except ImportError: pass - + + # VRAM checking: only when torch available, CUDA available, and app declares GPU usage + should_check_vram = ( + torch_available and + cuda_available and + hasattr(app_instance, 'metadata') and + getattr(app_instance.metadata, 'gpu_mem_min', 0) > 0 + ) + + if should_check_vram: + requirements = app_instance._get_estimated_vram_usage(**kwargs) + + if requirements: + required_bytes = requirements['size_bytes'] + model_name = requirements.get('name', 'model') + source = requirements.get('source', 'unknown') + + # Check if sufficient VRAM available RIGHT NOW + if not ClamsApp._check_vram_available(required_bytes): + available_gb = ClamsApp._get_available_vram() / 1024**3 + required_gb = required_bytes / 1024**3 + required_with_buffer_gb = required_gb * 1.1 # 10% safety margin + + error_msg = ( + f"Insufficient GPU memory for {model_name}. " + f"Tried to allocate {required_with_buffer_gb:.2f}GB " + f"(estimated {required_gb:.2f}GB + 10% buffer), " + f"available {available_gb:.2f}GB. " + ) + if source == 'conservative-first-request': + error_msg += ( + "This is a first request with this parameter set. " + "Conservative 80% VRAM requirement applied. " + ) + error_msg += ( + "GPU may be in use by other processes. " + "Please retry later." + ) + + app_instance.logger.error(error_msg) + raise InsufficientVRAMError(error_msg) + + app_instance.logger.info( + f"VRAM check passed for {model_name} ({source}): " + f"{required_bytes/1024**3:.2f}GB requested, " + f"{ClamsApp._get_available_vram()/1024**3:.2f}GB available" + ) + + # Capture available VRAM before execution and reset stats + if torch_available and cuda_available: + for device_id in range(device_count): + device_id_str = f'cuda:{device_id}' + total = torch.cuda.get_device_properties(device_id_str).total_memory + allocated = torch.cuda.memory_allocated(device_id_str) + available_before[device_id] = total - allocated + # Reset peak memory stats for all devices + torch.cuda.reset_peak_memory_stats('cuda') + try: result = func(*args, **kwargs) - + + # Record peak memory usage + total_peak = 0 if torch_available and cuda_available and device_count > 0: for device_id in range(device_count): - device_id = f'cuda:{device_id}' - peak_memory = torch.cuda.max_memory_allocated(device_id) - gpu_name = torch.cuda.get_device_name(device_id) - gpu_total_memory = torch.cuda.get_device_properties(device_id).total_memory + device_id_str = f'cuda:{device_id}' + peak_memory = torch.cuda.max_memory_allocated(device_id_str) + total_peak = max(total_peak, peak_memory) + gpu_name = torch.cuda.get_device_name(device_id_str) + gpu_total_memory = torch.cuda.get_device_properties(device_id_str).total_memory key = ClamsApp._cuda_device_name_concat(gpu_name, gpu_total_memory) - cuda_profiler[key] = peak_memory - + cuda_profiler[key] = { + 'available_before': available_before.get(device_id, 0), + 'peak': peak_memory + } + + # Record peak memory for future requests (if VRAM checking enabled) + if should_check_vram and total_peak > 0: + app_instance._record_vram_usage(kwargs, total_peak) + return result, cuda_profiler finally: if torch_available and cuda_available: torch.cuda.empty_cache() - + return wrapper @staticmethod diff --git a/clams/appmetadata/__init__.py b/clams/appmetadata/__init__.py index 4a7bd3c..cd8b2ca 100644 --- a/clams/appmetadata/__init__.py +++ b/clams/appmetadata/__init__.py @@ -352,9 +352,20 @@ class AppMetadata(pydantic.BaseModel): "a package name and its version in the string value at the minimum (e.g., ``clams-python==1.2.3``)." ) more: Optional[Dict[str, str]] = pydantic.Field( - None, + None, description="(optional) A string-to-string map that can be used to store any additional metadata of the app." ) + gpu_mem_min: int = pydantic.Field( + 0, + description="(optional) Minimum GPU memory required to run the app, in megabytes (MB). " + "Set to 0 (default) if the app does not use GPU." + ) + gpu_mem_typ: int = pydantic.Field( + 0, + description="(optional) Typical GPU memory usage for default parameters, in megabytes (MB). " + "Must be equal or larger than gpu_mem_min. " + "Set to 0 (default) if the app does not use GPU." + ) model_config = { 'title': 'CLAMS AppMetadata', @@ -372,6 +383,21 @@ def assign_versions(cls, data): data.mmif_version = get_mmif_specver() return data + @pydantic.model_validator(mode='after') + @classmethod + def validate_gpu_memory(cls, data): + import warnings + if data.gpu_mem_typ > 0 and data.gpu_mem_min > 0: + if data.gpu_mem_typ < data.gpu_mem_min: + warnings.warn( + f"gpu_mem_typ ({data.gpu_mem_typ} MB) is less than " + f"gpu_mem_min ({data.gpu_mem_min} MB). " + f"Setting gpu_mem_typ to {data.gpu_mem_min} MB.", + UserWarning + ) + data.gpu_mem_typ = data.gpu_mem_min + return data + @pydantic.field_validator('identifier', mode='before') @classmethod def append_version(cls, val): diff --git a/clams/develop/templates/app/metadata.py.template b/clams/develop/templates/app/metadata.py.template index 8b1f8c7..8506616 100644 --- a/clams/develop/templates/app/metadata.py.template +++ b/clams/develop/templates/app/metadata.py.template @@ -39,6 +39,9 @@ def appmetadata() -> AppMetadata: # this trick can also be useful (replace ANALYZER_NAME with the pypi dist name) analyzer_version=[l.strip().rsplit('==')[-1] for l in open(pathlib.Path(__file__).parent / 'requirements.txt').readlines() if re.match(r'^ANALYZER_NAME==', l)][0], analyzer_license="", # short name for a software license + # GPU memory requirements (in MB). Set to 0 if the app does not use GPU. + # gpu_mem_min=0, # minimum GPU memory required for minimal configuration parameters + # gpu_mem_rec=0, # recommended GPU memory for default parameters, must be equal or larger than gpu_mem_min ) # and then add I/O specifications: an app must have at least one input and one output metadata.add_input(DocumentTypes.Document) diff --git a/clams/restify/__init__.py b/clams/restify/__init__.py index ad522b8..76c5696 100644 --- a/clams/restify/__init__.py +++ b/clams/restify/__init__.py @@ -3,7 +3,7 @@ from flask_restful import Resource, Api from mmif import Mmif -from clams.app import ClamsApp +from clams.app import ClamsApp, InsufficientVRAMError class Restifier(object): @@ -42,14 +42,55 @@ def run(self, **options): def serve_production(self, **options): """ Runs the CLAMS app as a flask webapp, using a production-ready web server (gunicorn, https://docs.gunicorn.org/en/stable/#). - + :param options: any additional options to pass to the web server. """ import gunicorn.app.base import multiprocessing + import os def number_of_workers(): - return (multiprocessing.cpu_count() * 2) + 1 # +1 to make sure at least two workers are running + # Allow override via environment variable + if 'CLAMS_WORKERS' in os.environ: + return int(os.environ['CLAMS_WORKERS']) + + cpu_workers = (multiprocessing.cpu_count() * 2) + 1 + + # Get GPU memory requirement from app metadata + # Use gpu_mem_typ (typical usage) for worker calculation + try: + metadata = self.cla.metadata + gpu_mem_mb = metadata.gpu_mem_typ # typical usage determines how many workers fit + except Exception: + gpu_mem_mb = 0 + + if gpu_mem_mb <= 0: + return cpu_workers + + # Calculate workers based on total VRAM of the first CUDA device (no other GPUs are considered for now) + # Use nvidia-smi instead of torch to avoid initializing CUDA in parent process before fork + try: + import subprocess + import shutil + if shutil.which('nvidia-smi'): + result = subprocess.run( + ['nvidia-smi', '--query-gpu=memory.total', '--format=csv,noheader,nounits', '-i', '0'], + capture_output=True, text=True, timeout=5 + ) + if result.returncode == 0 and result.stdout.strip(): + total_vram_mb = float(result.stdout.strip()) + vram_workers = max(1, int(total_vram_mb // gpu_mem_mb)) + workers = min(vram_workers, cpu_workers) + self.cla.logger.info( + f"GPU detected: {total_vram_mb:.0f} MB VRAM, " + f"app requires {gpu_mem_mb} MB, " + f"using {workers} workers (max {vram_workers} by VRAM, {cpu_workers} by CPU)" + ) + return workers + except Exception: + pass + + return cpu_workers class ProductionApplication(gunicorn.app.base.BaseApplication): @@ -58,9 +99,16 @@ def __init__(self, app, host, port, **options): 'bind': f'{host}:{port}', 'workers': number_of_workers(), 'threads': 2, + # disable timeout for long-running GPU workloads (default 30s is too short) + 'timeout': 0, # because the default is 'None' 'accesslog': '-', # errorlog, however, is redirected to stderr by default since 19.2, so no need to set + # log level is warning by default + 'loglevel': os.environ.get('CLAMS_LOGLEVEL', 'warning').lower(), + # default to 1 to free GPU memory after each request + # developers can override via serve_production(max_requests=N) for single-model apps + 'max_requests': 1, } self.options.update(options) self.application = app @@ -75,6 +123,13 @@ def load_config(self): def load(self): return self.application + # Log max_requests setting + max_req = options.get('max_requests', 1) # default is 1, meaning workers are killed after each request + if max_req == 0: + self.cla.logger.info("Worker recycling: disabled (workers persist)") + else: + self.cla.logger.info(f"Worker recycling: after {max_req} request(s)") + ProductionApplication(self.flask_app, self.host, self.port, **options).run() def serve_development(self, **options): @@ -144,6 +199,9 @@ def post(self) -> Response: return Response(response="Invalid input data. See below for validation error.\n\n" + str(e), status=500, mimetype='text/plain') try: return self.json_to_response(self.cla.annotate(raw_data, **raw_params)) + except InsufficientVRAMError as e: + self.cla.logger.warning(f"Request rejected due to insufficient VRAM: {e}") + return self.json_to_response(self.cla.record_error(raw_data, **raw_params).serialize(pretty=True), status=503) except Exception: self.cla.logger.exception("Error in annotation") return self.json_to_response(self.cla.record_error(raw_data, **raw_params).serialize(pretty=True), status=500) diff --git a/documentation/clamsapp.md b/documentation/clamsapp.md index 27d1a6e..0f2443a 100644 --- a/documentation/clamsapp.md +++ b/documentation/clamsapp.md @@ -209,6 +209,17 @@ $ python app.py * Be default, the app will be running in *debugging* mode, but you can change it to *production* mode by passing `--production` option to support larger traffic volume. * As you might have noticed, the default `CMD` in the prebuilt containers is `python app.py --production --port 5000`. +##### Environment variables for production mode + +When running in production mode, the following environment variables can be used to configure the app server: + +| Variable | Description | Default | +|----------|-------------|---------| +| `CLAMS_WORKERS` | Number of gunicorn worker processes | Auto-calculated based on CPU cores and GPU memory | +| `CLAMS_LOGLEVEL` | Logging verbosity level (`debug`, `info`, `warning`, `error`) | `warning` | + +By default, the number of workers is calculated as `(CPU cores × 2) + 1`. For GPU-based apps, see [GPU Memory Management](gpu-apps.md) for details on automatic worker scaling and VRAM management. + #### `metadata.py`: Getting app metadata Running `metadata.py` will print out the app metadata in JSON format. diff --git a/documentation/gpu-apps.md b/documentation/gpu-apps.md new file mode 100644 index 0000000..d9dcc82 --- /dev/null +++ b/documentation/gpu-apps.md @@ -0,0 +1,333 @@ +## GPU Memory Management for CLAMS Apps + +This document covers GPU memory management features in the CLAMS SDK for developers building CUDA-based applications. + +### Overview + +CLAMS apps that use GPU acceleration face memory management challenges when running as HTTP servers with multiple workers. +Each gunicorn worker loads models independently into GPU VRAM, which can cause out-of-memory (OOM) errors. + +:::{note} +The memory profiling features (peak usage tracking) require **PyTorch** since they use `torch.cuda` APIs. +Worker calculation and VRAM availability checking use `nvidia-smi` and work with any framework, but the system requires PyTorch to be installed. +TensorFlow-based apps should set conservative (high) VRAM usage values in app metadata since profiling won't track TensorFlow allocations. + +All the VRAM-related log messages are set to `info` level. +::: + +The CLAMS SDK provides: +1. **Metadata fields** for declaring GPU memory requirements +2. **Automatic worker scaling** based on available VRAM +3. **Runtime VRAM checking** to reject requests when memory is insufficient +4. **Memory profiling** to optimize future requests + +### Declaring GPU Memory Requirements + +App developers should declare GPU memory requirements in the app metadata using two fields: + +| Field | Type | Default | Description | +|-------|------|---------|-------------| +| `gpu_mem_min` | int | 0 | Minimum GPU memory required to run the app (MB) | +| `gpu_mem_typ` | int | 0 | Typical GPU memory usage with default parameters (MB) | + +#### Example + +```python +from clams.app import ClamsApp +from clams.appmetadata import AppMetadata + +class MyGPUApp(ClamsApp): + def _appmetadata(self): + metadata = AppMetadata( + name="My GPU App", + description="An app that uses GPU acceleration", + app_license="MIT", + identifier="my-gpu-app", + url="https://example.com/my-gpu-app", + gpu_mem_min=4000, # 4GB minimum + gpu_mem_typ=6000, # 6GB typical usage + ) + # ... add inputs/outputs/parameters + return metadata +``` + +#### General Guidelines for Setting Values + +- **`gpu_mem_min`**: The absolute minimum VRAM needed to load the smallest supported model configuration. 0 (the default) means the app does not use GPU. + +- **`gpu_mem_typ`**: Expected VRAM usage with default parameters. This value is used for automatic worker calculation and displayed to users to help them understand resource requirements. Must be >= `gpu_mem_min`. + +If `gpu_mem_typ` is set lower than `gpu_mem_min`, the SDK will automatically correct it and issue a warning. + +### Automatic Worker Calculation + +When running in production mode (gunicorn), the SDK automatically calculates the optimal number of workers based on: + +1. CPU cores: `(cores × 2) + 1` +2. Available VRAM: `total_vram / gpu_mem_typ` + +The final worker count is the minimum of these two values, ensuring workers don't exceed available GPU memory. Using `gpu_mem_typ` (typical usage) rather than `gpu_mem_min` provides more realistic worker counts for typical workloads. + +##### Example Calculation + +For a system with: +- 8 CPU cores → 17 CPU-based workers +- 24GB VRAM, app typically uses 6GB (`gpu_mem_typ=6000`) → 4 VRAM-based workers + +Result: 4 workers (limited by VRAM) + +#### Worker Recycling + +By default, workers are recycled after each request (`max_requests=1`). This ensures GPU memory is fully released between requests, which is important for: +- Apps that load different models based on parameters +- Preventing memory fragmentation over time +- Ensuring accurate VRAM availability checks + +For apps with a single persistent model, developers can disable recycling for better performance: + +```python +# In app.py +if __name__ == '__main__': + restifier = Restifier(MyApp()) + restifier.serve_production(max_requests=0) # Workers persist indefinitely +``` + +#### Overriding Worker Count + +Use the `CLAMS_WORKERS` environment variable to override automatic calculation: + +```bash +# Set fixed number of workers +CLAMS_WORKERS=2 python app.py --production + +# In Docker +docker run -e CLAMS_WORKERS=2 -p 5000:5000 +``` + +```bash +CLAMS_LOGLEVEL=info python app.py --production +``` + +### Runtime VRAM Checking + +Beyond worker calculation, the SDK performs runtime VRAM checks before each annotation request. This catches cases where: +- Other processes are using GPU memory +- Previous requests haven't fully released memory +- Memory fragmentation reduces effective available space + +#### How It Works + +1. **Before annotation**: The SDK estimates required VRAM based on: + - Historical measurements from previous runs (with 20% buffer) + - Conservative estimate (80% of total VRAM) for first request + +2. **If insufficient VRAM**: The request is rejected with `InsufficientVRAMError` + +3. **After annotation**: Peak memory usage is recorded for future estimates + +#### HTTP Response + +When VRAM is insufficient, the REST API returns: +- **Status**: 503 Service Unavailable +- **Body**: Error message describing the shortage + +This allows clients to implement retry logic with backoff. + +### Memory Profiling + +The SDK automatically profiles and caches memory usage per parameter combination. + +#### Profile Storage + +Profiles are stored in: +``` +$XDG_CACHE_HOME/clams/memory_profiles//.txt +``` + +If `XDG_CACHE_HOME` is not set, defaults to `~/.cache`. In containers based on `clams-python-*` base images, this is typically `/cache/clams/memory_profiles/`. + +Each profile contains a single integer: the peak memory usage in bytes. + +#### Profile Behavior + +- **First request**: Uses conservative estimate (80% of total VRAM) +- **Subsequent requests**: Uses historical measurement × 1.2 buffer +- **Updates**: Only when new peak exceeds stored value + +This approach becomes more accurate over time while maintaining safety margins. + +### Error Handling + +#### InsufficientVRAMError + +A custom exception raised when VRAM is insufficient: + +```python +from clams.app import InsufficientVRAMError + +try: + result = app.annotate(mmif_input) +except InsufficientVRAMError as e: + # Handle insufficient memory + print(f"Not enough GPU memory: {e}") +``` + +This exception inherits from `RuntimeError` for backward compatibility. + +#### Best Practices + +1. **Catch the exception** in custom code that calls `annotate()` directly +2. **Implement retry logic** when receiving HTTP 503 +3. **Monitor memory usage** using the `hwFetch` parameter + +### Monitoring with hwFetch + +Enable hardware information in responses to monitor GPU usage: + +```bash +# Via HTTP query parameter +curl -X POST "http://localhost:5000/?hwFetch=true" -d@input.mmif + +# Via CLI +python cli.py --hwFetch true input.mmif output.mmif +``` + +Response metadata will include: +```json +{ + "app-metadata": { + "hwFetch": "NVIDIA RTX 4090, 20480 MB available, 3584 MB peak used" + } +} +``` + +### Conditions for VRAM Checking + +VRAM checking is only performed when all conditions are met: + +1. PyTorch is installed (`import torch` succeeds) +2. CUDA is available (`torch.cuda.is_available()` returns True) +3. App declares GPU requirements (`gpu_mem_min > 0`) + +Apps without GPU requirements (default `gpu_mem_min=0`) skip all VRAM checks. + +:::{important} +The VRAM checking system requires PyTorch to be installed. TensorFlow-based apps with PyTorch installed will get worker calculation and VRAM availability checking (via `nvidia-smi`), but memory profiling will only track PyTorch allocations, not TensorFlow allocations. For accurate profiling, TensorFlow apps should set conservative `gpu_mem_typ` values based on manual measurements. +::: + +### Model Loading Strategy + +#### Single Model + +Load the model in `__init__` so it's ready when requests arrive: + +```python +class MyGPUApp(ClamsApp): + def __init__(self): + super().__init__() + self.model = load_model() # Load once per worker + + def _annotate(self, mmif, **params): + result = self.model.predict(...) # Model already loaded + return mmif +``` + +Each gunicorn worker calls `__init__` independently, so each worker gets its own model copy. Worker count is limited by `gpu_mem_typ` to prevent OOM. +In this case, it's generally recommended to use a `max_requests` value that's larger than 1 to save model loading time. + +#### Multiple Model Variants + +For apps supporting different model sizes (e.g., tiny/base/large), use lazy loading with caching: + +```python +class WhisperApp(ClamsApp): + def __init__(self): + super().__init__() + self.models = {} # Cache for loaded models + + def _annotate(self, mmif, modelSize='base', **params): + if modelSize not in self.models: + self.models[modelSize] = whisper.load_model(modelSize) + + model = self.models[modelSize] + # use model... + return mmif +``` + +**Considerations for multiple models:** +- Set `gpu_mem_min` for the smallest supported model (absolute minimum to run) +- Set `gpu_mem_typ` for the largest commonly-used model (this determines worker count) +- Historical profiles are keyed by parameter hash, so different model sizes get separate profiles +- Multiple models may accumulate in memory within a single worker (consider enabling worker recycling with `max_requests=1`) + +### Memory Optimization Tips + +1. **Clear cache between requests**: The SDK calls `torch.cuda.empty_cache()` after annotation + +2. **Use appropriate batch sizes**: Smaller batches use less memory but may be slower + +3. **Consider model variants**: Offer parameters for different model sizes (e.g., base/large/xl) + +4. **Test on target hardware**: Memory usage varies by GPU architecture + +5. **Set accurate metadata values**: Measure actual usage rather than guessing + +### Migration Guide + +To add GPU memory management to an existing app: + +1. **Measure memory usage**: Run your app and note peak VRAM usage + +2. **Update metadata**: Add `gpu_mem_min` and `gpu_mem_typ` fields + +3. **Test worker scaling**: Run in production mode and verify worker count + +4. **Test rejection logic**: Simulate low VRAM scenarios + +5. **Update documentation**: Inform users of GPU requirements + +### Troubleshooting + +#### Workers not scaling correctly + +- Verify `gpu_mem_typ` is set in metadata (not 0) - this determines worker count +- Check PyTorch is installed and CUDA is available +- Use `CLAMS_WORKERS` to override if needed + +#### Requests being rejected unexpectedly + +- Check available VRAM with `nvidia-smi` +- Clear GPU memory from other processes +- Profile cache may have outdated high values (delete `~/.cache/clams/memory_profiles/` or `$XDG_CACHE_HOME/clams/memory_profiles/`) + +#### OOM errors despite worker limits + +- `gpu_mem_typ` may be set too low, allowing too many workers +- Memory fragmentation; try restarting workers +- Other processes consuming VRAM + +### API Reference + +#### AppMetadata Fields + +```python +gpu_mem_min: int = 0 # Minimum GPU memory (MB) +gpu_mem_typ: int = 0 # Typical GPU memory usage (MB) +``` + +#### Exception Classes + +```python +class InsufficientVRAMError(RuntimeError): + """Raised when insufficient GPU memory is available.""" + pass +``` + +#### Internal Methods + +These methods are used internally but documented for reference: + +- `ClamsApp._get_estimated_vram_usage(**params)` - Get estimated VRAM for parameters +- `ClamsApp._record_vram_usage(params, peak_bytes)` - Record peak usage +- `ClamsApp._check_vram_available(required_bytes)` - Check if VRAM sufficient +- `ClamsApp._get_available_vram()` - Get current available VRAM diff --git a/documentation/index.rst b/documentation/index.rst index 135da15..3f9fdd1 100644 --- a/documentation/index.rst +++ b/documentation/index.rst @@ -10,6 +10,7 @@ Welcome to CLAMS Python SDK documentation! introduction input-output runtime-params + gpu-apps appmetadata appdirectory cli diff --git a/requirements.txt b/requirements.txt index 8a44892..12d786c 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,4 @@ -mmif-python==1.1.2 +mmif-python==1.2.0 Flask>=2 Flask-RESTful>=0.3.9 diff --git a/tests/test_clamsapp.py b/tests/test_clamsapp.py index 8318f1b..cc74bc5 100644 --- a/tests/test_clamsapp.py +++ b/tests/test_clamsapp.py @@ -297,13 +297,13 @@ def test_annotate_returns_invalid_mmif(self): def test_open_document_location(self): mmif = ExampleInputMMIF.get_rawmmif() - with self.app.open_document_location(mmif.documents['t1']) as f: + with self.app.open_document_location(mmif['t1']) as f: self.assertEqual(f.read(), ExampleInputMMIF.EXAMPLE_TEXT) def test_open_document_location_custom_opener(self): from PIL import Image mmif = ExampleInputMMIF.get_rawmmif() - with self.app.open_document_location(mmif.documents['i1'], Image.open) as f: + with self.app.open_document_location(mmif['i1'], Image.open) as f: self.assertEqual(f.size, (200, 71)) def test_refine_parameters(self): diff --git a/tests/test_vram.py b/tests/test_vram.py new file mode 100644 index 0000000..c93d367 --- /dev/null +++ b/tests/test_vram.py @@ -0,0 +1,307 @@ +import pathlib +import shutil +import tempfile +import unittest +import warnings +from typing import Union +from unittest.mock import patch, MagicMock + +import pytest +from mmif import Mmif, DocumentTypes, AnnotationTypes + +import clams.app +import clams.restify +from clams.app import ClamsApp, InsufficientVRAMError +from clams.appmetadata import AppMetadata + +# Skip entire file if nvidia-smi not available +pytestmark = pytest.mark.skipif( + shutil.which('nvidia-smi') is None, + reason="nvidia-smi not available - no CUDA device" +) + + +class GPUExampleClamsApp(clams.app.ClamsApp): + """Example app with GPU memory requirements declared.""" + + def _appmetadata(self) -> Union[dict, AppMetadata]: + metadata = AppMetadata( + name="GPU Example App", + description="Test app with GPU memory requirements", + app_license="MIT", + identifier="gpu-example-app", + url="https://example.com/gpu-app", + gpu_mem_min=2000, # 2GB minimum + gpu_mem_typ=4000, # 4GB typical + ) + metadata.add_input(DocumentTypes.VideoDocument) + metadata.add_output(AnnotationTypes.TimeFrame) + return metadata + + def _annotate(self, mmif, **kwargs): + if not isinstance(mmif, Mmif): + mmif = Mmif(mmif, validate=False) + new_view = mmif.new_view() + self.sign_view(new_view, kwargs) + new_view.new_contain(AnnotationTypes.TimeFrame) + return mmif + + +class NonGPUExampleClamsApp(clams.app.ClamsApp): + """Example app without GPU memory requirements (gpu_mem_min=0).""" + + def _appmetadata(self) -> Union[dict, AppMetadata]: + metadata = AppMetadata( + name="Non-GPU Example App", + description="Test app without GPU requirements", + app_license="MIT", + identifier="non-gpu-example-app", + url="https://example.com/non-gpu-app", + ) + metadata.add_input(DocumentTypes.TextDocument) + metadata.add_output(AnnotationTypes.TimeFrame) + return metadata + + def _annotate(self, mmif, **kwargs): + if not isinstance(mmif, Mmif): + mmif = Mmif(mmif, validate=False) + new_view = mmif.new_view() + self.sign_view(new_view, kwargs) + new_view.new_contain(AnnotationTypes.TimeFrame) + return mmif + + +class TestVRAMManagement(unittest.TestCase): + + def setUp(self): + self.gpu_app = GPUExampleClamsApp() + self.non_gpu_app = NonGPUExampleClamsApp() + + # ===== A. Pure Logic Tests ===== + + def test_profile_path_structure(self): + """Profile path includes sanitized app identifier.""" + param_hash = "abc123def456" + path = self.gpu_app._get_profile_path(param_hash) + + self.assertIn('.cache', str(path)) + self.assertIn('clams', str(path)) + self.assertIn('memory_profiles', str(path)) + self.assertIn(param_hash, str(path)) + self.assertTrue(str(path).endswith('.txt')) + + def test_profile_path_sanitization(self): + """URLs with / and : are properly sanitized in path.""" + param_hash = "test123" + path = self.gpu_app._get_profile_path(param_hash) + + # App identifier has slashes and colons that should be replaced + path_str = str(path) + # After sanitization, no / or : should be in the app_id part + app_id_part = path.parent.name + self.assertNotIn('/', app_id_part) + self.assertNotIn(':', app_id_part) + + def test_insufficient_vram_error(self): + """InsufficientVRAMError can be raised and caught.""" + with self.assertRaises(InsufficientVRAMError): + raise InsufficientVRAMError("Test error message") + + # Also inherits from RuntimeError + with self.assertRaises(RuntimeError): + raise InsufficientVRAMError("Test error message") + + def test_http_503_on_vram_error(self): + """RestAPI returns 503 for InsufficientVRAMError.""" + app = clams.restify.Restifier(GPUExampleClamsApp()).test_client() + + # Mock the annotate method to raise InsufficientVRAMError + with patch.object(GPUExampleClamsApp, 'annotate', + side_effect=InsufficientVRAMError("Not enough VRAM")): + mmif = Mmif(validate=False) + from mmif import Document + doc = Document({'@type': DocumentTypes.VideoDocument, + 'properties': {'id': 'v1', 'location': '/test.mp4'}}) + mmif.add_document(doc) + + res = app.post('/', data=mmif.serialize()) + self.assertEqual(res.status_code, 503) + self.assertIn('Not enough VRAM', res.get_data(as_text=True)) + + # ===== B. Mocked CUDA Tests ===== + + def test_check_vram_available_sufficient(self): + """Returns True when sufficient VRAM available.""" + mock_props = MagicMock() + mock_props.total_memory = 24 * 1024**3 # 24GB + + with patch('torch.cuda.is_available', return_value=True), \ + patch('torch.cuda.current_device', return_value=0), \ + patch('torch.cuda.get_device_properties', return_value=mock_props), \ + patch('torch.cuda.memory_allocated', return_value=1 * 1024**3), \ + patch('torch.cuda.memory_reserved', return_value=1 * 1024**3): + + # 24GB - 1GB = 23GB available, requesting 6GB + result = ClamsApp._check_vram_available(6 * 1024**3) + self.assertTrue(result) + + def test_check_vram_available_insufficient(self): + """Returns False when insufficient VRAM available.""" + mock_props = MagicMock() + mock_props.total_memory = 8 * 1024**3 # 8GB + + with patch('torch.cuda.is_available', return_value=True), \ + patch('torch.cuda.current_device', return_value=0), \ + patch('torch.cuda.get_device_properties', return_value=mock_props), \ + patch('torch.cuda.memory_allocated', return_value=6 * 1024**3), \ + patch('torch.cuda.memory_reserved', return_value=6 * 1024**3): + + # 8GB - 6GB = 2GB available, requesting 6GB (+ 10% margin) + result = ClamsApp._check_vram_available(6 * 1024**3) + self.assertFalse(result) + + def test_get_available_vram(self): + """Returns correct available VRAM calculation.""" + mock_props = MagicMock() + mock_props.total_memory = 16 * 1024**3 # 16GB + + with patch('torch.cuda.is_available', return_value=True), \ + patch('torch.cuda.current_device', return_value=0), \ + patch('torch.cuda.get_device_properties', return_value=mock_props), \ + patch('torch.cuda.memory_allocated', return_value=4 * 1024**3), \ + patch('torch.cuda.memory_reserved', return_value=5 * 1024**3): + + # Should use max(allocated, reserved) = 5GB + # Available = 16GB - 5GB = 11GB + result = ClamsApp._get_available_vram() + self.assertEqual(result, 11 * 1024**3) + + def test_get_estimated_vram_first_request(self): + """Uses conservative 80% when no historical profile exists.""" + with tempfile.TemporaryDirectory() as tmpdir: + with patch.object(self.gpu_app, '_get_profile_path') as mock_path: + # Profile doesn't exist + profile_file = pathlib.Path(tmpdir) / 'memory_abc123.txt' + mock_path.return_value = profile_file + + mock_props = MagicMock() + mock_props.total_memory = 24 * 1024**3 # 24GB + + with patch('torch.cuda.is_available', return_value=True), \ + patch('torch.cuda.current_device', return_value=0), \ + patch('torch.cuda.get_device_properties', return_value=mock_props): + + result = self.gpu_app._get_estimated_vram_usage(model='large') + + self.assertIsNotNone(result) + self.assertEqual(result['source'], 'conservative-first-request') + # Should be 80% of 24GB + expected = int(24 * 1024**3 * 0.8) + self.assertEqual(result['size_bytes'], expected) + + def test_get_estimated_vram_historical(self): + """Uses historical measurement × 1.2 when profile exists.""" + with tempfile.TemporaryDirectory() as tmpdir: + with patch.object(self.gpu_app, '_get_profile_path') as mock_path: + # Create profile with historical value + profile_file = pathlib.Path(tmpdir) / 'memory_abc123.txt' + profile_file.parent.mkdir(parents=True, exist_ok=True) + historical_peak = 3 * 1024**3 # 3GB + profile_file.write_text(str(historical_peak)) + mock_path.return_value = profile_file + + result = self.gpu_app._get_estimated_vram_usage(model='large') + + self.assertIsNotNone(result) + self.assertEqual(result['source'], 'historical') + # Should be historical × 1.2 + expected = int(historical_peak * 1.2) + self.assertEqual(result['size_bytes'], expected) + + def test_record_vram_usage_creates_file(self): + """Profile file is created with peak value.""" + with tempfile.TemporaryDirectory() as tmpdir: + with patch.object(self.gpu_app, '_get_profile_path') as mock_path: + profile_file = pathlib.Path(tmpdir) / 'subdir' / 'memory_abc123.txt' + mock_path.return_value = profile_file + + peak_bytes = 3 * 1024**3 + self.gpu_app._record_vram_usage({'model': 'large'}, peak_bytes) + + self.assertTrue(profile_file.exists()) + self.assertEqual(int(profile_file.read_text()), peak_bytes) + + def test_record_vram_usage_updates_higher(self): + """Only updates profile if new peak is higher.""" + with tempfile.TemporaryDirectory() as tmpdir: + with patch.object(self.gpu_app, '_get_profile_path') as mock_path: + profile_file = pathlib.Path(tmpdir) / 'memory_abc123.txt' + profile_file.parent.mkdir(parents=True, exist_ok=True) + + # Initial value + initial_peak = 5 * 1024**3 + profile_file.write_text(str(initial_peak)) + mock_path.return_value = profile_file + + # Try to record lower value - should not update + self.gpu_app._record_vram_usage({'model': 'large'}, 3 * 1024**3) + self.assertEqual(int(profile_file.read_text()), initial_peak) + + # Record higher value - should update + higher_peak = 7 * 1024**3 + self.gpu_app._record_vram_usage({'model': 'large'}, higher_peak) + self.assertEqual(int(profile_file.read_text()), higher_peak) + + def test_vram_check_skipped_when_no_gpu_mem_min(self): + """VRAM checking is skipped when gpu_mem_min=0.""" + # non_gpu_app has gpu_mem_min=0, so should skip VRAM checking + self.assertEqual(self.non_gpu_app.metadata.gpu_mem_min, 0) + + # _get_estimated_vram_usage should still work but won't be called + # during annotation because the condition check will fail + + # ===== C. AppMetadata Tests ===== + + def test_gpu_mem_fields_default_zero(self): + """GPU memory fields default to 0.""" + metadata = AppMetadata( + name="Test App", + description="Test", + app_license="MIT", + identifier="test-app", + url="https://example.com", + ) + metadata.add_input(DocumentTypes.TextDocument) + metadata.add_output(AnnotationTypes.TimeFrame) + + self.assertEqual(metadata.gpu_mem_min, 0) + self.assertEqual(metadata.gpu_mem_typ, 0) + + def test_gpu_mem_typ_validation(self): + """Warning issued when gpu_mem_typ < gpu_mem_min, auto-corrected.""" + with warnings.catch_warnings(record=True) as w: + warnings.simplefilter("always") + + metadata = AppMetadata( + name="Test App", + description="Test", + app_license="MIT", + identifier="test-app", + url="https://example.com", + gpu_mem_min=4000, # 4GB min + gpu_mem_typ=2000, # 2GB typical (less than min!) + ) + metadata.add_input(DocumentTypes.TextDocument) + metadata.add_output(AnnotationTypes.TimeFrame) + + # Should have issued a warning + self.assertEqual(len(w), 1) + self.assertIn('gpu_mem_typ', str(w[0].message)) + self.assertIn('gpu_mem_min', str(w[0].message)) + + # Should have auto-corrected + self.assertEqual(metadata.gpu_mem_typ, metadata.gpu_mem_min) + + +if __name__ == '__main__': + unittest.main()