From 7a804f568c630d557a2f3c139fbb987e3fe12392 Mon Sep 17 00:00:00 2001 From: Keigh Rim Date: Fri, 28 Nov 2025 11:58:53 -0500 Subject: [PATCH] refined mmif describers for better data presentation --- mmif/utils/cli/describe.py | 2 + mmif/utils/workflow_helper.py | 178 ++++++++++++++++++++-------------- tests/test_utils_cli.py | 9 +- 3 files changed, 111 insertions(+), 78 deletions(-) diff --git a/mmif/utils/cli/describe.py b/mmif/utils/cli/describe.py index c42a98cc..eaf35856 100644 --- a/mmif/utils/cli/describe.py +++ b/mmif/utils/cli/describe.py @@ -7,6 +7,8 @@ from mmif.utils.workflow_helper import generate_workflow_identifier, describe_single_mmif, \ describe_mmif_collection +# gen_param_hash is imported for backward compatibility +from mmif.utils.workflow_helper import generate_param_hash def get_pipeline_specs(mmif_file: Union[str, Path]): diff --git a/mmif/utils/workflow_helper.py b/mmif/utils/workflow_helper.py index 88d6f26f..013176a1 100644 --- a/mmif/utils/workflow_helper.py +++ b/mmif/utils/workflow_helper.py @@ -156,7 +156,7 @@ def _get_profile_data(view) -> dict: # need to convert to milliseconds integer time_obj = datetime.datetime.strptime(running_time_str, "%H:%M:%S.%f").time() milliseconds = (time_obj.hour * 3600 + time_obj.minute * 60 + time_obj.second) * 1000 + time_obj.microsecond // 1000 - return {"runningTime": milliseconds} + return {"runningTimeMS": milliseconds} def describe_single_mmif(mmif_file: Union[str, Path]) -> dict: @@ -183,16 +183,16 @@ def describe_single_mmif(mmif_file: Union[str, Path]) -> dict: * ``workflowId``: A unique identifier for the workflow, based on the sequence of app executions (app, version, parameter hashes). App - executions with errors or warnings are excluded from this identifier. + executions with errors are excluded from this identifier. App + executions with warnings are still considered successful for the purpose of this identifier. * ``stats``: * ``appCount``: Total number of identified app executions. * ``errorViews``: A list of view IDs that reported errors. * ``warningViews``: A list of view IDs that reported warnings. * ``emptyViews``: A list of view IDs that contain no annotations. - * ``annotationCount``: A dictionary with the ``total`` number of - annotations across all app executions. * ``annotationCountByType``: A dictionary mapping each annotation - type to its ``total`` count across all app executions. + type to its count, plus a ``total`` key for the sum of all + annotations across all app executions. * ``apps``: A list of objects, where each object represents one app execution. It includes metadata, profiling, and aggregated statistics for all views generated by that execution. A special entry for views @@ -247,10 +247,8 @@ def describe_single_mmif(mmif_file: Union[str, Path]) -> dict: } total_annotations_in_exec = sum(execution_ann_counter.values()) if total_annotations_in_exec > 0: - app_data['annotationCount'] = {'total': total_annotations_in_exec} - app_data['annotationCountByType'] = { - at_type: {'total': count} for at_type, count in execution_ann_counter.items() - } + app_data['annotationCountByType'] = dict(execution_ann_counter) + app_data['annotationCountByType']['total'] = total_annotations_in_exec grouped_apps.append(app_data) # Handle unassigned and problematic views @@ -277,16 +275,19 @@ def describe_single_mmif(mmif_file: Union[str, Path]) -> dict: }) # aggregate total annotation counts - total_annotations = 0 - total_annotations_by_type = defaultdict(lambda: {'total': 0}) + total_annotations_by_type = Counter() for execution in grouped_apps: # Only aggregate from actual apps, not the special unassigned entry if execution.get('app') != "http://apps.clams.ai/non-existing-app/v1": - if 'annotationCount' in execution: - total_annotations += execution['annotationCount']['total'] if 'annotationCountByType' in execution: - for at_type, data in execution['annotationCountByType'].items(): - total_annotations_by_type[at_type]['total'] += data['total'] + exec_counts = execution['annotationCountByType'].copy() + del exec_counts['total'] + total_annotations_by_type.update(Counter(exec_counts)) + + final_total_annotations = sum(total_annotations_by_type.values()) + final_annotation_counts = dict(total_annotations_by_type) + if final_total_annotations > 0: + final_annotation_counts['total'] = final_total_annotations return { "workflowId": workflow_id, @@ -295,8 +296,7 @@ def describe_single_mmif(mmif_file: Union[str, Path]) -> dict: "errorViews": error_view_ids, "warningViews": warning_view_ids, "emptyViews": empty_view_ids, - "annotationCount": {"total": total_annotations}, - "annotationCountByType": dict(total_annotations_by_type) + "annotationCountByType": final_annotation_counts }, "apps": grouped_apps } @@ -314,25 +314,21 @@ def describe_mmif_collection(mmif_dir: Union[str, Path]) -> dict: * ``mmifCountByStatus``: A dictionary summarizing the processing status of all MMIF files in the collection. It includes: * ``total``: Total number of MMIF files found. - * ``successful``: Number of MMIF files processed without errors or warnings. + * ``successful``: Number of MMIF files processed without errors (may contain warnings). * ``withErrors``: Number of MMIF files containing app executions that reported errors. * ``withWarnings``: Number of MMIF files containing app executions that reported warnings. * ``invalid``: Number of files that failed to be parsed as valid MMIF. - * ``mmifCountByWorkflow``: A dictionary mapping each unique ``workflowId`` - (from the single MMIF reports) to the ``count`` of MMIF files that share that workflow. - * ``appProfilings``: A dictionary summarizing the aggregated performance - statistics for each unique app found across the collection. Each entry, - keyed by the app's URI, includes: - * ``avgRunningTimeMS``: Average running time in milliseconds. - * ``minRunningTimeMS``: Minimum running time in milliseconds. - * ``maxRunningTimeMS``: Maximum running time in milliseconds. - * ``stdevRunningTimeMS``: Standard deviation of running times in milliseconds - (0 if only one execution). + * ``workflows``: A list of "workflow" objects found in the "successful" MMIF files (files with errors + are excluded), where each object contains: + * ``workflowId``: The unique identifier for the workflow. + * ``apps``: A list of app objects, each with ``app`` (name+ver identifier), + ``appConfiguration``, and ``appProfiling`` statistics (avg, min, max, stdev running times) + aggregated per workflow. + * ``mmifs``: A list of MMIF file basenames belonging to this workflow. + * ``mmifCount``: The number of MMIF files in this workflow. * ``annotationCountByType``: A dictionary aggregating annotation counts - across the entire collection. It includes: - * ``total``: The grand total number of annotations across all MMIF files. - * Individual entries keyed by annotation type URI, each showing the - ``total`` count for that specific annotation type. + across the entire collection. It includes a ``total`` key for the grand + total, plus integer counts for each individual annotation type. --- The docstring above is used to generate help messages for the CLI command. @@ -342,24 +338,28 @@ def describe_mmif_collection(mmif_dir: Union[str, Path]) -> dict: :return: A dictionary containing the summarized collection specification. """ import statistics - from collections import defaultdict + from collections import defaultdict, Counter - mmif_files = Path(mmif_dir).glob('*.mmif') + mmif_files = list(Path(mmif_dir).glob('*.mmif')) status_summary = defaultdict(int) - status_summary['total'] = 0 + status_summary['total'] = len(mmif_files) status_summary['successful'] = 0 status_summary['withErrors'] = 0 status_summary['withWarnings'] = 0 status_summary['invalid'] = 0 - workflow_analysis = defaultdict(lambda: {'count': 0}) - app_profilings_raw = defaultdict(list) - annotation_counts = defaultdict(int) - annotation_counts['total'] = 0 + aggregated_counts = Counter() + + workflows_data = defaultdict(lambda: { + 'mmifs': [], + 'apps': defaultdict(lambda: { + 'appConfiguration': None, # Store the first config here + 'execution_times': [] + }) + }) for mmif_file in mmif_files: - status_summary['total'] += 1 try: single_report = describe_single_mmif(mmif_file) except Exception as e: @@ -368,43 +368,75 @@ def describe_mmif_collection(mmif_dir: Union[str, Path]) -> dict: if single_report['stats']['errorViews']: status_summary['withErrors'] += 1 - elif single_report['stats']['warningViews']: + continue # Exclude from all other stats + + # If we get here, the MMIF has no errors and is considered "successful" + status_summary['successful'] += 1 + if single_report['stats']['warningViews']: status_summary['withWarnings'] += 1 - else: - status_summary['successful'] += 1 - # Workflow analysis wf_id = single_report['workflowId'] - workflow_analysis[wf_id]['count'] += 1 - - # App performance and annotation raw data collection from "apps" list - for app_execution in single_report.get('apps', []): - # app profilings - app = app_execution.get('app') - profiling = app_execution.get('appProfiling', {}) - running_time = profiling.get('runningTime') - if app and running_time is not None: - app_profilings_raw[app].append(running_time) - - # annotation counts - annotation_counts['total'] += app_execution.get('annotationCount', {}).get('total', 0) - for at_type, data in app_execution.get('annotationCountByType', {}).items(): - annotation_counts[at_type] += data.get('total', 0) - - # Process app performance data - profiles = {} - for app, execution_times in app_profilings_raw.items(): - if execution_times: - profiles[app] = { - 'avgRunningTimeMS': statistics.mean(execution_times), - 'minRunningTimeMS': min(execution_times), - 'maxRunningTimeMS': max(execution_times), - 'stdevRunningTimeMS': statistics.stdev(execution_times) if len(execution_times) > 1 else 0 + workflows_data[wf_id]['mmifs'].append(Path(mmif_file).name) + + # Aggregate annotation counts for successful mmifs + report_counts = single_report['stats'].get('annotationCountByType', {}) + if 'total' in report_counts: + del report_counts['total'] # don't add the sub-total to the main counter + aggregated_counts.update(report_counts) + + for app_exec in single_report.get('apps', []): + app_uri = app_exec.get('app') + # skip the special "unassigned" app + if app_uri and app_uri != "http://apps.clams.ai/non-existing-app/v1": + running_time = app_exec.get('appProfiling', {}).get('runningTimeMS') + if running_time is not None: + workflows_data[wf_id]['apps'][app_uri]['execution_times'].append(running_time) + + # Store the first non-empty app configuration we find for this app in this workflow + if workflows_data[wf_id]['apps'][app_uri]['appConfiguration'] is None: + config = app_exec.get('appConfiguration', {}) + if config: + workflows_data[wf_id]['apps'][app_uri]['appConfiguration'] = config + + # Process collected data into the final output format + final_workflows_list = [] + for wf_id, wf_data in sorted(workflows_data.items()): + workflow_object = { + 'workflowId': wf_id, + 'mmifs': sorted(wf_data['mmifs']), + 'mmifCount': len(wf_data['mmifs']), + 'apps': [] + } + + for app_uri, app_data in sorted(wf_data['apps'].items()): + times = app_data['execution_times'] + if times: + profiling_stats = { + 'avgRunningTimeMS': statistics.mean(times), + 'minRunningTimeMS': min(times), + 'maxRunningTimeMS': max(times), + 'stdevRunningTimeMS': statistics.stdev(times) if len(times) > 1 else 0 + } + else: + profiling_stats = {} + + app_object = { + 'app': app_uri, + 'appConfiguration': app_data['appConfiguration'] or {}, # Default to empty dict + 'appProfiling': profiling_stats } + workflow_object['apps'].append(app_object) + + final_workflows_list.append(workflow_object) + + # Finalize annotation counts + final_annotation_counts = dict(aggregated_counts) + grand_total = sum(final_annotation_counts.values()) + if grand_total > 0: + final_annotation_counts['total'] = grand_total return { 'mmifCountByStatus': dict(status_summary), - 'mmifCountByWorkflow': {k: v for k, v in sorted(workflow_analysis.items())}, - 'appProfilings': profiles, - 'annotationCountByType': dict(annotation_counts) - } + 'workflows': final_workflows_list, + 'annotationCountByType': final_annotation_counts + } \ No newline at end of file diff --git a/tests/test_utils_cli.py b/tests/test_utils_cli.py index 572ce44a..fa0f8906 100644 --- a/tests/test_utils_cli.py +++ b/tests/test_utils_cli.py @@ -213,7 +213,7 @@ def test_describe_single_mmif_empty(self): result = mmif.utils.workflow_helper.describe_single_mmif(tmp_file) self.assertEqual(result["stats"]["appCount"], 0) self.assertEqual(len(result["apps"]), 0) - self.assertEqual(result["stats"]["annotationCount"]["total"], 0) + self.assertEqual(result["stats"]["annotationCountByType"], {}) finally: os.unlink(tmp_file) @@ -231,7 +231,7 @@ def test_describe_single_mmif_one_app(self): app_exec = result["apps"][0] self.assertEqual(app_exec["app"], view.metadata.app) self.assertEqual(app_exec["viewIds"], [view.id]) - self.assertEqual(app_exec["appProfiling"]["runningTime"], 1234) + self.assertEqual(app_exec["appProfiling"]["runningTimeMS"], 1234) finally: os.unlink(tmp_file) @@ -294,9 +294,8 @@ def test_describe_collection_empty(self): output = mmif.utils.workflow_helper.describe_mmif_collection(dummy_dir) expected = { 'mmifCountByStatus': {'total': 0, 'successful': 0, 'withErrors': 0, 'withWarnings': 0, 'invalid': 0}, - 'mmifCountByWorkflow': {}, - 'appProfilings': {}, - 'annotationCountByType': {'total': 0} + 'workflows': [], + 'annotationCountByType': {} } self.assertEqual(output, expected) finally: