diff --git a/.gitignore b/.gitignore index 5dd9b09b..1fb8c951 100644 --- a/.gitignore +++ b/.gitignore @@ -84,3 +84,4 @@ documentation/_build/ /VERSION _issues +documentation/cli_help.rst diff --git a/documentation/autodoc/mmif.utils.rst b/documentation/autodoc/mmif.utils.rst index a9275023..8bd90cfd 100644 --- a/documentation/autodoc/mmif.utils.rst +++ b/documentation/autodoc/mmif.utils.rst @@ -1,12 +1,15 @@ mmif.utils package ================== -Package containing utility modules for handling different types of source -documents, and general implementation of common data structures and +Package containing utility modules for handling different types of source +documents, and general implementation of common data structures and algorithms. +Submodules +---------- + ``video_document_helper`` module ----------------------------------------- +-------------------------------- .. automodule:: mmif.utils.video_document_helper :members: @@ -14,26 +17,33 @@ algorithms. :show-inheritance: ``text_document_helper`` module ---------------------------------- +------------------------------- -.. automodule:: mmif.utils.sequence_helper +.. automodule:: mmif.utils.text_document_helper :members: :undoc-members: :show-inheritance: -``sequence_helper`` module ---------------------------------- +``timeunit_helper`` module +------------------------------- -.. automodule:: mmif.utils.sequence_helper +.. automodule:: mmif.utils.timeunit_helper :members: :undoc-members: :show-inheritance: ``sequence_helper`` module ---------------------------------- +-------------------------- .. automodule:: mmif.utils.sequence_helper :members: :undoc-members: :show-inheritance: +``workflow_helper`` module +-------------------------- + +.. automodule:: mmif.utils.workflow_helper + :members: + :undoc-members: + :show-inheritance: \ No newline at end of file diff --git a/documentation/cli.rst b/documentation/cli.rst new file mode 100644 index 00000000..ce662493 --- /dev/null +++ b/documentation/cli.rst @@ -0,0 +1,20 @@ +.. _cli: + +``mmif`` shell command +====================== + +``mmif-python`` comes with a command line interface (CLI) that allows you to handle MMIF files. Many of these commands are designed to handle MMIF files in the context of CLAMS workflows. + +The CLI is installed as ``mmif`` shell command. To see the available commands, run + +.. code-block:: bash + + mmif --help + +.. contents:: + :local: + :backlinks: none + +The following documentation is automatically generated from the CLI help messages. + +.. include:: cli_help.rst diff --git a/documentation/conf.py b/documentation/conf.py index b1bbd4a0..ad8e4e05 100644 --- a/documentation/conf.py +++ b/documentation/conf.py @@ -5,6 +5,7 @@ # https://www.sphinx-doc.org/en/master/usage/configuration.html import datetime +import textwrap # -- Path setup -------------------------------------------------------------- @@ -110,4 +111,40 @@ def linkcode_resolve(domain, info): # 1. sphinx-mv/main.py know current version of the library by git tag, # but conf.py has no way to know that... # 2. target-versions.csv file can be read once and used in the for loop -# in sphinx-mv/main.py, but here it should be read in for each `docs` bulid. +# ... (previous content) + +def generate_cli_rst(app): + import mmif + from mmif import prep_argparser_and_subcmds, find_all_modules + + # Generate main help + os.environ['COLUMNS'] = '100' + parser, subparsers = prep_argparser_and_subcmds() + help_text = parser.format_help() + + content = [] + + content.append('Main Command\n') + content.append('------------\n\n') + content.append('.. code-block:: text\n\n') + content.append(textwrap.indent(help_text, ' ')) + content.append('\n\n') + + # Generate subcommand help + for cli_module in find_all_modules('mmif.utils.cli'): + cli_module_name = cli_module.__name__.rsplit('.')[-1] + subparser = cli_module.prep_argparser(prog=f'mmif {cli_module_name}') + sub_help = subparser.format_help() + + content.append(f'{cli_module_name}\n') + content.append('-' * len(cli_module_name) + '\n\n') + content.append('.. code-block:: text\n\n') + content.append(textwrap.indent(sub_help, ' ')) + content.append('\n\n') + + with open(proj_root_dir / 'documentation' / 'cli_help.rst', 'w') as f: + f.write(''.join(content)) + + +def setup(app): + app.connect('builder-inited', generate_cli_rst) diff --git a/documentation/index.rst b/documentation/index.rst index beb56aae..2648277c 100644 --- a/documentation/index.rst +++ b/documentation/index.rst @@ -8,6 +8,7 @@ Welcome to mmif-python's documentation! :caption: Contents introduction + cli plugins target-versions diff --git a/documentation/introduction.rst b/documentation/introduction.rst index cfd9eaf7..95508f3c 100644 --- a/documentation/introduction.rst +++ b/documentation/introduction.rst @@ -87,3 +87,9 @@ To get subcomponents, you can use various getters implemented in subclasses. For For a full list of available helper methods, please refer to :ref:`the API documentation `. +MMIF usage in CLAMS Workflows +----------------------------- + +In the context of CLAMS, a **Workflow** refers to the sequence of CLAMS applications that have been executed to generate the views and annotations within a MMIF file. + +When using the ``mmif-python`` SDK, a unique identifier for a workflow (``workflowId``) is calculated based on the applications involved. This identifier is constructed by concatenating the application name, version, and a hash of the runtime parameters for each step in the sequence. This ensures that the identifier uniquely represents not just the apps used, but their specific configurations, aiding in reproducibility. diff --git a/mmif/serialize/annotation.py b/mmif/serialize/annotation.py index 6f1471aa..6527f482 100644 --- a/mmif/serialize/annotation.py +++ b/mmif/serialize/annotation.py @@ -2,7 +2,7 @@ The :mod:`annotation` module contains the classes used to represent a MMIF annotation as a live Python object. -In MMIF, annotations are created by apps in a pipeline as a part +In MMIF, annotations are created by apps in a workflow as a part of a view. For documentation on how views are represented, see :mod:`mmif.serialize.view`. """ diff --git a/mmif/serialize/mmif.py b/mmif/serialize/mmif.py index 2b759765..9e94496d 100644 --- a/mmif/serialize/mmif.py +++ b/mmif/serialize/mmif.py @@ -635,7 +635,7 @@ def get_alignments(self, at_type1: Union[str, ThingTypesBase], at_type2: Union[s def get_views_for_document(self, doc_id: str) -> List[View]: """ Returns the list of all views that have annotations anchored on a particular document. - Note that when the document is inside a view (generated during the pipeline's running), + Note that when the document is inside a view (generated during the workflow's running), doc_id must be prefixed with the view_id. """ views = [] diff --git a/mmif/serialize/view.py b/mmif/serialize/view.py index 80b7a65b..1c3d8e39 100644 --- a/mmif/serialize/view.py +++ b/mmif/serialize/view.py @@ -2,7 +2,7 @@ The :mod:`view` module contains the classes used to represent a MMIF view as a live Python object. -In MMIF, views are created by apps in a pipeline that are annotating +In MMIF, views are created by apps in a workflow that are annotating data that was previously present in the MMIF file. The :class:`View` class is a high-level container that provides convenient diff --git a/mmif/utils/cli/describe.py b/mmif/utils/cli/describe.py index ab95205d..eaf35856 100644 --- a/mmif/utils/cli/describe.py +++ b/mmif/utils/cli/describe.py @@ -1,190 +1,28 @@ import argparse -import hashlib import json import sys import textwrap from pathlib import Path -from typing import Union, List, Tuple, Optional +from typing import Union -from mmif import Mmif +from mmif.utils.workflow_helper import generate_workflow_identifier, describe_single_mmif, \ + describe_mmif_collection +# gen_param_hash is imported for backward compatibility +from mmif.utils.workflow_helper import generate_param_hash -def split_appname_appversion( - long_app_id: str -) -> Tuple[Optional[str], Optional[str]]: - """ - Split app name and version from a long app identifier. - - Assumes the identifier looks like "uri://APP_DOMAIN/APP_NAME/APP_VERSION" - - :param long_app_id: Full app identifier URI - :return: Tuple of (app_name, app_version), either may be None if not found - """ - app_path = Path(long_app_id).parts - app_name = app_path[2] if len(app_path) > 2 else None - app_version = app_path[3] if len(app_path) > 3 else None - if (app_version is not None and app_name is not None - and app_name.endswith(app_version)): - app_name = app_name[:-len(app_version) - 1] - if app_version == 'unresolvable': - app_version = None - return app_name, app_version - - -def generate_param_hash(params: dict) -> str: - """ - Generate MD5 hash from a parameter dictionary. - - Parameters are sorted alphabetically, joined as key=value pairs, - and hashed using MD5. This is not for security purposes, only for - generating consistent identifiers. - - :param params: Dictionary of parameters - :return: MD5 hash string (32 hex characters) - """ - if not params: - param_string = "" - else: - param_list = ['='.join([k, str(v)]) for k, v in params.items()] - param_list.sort() - param_string = ','.join(param_list) - return hashlib.md5(param_string.encode('utf-8')).hexdigest() - - -def get_pipeline_specs( - mmif_file: Union[str, Path] -) -> Tuple[ - List[Tuple[str, Optional[str], dict, Optional[str], Optional[dict], int, dict]], - List[str], List[str], List[str] -]: - """ - Read a MMIF file and extract the pipeline specification from it. - - Extracts app configurations, profiling data, and annotation statistics - for each contentful view. Views with errors, warnings, or no annotations - are tracked separately. - - :param mmif_file: Path to the MMIF file - :return: Tuple of (spec_list, error_views, warning_views, empty_views) - where spec_list contains tuples of (view_id, app_name, configs, - running_time_ms, running_hardware, annotation_count, - annotations_by_type) for each contentful view, and the three - lists contain view IDs for error/warning/empty views respectively - """ - if not isinstance(mmif_file, (str, Path)): - raise ValueError( - "MMIF file path must be a string or a Path object." - ) - - with open(mmif_file, "r") as f: - mmif_str = f.read() - - data = Mmif(mmif_str) - spec = [] - error_views = [] - warning_views = [] - empty_views = [] - - for view in data.views: - # Track error, warning, and empty views (mutually exclusive) - if view.has_error(): - error_views.append(view.id) - continue - elif view.has_warnings(): - warning_views.append(view.id) - continue - elif len(view.annotations) == 0: - empty_views.append(view.id) - continue - - app = view.metadata.get("app") - configs = view.metadata.get("appConfiguration", {}) - - # Get running time string (H:MM:SS.microseconds format) - # Support both new (appProfiling.runningTime) and old (appRunningTime) - running_time = None - if "appProfiling" in view.metadata: - profiling = view.metadata["appProfiling"] - if isinstance(profiling, dict) and "runningTime" in profiling: - running_time = profiling["runningTime"] - elif "appRunningTime" in view.metadata: - running_time = view.metadata["appRunningTime"] - - # Support both new (appProfiling.hardware) and old (appRunningHardware) - running_hardware = None - if "appProfiling" in view.metadata: - profiling = view.metadata["appProfiling"] - if isinstance(profiling, dict) and "hardware" in profiling: - running_hardware = profiling["hardware"] - elif "appRunningHardware" in view.metadata: - running_hardware = view.metadata["appRunningHardware"] - - # Count annotations and group by type - annotation_count = len(view.annotations) - annotations_by_type = {} - for annotation in view.annotations: - at_type = str(annotation.at_type) - annotations_by_type[at_type] = annotations_by_type.get( - at_type, 0 - ) + 1 - - spec.append(( - view.id, app, configs, running_time, running_hardware, - annotation_count, annotations_by_type - )) - - return spec, error_views, warning_views, empty_views +def get_pipeline_specs(mmif_file: Union[str, Path]): + import warnings + warnings.warn("get_pipeline_specs is deprecated, use mmif.utils.workflow_helper.describe_single_mmif instead", + DeprecationWarning) + return describe_single_mmif(mmif_file) def generate_pipeline_identifier(mmif_file: Union[str, Path]) -> str: - """ - Generate a pipeline identifier string from a MMIF file. - - The identifier follows the storage directory structure format: - app_name/version/param_hash/app_name2/version2/param_hash2/... - - Uses view.metadata.parameters (raw user-passed values) for hashing - to ensure reproducibility. Views with errors or warnings are excluded - from the identifier; empty views (no annotations) are included. - - :param mmif_file: Path to the MMIF file - :return: Pipeline identifier string - """ - if not isinstance(mmif_file, (str, Path)): - raise ValueError( - "MMIF file path must be a string or a Path object." - ) - - with open(mmif_file, "r") as f: - mmif_str = f.read() - - data = Mmif(mmif_str) - segments = [] - - for view in data.views: - # Skip views with errors or warnings - if view.has_error() or view.has_warnings(): - continue - - app = view.metadata.get("app") - if app is None: - continue - app_name, app_version = split_appname_appversion(app) - - # Use raw parameters for reproducibility - try: - param_dict = view.metadata.parameters - except (KeyError, AttributeError): - param_dict = {} - - param_hash = generate_param_hash(param_dict) - - # Build segment: app_name/version/hash - name_str = app_name if app_name else "unknown" - version_str = app_version if app_version else "unversioned" - segments.append(f"{name_str}/{version_str}/{param_hash}") - - return '/'.join(segments) + import warnings + warnings.warn("generate_pipeline_identifier is deprecated, use generate_workflow_identifier instead", + DeprecationWarning) + return generate_workflow_identifier(mmif_file) def describe_argparser(): @@ -194,22 +32,31 @@ def describe_argparser(): `clams --help`, respectively. """ oneliner = ( - 'provides CLI to describe the pipeline specification from a MMIF ' - 'file.' + 'provides CLI to describe the workflow specification from a MMIF ' + 'file or a collection of MMIF files.' ) - additional = textwrap.dedent(""" - MMIF describe extracts pipeline information from a MMIF file and outputs - a JSON summary including: - - pipeline_id: unique identifier for the pipeline based on apps, versions, - and parameter hashes (excludes error/warning views) - - stats: annotation counts (total and per-view), counts by annotation type, - and lists of error/warning/empty view IDs - - views: map of view IDs to app configurations and profiling data - - Views with errors or warnings are tracked but excluded from the pipeline - identifier and annotation statistics.""") - return oneliner, oneliner + '\n\n' + additional + # get and clean docstrings + single_doc = describe_single_mmif.__doc__.split(':param')[0] + single_doc = textwrap.dedent(single_doc).strip() + collection_doc = describe_mmif_collection.__doc__.split(':param')[0] + collection_doc = textwrap.dedent(collection_doc).strip() + + additional = textwrap.dedent(f""" + This command extracts workflow information from a single MMIF file or + summarizes a directory of MMIF files. + + ========================== + For a single MMIF file + ========================== + {single_doc} + + =============================== + For a directory of MMIF files + =============================== + {collection_doc} + """) + return oneliner, oneliner + '\n\n' + additional.strip() def prep_argparser(**kwargs): @@ -221,9 +68,9 @@ def prep_argparser(**kwargs): parser.add_argument( "MMIF_FILE", nargs="?", - type=argparse.FileType("r"), + type=str, default=None if sys.stdin.isatty() else sys.stdin, - help='input MMIF file path, or STDIN if `-` or not provided.' + help='input MMIF file, a directory of MMIF files, or STDIN if `-` or not provided.' ) parser.add_argument( "-o", "--output", @@ -244,87 +91,47 @@ def main(args): Main entry point for the describe CLI command. Reads a MMIF file and outputs a JSON summary containing: - - pipeline_id: unique identifier for the pipeline + - workflow_id: unique identifier for the source and app sequence - stats: view counts, annotation counts (total/per-view/per-type), and lists of error/warning/empty view IDs - views: map of view IDs to app configurations and profiling data :param args: Parsed command-line arguments """ - # Read MMIF content - mmif_content = args.MMIF_FILE.read() - - # For file input, we need to handle the path - # If input is from stdin, create a temp file - import tempfile - with tempfile.NamedTemporaryFile( - mode='w', suffix='.mmif', delete=False - ) as tmp: - tmp.write(mmif_content) - tmp_path = tmp.name - - try: - spec, error_views, warning_views, empty_views = get_pipeline_specs( - tmp_path - ) - pipeline_id = generate_pipeline_identifier(tmp_path) - - # Convert to JSON-serializable format and calculate stats - views = {} - annotation_count_stats = {"total": 0} - annotation_count_by_type = {} - - for (view_id, app, configs, running_time, running_hardware, - annotation_count, annotations_by_type) in spec: - entry = { - "app": app, - "appConfiguration": configs, - } - # Output in new appProfiling format - if running_time is not None or running_hardware is not None: - profiling = {} - if running_time is not None: - profiling["runningTime"] = running_time - if running_hardware is not None: - profiling["hardware"] = running_hardware - entry["appProfiling"] = profiling - - views[view_id] = entry - - # Build annotation count stats - annotation_count_stats["total"] += annotation_count - annotation_count_stats[view_id] = annotation_count - - # Build annotation count by type stats - for at_type, count in annotations_by_type.items(): - if at_type not in annotation_count_by_type: - annotation_count_by_type[at_type] = {"total": 0} - annotation_count_by_type[at_type]["total"] += count - annotation_count_by_type[at_type][view_id] = count - - output = { - "pipeline_id": pipeline_id, - "stats": { - "viewCount": len(views), - "errorViews": error_views, - "warningViews": warning_views, - "emptyViews": empty_views, - "annotationCount": annotation_count_stats, - "annotationCountByType": annotation_count_by_type - }, - "views": views - } + output = {} + # if input is a directory + if isinstance(args.MMIF_FILE, str) and Path(args.MMIF_FILE).is_dir(): + output = describe_mmif_collection(args.MMIF_FILE) + # if input is a file or stdin + else: + # Read MMIF content + if hasattr(args.MMIF_FILE, 'read'): + mmif_content = args.MMIF_FILE.read() + else: + with open(args.MMIF_FILE, 'r') as f: + mmif_content = f.read() - # Write output + # For file input, we need to handle the path + # If input is from stdin, create a temp file + import tempfile + tmp_path = None + try: + with tempfile.NamedTemporaryFile( + mode='w', suffix='.mmif', delete=False + ) as tmp: + tmp.write(mmif_content) + tmp_path = Path(tmp.name) + output = describe_single_mmif(tmp_path) + finally: + if tmp_path and tmp_path.exists(): + tmp_path.unlink() + + if output: if args.pretty: json.dump(output, args.output, indent=2) else: json.dump(output, args.output) args.output.write('\n') - finally: - # Clean up temp file - import os - os.unlink(tmp_path) if __name__ == "__main__": diff --git a/mmif/utils/cli/rewind.py b/mmif/utils/cli/rewind.py index e5bd4ed3..1e038180 100644 --- a/mmif/utils/cli/rewind.py +++ b/mmif/utils/cli/rewind.py @@ -3,26 +3,26 @@ import textwrap import mmif +from mmif.utils.workflow_helper import group_views_by_app def prompt_user(mmif_obj: mmif.Mmif) -> int: """ Function to ask user to choose the rewind range. """ + grouped_apps = group_views_by_app(mmif_obj.views) + view_to_app_num = {} + for i, execution in enumerate(grouped_apps): + for view in execution: + view_to_app_num[view.id] = i + 1 - ## Give a user options (#, "app", "timestamp") - time order - n = len(mmif_obj.views) - i = 0 # option number - aname = "" - a = 0 # header print("\n" + "{:<8} {:<8} {:<30} {:<100}".format("view-num", "app-num", "timestamp", "app")) + view_num = 0 for view in reversed(mmif_obj.views): - if view.metadata.app != aname: - aname = view.metadata.app - a += 1 - i += 1 - print("{:<8} {:<8} {:<30} {:<100}".format(i, a, str(view.metadata.timestamp), str(view.metadata.app))) + view_num += 1 + app_exec_num = view_to_app_num.get(view.id, 'N/A') + print("{:<8} {:<15} {:<30} {:<100}".format(view_num, app_exec_num, str(view.metadata.timestamp), str(view.metadata.app))) ## User input return int(input("\nEnter the number to delete from that point by rewinding: ")) @@ -33,7 +33,7 @@ def rewind_mmif(mmif_obj: mmif.Mmif, choice: int, choice_is_viewnum: bool = True Rewind MMIF by deleting the last N views. The number of views to rewind is given as a number of "views", or number of "producer apps". By default, the number argument is interpreted as the number of "views". - Note that when the same app is repeatedly run in a CLAMS pipeline and produces multiple views in a row, + Note that when the same app is repeatedly run in a CLAMS workflow and produces multiple views in a row, rewinding in "app" mode will rewind all those views at once. :param mmif_obj: mmif object @@ -46,16 +46,9 @@ def rewind_mmif(mmif_obj: mmif.Mmif, choice: int, choice_is_viewnum: bool = True for vid in list(v.id for v in mmif_obj.views)[-1:-choice-1:-1]: mmif_obj.views._items.pop(vid) else: - app_count = 0 - cur_app = "" - vid_to_pop = [] - for v in reversed(mmif_obj.views): - vid_to_pop.append(v.id) - if app_count >= choice: - break - if v.metadata.app != cur_app: - app_count += 1 - cur_app = v.metadata.app + grouped_apps = group_views_by_app(mmif_obj.views) + executions_to_rewind = grouped_apps[-choice:] + vid_to_pop = [view.id for execution in executions_to_rewind for view in execution] for vid in vid_to_pop: mmif_obj.views._items.pop(vid) return mmif_obj @@ -66,7 +59,7 @@ def describe_argparser(): returns two strings: one-line description of the argparser, and addition material, which will be shown in `clams --help` and `clams --help`, respectively. """ - oneliner = 'provides CLI to rewind a MMIF from a CLAMS pipeline.' + oneliner = 'provides CLI to rewind a MMIF from a CLAMS workflow.' additional = textwrap.dedent(""" MMIF rewinder rewinds a MMIF by deleting the last N views. N can be specified as a number of views, or a number of producer apps. """) diff --git a/mmif/utils/cli/source.py b/mmif/utils/cli/source.py index 1e6b6b1b..3abd2e1f 100644 --- a/mmif/utils/cli/source.py +++ b/mmif/utils/cli/source.py @@ -233,7 +233,7 @@ def prep_argparser(**kwargs): name[len('mmif_docloc_'):]: importlib.import_module(name) for _, name, _ in pkgutil.iter_modules() if re.match(r'mmif[-_]docloc[-_]', name) } - parser = argparse.ArgumentParser(description=describe_argparser()[1], formatter_class=argparse.RawTextHelpFormatter, **kwargs) + parser = argparse.ArgumentParser(description=describe_argparser()[1], formatter_class=argparse.RawDescriptionHelpFormatter, **kwargs) parser.add_argument( 'documents', default=None, diff --git a/mmif/utils/workflow_helper.py b/mmif/utils/workflow_helper.py new file mode 100644 index 00000000..013176a1 --- /dev/null +++ b/mmif/utils/workflow_helper.py @@ -0,0 +1,442 @@ +import datetime +import hashlib +from collections import Counter, defaultdict +from pathlib import Path +from typing import List, Any, Tuple, Optional, Union +import itertools +from mmif import Mmif + + +def group_views_by_app(views: List[Any]) -> List[List[Any]]: + """ + Groups views into app executions based on app and timestamp. + + An "app" is a set of views produced by the same app at the + exact same timestamp. + """ + # Filter out views that don't have a timestamp or app, as they can't be grouped. + groupable_views = [ + v for v in views + if v.metadata.get("app") and v.metadata.get("timestamp") is not None + ] + + # Sort views by timestamp first, then by app URI to ensure deterministic grouping + groupable_views.sort(key=lambda v: (v.metadata.timestamp, v.metadata.app)) + + # Group by app and timestamp + grouped_apps = [] + for key, group in itertools.groupby(groupable_views, key=lambda v: (v.metadata.app, v.metadata.timestamp)): + grouped_apps.append(list(group)) + + return grouped_apps + + +def _split_appname_appversion( + long_app_id: str +) -> Tuple[Optional[str], Optional[str]]: + """ + Split app name and version from a long app identifier. + + Assumes the identifier looks like "uri://APP_DOMAIN/APP_NAME/APP_VERSION" + + :param long_app_id: Full app identifier URI + :return: Tuple of (app_name, app_version), either may be None if not found + """ + app_path = Path(long_app_id).parts + app_name = app_path[2] if len(app_path) > 2 else None + app_version = app_path[3] if len(app_path) > 3 else None + if (app_version is not None and app_name is not None + and app_name.endswith(app_version)): + app_name = app_name[:-len(app_version) - 1] + if app_version == 'unresolvable': + app_version = None + return app_name, app_version + + +def generate_param_hash(params: dict) -> str: + """ + Generate MD5 hash from a parameter dictionary. + + Parameters are sorted alphabetically, joined as key=value pairs, + and hashed using MD5. This is not for security purposes, only for + generating consistent identifiers. + + :param params: Dictionary of parameters + :return: MD5 hash string (32 hex characters) + """ + if not params: + param_string = "" + else: + param_list = ['='.join([k, str(v)]) for k, v in params.items()] + param_list.sort() + param_string = ','.join(param_list) + return hashlib.md5(param_string.encode('utf-8')).hexdigest() + + +def generate_workflow_identifier(mmif_file: Union[str, Path]) -> str: + """ + Generate a workflow identifier string from a MMIF file. + + The identifier follows the storage directory structure format: + app_name/version/param_hash/app_name2/version2/param_hash2/... + + Uses view.metadata.parameters (raw user-passed values) for hashing + to ensure reproducibility. Views with errors or warnings are excluded + from the identifier; empty views are included. + """ + if not isinstance(mmif_file, (str, Path)): + raise ValueError( + "MMIF file path must be a string or a Path object." + ) + + with open(mmif_file, "r") as f: + mmif_str = f.read() + + data = Mmif(mmif_str) + segments = [] + + # First prefix is source information, sorted by document type + sources = Counter(doc.at_type.shortname for doc in data.documents) + segments.append('-'.join([f'{k}-{sources[k]}' for k in sorted(sources.keys())])) + + # Group views into runs + grouped_apps = group_views_by_app(data.views) + + for app_execution in grouped_apps: + # Use the first view in the run as representative for metadata + first_view = app_execution[0] + + # Skip runs where the representative view has errors or warnings + if first_view.has_error() or first_view.has_warnings(): + continue + + app = first_view.metadata.get("app") + if app is None: + continue + app_name, app_version = _split_appname_appversion(app) + + # Use raw parameters from the first view for reproducibility + try: + param_dict = first_view.metadata.parameters + except (KeyError, AttributeError): + param_dict = {} + + param_hash = generate_param_hash(param_dict) + + # Build segment: app_name/version/hash + name_str = app_name if app_name else "unknown" + version_str = app_version if app_version else "unversioned" + segments.append(f"{name_str}/{version_str}/{param_hash}") + + return '/'.join(segments) + + +def _get_profile_data(view) -> dict: + """ + Extract profiling data from a view's metadata. + + :param view: MMIF view object + :return: Dictionary of profiling data + """ + # TODO (krim @ 2025-11-27): the GPU part is heavily rely on how clams-python implements _cuda_memory_to_str funct + # also it's not clear how helpful vram usage in the describe output is + # So I'm not using vram records here. Perhaps should `describe` be moved to clams-python instead? + + # running time can be found two ways: either in appProfiling.runningTime or appRunningTime (legacy) key + profiling = view.metadata.get("appProfiling", {}) + if "runningTime" not in profiling: + running_time_str = view.metadata.get("appRunningTime") + else: + running_time_str = profiling.get("runningTime") + + if running_time_str is None: + return {} + + # the format is datetime.timedelta string, e.g. '0:00:02.345678' + # need to convert to milliseconds integer + time_obj = datetime.datetime.strptime(running_time_str, "%H:%M:%S.%f").time() + milliseconds = (time_obj.hour * 3600 + time_obj.minute * 60 + time_obj.second) * 1000 + time_obj.microsecond // 1000 + return {"runningTimeMS": milliseconds} + + +def describe_single_mmif(mmif_file: Union[str, Path]) -> dict: + """ + Reads a MMIF file and extracts the workflow specification from it. + + This function provides an app-centric summarization of the workflow. The + conceptual hierarchy is that a **workflow** is a sequence of **apps**, + and each **app** execution can produce one or more **views**. This function + groups views that share the same ``app`` and ``metadata.timestamp`` into + a single logical "app execution". + + .. note:: + For MMIF files generated by ``clams-python`` <= 1.3.3, all views + are independently timestamped. This means that even if multiple views + were generated by a single execution of an app, their + ``metadata.timestamp`` values will be unique. As a result, the grouping + logic will treat each view as a separate app execution. The change + that aligns timestamps for views from a single app execution is + implemented in `clams-python PR #271 + `_. + + The output format is a dictionary with the following keys: + + * ``workflowId``: A unique identifier for the workflow, based on the + sequence of app executions (app, version, parameter hashes). App + executions with errors are excluded from this identifier. App + executions with warnings are still considered successful for the purpose of this identifier. + * ``stats``: + * ``appCount``: Total number of identified app executions. + * ``errorViews``: A list of view IDs that reported errors. + * ``warningViews``: A list of view IDs that reported warnings. + * ``emptyViews``: A list of view IDs that contain no annotations. + * ``annotationCountByType``: A dictionary mapping each annotation + type to its count, plus a ``total`` key for the sum of all + annotations across all app executions. + * ``apps``: A list of objects, where each object represents one app + execution. It includes metadata, profiling, and aggregated statistics + for all views generated by that execution. A special entry for views + that could not be assigned to an execution will be at the end of the list. + + --- + The docstring above is used to generate help messages for the CLI command. + Do not remove the triple-dashed lines. + + :param mmif_file: Path to the MMIF file + :return: A dictionary containing the workflow specification. + """ + if not isinstance(mmif_file, (str, Path)): + raise ValueError( + "MMIF file path must be a string or a Path object." + ) + + workflow_id = generate_workflow_identifier(mmif_file) + with open(mmif_file, "r") as f: + mmif_str = f.read() + + mmif = Mmif(mmif_str) + + error_view_ids = [] + warning_view_ids = [] + empty_view_ids = [] + + # Generate the new "apps" list + grouped_apps = [] + processed_view_ids = set() + view_groups = group_views_by_app(mmif.views) + for group in view_groups: + first_view = group[0] + # skip executions with errors or warnings + if first_view.has_error() or first_view.has_warnings(): + continue + + execution_ann_counter = Counter() + for view in group: + if len(view.annotations) == 0: + empty_view_ids.append(view.id) + execution_ann_counter.update(Counter(str(ann.at_type) for ann in view.annotations)) + + execution_view_ids = [v.id for v in group] + processed_view_ids.update(execution_view_ids) + + app_data = { + "app": first_view.metadata.app, + "viewIds": execution_view_ids, + "appConfiguration": first_view.metadata.get("appConfiguration", {}), + "appProfiling": _get_profile_data(first_view), + } + total_annotations_in_exec = sum(execution_ann_counter.values()) + if total_annotations_in_exec > 0: + app_data['annotationCountByType'] = dict(execution_ann_counter) + app_data['annotationCountByType']['total'] = total_annotations_in_exec + grouped_apps.append(app_data) + + # Handle unassigned and problematic views + all_view_ids = set(v.id for v in mmif.views) + + for view in mmif.views: + if view.id not in processed_view_ids: + if view.has_error(): + error_view_ids.append(view.id) + elif view.has_warnings(): + warning_view_ids.append(view.id) + elif len(view.annotations) == 0: + empty_view_ids.append(view.id) + + unassigned_view_ids = all_view_ids - processed_view_ids - set(error_view_ids) - set(warning_view_ids) + + # Store app_count before potentially adding the special entry + app_count = len(grouped_apps) + + if unassigned_view_ids: + grouped_apps.append({ + "app": "http://apps.clams.ai/non-existing-app/v1", + "viewIds": sorted(list(unassigned_view_ids)) + }) + + # aggregate total annotation counts + total_annotations_by_type = Counter() + for execution in grouped_apps: + # Only aggregate from actual apps, not the special unassigned entry + if execution.get('app') != "http://apps.clams.ai/non-existing-app/v1": + if 'annotationCountByType' in execution: + exec_counts = execution['annotationCountByType'].copy() + del exec_counts['total'] + total_annotations_by_type.update(Counter(exec_counts)) + + final_total_annotations = sum(total_annotations_by_type.values()) + final_annotation_counts = dict(total_annotations_by_type) + if final_total_annotations > 0: + final_annotation_counts['total'] = final_total_annotations + + return { + "workflowId": workflow_id, + "stats": { + "appCount": app_count, + "errorViews": error_view_ids, + "warningViews": warning_view_ids, + "emptyViews": empty_view_ids, + "annotationCountByType": final_annotation_counts + }, + "apps": grouped_apps + } + + +def describe_mmif_collection(mmif_dir: Union[str, Path]) -> dict: + """ + Reads all MMIF files in a directory and extracts a summarized workflow specification. + + This function provides an overview of a collection of MMIF files, aggregating + statistics across multiple files. + + The output format is a dictionary with the following keys: + + * ``mmifCountByStatus``: A dictionary summarizing the processing status of + all MMIF files in the collection. It includes: + * ``total``: Total number of MMIF files found. + * ``successful``: Number of MMIF files processed without errors (may contain warnings). + * ``withErrors``: Number of MMIF files containing app executions that reported errors. + * ``withWarnings``: Number of MMIF files containing app executions that reported warnings. + * ``invalid``: Number of files that failed to be parsed as valid MMIF. + * ``workflows``: A list of "workflow" objects found in the "successful" MMIF files (files with errors + are excluded), where each object contains: + * ``workflowId``: The unique identifier for the workflow. + * ``apps``: A list of app objects, each with ``app`` (name+ver identifier), + ``appConfiguration``, and ``appProfiling`` statistics (avg, min, max, stdev running times) + aggregated per workflow. + * ``mmifs``: A list of MMIF file basenames belonging to this workflow. + * ``mmifCount``: The number of MMIF files in this workflow. + * ``annotationCountByType``: A dictionary aggregating annotation counts + across the entire collection. It includes a ``total`` key for the grand + total, plus integer counts for each individual annotation type. + + --- + The docstring above is used to generate help messages for the CLI command. + Do not remove the triple-dashed lines. + + :param mmif_dir: Path to the directory containing MMIF files. + :return: A dictionary containing the summarized collection specification. + """ + import statistics + from collections import defaultdict, Counter + + mmif_files = list(Path(mmif_dir).glob('*.mmif')) + + status_summary = defaultdict(int) + status_summary['total'] = len(mmif_files) + status_summary['successful'] = 0 + status_summary['withErrors'] = 0 + status_summary['withWarnings'] = 0 + status_summary['invalid'] = 0 + + aggregated_counts = Counter() + + workflows_data = defaultdict(lambda: { + 'mmifs': [], + 'apps': defaultdict(lambda: { + 'appConfiguration': None, # Store the first config here + 'execution_times': [] + }) + }) + + for mmif_file in mmif_files: + try: + single_report = describe_single_mmif(mmif_file) + except Exception as e: + status_summary['invalid'] += 1 + continue + + if single_report['stats']['errorViews']: + status_summary['withErrors'] += 1 + continue # Exclude from all other stats + + # If we get here, the MMIF has no errors and is considered "successful" + status_summary['successful'] += 1 + if single_report['stats']['warningViews']: + status_summary['withWarnings'] += 1 + + wf_id = single_report['workflowId'] + workflows_data[wf_id]['mmifs'].append(Path(mmif_file).name) + + # Aggregate annotation counts for successful mmifs + report_counts = single_report['stats'].get('annotationCountByType', {}) + if 'total' in report_counts: + del report_counts['total'] # don't add the sub-total to the main counter + aggregated_counts.update(report_counts) + + for app_exec in single_report.get('apps', []): + app_uri = app_exec.get('app') + # skip the special "unassigned" app + if app_uri and app_uri != "http://apps.clams.ai/non-existing-app/v1": + running_time = app_exec.get('appProfiling', {}).get('runningTimeMS') + if running_time is not None: + workflows_data[wf_id]['apps'][app_uri]['execution_times'].append(running_time) + + # Store the first non-empty app configuration we find for this app in this workflow + if workflows_data[wf_id]['apps'][app_uri]['appConfiguration'] is None: + config = app_exec.get('appConfiguration', {}) + if config: + workflows_data[wf_id]['apps'][app_uri]['appConfiguration'] = config + + # Process collected data into the final output format + final_workflows_list = [] + for wf_id, wf_data in sorted(workflows_data.items()): + workflow_object = { + 'workflowId': wf_id, + 'mmifs': sorted(wf_data['mmifs']), + 'mmifCount': len(wf_data['mmifs']), + 'apps': [] + } + + for app_uri, app_data in sorted(wf_data['apps'].items()): + times = app_data['execution_times'] + if times: + profiling_stats = { + 'avgRunningTimeMS': statistics.mean(times), + 'minRunningTimeMS': min(times), + 'maxRunningTimeMS': max(times), + 'stdevRunningTimeMS': statistics.stdev(times) if len(times) > 1 else 0 + } + else: + profiling_stats = {} + + app_object = { + 'app': app_uri, + 'appConfiguration': app_data['appConfiguration'] or {}, # Default to empty dict + 'appProfiling': profiling_stats + } + workflow_object['apps'].append(app_object) + + final_workflows_list.append(workflow_object) + + # Finalize annotation counts + final_annotation_counts = dict(aggregated_counts) + grand_total = sum(final_annotation_counts.values()) + if grand_total > 0: + final_annotation_counts['total'] = grand_total + + return { + 'mmifCountByStatus': dict(status_summary), + 'workflows': final_workflows_list, + 'annotationCountByType': final_annotation_counts + } \ No newline at end of file diff --git a/tests/test_utils.py b/tests/test_utils.py index 3763f489..0c261fe7 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -1,5 +1,7 @@ import pathlib import unittest +import tempfile +import json import pytest @@ -220,5 +222,66 @@ def test_slice_text(self): full_sliced_text) +class TestWorkflowHelper(unittest.TestCase): + + def setUp(self) -> None: + self.maxDiff = None + self.basic_mmif = Mmif( + '{"metadata": {"mmif": "http://mmif.clams.ai/1.0.0"}, "documents": [{"@type": "http://mmif.clams.ai/vocabulary/VideoDocument/v1", "properties": {"id": "d1", "mime": "video/mp4", "location": "file:///test/video.mp4"}}], "views": []}' + ) + + def create_temp_mmif_file(self, mmif_obj): + """Helper to create a temporary MMIF file.""" + import tempfile + import json + tmp = tempfile.NamedTemporaryFile(mode='w', suffix='.mmif', delete=False) + if isinstance(mmif_obj, Mmif): + content_to_write = mmif_obj.serialize(pretty=False) + else: + content_to_write = json.dumps(mmif_obj) + tmp.write(content_to_write) + tmp.close() + return tmp.name + + def test_split_appname_appversion(self): + from mmif.utils.workflow_helper import _split_appname_appversion + app_name, app_version = _split_appname_appversion("http://apps.clams.ai/test-app/v1.0.0") + self.assertEqual(app_name, "test-app") + self.assertEqual(app_version, "v1.0.0") + + def test_generate_param_hash(self): + from mmif.utils.workflow_helper import generate_param_hash + params = {"param1": "value1", "param2": 42} + hash1 = generate_param_hash(params) + hash2 = generate_param_hash(params) + self.assertEqual(hash1, hash2) + params_reversed = {"param2": 42, "param1": "value1"} + hash3 = generate_param_hash(params_reversed) + self.assertEqual(hash1, hash3) + + def test_generate_workflow_identifier_grouped(self): + from mmif.vocabulary import AnnotationTypes + from mmif.utils import workflow_helper + view1 = self.basic_mmif.new_view() + view1.metadata.app = "http://apps.clams.ai/app1/v1.0.0" + view1.metadata.timestamp = "2024-01-01T12:00:00Z" + view2 = self.basic_mmif.new_view() + view2.metadata.app = "http://apps.clams.ai/app1/v1.0.0" + view2.metadata.timestamp = "2024-01-01T12:00:00Z" + view3 = self.basic_mmif.new_view() + view3.metadata.app = "http://apps.clams.ai/app2/v2.0.0" + view3.metadata.timestamp = "2024-01-01T12:01:00Z" + tmp_file = self.create_temp_mmif_file(self.basic_mmif) + import os + try: + workflow_id = workflow_helper.generate_workflow_identifier(tmp_file) + segments = workflow_id.split('/') + self.assertEqual(len(segments), 7) + self.assertIn('app1', segments[1]) + self.assertIn('app2', segments[4]) + finally: + os.unlink(tmp_file) + + if __name__ == '__main__': unittest.main() diff --git a/tests/test_utils_cli.py b/tests/test_utils_cli.py index 32f4c8bc..fa0f8906 100644 --- a/tests/test_utils_cli.py +++ b/tests/test_utils_cli.py @@ -1,11 +1,14 @@ import contextlib import io +import json import os +import tempfile import unittest.mock import mmif from mmif.utils.cli import rewind from mmif.utils.cli import source +from mmif.utils.cli import describe from mmif.serialize import Mmif from mmif.vocabulary import DocumentTypes, AnnotationTypes @@ -47,9 +50,9 @@ def generate_source_mmif(self): # to suppress output (otherwise, set to stdout by default) args = self.parser.parse_args(self.get_params()) - args.output = open(os.devnull, 'w') - - return source.main(args) + with open(os.devnull, 'w') as devnull: + args.output = devnull + return source.main(args) def test_accept_file_paths(self): self.docs.append("video:/a/b/c.mp4") @@ -136,9 +139,11 @@ def setUp(self): ) @staticmethod - def add_dummy_view(mmif: Mmif, appname: str): + def add_dummy_view(mmif: Mmif, appname: str, timestamp: str = None): v = mmif.new_view() v.metadata.app = appname + if timestamp: + v.metadata.timestamp = timestamp v.new_annotation(AnnotationTypes.Annotation) def test_view_rewind(self): @@ -155,16 +160,147 @@ def test_view_rewind(self): self.assertEqual(len(rewound.views), len(self.mmif_one.views)) def test_app_rewind(self): - # Regular Case - app_one_views = 3 - app_two_views = 2 - for i in range(app_one_views): - self.add_dummy_view(self.mmif_one, 'dummy_app_one') - for j in range(app_two_views): - self.add_dummy_view(self.mmif_one, 'dummy_app_two') - self.assertEqual(len(self.mmif_one.views), app_one_views + app_two_views) + # Create 3 app executions + # App 1 (T1): 2 views + self.add_dummy_view(self.mmif_one, 'dummy_app_one', '2024-01-01T12:00:00Z') + self.add_dummy_view(self.mmif_one, 'dummy_app_one', '2024-01-01T12:00:00Z') + # App 2 (T2): 1 view + self.add_dummy_view(self.mmif_one, 'dummy_app_two', '2024-01-01T12:01:00Z') + # App 3 (T3): 2 views + self.add_dummy_view(self.mmif_one, 'dummy_app_three', '2024-01-01T12:02:00Z') + self.add_dummy_view(self.mmif_one, 'dummy_app_three', '2024-01-01T12:02:00Z') + + self.assertEqual(len(self.mmif_one.views), 5) + + # Rewind 1 app execution (the 'dummy_app_three' execution) rewound = rewind.rewind_mmif(self.mmif_one, 1, choice_is_viewnum=False) - self.assertEqual(len(rewound.views), app_one_views) + + # 5 - 2 = 3 views should remain + self.assertEqual(len(rewound.views), 3) + + # Check that the correct views were removed + remaining_apps = {v.metadata.app for v in rewound.views} + self.assertNotIn('dummy_app_three', remaining_apps) + self.assertIn('dummy_app_one', remaining_apps) + self.assertIn('dummy_app_two', remaining_apps) + + +class TestDescribe(unittest.TestCase): + """Test suite for the describe CLI module.""" + + def setUp(self): + """Create test MMIF structures.""" + self.parser = describe.prep_argparser() + self.maxDiff = None + self.basic_mmif = Mmif( + '{"metadata": {"mmif": "http://mmif.clams.ai/1.0.0"}, "documents": [{"@type": "http://mmif.clams.ai/vocabulary/VideoDocument/v1", "properties": {"id": "d1", "mime": "video/mp4", "location": "file:///test/video.mp4"}}], "views": []}' + ) + + def create_temp_mmif_file(self, mmif_obj): + """Helper to create a temporary MMIF file.""" + tmp = tempfile.NamedTemporaryFile(mode='w', suffix='.mmif', delete=False) + if isinstance(mmif_obj, Mmif): + content_to_write = mmif_obj.serialize(pretty=False) + else: + content_to_write = json.dumps(mmif_obj) + tmp.write(content_to_write) + tmp.close() + return tmp.name + + def test_describe_single_mmif_empty(self): + tmp_file = self.create_temp_mmif_file(self.basic_mmif) + try: + result = mmif.utils.workflow_helper.describe_single_mmif(tmp_file) + self.assertEqual(result["stats"]["appCount"], 0) + self.assertEqual(len(result["apps"]), 0) + self.assertEqual(result["stats"]["annotationCountByType"], {}) + finally: + os.unlink(tmp_file) + + def test_describe_single_mmif_one_app(self): + view = self.basic_mmif.new_view() + view.metadata.app = "http://apps.clams.ai/test-app/v1.0.0" + view.metadata.timestamp = "2024-01-01T12:00:00Z" + view.metadata.appProfiling = {"runningTime": "0:00:01.234"} + view.new_annotation(AnnotationTypes.TimeFrame) + tmp_file = self.create_temp_mmif_file(self.basic_mmif) + try: + result = mmif.utils.workflow_helper.describe_single_mmif(tmp_file) + self.assertEqual(result["stats"]["appCount"], 1) + self.assertEqual(len(result["apps"]), 1) + app_exec = result["apps"][0] + self.assertEqual(app_exec["app"], view.metadata.app) + self.assertEqual(app_exec["viewIds"], [view.id]) + self.assertEqual(app_exec["appProfiling"]["runningTimeMS"], 1234) + finally: + os.unlink(tmp_file) + + def test_describe_single_mmif_one_app_two_views(self): + view1 = self.basic_mmif.new_view() + view1.metadata.app = "http://apps.clams.ai/test-app/v1.0.0" + view1.metadata.timestamp = "2024-01-01T12:00:00Z" + view1.new_annotation(AnnotationTypes.TimeFrame) + view2 = self.basic_mmif.new_view() + view2.metadata.app = "http://apps.clams.ai/test-app/v1.0.0" + view2.metadata.timestamp = "2024-01-01T12:00:00Z" + view2.new_annotation(AnnotationTypes.TimeFrame) + tmp_file = self.create_temp_mmif_file(self.basic_mmif) + try: + result = mmif.utils.workflow_helper.describe_single_mmif(tmp_file) + self.assertEqual(result["stats"]["appCount"], 1) + self.assertEqual(len(result["apps"]), 1) + app_exec = result["apps"][0] + self.assertEqual(app_exec["viewIds"], [view1.id, view2.id]) + finally: + os.unlink(tmp_file) + + def test_describe_single_mmif_error_view(self): + view = self.basic_mmif.new_view() + view.metadata.app = "http://apps.clams.ai/test-app/v1.0.0" + view.metadata.timestamp = "2024-01-01T12:00:00Z" + view.metadata.error = {"message": "Something went wrong"} + tmp_file = self.create_temp_mmif_file(self.basic_mmif) + try: + result = mmif.utils.workflow_helper.describe_single_mmif(tmp_file) + self.assertEqual(result["stats"]["appCount"], 0) + self.assertEqual(len(result["apps"]), 0) + self.assertEqual(len(result["stats"]["errorViews"]), 1) + finally: + os.unlink(tmp_file) + + @unittest.mock.patch('jsonschema.validators.validate') + def test_describe_single_mmif_with_unassigned_views(self, mock_validate): + raw_mmif = json.loads(self.basic_mmif.serialize()) + raw_mmif['views'].append({'id': 'v1', 'metadata': {'app': 'http://apps.clams.ai/app1/v1.0.0', 'timestamp': '2024-01-01T12:00:00Z'}, 'annotations': []}) + raw_mmif['views'].append({'id': 'v2', 'metadata': {'app': 'http://apps.clams.ai/app2/v2.0.0'}, 'annotations': []}) + raw_mmif['views'].append({'id': 'v3', 'metadata': {'timestamp': '2024-01-01T12:01:00Z', 'app': ''}, 'annotations': []}) + tmp_file = self.create_temp_mmif_file(raw_mmif) + try: + result = mmif.utils.workflow_helper.describe_single_mmif(tmp_file) + self.assertEqual(result['stats']['appCount'], 1) + self.assertEqual(len(result['apps']), 2) + special_entry = result['apps'][-1] + self.assertEqual(special_entry['app'], 'http://apps.clams.ai/non-existing-app/v1') + self.assertEqual(len(special_entry['viewIds']), 2) + self.assertIn('v2', special_entry['viewIds']) + self.assertIn('v3', special_entry['viewIds']) + finally: + os.unlink(tmp_file) + + def test_describe_collection_empty(self): + dummy_dir = 'dummy_mmif_collection' + os.makedirs(dummy_dir, exist_ok=True) + try: + output = mmif.utils.workflow_helper.describe_mmif_collection(dummy_dir) + expected = { + 'mmifCountByStatus': {'total': 0, 'successful': 0, 'withErrors': 0, 'withWarnings': 0, 'invalid': 0}, + 'workflows': [], + 'annotationCountByType': {} + } + self.assertEqual(output, expected) + finally: + os.rmdir(dummy_dir) + if __name__ == '__main__': unittest.main()