From d87fa65c1ff2454dc36836586a7df2033c746051 Mon Sep 17 00:00:00 2001 From: Dylan Lee Date: Thu, 28 Aug 2025 11:11:42 -0400 Subject: [PATCH 1/2] Add tools for working with Nomad API Added nomad_memory_monitor.sh and purge_dispatch_jobs.py. The memory monitor clears old evalutaions from the Nomad API's server/s once active memory gets above a certain level. This is necessary to keep the server responsive and functioning. Also added purge _dispatch_jobs.py which allows you to quickly purge jobs after a batch has been run that haven't been garbage collected --- tools/nomad_memory_monitor.sh | 83 ++++++++ tools/purge_dispatch_jobs.py | 361 ++++++++++++++++++++++++++++++++++ 2 files changed, 444 insertions(+) create mode 100755 tools/nomad_memory_monitor.sh create mode 100755 tools/purge_dispatch_jobs.py diff --git a/tools/nomad_memory_monitor.sh b/tools/nomad_memory_monitor.sh new file mode 100755 index 0000000..ed45824 --- /dev/null +++ b/tools/nomad_memory_monitor.sh @@ -0,0 +1,83 @@ +#!/bin/bash + +## Nomad Memory Monitoring and Garbage Collection Script + +## This script monitors Nomad memory usage and triggers garbage collection when needed. +# Set the memory threshold in GiB at which garbage collection should trigger +MEMORY_THRESHOLD_GIB=6 + +MEMORY_THRESHOLD_BYTES=$(( MEMORY_THRESHOLD_GIB * 1024 * 1024 * 1024 )) + +get_nomad_alloc_bytes() { + local metrics_json + metrics_json=$(nomad operator metrics -json 2>&1 | grep -v "Unable to retrieve credentials") + if [ ${PIPESTATUS[0]} -ne 0 ]; then + echo "Error: 'nomad operator metrics' command failed" >&2 + return 1 + fi + # alloc_bytes indicates the number of bytes currently allocated by the nomad server + local alloc_bytes + alloc_bytes=$(echo "$metrics_json" | jq -r '.Gauges[] | select(.Name == "nomad.runtime.alloc_bytes") | .Value') + + if [[ -z "$alloc_bytes" || "$alloc_bytes" == "null" ]]; then + echo "Error: Could not parse 'nomad.runtime.alloc_bytes' from Nomad." >&2 + return 2 + fi + echo "$alloc_bytes" +} + +start_nomad_gc() { + # Set default log file path if not provided + local log_file="${1:-nomad_memory_usage.log}" + echo "Starting Nomad garbage collection background process..." + echo "Memory usage logs will be written to: $log_file" + + ( + export NOMAD_TOKEN="${NOMAD_TOKEN}" + export NOMAD_ADDR="${NOMAD_ADDR:-http://nomad-server-test.test.nextgenwaterprediction.com:4646}" + + while true; do + local current_alloc_bytes + current_alloc_bytes=$(get_nomad_alloc_bytes) + local exit_code=$? + + if [[ $exit_code -eq 0 ]]; then + # Write memory usage to log file + echo "$(date): Current alloc_bytes: $((current_alloc_bytes / 1024 / 1024 / 1024))GB | Threshold: $((MEMORY_THRESHOLD_BYTES / 1024 / 1024 / 1024))GB" >> "$log_file" + + # Direct comparison: trigger if allocated bytes are greater than or equal to the threshold + if [[ $current_alloc_bytes -ge $MEMORY_THRESHOLD_BYTES ]]; then + echo "$(date): Memory usage ($((current_alloc_bytes / 1024 / 1024 / 1024))GB) has exceeded the threshold ($((MEMORY_THRESHOLD_BYTES / 1024 / 1024 / 1024))GB). Running 'nomad system gc'..." >> "$log_file" + nomad system gc + fi + else + echo "$(date): Warning - Could not check Nomad memory usage (exit code: $exit_code)" >> "$log_file" + fi + sleep 120 + done + ) & + + NOMAD_GC_PID=$! + echo "Nomad GC background process started with PID: $NOMAD_GC_PID" + + # Function to cleanup background process on script exit + cleanup_nomad_gc() { + if [[ -n "$NOMAD_GC_PID" ]]; then + echo "Stopping Nomad GC background process (PID: $NOMAD_GC_PID)..." + kill $NOMAD_GC_PID 2>/dev/null + fi + } + + # Set trap to cleanup on script exit + trap cleanup_nomad_gc EXIT +} + +# If script is run directly (not sourced), start the memory monitoring +if [[ "${BASH_SOURCE[0]}" == "${0}" ]]; then + echo "=== Nomad Memory Monitor ===" + start_nomad_gc + + # Keep the script running + echo "Memory monitoring active. Press Ctrl+C to stop." + wait $NOMAD_GC_PID +fi diff --git a/tools/purge_dispatch_jobs.py b/tools/purge_dispatch_jobs.py new file mode 100755 index 0000000..2f78f5d --- /dev/null +++ b/tools/purge_dispatch_jobs.py @@ -0,0 +1,361 @@ +""" +Script to purge Nomad dispatch jobs using the API. +Replicates the functionality of: +for job in $(nomad job status agreement_maker | grep "dispatch-" | awk '{print $1}'); do + echo "Purging $job"; + nomad job stop -purge $job; +done + +But using the Nomad HTTP API for better performance. +By default purges dispatch jobs from all pipeline job definitions. +""" + +import argparse +import asyncio +import logging +import os +import sys +from typing import List, Optional +from urllib.parse import urljoin + +import aiohttp +import nomad +from tenacity import ( + retry, + retry_if_exception_type, + stop_after_attempt, + wait_exponential, +) + +# Add src to path to import config +sys.path.insert( + 0, os.path.join(os.path.dirname(os.path.dirname(__file__)), "src") +) +from default_config import ( + AGREEMENT_MAKER_JOB_NAME, + FIM_MOSAICKER_JOB_NAME, + HAND_INUNDATOR_JOB_NAME, + NOMAD_ADDRESS, + NOMAD_NAMESPACE, + NOMAD_TOKEN, +) + +# All pipeline job names +PIPELINE_JOBS = [ + AGREEMENT_MAKER_JOB_NAME, + FIM_MOSAICKER_JOB_NAME, + HAND_INUNDATOR_JOB_NAME, + "pipeline", # Add pipeline job +] + +logger = logging.getLogger(__name__) + + +class NomadPurger: + def __init__( + self, + nomad_addr: str, + token: Optional[str] = None, + namespace: str = "default", + ): + self.nomad_addr = nomad_addr.rstrip("/") + self.token = token + self.namespace = namespace + + # Initialize the nomad client for some operations + from urllib.parse import urlparse + + parsed = urlparse(nomad_addr) + self.client = nomad.Nomad( + host=parsed.hostname, + port=parsed.port, + verify=False, + token=token or None, + namespace=namespace or None, + ) + + @retry( + stop=stop_after_attempt(3), + wait=wait_exponential(multiplier=1, min=1, max=10), + retry=retry_if_exception_type( + (aiohttp.ClientError, nomad.api.exceptions.BaseNomadException) + ), + ) + async def _api_call( + self, session: aiohttp.ClientSession, method: str, path: str, **kwargs + ) -> dict: + """Make a Nomad API call with retry logic.""" + url = urljoin(self.nomad_addr, path) + headers = kwargs.pop("headers", {}) + + if self.token: + headers["X-Nomad-Token"] = self.token + + params = kwargs.pop("params", {}) + if self.namespace and self.namespace != "default": + params["namespace"] = self.namespace + + async with session.request( + method, url, headers=headers, params=params, **kwargs + ) as response: + response.raise_for_status() + return await response.json() + + async def get_all_dispatch_jobs( + self, parent_job_names: List[str] + ) -> List[dict]: + """Get all dispatch jobs for multiple parent jobs.""" + logger.info( + f"Getting dispatch jobs for parent jobs: {', '.join(parent_job_names)}" + ) + + async with aiohttp.ClientSession() as session: + try: + # Get all jobs at once + all_jobs = await self._api_call(session, "GET", "/v1/jobs") + + dispatch_jobs = [] + + for job in all_jobs: + job_id = job.get("ID", "") + # Check if this is a dispatch job (contains dispatch-) + if "dispatch-" in job_id: + # Check if it belongs to any of our parent jobs + for parent_name in parent_job_names: + if job_id.startswith(parent_name): + dispatch_jobs.append(job) + break + + logger.info(f"Found {len(dispatch_jobs)} dispatch jobs total") + return dispatch_jobs + + except Exception as e: + logger.error(f"Error getting dispatch jobs: {e}") + raise + + async def purge_job( + self, session: aiohttp.ClientSession, job_id: str, dry_run: bool = False + ) -> bool: + """Purge a single job.""" + try: + if dry_run: + logger.info(f"[DRY RUN] Would purge job: {job_id}") + return True + + logger.info(f"Purging job: {job_id}") + + # Stop and purge the job + await self._api_call( + session, "DELETE", f"/v1/job/{job_id}", params={"purge": "true"} + ) + + logger.info(f"Successfully purged job: {job_id}") + return True + + except aiohttp.ClientResponseError as e: + if e.status == 404: + logger.warning( + f"Job {job_id} not found (may have already been purged)" + ) + return True + logger.error( + f"Failed to purge job {job_id}: HTTP {e.status} - {e.message}" + ) + return False + except Exception as e: + logger.error(f"Failed to purge job {job_id}: {e}") + return False + + async def purge_all_dispatch_jobs( + self, + parent_job_names: List[str], + dry_run: bool = False, + batch_size: int = 5, + batch_delay: float = 1.0, + ) -> tuple[int, int]: + """Purge all dispatch jobs for the given parent jobs with rate limiting.""" + dispatch_jobs = await self.get_all_dispatch_jobs(parent_job_names) + + if not dispatch_jobs: + logger.info("No dispatch jobs found to purge") + return 0, 0 + + logger.info( + f"Starting to purge {len(dispatch_jobs)} dispatch jobs in batches of {batch_size}" + ) + + successful = 0 + failed = 0 + + async with aiohttp.ClientSession( + connector=aiohttp.TCPConnector( + limit=10 + ), # Increase connection pool + timeout=aiohttp.ClientTimeout(total=30), + ) as session: + # Create a semaphore to limit concurrent operations + semaphore = asyncio.Semaphore(batch_size) + + async def purge_single_job(job): + async with semaphore: + job_id = job.get("ID", "unknown") + success = await self.purge_job(session, job_id, dry_run) + return job_id, success + + # Process jobs in batches + for i in range(0, len(dispatch_jobs), batch_size): + batch = dispatch_jobs[i : i + batch_size] + batch_num = i // batch_size + 1 + total_batches = ( + len(dispatch_jobs) + batch_size - 1 + ) // batch_size + + logger.info( + f"Processing batch {batch_num}/{total_batches} ({len(batch)} jobs)" + ) + + # Execute batch concurrently + tasks = [purge_single_job(job) for job in batch] + results = await asyncio.gather(*tasks, return_exceptions=True) + + # Count results + for result in results: + if isinstance(result, Exception): + logger.error(f"Task failed with exception: {result}") + failed += 1 + else: + job_id, success = result + if success: + successful += 1 + else: + failed += 1 + + # Add delay between batches to avoid overwhelming the server + if i + batch_size < len(dispatch_jobs) and batch_delay > 0: + logger.debug( + f"Waiting {batch_delay} seconds before next batch..." + ) + await asyncio.sleep(batch_delay) + + logger.info(f"Purge complete: {successful} successful, {failed} failed") + return successful, failed + + +async def main(): + """ + Script to purge Nomad dispatch jobs using the API. + Replicates the functionality of: + for job in $(nomad job status agreement_maker | grep "dispatch-" | awk '{print $1}'); do + echo "Purging $job"; + nomad job stop -purge $job; + done + + But using the Nomad HTTP API for better performance. + By default purges dispatch jobs from all pipeline job definitions. + """ + + parser = argparse.ArgumentParser( + description="Purge Nomad dispatch jobs using the API", + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=f""" +Examples: + %(prog)s # Purge dispatch jobs from all pipeline jobs + %(prog)s --job-name agreement_maker # Purge only agreement_maker dispatch jobs + %(prog)s --dry-run # Show what would be purged + %(prog)s --nomad-addr http://nomad.example.com:4646 + +Default pipeline jobs: {", ".join(PIPELINE_JOBS)} + """, + ) + + parser.add_argument( + "--job-name", + help="Specific parent job name to purge dispatch jobs for. If not specified, purges from all pipeline jobs.", + ) + parser.add_argument( + "--nomad-addr", + default=os.getenv("NOMAD_ADDR", ""), + help=f"Nomad server address", + ) + parser.add_argument( + "--nomad-token", + default=os.getenv("NOMAD_TOKEN", NOMAD_TOKEN), + help="Nomad authentication token", + ) + parser.add_argument( + "--namespace", + default=os.getenv("NOMAD_NAMESPACE", "default"), + help=f"Nomad namespace (default: {NOMAD_NAMESPACE})", + ) + parser.add_argument( + "--dry-run", + action="store_true", + help="Show what would be purged without actually doing it", + ) + parser.add_argument( + "--verbose", "-v", action="store_true", help="Enable verbose logging" + ) + parser.add_argument( + "--batch-size", + type=int, + default=500, + help="Number of jobs to purge concurrently (default: 5)", + ) + parser.add_argument( + "--batch-delay", + type=float, + default=0.05, + help="Delay in seconds between batches (default: 1.0)", + ) + + args = parser.parse_args() + + # Configure logging + log_level = logging.DEBUG if args.verbose else logging.INFO + logging.basicConfig( + level=log_level, format="%(asctime)s - %(levelname)s - %(message)s" + ) + + # Determine which jobs to target + if args.job_name: + target_jobs = [args.job_name] + logger.info(f"Targeting specific job: {args.job_name}") + else: + target_jobs = PIPELINE_JOBS + logger.info(f"Targeting all pipeline jobs: {', '.join(target_jobs)}") + + # Create purger and run + purger = NomadPurger( + nomad_addr=args.nomad_addr, + token=args.nomad_token if args.nomad_token else None, + namespace=args.namespace, + ) + + try: + successful, failed = await purger.purge_all_dispatch_jobs( + target_jobs, + dry_run=args.dry_run, + batch_size=args.batch_size, + batch_delay=args.batch_delay, + ) + + if failed > 0: + logger.error( + f"Some operations failed: {successful} successful, {failed} failed" + ) + sys.exit(1) + else: + logger.info( + f"All operations completed successfully: {successful} jobs processed" + ) + + except KeyboardInterrupt: + logger.info("Operation cancelled by user") + sys.exit(130) + except Exception as e: + logger.error(f"Unexpected error: {e}") + sys.exit(1) + + +if __name__ == "__main__": + asyncio.run(main()) From 45229386711e58797ad87e83798694d7f22a53e8 Mon Sep 17 00:00:00 2001 From: Dylan Lee Date: Wed, 10 Sep 2025 15:21:48 -0400 Subject: [PATCH 2/2] Reformat line lengths --- tools/purge_dispatch_jobs.py | 76 +++++++++--------------------------- 1 file changed, 19 insertions(+), 57 deletions(-) diff --git a/tools/purge_dispatch_jobs.py b/tools/purge_dispatch_jobs.py index 2f78f5d..79a89c6 100755 --- a/tools/purge_dispatch_jobs.py +++ b/tools/purge_dispatch_jobs.py @@ -28,9 +28,7 @@ ) # Add src to path to import config -sys.path.insert( - 0, os.path.join(os.path.dirname(os.path.dirname(__file__)), "src") -) +sys.path.insert(0, os.path.join(os.path.dirname(os.path.dirname(__file__)), "src")) from default_config import ( AGREEMENT_MAKER_JOB_NAME, FIM_MOSAICKER_JOB_NAME, @@ -77,13 +75,9 @@ def __init__( @retry( stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=1, max=10), - retry=retry_if_exception_type( - (aiohttp.ClientError, nomad.api.exceptions.BaseNomadException) - ), + retry=retry_if_exception_type((aiohttp.ClientError, nomad.api.exceptions.BaseNomadException)), ) - async def _api_call( - self, session: aiohttp.ClientSession, method: str, path: str, **kwargs - ) -> dict: + async def _api_call(self, session: aiohttp.ClientSession, method: str, path: str, **kwargs) -> dict: """Make a Nomad API call with retry logic.""" url = urljoin(self.nomad_addr, path) headers = kwargs.pop("headers", {}) @@ -95,19 +89,13 @@ async def _api_call( if self.namespace and self.namespace != "default": params["namespace"] = self.namespace - async with session.request( - method, url, headers=headers, params=params, **kwargs - ) as response: + async with session.request(method, url, headers=headers, params=params, **kwargs) as response: response.raise_for_status() return await response.json() - async def get_all_dispatch_jobs( - self, parent_job_names: List[str] - ) -> List[dict]: + async def get_all_dispatch_jobs(self, parent_job_names: List[str]) -> List[dict]: """Get all dispatch jobs for multiple parent jobs.""" - logger.info( - f"Getting dispatch jobs for parent jobs: {', '.join(parent_job_names)}" - ) + logger.info(f"Getting dispatch jobs for parent jobs: {', '.join(parent_job_names)}") async with aiohttp.ClientSession() as session: try: @@ -133,9 +121,7 @@ async def get_all_dispatch_jobs( logger.error(f"Error getting dispatch jobs: {e}") raise - async def purge_job( - self, session: aiohttp.ClientSession, job_id: str, dry_run: bool = False - ) -> bool: + async def purge_job(self, session: aiohttp.ClientSession, job_id: str, dry_run: bool = False) -> bool: """Purge a single job.""" try: if dry_run: @@ -145,22 +131,16 @@ async def purge_job( logger.info(f"Purging job: {job_id}") # Stop and purge the job - await self._api_call( - session, "DELETE", f"/v1/job/{job_id}", params={"purge": "true"} - ) + await self._api_call(session, "DELETE", f"/v1/job/{job_id}", params={"purge": "true"}) logger.info(f"Successfully purged job: {job_id}") return True except aiohttp.ClientResponseError as e: if e.status == 404: - logger.warning( - f"Job {job_id} not found (may have already been purged)" - ) + logger.warning(f"Job {job_id} not found (may have already been purged)") return True - logger.error( - f"Failed to purge job {job_id}: HTTP {e.status} - {e.message}" - ) + logger.error(f"Failed to purge job {job_id}: HTTP {e.status} - {e.message}") return False except Exception as e: logger.error(f"Failed to purge job {job_id}: {e}") @@ -180,17 +160,13 @@ async def purge_all_dispatch_jobs( logger.info("No dispatch jobs found to purge") return 0, 0 - logger.info( - f"Starting to purge {len(dispatch_jobs)} dispatch jobs in batches of {batch_size}" - ) + logger.info(f"Starting to purge {len(dispatch_jobs)} dispatch jobs in batches of {batch_size}") successful = 0 failed = 0 async with aiohttp.ClientSession( - connector=aiohttp.TCPConnector( - limit=10 - ), # Increase connection pool + connector=aiohttp.TCPConnector(limit=10), # Increase connection pool timeout=aiohttp.ClientTimeout(total=30), ) as session: # Create a semaphore to limit concurrent operations @@ -206,13 +182,9 @@ async def purge_single_job(job): for i in range(0, len(dispatch_jobs), batch_size): batch = dispatch_jobs[i : i + batch_size] batch_num = i // batch_size + 1 - total_batches = ( - len(dispatch_jobs) + batch_size - 1 - ) // batch_size + total_batches = (len(dispatch_jobs) + batch_size - 1) // batch_size - logger.info( - f"Processing batch {batch_num}/{total_batches} ({len(batch)} jobs)" - ) + logger.info(f"Processing batch {batch_num}/{total_batches} ({len(batch)} jobs)") # Execute batch concurrently tasks = [purge_single_job(job) for job in batch] @@ -232,9 +204,7 @@ async def purge_single_job(job): # Add delay between batches to avoid overwhelming the server if i + batch_size < len(dispatch_jobs) and batch_delay > 0: - logger.debug( - f"Waiting {batch_delay} seconds before next batch..." - ) + logger.debug(f"Waiting {batch_delay} seconds before next batch...") await asyncio.sleep(batch_delay) logger.info(f"Purge complete: {successful} successful, {failed} failed") @@ -292,9 +262,7 @@ async def main(): action="store_true", help="Show what would be purged without actually doing it", ) - parser.add_argument( - "--verbose", "-v", action="store_true", help="Enable verbose logging" - ) + parser.add_argument("--verbose", "-v", action="store_true", help="Enable verbose logging") parser.add_argument( "--batch-size", type=int, @@ -312,9 +280,7 @@ async def main(): # Configure logging log_level = logging.DEBUG if args.verbose else logging.INFO - logging.basicConfig( - level=log_level, format="%(asctime)s - %(levelname)s - %(message)s" - ) + logging.basicConfig(level=log_level, format="%(asctime)s - %(levelname)s - %(message)s") # Determine which jobs to target if args.job_name: @@ -340,14 +306,10 @@ async def main(): ) if failed > 0: - logger.error( - f"Some operations failed: {successful} successful, {failed} failed" - ) + logger.error(f"Some operations failed: {successful} successful, {failed} failed") sys.exit(1) else: - logger.info( - f"All operations completed successfully: {successful} jobs processed" - ) + logger.info(f"All operations completed successfully: {successful} jobs processed") except KeyboardInterrupt: logger.info("Operation cancelled by user")