diff --git a/tools/nomad_memory_monitor.sh b/tools/nomad_memory_monitor.sh new file mode 100755 index 0000000..ed45824 --- /dev/null +++ b/tools/nomad_memory_monitor.sh @@ -0,0 +1,83 @@ +#!/bin/bash + +## Nomad Memory Monitoring and Garbage Collection Script + +## This script monitors Nomad memory usage and triggers garbage collection when needed. +# Set the memory threshold in GiB at which garbage collection should trigger +MEMORY_THRESHOLD_GIB=6 + +MEMORY_THRESHOLD_BYTES=$(( MEMORY_THRESHOLD_GIB * 1024 * 1024 * 1024 )) + +get_nomad_alloc_bytes() { + local metrics_json + metrics_json=$(nomad operator metrics -json 2>&1 | grep -v "Unable to retrieve credentials") + if [ ${PIPESTATUS[0]} -ne 0 ]; then + echo "Error: 'nomad operator metrics' command failed" >&2 + return 1 + fi + # alloc_bytes indicates the number of bytes currently allocated by the nomad server + local alloc_bytes + alloc_bytes=$(echo "$metrics_json" | jq -r '.Gauges[] | select(.Name == "nomad.runtime.alloc_bytes") | .Value') + + if [[ -z "$alloc_bytes" || "$alloc_bytes" == "null" ]]; then + echo "Error: Could not parse 'nomad.runtime.alloc_bytes' from Nomad." >&2 + return 2 + fi + echo "$alloc_bytes" +} + +start_nomad_gc() { + # Set default log file path if not provided + local log_file="${1:-nomad_memory_usage.log}" + echo "Starting Nomad garbage collection background process..." + echo "Memory usage logs will be written to: $log_file" + + ( + export NOMAD_TOKEN="${NOMAD_TOKEN}" + export NOMAD_ADDR="${NOMAD_ADDR:-http://nomad-server-test.test.nextgenwaterprediction.com:4646}" + + while true; do + local current_alloc_bytes + current_alloc_bytes=$(get_nomad_alloc_bytes) + local exit_code=$? + + if [[ $exit_code -eq 0 ]]; then + # Write memory usage to log file + echo "$(date): Current alloc_bytes: $((current_alloc_bytes / 1024 / 1024 / 1024))GB | Threshold: $((MEMORY_THRESHOLD_BYTES / 1024 / 1024 / 1024))GB" >> "$log_file" + + # Direct comparison: trigger if allocated bytes are greater than or equal to the threshold + if [[ $current_alloc_bytes -ge $MEMORY_THRESHOLD_BYTES ]]; then + echo "$(date): Memory usage ($((current_alloc_bytes / 1024 / 1024 / 1024))GB) has exceeded the threshold ($((MEMORY_THRESHOLD_BYTES / 1024 / 1024 / 1024))GB). Running 'nomad system gc'..." >> "$log_file" + nomad system gc + fi + else + echo "$(date): Warning - Could not check Nomad memory usage (exit code: $exit_code)" >> "$log_file" + fi + sleep 120 + done + ) & + + NOMAD_GC_PID=$! + echo "Nomad GC background process started with PID: $NOMAD_GC_PID" + + # Function to cleanup background process on script exit + cleanup_nomad_gc() { + if [[ -n "$NOMAD_GC_PID" ]]; then + echo "Stopping Nomad GC background process (PID: $NOMAD_GC_PID)..." + kill $NOMAD_GC_PID 2>/dev/null + fi + } + + # Set trap to cleanup on script exit + trap cleanup_nomad_gc EXIT +} + +# If script is run directly (not sourced), start the memory monitoring +if [[ "${BASH_SOURCE[0]}" == "${0}" ]]; then + echo "=== Nomad Memory Monitor ===" + start_nomad_gc + + # Keep the script running + echo "Memory monitoring active. Press Ctrl+C to stop." + wait $NOMAD_GC_PID +fi diff --git a/tools/purge_dispatch_jobs.py b/tools/purge_dispatch_jobs.py new file mode 100755 index 0000000..79a89c6 --- /dev/null +++ b/tools/purge_dispatch_jobs.py @@ -0,0 +1,323 @@ +""" +Script to purge Nomad dispatch jobs using the API. +Replicates the functionality of: +for job in $(nomad job status agreement_maker | grep "dispatch-" | awk '{print $1}'); do + echo "Purging $job"; + nomad job stop -purge $job; +done + +But using the Nomad HTTP API for better performance. +By default purges dispatch jobs from all pipeline job definitions. +""" + +import argparse +import asyncio +import logging +import os +import sys +from typing import List, Optional +from urllib.parse import urljoin + +import aiohttp +import nomad +from tenacity import ( + retry, + retry_if_exception_type, + stop_after_attempt, + wait_exponential, +) + +# Add src to path to import config +sys.path.insert(0, os.path.join(os.path.dirname(os.path.dirname(__file__)), "src")) +from default_config import ( + AGREEMENT_MAKER_JOB_NAME, + FIM_MOSAICKER_JOB_NAME, + HAND_INUNDATOR_JOB_NAME, + NOMAD_ADDRESS, + NOMAD_NAMESPACE, + NOMAD_TOKEN, +) + +# All pipeline job names +PIPELINE_JOBS = [ + AGREEMENT_MAKER_JOB_NAME, + FIM_MOSAICKER_JOB_NAME, + HAND_INUNDATOR_JOB_NAME, + "pipeline", # Add pipeline job +] + +logger = logging.getLogger(__name__) + + +class NomadPurger: + def __init__( + self, + nomad_addr: str, + token: Optional[str] = None, + namespace: str = "default", + ): + self.nomad_addr = nomad_addr.rstrip("/") + self.token = token + self.namespace = namespace + + # Initialize the nomad client for some operations + from urllib.parse import urlparse + + parsed = urlparse(nomad_addr) + self.client = nomad.Nomad( + host=parsed.hostname, + port=parsed.port, + verify=False, + token=token or None, + namespace=namespace or None, + ) + + @retry( + stop=stop_after_attempt(3), + wait=wait_exponential(multiplier=1, min=1, max=10), + retry=retry_if_exception_type((aiohttp.ClientError, nomad.api.exceptions.BaseNomadException)), + ) + async def _api_call(self, session: aiohttp.ClientSession, method: str, path: str, **kwargs) -> dict: + """Make a Nomad API call with retry logic.""" + url = urljoin(self.nomad_addr, path) + headers = kwargs.pop("headers", {}) + + if self.token: + headers["X-Nomad-Token"] = self.token + + params = kwargs.pop("params", {}) + if self.namespace and self.namespace != "default": + params["namespace"] = self.namespace + + async with session.request(method, url, headers=headers, params=params, **kwargs) as response: + response.raise_for_status() + return await response.json() + + async def get_all_dispatch_jobs(self, parent_job_names: List[str]) -> List[dict]: + """Get all dispatch jobs for multiple parent jobs.""" + logger.info(f"Getting dispatch jobs for parent jobs: {', '.join(parent_job_names)}") + + async with aiohttp.ClientSession() as session: + try: + # Get all jobs at once + all_jobs = await self._api_call(session, "GET", "/v1/jobs") + + dispatch_jobs = [] + + for job in all_jobs: + job_id = job.get("ID", "") + # Check if this is a dispatch job (contains dispatch-) + if "dispatch-" in job_id: + # Check if it belongs to any of our parent jobs + for parent_name in parent_job_names: + if job_id.startswith(parent_name): + dispatch_jobs.append(job) + break + + logger.info(f"Found {len(dispatch_jobs)} dispatch jobs total") + return dispatch_jobs + + except Exception as e: + logger.error(f"Error getting dispatch jobs: {e}") + raise + + async def purge_job(self, session: aiohttp.ClientSession, job_id: str, dry_run: bool = False) -> bool: + """Purge a single job.""" + try: + if dry_run: + logger.info(f"[DRY RUN] Would purge job: {job_id}") + return True + + logger.info(f"Purging job: {job_id}") + + # Stop and purge the job + await self._api_call(session, "DELETE", f"/v1/job/{job_id}", params={"purge": "true"}) + + logger.info(f"Successfully purged job: {job_id}") + return True + + except aiohttp.ClientResponseError as e: + if e.status == 404: + logger.warning(f"Job {job_id} not found (may have already been purged)") + return True + logger.error(f"Failed to purge job {job_id}: HTTP {e.status} - {e.message}") + return False + except Exception as e: + logger.error(f"Failed to purge job {job_id}: {e}") + return False + + async def purge_all_dispatch_jobs( + self, + parent_job_names: List[str], + dry_run: bool = False, + batch_size: int = 5, + batch_delay: float = 1.0, + ) -> tuple[int, int]: + """Purge all dispatch jobs for the given parent jobs with rate limiting.""" + dispatch_jobs = await self.get_all_dispatch_jobs(parent_job_names) + + if not dispatch_jobs: + logger.info("No dispatch jobs found to purge") + return 0, 0 + + logger.info(f"Starting to purge {len(dispatch_jobs)} dispatch jobs in batches of {batch_size}") + + successful = 0 + failed = 0 + + async with aiohttp.ClientSession( + connector=aiohttp.TCPConnector(limit=10), # Increase connection pool + timeout=aiohttp.ClientTimeout(total=30), + ) as session: + # Create a semaphore to limit concurrent operations + semaphore = asyncio.Semaphore(batch_size) + + async def purge_single_job(job): + async with semaphore: + job_id = job.get("ID", "unknown") + success = await self.purge_job(session, job_id, dry_run) + return job_id, success + + # Process jobs in batches + for i in range(0, len(dispatch_jobs), batch_size): + batch = dispatch_jobs[i : i + batch_size] + batch_num = i // batch_size + 1 + total_batches = (len(dispatch_jobs) + batch_size - 1) // batch_size + + logger.info(f"Processing batch {batch_num}/{total_batches} ({len(batch)} jobs)") + + # Execute batch concurrently + tasks = [purge_single_job(job) for job in batch] + results = await asyncio.gather(*tasks, return_exceptions=True) + + # Count results + for result in results: + if isinstance(result, Exception): + logger.error(f"Task failed with exception: {result}") + failed += 1 + else: + job_id, success = result + if success: + successful += 1 + else: + failed += 1 + + # Add delay between batches to avoid overwhelming the server + if i + batch_size < len(dispatch_jobs) and batch_delay > 0: + logger.debug(f"Waiting {batch_delay} seconds before next batch...") + await asyncio.sleep(batch_delay) + + logger.info(f"Purge complete: {successful} successful, {failed} failed") + return successful, failed + + +async def main(): + """ + Script to purge Nomad dispatch jobs using the API. + Replicates the functionality of: + for job in $(nomad job status agreement_maker | grep "dispatch-" | awk '{print $1}'); do + echo "Purging $job"; + nomad job stop -purge $job; + done + + But using the Nomad HTTP API for better performance. + By default purges dispatch jobs from all pipeline job definitions. + """ + + parser = argparse.ArgumentParser( + description="Purge Nomad dispatch jobs using the API", + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=f""" +Examples: + %(prog)s # Purge dispatch jobs from all pipeline jobs + %(prog)s --job-name agreement_maker # Purge only agreement_maker dispatch jobs + %(prog)s --dry-run # Show what would be purged + %(prog)s --nomad-addr http://nomad.example.com:4646 + +Default pipeline jobs: {", ".join(PIPELINE_JOBS)} + """, + ) + + parser.add_argument( + "--job-name", + help="Specific parent job name to purge dispatch jobs for. If not specified, purges from all pipeline jobs.", + ) + parser.add_argument( + "--nomad-addr", + default=os.getenv("NOMAD_ADDR", ""), + help=f"Nomad server address", + ) + parser.add_argument( + "--nomad-token", + default=os.getenv("NOMAD_TOKEN", NOMAD_TOKEN), + help="Nomad authentication token", + ) + parser.add_argument( + "--namespace", + default=os.getenv("NOMAD_NAMESPACE", "default"), + help=f"Nomad namespace (default: {NOMAD_NAMESPACE})", + ) + parser.add_argument( + "--dry-run", + action="store_true", + help="Show what would be purged without actually doing it", + ) + parser.add_argument("--verbose", "-v", action="store_true", help="Enable verbose logging") + parser.add_argument( + "--batch-size", + type=int, + default=500, + help="Number of jobs to purge concurrently (default: 5)", + ) + parser.add_argument( + "--batch-delay", + type=float, + default=0.05, + help="Delay in seconds between batches (default: 1.0)", + ) + + args = parser.parse_args() + + # Configure logging + log_level = logging.DEBUG if args.verbose else logging.INFO + logging.basicConfig(level=log_level, format="%(asctime)s - %(levelname)s - %(message)s") + + # Determine which jobs to target + if args.job_name: + target_jobs = [args.job_name] + logger.info(f"Targeting specific job: {args.job_name}") + else: + target_jobs = PIPELINE_JOBS + logger.info(f"Targeting all pipeline jobs: {', '.join(target_jobs)}") + + # Create purger and run + purger = NomadPurger( + nomad_addr=args.nomad_addr, + token=args.nomad_token if args.nomad_token else None, + namespace=args.namespace, + ) + + try: + successful, failed = await purger.purge_all_dispatch_jobs( + target_jobs, + dry_run=args.dry_run, + batch_size=args.batch_size, + batch_delay=args.batch_delay, + ) + + if failed > 0: + logger.error(f"Some operations failed: {successful} successful, {failed} failed") + sys.exit(1) + else: + logger.info(f"All operations completed successfully: {successful} jobs processed") + + except KeyboardInterrupt: + logger.info("Operation cancelled by user") + sys.exit(130) + except Exception as e: + logger.error(f"Unexpected error: {e}") + sys.exit(1) + + +if __name__ == "__main__": + asyncio.run(main())