From c4f1fb09b1c4bdb0589c73e3cfc912dcde61b9de Mon Sep 17 00:00:00 2001 From: Keigh Rim Date: Thu, 20 Nov 2025 11:20:17 -0500 Subject: [PATCH 1/4] added basic `mmif describe` cli a --- mmif/utils/cli/__init__.py | 1 + mmif/utils/cli/describe.py | 299 +++++++++++++++++++++++++++++++++++++ 2 files changed, 300 insertions(+) create mode 100644 mmif/utils/cli/describe.py diff --git a/mmif/utils/cli/__init__.py b/mmif/utils/cli/__init__.py index 77f6b74b..24855994 100644 --- a/mmif/utils/cli/__init__.py +++ b/mmif/utils/cli/__init__.py @@ -1,3 +1,4 @@ +from mmif.utils.cli import describe from mmif.utils.cli import rewind from mmif.utils.cli import source diff --git a/mmif/utils/cli/describe.py b/mmif/utils/cli/describe.py new file mode 100644 index 00000000..92e26864 --- /dev/null +++ b/mmif/utils/cli/describe.py @@ -0,0 +1,299 @@ +import argparse +import hashlib +import json +import sys +import textwrap +from pathlib import Path +from typing import Union, List, Tuple, Optional + +from mmif import Mmif + + +def split_appname_appversion(long_app_id: str) -> Tuple[str, str]: + """ + Split app name and version from a long app identifier. + + Assumes the identifier looks like "uri://APP_DOMAIN/APP_NAME/APP_VERSION" + + :param long_app_id: Full app identifier URI + :return: Tuple of (app_name, app_version) + """ + app_path = Path(long_app_id).parts + app_name = app_path[2] if len(app_path) > 2 else None + app_version = app_path[3] if len(app_path) > 3 else None + if app_version is not None and app_name.endswith(app_version): + app_name = app_name[:-len(app_version) - 1] + if app_version == 'unresolvable': + app_version = None + return app_name, app_version + + +def generate_param_hash(params: dict) -> str: + """ + Generate MD5 hash from a parameter dictionary. + + Parameters are sorted alphabetically, joined as key=value pairs, + and hashed using MD5. This is not for security purposes, only for + generating consistent identifiers. + + :param params: Dictionary of parameters + :return: MD5 hash string (32 hex characters) + """ + if not params: + param_string = "" + else: + param_list = ['='.join([k, str(v)]) for k, v in params.items()] + param_list.sort() + param_string = ','.join(param_list) + return hashlib.md5(param_string.encode('utf-8')).hexdigest() + + +def get_pipeline_specs( + mmif_file: Union[str, Path] +) -> List[Tuple[str, str, dict, Optional[int], Optional[dict], int, dict]]: + """ + Read a MMIF file and extract the pipeline specification from it. + + :param mmif_file: Path to the MMIF file + :return: List of tuples containing (view_id, app_name, configs, + running_time_ms, running_hardware, annotation_count, + annotations_by_type) for each view in the pipeline + """ + if not isinstance(mmif_file, (str, Path)): + raise ValueError( + "MMIF file path must be a string or a Path object." + ) + + with open(mmif_file, "r") as f: + mmif_str = f.read() + + data = Mmif(mmif_str) + spec = [] + + for view in data.views: + # Skip views with errors, warnings, or no annotations + if view.has_error() or view.has_warnings(): + continue + elif len(view.annotations) == 0: + continue + + app = view.metadata.get("app") + configs = view.metadata.get("appConfiguration", {}) + + # Parse running time from hh:mm:ss.ms format to milliseconds + # Support both new (appProfiling.runningTime) and old (appRunningTime) + running_time = None + time_str = None + if "appProfiling" in view.metadata: + profiling = view.metadata["appProfiling"] + if isinstance(profiling, dict) and "runningTime" in profiling: + time_str = profiling["runningTime"] + elif "appRunningTime" in view.metadata: + time_str = view.metadata["appRunningTime"] + + if time_str: + try: + rest, ms = time_str.split(".") + running_time = sum( + int(x) * 60**i + for i, x in enumerate(reversed(rest.split(":"))) + ) * 1000 + int(ms) + except (ValueError, AttributeError): + # If parsing fails, leave as None + pass + + # Support both new (appProfiling.hardware) and old (appRunningHardware) + running_hardware = None + if "appProfiling" in view.metadata: + profiling = view.metadata["appProfiling"] + if isinstance(profiling, dict) and "hardware" in profiling: + running_hardware = profiling["hardware"] + elif "appRunningHardware" in view.metadata: + running_hardware = view.metadata["appRunningHardware"] + + # Count annotations and group by type + annotation_count = len(view.annotations) + annotations_by_type = {} + for annotation in view.annotations: + at_type = str(annotation.at_type) + annotations_by_type[at_type] = annotations_by_type.get( + at_type, 0 + ) + 1 + + spec.append(( + view.id, app, configs, running_time, running_hardware, + annotation_count, annotations_by_type + )) + + return spec + + +def generate_pipeline_identifier(mmif_file: Union[str, Path]) -> str: + """ + Generate a pipeline identifier string from a MMIF file. + + The identifier follows the storage directory structure format: + app_name/version/param_hash/app_name2/version2/param_hash2/... + + Uses view.metadata.parameters (raw user-passed values) for hashing + to ensure reproducibility. + + :param mmif_file: Path to the MMIF file + :return: Pipeline identifier string + """ + if not isinstance(mmif_file, (str, Path)): + raise ValueError( + "MMIF file path must be a string or a Path object." + ) + + with open(mmif_file, "r") as f: + mmif_str = f.read() + + data = Mmif(mmif_str) + segments = [] + + for view in data.views: + # Skip views with errors, warnings, or no annotations + if view.has_error() or view.has_warnings(): + continue + elif len(view.annotations) == 0: + continue + + app = view.metadata.get("app") + app_name, app_version = split_appname_appversion(app) + + # Use raw parameters for reproducibility + try: + param_dict = view.metadata.parameters + except (KeyError, AttributeError): + param_dict = {} + + param_hash = generate_param_hash(param_dict) + + # Build segment: app_name/version/hash + version_str = app_version if app_version else "unversioned" + segments.append(f"{app_name}/{version_str}/{param_hash}") + + return '/'.join(segments) + + +def describe_argparser(): + """ + Returns two strings: one-line description of the argparser, and + additional material, which will be shown in `clams --help` and + `clams --help`, respectively. + """ + oneliner = ( + 'provides CLI to describe the pipeline specification from a MMIF ' + 'file.' + ) + additional = textwrap.dedent(""" + MMIF describe extracts pipeline information from a MMIF file, including + app names, runtime configurations, and runtime profiling statistics + for each view in the processing pipeline.""") + return oneliner, oneliner + '\n\n' + additional + + +def prep_argparser(**kwargs): + parser = argparse.ArgumentParser( + description=describe_argparser()[1], + formatter_class=argparse.RawDescriptionHelpFormatter, + **kwargs + ) + parser.add_argument( + "MMIF_FILE", + nargs="?", + type=argparse.FileType("r"), + default=None if sys.stdin.isatty() else sys.stdin, + help='input MMIF file path, or STDIN if `-` or not provided.' + ) + parser.add_argument( + "-o", "--output", + type=argparse.FileType("w"), + default=sys.stdout, + help='output file path, or STDOUT if not provided.' + ) + parser.add_argument( + "-p", "--pretty", + action="store_true", + help="Pretty-print JSON output" + ) + return parser + + +def main(args): + # Read MMIF content + mmif_content = args.MMIF_FILE.read() + + # For file input, we need to handle the path + # If input is from stdin, create a temp file + import tempfile + with tempfile.NamedTemporaryFile( + mode='w', suffix='.mmif', delete=False + ) as tmp: + tmp.write(mmif_content) + tmp_path = tmp.name + + try: + spec = get_pipeline_specs(tmp_path) + pipeline_id = generate_pipeline_identifier(tmp_path) + + # Convert to JSON-serializable format and calculate stats + views = {} + annotation_count_stats = {"total": 0} + annotation_count_by_type = {} + + for (view_id, app, configs, running_time, running_hardware, + annotation_count, annotations_by_type) in spec: + entry = { + "app": app, + "appConfiguration": configs, + } + # Output in new appProfiling format + if running_time is not None or running_hardware is not None: + profiling = {} + if running_time is not None: + profiling["runningTime"] = running_time + if running_hardware is not None: + profiling["hardware"] = running_hardware + entry["appProfiling"] = profiling + + views[view_id] = entry + + # Build annotation count stats + annotation_count_stats["total"] += annotation_count + annotation_count_stats[view_id] = annotation_count + + # Build annotation count by type stats + for at_type, count in annotations_by_type.items(): + if at_type not in annotation_count_by_type: + annotation_count_by_type[at_type] = {"total": 0} + annotation_count_by_type[at_type]["total"] += count + annotation_count_by_type[at_type][view_id] = count + + output = { + "pipeline_id": pipeline_id, + "stats": { + "viewCount": len(views), + "annotationCount": annotation_count_stats, + "annotationCountByType": annotation_count_by_type + }, + "views": views + } + + # Write output + if args.pretty: + json.dump(output, args.output, indent=2) + else: + json.dump(output, args.output) + args.output.write('\n') + finally: + # Clean up temp file + import os + os.unlink(tmp_path) + + +if __name__ == "__main__": + parser = prep_argparser() + args = parser.parse_args() + main(args) From 9348b6832aa4c03a7a2895ef95da295df404e421 Mon Sep 17 00:00:00 2001 From: Keigh Rim Date: Thu, 20 Nov 2025 11:59:03 -0500 Subject: [PATCH 2/4] updated other CLI modules' argparser for consistency a --- mmif/utils/cli/rewind.py | 12 ++++++------ mmif/utils/cli/source.py | 13 ++++--------- tests/test_utils_cli.py | 8 ++++---- 3 files changed, 14 insertions(+), 19 deletions(-) diff --git a/mmif/utils/cli/rewind.py b/mmif/utils/cli/rewind.py index 570b0893..e5bd4ed3 100644 --- a/mmif/utils/cli/rewind.py +++ b/mmif/utils/cli/rewind.py @@ -76,14 +76,14 @@ def describe_argparser(): def prep_argparser(**kwargs): parser = argparse.ArgumentParser(description=describe_argparser()[1], formatter_class=argparse.RawDescriptionHelpFormatter, **kwargs) - parser.add_argument("IN_MMIF_FILE", + parser.add_argument("MMIF_FILE", nargs="?", type=argparse.FileType("r"), default=None if sys.stdin.isatty() else sys.stdin, help='input MMIF file path, or STDIN if `-` or not provided.') - parser.add_argument("OUT_MMIF_FILE", - nargs="?", type=argparse.FileType("w"), + parser.add_argument("-o", "--output", + type=argparse.FileType("w"), default=sys.stdout, - help='output MMIF file path, or STDOUT if `-` or not provided.') + help='output file path, or STDOUT if not provided.') parser.add_argument("-p", '--pretty', action='store_true', help="Pretty-print rewound MMIF") parser.add_argument("-n", '--number', default="0", type=int, @@ -95,7 +95,7 @@ def prep_argparser(**kwargs): def main(args): - mmif_obj = mmif.Mmif(args.IN_MMIF_FILE.read()) + mmif_obj = mmif.Mmif(args.MMIF_FILE.read()) if args.number == 0: # If user doesn't know how many views to rewind, give them choices. choice = prompt_user(mmif_obj) @@ -104,7 +104,7 @@ def main(args): if not isinstance(choice, int) or choice <= 0: raise ValueError(f"Only can rewind by a positive number of views. Got {choice}.") - args.OUT_MMIF_FILE.write(rewind_mmif(mmif_obj, choice, args.mode == 'view').serialize(pretty=args.pretty)) + args.output.write(rewind_mmif(mmif_obj, choice, args.mode == 'view').serialize(pretty=args.pretty)) if __name__ == "__main__": diff --git a/mmif/utils/cli/source.py b/mmif/utils/cli/source.py index 7e662f9f..1e6b6b1b 100644 --- a/mmif/utils/cli/source.py +++ b/mmif/utils/cli/source.py @@ -258,10 +258,9 @@ def prep_argparser(**kwargs): ) parser.add_argument( '-o', '--output', - default=None, - action='store', - nargs='?', - help='A name of a file to capture a generated MMIF json. When not given, MMIF is printed to stdout.' + type=argparse.FileType('w'), + default=sys.stdout, + help='output file path, or STDOUT if not provided.' ) scheme_help = 'A scheme to associate with the document location URI. When not given, the default scheme is `file://`.' if len(discovered_docloc_plugins) > 0: @@ -279,12 +278,8 @@ def prep_argparser(**kwargs): def main(args): - if args.output: - out_f = open(args.output, 'w') - else: - out_f = sys.stdout mmif = generate_source_mmif_from_file(windows_path=False, **vars(args)) - out_f.write(mmif) + args.output.write(mmif) return mmif if __name__ == '__main__': diff --git a/tests/test_utils_cli.py b/tests/test_utils_cli.py index adb8e857..32f4c8bc 100644 --- a/tests/test_utils_cli.py +++ b/tests/test_utils_cli.py @@ -44,11 +44,11 @@ def get_params(self): return params def generate_source_mmif(self): - - # to suppress output (otherwise, set to stdout by default + + # to suppress output (otherwise, set to stdout by default) args = self.parser.parse_args(self.get_params()) - args.output = os.devnull - + args.output = open(os.devnull, 'w') + return source.main(args) def test_accept_file_paths(self): From 43e2e140482425ac33765c250eb5c3527ecee8c7 Mon Sep 17 00:00:00 2001 From: Keigh Rim Date: Thu, 20 Nov 2025 11:34:52 -0500 Subject: [PATCH 3/4] updated handling of "null"-views in `describe` module --- mmif/utils/cli/describe.py | 68 ++++++++++++++++++++++++++++++-------- 1 file changed, 54 insertions(+), 14 deletions(-) diff --git a/mmif/utils/cli/describe.py b/mmif/utils/cli/describe.py index 92e26864..a8fa6530 100644 --- a/mmif/utils/cli/describe.py +++ b/mmif/utils/cli/describe.py @@ -50,14 +50,23 @@ def generate_param_hash(params: dict) -> str: def get_pipeline_specs( mmif_file: Union[str, Path] -) -> List[Tuple[str, str, dict, Optional[int], Optional[dict], int, dict]]: +) -> Tuple[ + List[Tuple[str, str, dict, Optional[int], Optional[dict], int, dict]], + List[str], List[str], List[str] +]: """ Read a MMIF file and extract the pipeline specification from it. + Extracts app configurations, profiling data, and annotation statistics + for each contentful view. Views with errors, warnings, or no annotations + are tracked separately. + :param mmif_file: Path to the MMIF file - :return: List of tuples containing (view_id, app_name, configs, + :return: Tuple of (spec_list, error_views, warning_views, empty_views) + where spec_list contains tuples of (view_id, app_name, configs, running_time_ms, running_hardware, annotation_count, - annotations_by_type) for each view in the pipeline + annotations_by_type) for each contentful view, and the three + lists contain view IDs for error/warning/empty views respectively """ if not isinstance(mmif_file, (str, Path)): raise ValueError( @@ -69,12 +78,20 @@ def get_pipeline_specs( data = Mmif(mmif_str) spec = [] + error_views = [] + warning_views = [] + empty_views = [] for view in data.views: - # Skip views with errors, warnings, or no annotations - if view.has_error() or view.has_warnings(): + # Track error, warning, and empty views (mutually exclusive) + if view.has_error(): + error_views.append(view.id) + continue + elif view.has_warnings(): + warning_views.append(view.id) continue elif len(view.annotations) == 0: + empty_views.append(view.id) continue app = view.metadata.get("app") @@ -125,7 +142,7 @@ def get_pipeline_specs( annotation_count, annotations_by_type )) - return spec + return spec, error_views, warning_views, empty_views def generate_pipeline_identifier(mmif_file: Union[str, Path]) -> str: @@ -136,7 +153,8 @@ def generate_pipeline_identifier(mmif_file: Union[str, Path]) -> str: app_name/version/param_hash/app_name2/version2/param_hash2/... Uses view.metadata.parameters (raw user-passed values) for hashing - to ensure reproducibility. + to ensure reproducibility. Views with errors or warnings are excluded + from the identifier; empty views (no annotations) are included. :param mmif_file: Path to the MMIF file :return: Pipeline identifier string @@ -153,11 +171,9 @@ def generate_pipeline_identifier(mmif_file: Union[str, Path]) -> str: segments = [] for view in data.views: - # Skip views with errors, warnings, or no annotations + # Skip views with errors or warnings if view.has_error() or view.has_warnings(): continue - elif len(view.annotations) == 0: - continue app = view.metadata.get("app") app_name, app_version = split_appname_appversion(app) @@ -188,9 +204,17 @@ def describe_argparser(): 'file.' ) additional = textwrap.dedent(""" - MMIF describe extracts pipeline information from a MMIF file, including - app names, runtime configurations, and runtime profiling statistics - for each view in the processing pipeline.""") + MMIF describe extracts pipeline information from a MMIF file and outputs + a JSON summary including: + + - pipeline_id: unique identifier for the pipeline based on apps, versions, + and parameter hashes (excludes error/warning views) + - stats: annotation counts (total and per-view), counts by annotation type, + and lists of error/warning/empty view IDs + - views: map of view IDs to app configurations and profiling data + + Views with errors or warnings are tracked but excluded from the pipeline + identifier and annotation statistics.""") return oneliner, oneliner + '\n\n' + additional @@ -222,6 +246,17 @@ def prep_argparser(**kwargs): def main(args): + """ + Main entry point for the describe CLI command. + + Reads a MMIF file and outputs a JSON summary containing: + - pipeline_id: unique identifier for the pipeline + - stats: view counts, annotation counts (total/per-view/per-type), + and lists of error/warning/empty view IDs + - views: map of view IDs to app configurations and profiling data + + :param args: Parsed command-line arguments + """ # Read MMIF content mmif_content = args.MMIF_FILE.read() @@ -235,7 +270,9 @@ def main(args): tmp_path = tmp.name try: - spec = get_pipeline_specs(tmp_path) + spec, error_views, warning_views, empty_views = get_pipeline_specs( + tmp_path + ) pipeline_id = generate_pipeline_identifier(tmp_path) # Convert to JSON-serializable format and calculate stats @@ -275,6 +312,9 @@ def main(args): "pipeline_id": pipeline_id, "stats": { "viewCount": len(views), + "errorViews": error_views, + "warningViews": warning_views, + "emptyViews": empty_views, "annotationCount": annotation_count_stats, "annotationCountByType": annotation_count_by_type }, From f7eedfa6337b441c39af5b5370242ed2baf02435 Mon Sep 17 00:00:00 2001 From: Keigh Rim Date: Thu, 20 Nov 2025 11:53:28 -0500 Subject: [PATCH 4/4] fixed type hints, changed time format when profiling is found --- mmif/utils/cli/describe.py | 34 ++++++++++++++-------------------- 1 file changed, 14 insertions(+), 20 deletions(-) diff --git a/mmif/utils/cli/describe.py b/mmif/utils/cli/describe.py index a8fa6530..ab95205d 100644 --- a/mmif/utils/cli/describe.py +++ b/mmif/utils/cli/describe.py @@ -9,19 +9,22 @@ from mmif import Mmif -def split_appname_appversion(long_app_id: str) -> Tuple[str, str]: +def split_appname_appversion( + long_app_id: str +) -> Tuple[Optional[str], Optional[str]]: """ Split app name and version from a long app identifier. Assumes the identifier looks like "uri://APP_DOMAIN/APP_NAME/APP_VERSION" :param long_app_id: Full app identifier URI - :return: Tuple of (app_name, app_version) + :return: Tuple of (app_name, app_version), either may be None if not found """ app_path = Path(long_app_id).parts app_name = app_path[2] if len(app_path) > 2 else None app_version = app_path[3] if len(app_path) > 3 else None - if app_version is not None and app_name.endswith(app_version): + if (app_version is not None and app_name is not None + and app_name.endswith(app_version)): app_name = app_name[:-len(app_version) - 1] if app_version == 'unresolvable': app_version = None @@ -51,7 +54,7 @@ def generate_param_hash(params: dict) -> str: def get_pipeline_specs( mmif_file: Union[str, Path] ) -> Tuple[ - List[Tuple[str, str, dict, Optional[int], Optional[dict], int, dict]], + List[Tuple[str, Optional[str], dict, Optional[str], Optional[dict], int, dict]], List[str], List[str], List[str] ]: """ @@ -97,27 +100,15 @@ def get_pipeline_specs( app = view.metadata.get("app") configs = view.metadata.get("appConfiguration", {}) - # Parse running time from hh:mm:ss.ms format to milliseconds + # Get running time string (H:MM:SS.microseconds format) # Support both new (appProfiling.runningTime) and old (appRunningTime) running_time = None - time_str = None if "appProfiling" in view.metadata: profiling = view.metadata["appProfiling"] if isinstance(profiling, dict) and "runningTime" in profiling: - time_str = profiling["runningTime"] + running_time = profiling["runningTime"] elif "appRunningTime" in view.metadata: - time_str = view.metadata["appRunningTime"] - - if time_str: - try: - rest, ms = time_str.split(".") - running_time = sum( - int(x) * 60**i - for i, x in enumerate(reversed(rest.split(":"))) - ) * 1000 + int(ms) - except (ValueError, AttributeError): - # If parsing fails, leave as None - pass + running_time = view.metadata["appRunningTime"] # Support both new (appProfiling.hardware) and old (appRunningHardware) running_hardware = None @@ -176,6 +167,8 @@ def generate_pipeline_identifier(mmif_file: Union[str, Path]) -> str: continue app = view.metadata.get("app") + if app is None: + continue app_name, app_version = split_appname_appversion(app) # Use raw parameters for reproducibility @@ -187,8 +180,9 @@ def generate_pipeline_identifier(mmif_file: Union[str, Path]) -> str: param_hash = generate_param_hash(param_dict) # Build segment: app_name/version/hash + name_str = app_name if app_name else "unknown" version_str = app_version if app_version else "unversioned" - segments.append(f"{app_name}/{version_str}/{param_hash}") + segments.append(f"{name_str}/{version_str}/{param_hash}") return '/'.join(segments)