diff --git a/src/madengine/tools/update_perf_csv.py b/src/madengine/tools/update_perf_csv.py index 5e32e3e2..b19600a7 100644 --- a/src/madengine/tools/update_perf_csv.py +++ b/src/madengine/tools/update_perf_csv.py @@ -116,9 +116,13 @@ def handle_multiple_results( row = common_info_json.copy() model = r.pop("model") row["model"] = model_name + "_" + str(model) - row.update(r) + + # Extract all columns from CSV result to ensure proper column alignment + # This ensures all result columns (benchmark, tp, inp, out, dtype, etc.) are captured + for key, value in r.items(): + row[key] = value - if row["performance"] is not None and pd.notna(row["performance"]): + if row.get("performance") is not None and pd.notna(row.get("performance")): row["status"] = "SUCCESS" else: row["status"] = "FAILURE" @@ -126,11 +130,15 @@ def handle_multiple_results( final_multiple_results_df = pd.concat( [final_multiple_results_df, pd.DataFrame(row, index=[0])], ignore_index=True ) - # Reorder columns according to existing perf csv - columns = perf_csv_df.columns.tolist() - # Add any additional columns to the end - columns = columns + [col for col in final_multiple_results_df.columns if col not in columns] - final_multiple_results_df = final_multiple_results_df[columns] + + # Reorder columns according to existing perf csv (moved outside loop for efficiency) + if not final_multiple_results_df.empty: + desired_columns = perf_csv_df.columns.tolist() + # Add any additional columns from final_multiple_results_df + desired_columns = desired_columns + [col for col in final_multiple_results_df.columns if col not in desired_columns] + # Only select columns that actually exist in final_multiple_results_df to avoid KeyError + available_columns = [col for col in desired_columns if col in final_multiple_results_df.columns] + final_multiple_results_df = final_multiple_results_df[available_columns] perf_entry_df_to_csv(final_multiple_results_df) if perf_csv_df.empty: diff --git a/src/madengine/tools/update_perf_super.py b/src/madengine/tools/update_perf_super.py new file mode 100644 index 00000000..a22fa314 --- /dev/null +++ b/src/madengine/tools/update_perf_super.py @@ -0,0 +1,327 @@ +"""Module to update the perf_entry_super.json file with enhanced performance data. + +This module is used to update the perf_entry_super.json file with performance data +that includes configuration information from config files. + +Copyright (c) Advanced Micro Devices, Inc. All rights reserved. +""" + +# built-in imports +import json +import os +import typing +# third-party imports +import pandas as pd +# MAD Engine imports +from madengine.utils.config_parser import ConfigParser + + +def read_json(js: str) -> dict: + """Read a JSON file. + + Args: + js: The path to the JSON file. + + Returns: + The JSON dictionary. + """ + with open(js, 'r') as f: + return json.load(f) + + +def write_json(data: typing.Union[dict, list], output_path: str) -> None: + """Write data to a JSON file. + + Args: + data: The data to write (dict or list). + output_path: The path to the output JSON file. + """ + with open(output_path, 'w') as f: + json.dump(data, f, indent=2) + + +def load_perf_super_json(perf_super_json: str) -> list: + """Load existing perf_entry_super.json file. + + Args: + perf_super_json: Path to perf_entry_super.json file. + + Returns: + List of performance records, or empty list if file doesn't exist. + """ + if not os.path.exists(perf_super_json): + return [] + + try: + data = read_json(perf_super_json) + # Ensure it's a list + if isinstance(data, list): + return data + else: + return [data] + except Exception as e: + print(f"Warning: Could not load existing perf_entry_super.json: {e}") + return [] + + +def handle_multiple_results_super( + perf_super_list: list, + multiple_results: str, + common_info: str, + model_name: str, + config_parser: ConfigParser + ) -> list: + """Handle multiple results with config matching. + + Args: + perf_super_list: List of existing performance records. + multiple_results: The path to the multiple results CSV file. + common_info: The path to the common info JSON file. + model_name: The model name. + config_parser: ConfigParser instance for loading configs. + + Returns: + Updated list of performance records with configs. + """ + # Load multiple results CSV + multiple_results_df = pd.read_csv(multiple_results) + multiple_results_df.columns = multiple_results_df.columns.str.strip() + + # Check required columns + required_cols = ['model', 'performance', 'metric'] + for col in required_cols: + if col not in multiple_results_df.columns: + raise RuntimeError(f"{multiple_results} file is missing the {col} column") + + # Load common info + common_info_json = read_json(common_info) + + # Parse config file from args if present + configs_data = None + if 'args' in common_info_json and common_info_json['args']: + # Try to extract config path from args + scripts_path = common_info_json.get('pipeline', '') + configs_data = config_parser.parse_and_load( + common_info_json['args'], + scripts_path + ) + + # Process each result row + for result_row in multiple_results_df.to_dict(orient="records"): + record = common_info_json.copy() + + # Update model name + result_model = result_row.pop("model") + record["model"] = f"{model_name}_{result_model}" + + # Extract metadata fields that should be at top level + record["performance"] = result_row.get("performance") + record["metric"] = result_row.get("metric") + + # Set status based on performance + if record.get("performance") is not None and pd.notna(record.get("performance")): + record["status"] = "SUCCESS" + else: + record["status"] = "FAILURE" + + # Store all result data in multi_results field (structured data) + # This captures additional metrics beyond the primary performance metric + record["multi_results"] = result_row + + # Match config to this specific result + if configs_data: + if isinstance(configs_data, list): + # For CSV configs with multiple rows, try to match + matched_config = config_parser.match_config_to_result( + configs_data, + result_row, + result_model + ) + record["configs"] = matched_config + else: + # For JSON/YAML configs, use as-is + record["configs"] = configs_data + else: + record["configs"] = None + + perf_super_list.append(record) + + return perf_super_list + + +def handle_single_result_super( + perf_super_list: list, + single_result: str + ) -> list: + """Handle a single result. + + Args: + perf_super_list: List of existing performance records. + single_result: The path to the single result JSON file. + + Returns: + Updated list of performance records. + """ + single_result_json = read_json(single_result) + + # Ensure configs field exists (may be None) + if "configs" not in single_result_json: + single_result_json["configs"] = None + + perf_super_list.append(single_result_json) + return perf_super_list + + +def handle_exception_result_super( + perf_super_list: list, + exception_result: str + ) -> list: + """Handle an exception result. + + Args: + perf_super_list: List of existing performance records. + exception_result: The path to the exception result JSON file. + + Returns: + Updated list of performance records. + """ + exception_result_json = read_json(exception_result) + + # Ensure configs field exists (may be None) + if "configs" not in exception_result_json: + exception_result_json["configs"] = None + + perf_super_list.append(exception_result_json) + return perf_super_list + + +def update_perf_super_json( + perf_super_json: str, + multiple_results: typing.Optional[str] = None, + single_result: typing.Optional[str] = None, + exception_result: typing.Optional[str] = None, + common_info: typing.Optional[str] = None, + model_name: typing.Optional[str] = None, + scripts_base_dir: typing.Optional[str] = None, + cumulative_json: typing.Optional[str] = None, + ) -> None: + """Update the perf_entry_super.json file with the latest performance data. + + Args: + perf_super_json: Path to perf_entry_super.json file (latest run only). + multiple_results: Path to multiple results CSV file. + single_result: Path to single result JSON file. + exception_result: Path to exception result JSON file. + common_info: Path to common info JSON file. + model_name: The model name. + scripts_base_dir: Base directory for scripts (for config file resolution). + cumulative_json: Path to cumulative perf_super.json file (all runs). If provided, + results will be appended to this file in addition to perf_entry_super.json. + """ + print(f"Updating perf_entry_super.json with enhanced performance data") + + # Start with empty list for latest run + perf_super_list = [] + + # Create config parser + config_parser = ConfigParser(scripts_base_dir=scripts_base_dir) + + # Handle different result types + if multiple_results: + perf_super_list = handle_multiple_results_super( + perf_super_list, + multiple_results, + common_info, + model_name, + config_parser, + ) + elif single_result: + perf_super_list = handle_single_result_super(perf_super_list, single_result) + elif exception_result: + perf_super_list = handle_exception_result_super( + perf_super_list, exception_result + ) + else: + print("No results to update in perf_entry_super.json") + return + + # Write latest run to perf_entry_super.json + write_json(perf_super_list, perf_super_json) + print(f"Successfully updated {perf_super_json} (latest run)") + + # Export latest run to CSV + export_perf_super_to_csv(perf_super_json) + + # Update cumulative database if path provided + if cumulative_json: + cumulative_list = load_perf_super_json(cumulative_json) + cumulative_list.extend(perf_super_list) + write_json(cumulative_list, cumulative_json) + print(f"Successfully updated {cumulative_json} (cumulative - {len(cumulative_list)} total entries)") + export_perf_super_to_csv(cumulative_json) + + +def export_perf_super_to_csv(perf_super_json: str) -> None: + """Export perf_entry_super.json to CSV format. + + Flattens nested structures (multi_results, configs) and exports to CSV. + + Args: + perf_super_json: Path to perf_entry_super.json file. + """ + if not os.path.exists(perf_super_json): + print(f"Warning: {perf_super_json} does not exist. Skipping CSV export.") + return + + try: + data = read_json(perf_super_json) + + # Handle both single dict and list + if isinstance(data, dict): + data = [data] + elif not isinstance(data, list): + print(f"Warning: Unexpected data type in {perf_super_json}") + return + + if len(data) == 0: + print(f"Warning: No data in {perf_super_json}") + return + + # Flatten nested structures for CSV + flattened_data = [] + for record in data: + flat_record = {} + + for key, value in record.items(): + if key == 'multi_results' and isinstance(value, dict): + # Expand multi_results to top level with prefix + for mr_key, mr_value in value.items(): + flat_record[f"mr_{mr_key}"] = mr_value + elif key == 'configs' and value is not None: + # Convert configs to JSON string + flat_record['configs'] = json.dumps(value) + elif isinstance(value, (list, dict)): + # Convert other complex types to JSON strings + flat_record[key] = json.dumps(value) if value else None + else: + flat_record[key] = value + + flattened_data.append(flat_record) + + # Create DataFrame and export + df = pd.DataFrame(flattened_data) + + # Reorder columns: put important ones first + priority_cols = ['model', 'status', 'performance', 'metric'] + other_cols = [col for col in df.columns if col not in priority_cols] + ordered_cols = [col for col in priority_cols if col in df.columns] + other_cols + df = df[ordered_cols] + + # Export to CSV + csv_filename = perf_super_json.replace('.json', '.csv') + df.to_csv(csv_filename, index=False) + print(f"Successfully exported {csv_filename}") + + except Exception as e: + print(f"Error exporting {perf_super_json} to CSV: {e}") + diff --git a/src/madengine/utils/config_parser.py b/src/madengine/utils/config_parser.py new file mode 100644 index 00000000..ec988570 --- /dev/null +++ b/src/madengine/utils/config_parser.py @@ -0,0 +1,436 @@ +"""Config Parser Module for MAD Engine. + +This module provides utilities to parse configuration files from model arguments +and load them in various formats (CSV, JSON, YAML). + +Handles multiple repository patterns: +- Standalone repos (MAD, MAD-private): ./scripts/model/configs/ +- Submodule in MAD-internal: ./scripts/{MAD|MAD-private}/model/configs/ + +Copyright (c) Advanced Micro Devices, Inc. All rights reserved. +""" + +import os +import re +import json +import logging +import typing +from pathlib import Path + +import pandas as pd + +try: + import yaml + YAML_AVAILABLE = True +except ImportError: + YAML_AVAILABLE = False + +LOGGER = logging.getLogger(__name__) + + +class ConfigParser: + """Parser for model configuration files. + + This class handles parsing configuration files in various formats + (CSV, JSON, YAML) that are referenced in model arguments. + + Supports three usage patterns when run from MAD-internal CI: + 1. MAD-internal models: ./scripts/model/configs/ + 2. MAD submodule: ./scripts/MAD/model/configs/ + 3. MAD-private submodule: ./scripts/MAD-private/model/configs/ + + Also works when run standalone in MAD or MAD-private repos. + """ + + # Known repository/submodule names to detect + KNOWN_REPOS = ['MAD', 'MAD-private', 'MAD-internal'] + + def __init__(self, scripts_base_dir: typing.Optional[str] = None): + """Initialize ConfigParser. + + Args: + scripts_base_dir: Base directory for scripts + (e.g., "scripts/MAD-private/pyt_atom") + """ + self.scripts_base_dir = scripts_base_dir + self._path_cache = {} # Cache resolved paths + + def _extract_repo_root(self, path: str) -> typing.Optional[str]: + """Extract repository root from a scripts path. + + Examples: + "scripts/MAD-private/pyt_atom" -> "scripts/MAD-private" + "scripts/MAD/vllm" -> "scripts/MAD" + "scripts/model" -> "scripts" + + Args: + path: Full or partial path containing scripts directory + + Returns: + Repository root path, or None if not identifiable + """ + if not path: + return None + + parts = Path(path).parts + + # Find 'scripts' in the path + try: + scripts_idx = parts.index('scripts') + except ValueError: + return None + + # Check if next part after 'scripts' is a known repo name + if scripts_idx + 1 < len(parts): + next_part = parts[scripts_idx + 1] + if next_part in self.KNOWN_REPOS: + # It's a submodule: scripts/MAD-private or scripts/MAD + return os.path.join(*parts[:scripts_idx + 2]) + else: + # It's MAD-internal's own models: scripts/model -> scripts + return os.path.join(*parts[:scripts_idx + 1]) + + # Just 'scripts' directory + return os.path.join(*parts[:scripts_idx + 1]) + + def _build_candidate_paths( + self, + config_path: str, + model_scripts_path: str = None + ) -> typing.List[str]: + """Build list of candidate paths to try in priority order. + + Args: + config_path: Relative config path (e.g., "configs/default.csv") + model_scripts_path: Path to model script file + + Returns: + List of full paths to try, in order of priority + """ + candidates = [] + + # Priority 1: Relative to model's immediate directory + # scripts/MAD-private/pyt_atom + configs/default.csv + if model_scripts_path: + scripts_dir = os.path.dirname(model_scripts_path) + if scripts_dir: + candidates.append(os.path.join(scripts_dir, config_path)) + + # Priority 2: Relative to scripts_base_dir + # scripts/MAD-private/pyt_atom + configs/default.csv + if self.scripts_base_dir: + candidates.append(os.path.join(self.scripts_base_dir, config_path)) + + # Priority 3: Relative to repository root (for shared configs) + # This handles: scripts/MAD-private/pyt_atom -> scripts/MAD-private/configs/ + if self.scripts_base_dir: + repo_root = self._extract_repo_root(self.scripts_base_dir) + if repo_root: + candidates.append(os.path.join(repo_root, config_path)) + + if model_scripts_path: + scripts_dir = os.path.dirname(model_scripts_path) + if scripts_dir: + repo_root = self._extract_repo_root(scripts_dir) + if repo_root: + candidates.append(os.path.join(repo_root, config_path)) + + # Priority 4: Walk up from model's directory + # Try parent directories up to repo root + if model_scripts_path: + scripts_dir = os.path.dirname(model_scripts_path) + repo_root = self._extract_repo_root(scripts_dir) + if repo_root and scripts_dir: + candidates.extend( + self._walk_up_between(config_path, scripts_dir, repo_root) + ) + + # Priority 5: Walk up from scripts_base_dir + if self.scripts_base_dir: + repo_root = self._extract_repo_root(self.scripts_base_dir) + if repo_root: + candidates.extend( + self._walk_up_between(config_path, self.scripts_base_dir, repo_root) + ) + + # Remove duplicates while preserving order + seen = set() + unique_candidates = [] + for path in candidates: + normalized = os.path.normpath(path) + if normalized not in seen: + seen.add(normalized) + unique_candidates.append(normalized) + + return unique_candidates + + def _walk_up_between( + self, + config_path: str, + start_dir: str, + stop_dir: str + ) -> typing.List[str]: + """Generate candidate paths by walking up from start to stop directory. + + Args: + config_path: Relative config path + start_dir: Starting directory + stop_dir: Stop at this directory (inclusive) + + Returns: + List of candidate paths + """ + candidates = [] + current = os.path.abspath(start_dir) + stop = os.path.abspath(stop_dir) + + while current.startswith(stop): + parent = os.path.dirname(current) + if parent == current: # Reached root + break + current = parent + candidates.append(os.path.join(current, config_path)) + if current == stop: # Reached stop directory + break + + return candidates + + def parse_config_from_args( + self, + args_string: str, + model_scripts_path: str = None + ) -> typing.Optional[str]: + """Extract and resolve config file path from model arguments. + + Resolution strategy: + 1. If absolute path -> verify it exists + 2. Try model's immediate directory + 3. Try scripts_base_dir + 4. Try repository root (scripts/MAD-private/, scripts/MAD/, scripts/) + 5. Walk up from model directory to repo root + + This handles all cases: + - MAD-internal models: scripts/model/configs/default.csv + - MAD submodule: scripts/MAD/model/configs/default.csv + - MAD-private submodule: scripts/MAD-private/model/configs/default.csv + - Shared configs at repo level: scripts/MAD-private/configs/default.csv + + Args: + args_string: The args field from models.json + model_scripts_path: Path to the model's script file (e.g., run.py) + + Returns: + Full path to config file, or None if not found + """ + if not args_string: + return None + + # Look for --config argument + config_match = re.search(r'--config\s+([^\s]+)', args_string) + if not config_match: + return None + + config_path = config_match.group(1) + + # Check cache first + cache_key = f"{config_path}::{model_scripts_path}::{self.scripts_base_dir}" + if cache_key in self._path_cache: + cached_path = self._path_cache[cache_key] + if os.path.exists(cached_path): + return cached_path + else: + del self._path_cache[cache_key] + + # Handle absolute paths + if os.path.isabs(config_path): + if os.path.exists(config_path): + self._path_cache[cache_key] = config_path + return config_path + else: + LOGGER.warning(f"Absolute config path does not exist: {config_path}") + return None + + # Build and try candidate paths + candidates = self._build_candidate_paths(config_path, model_scripts_path) + + for candidate in candidates: + LOGGER.debug(f"Trying config path: {candidate}") + if os.path.exists(candidate): + LOGGER.info(f"Found config file at: {candidate}") + self._path_cache[cache_key] = candidate + return candidate + + # Not found + LOGGER.warning( + f"Config file not found: {config_path}\n" + f" model_scripts_path: {model_scripts_path}\n" + f" scripts_base_dir: {self.scripts_base_dir}\n" + f" Tried {len(candidates)} locations:\n" + + "\n".join(f" - {c}" for c in candidates[:5]) + + (f"\n ... and {len(candidates)-5} more" if len(candidates) > 5 else "") + ) + return None + + def load_config_file( + self, + config_path: str + ) -> typing.Optional[typing.Union[typing.List[dict], dict]]: + """Load and parse a configuration file. + + Args: + config_path: Full path to the config file + + Returns: + For CSV: List of dicts (one per row, excluding empty rows) + For JSON/YAML: Dict or list as-is from file + None if file cannot be loaded + """ + if not config_path or not os.path.exists(config_path): + return None + + file_ext = Path(config_path).suffix.lower() + + try: + if file_ext == '.csv': + return self._load_csv(config_path) + elif file_ext == '.json': + return self._load_json(config_path) + elif file_ext in ['.yaml', '.yml']: + return self._load_yaml(config_path) + else: + LOGGER.warning(f"Unsupported config file format: {file_ext}") + return None + except Exception as e: + LOGGER.error(f"Error loading config file {config_path}: {e}") + return None + + def _load_csv(self, config_path: str) -> typing.List[dict]: + """Load CSV config file. + + Args: + config_path: Path to CSV file + + Returns: + List of dicts, one per row (excluding completely empty rows) + """ + df = pd.read_csv(config_path) + + # Remove rows that are completely empty (all NaN) + # This handles blank lines in CSV files + df = df.dropna(how='all') + + # Convert NaN to None for JSON serialization + df = df.where(pd.notnull(df), None) + + # Convert to list of dicts + configs = df.to_dict(orient='records') + + LOGGER.info(f"Loaded {len(configs)} config entries from {config_path}") + + return configs + + def _load_json(self, config_path: str) -> typing.Union[dict, list]: + """Load JSON config file. + + Args: + config_path: Path to JSON file + + Returns: + Dict or list from JSON file + """ + with open(config_path, 'r') as f: + return json.load(f) + + def _load_yaml(self, config_path: str) -> typing.Union[dict, list]: + """Load YAML config file. + + Args: + config_path: Path to YAML file + + Returns: + Dict or list from YAML file + """ + if not YAML_AVAILABLE: + raise ImportError("PyYAML is not installed. Cannot load YAML config files.") + + with open(config_path, 'r') as f: + return yaml.safe_load(f) + + def match_config_to_result( + self, + configs_list: typing.List[dict], + result_data: dict, + model_name: str + ) -> typing.Optional[dict]: + """Match a specific result to its corresponding config. + + For CSV configs with multiple rows (like vllm), match based on + model name and other identifiable fields. + + Args: + configs_list: List of config dicts (from CSV rows) + result_data: Single result row data + model_name: The model name from result + + Returns: + Matching config dict, or None if no match found + """ + if not configs_list: + return None + + # For single config, return it + if len(configs_list) == 1: + return configs_list[0] + + # For multiple configs, try to match based on common fields + for config in configs_list: + # Try to match on 'model' field if it exists in both + if 'model' in config and 'model' in result_data: + # Compare normalized versions + config_model = str(config['model']).replace('/', '_').replace('-', '_').lower() + result_model = str(result_data['model']).replace('/', '_').replace('-', '_').lower() + if config_model in result_model or result_model in config_model: + # Additional checks for benchmark type if available + if 'benchmark' in config and 'benchmark' in result_data: + if config['benchmark'] == result_data['benchmark']: + return config + else: + return config + + # If no match found, return first config as fallback + LOGGER.warning(f"Could not match config for result: {model_name}. Using first config.") + return configs_list[0] + + def parse_and_load( + self, + args_string: str, + model_scripts_path: str = None + ) -> typing.Optional[typing.Union[typing.List[dict], dict]]: + """Parse config path from args and load the config file. + + Convenience method that combines parse_config_from_args and load_config_file. + + Args: + args_string: The args field from models.json + model_scripts_path: Path to the model's script file + + Returns: + Config data (list of dicts for CSV, dict for JSON/YAML), or None + """ + config_path = self.parse_config_from_args(args_string, model_scripts_path) + if not config_path: + return None + + return self.load_config_file(config_path) + + +def get_config_parser(scripts_base_dir: typing.Optional[str] = None) -> ConfigParser: + """Factory function to create a ConfigParser instance. + + Args: + scripts_base_dir: Base directory for scripts + + Returns: + ConfigParser instance + """ + return ConfigParser(scripts_base_dir=scripts_base_dir)