Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions mmif/utils/cli/describe.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@

from mmif.utils.workflow_helper import generate_workflow_identifier, describe_single_mmif, \
describe_mmif_collection
# gen_param_hash is imported for backward compatibility
from mmif.utils.workflow_helper import generate_param_hash


def get_pipeline_specs(mmif_file: Union[str, Path]):
Expand Down
178 changes: 105 additions & 73 deletions mmif/utils/workflow_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ def _get_profile_data(view) -> dict:
# need to convert to milliseconds integer
time_obj = datetime.datetime.strptime(running_time_str, "%H:%M:%S.%f").time()
milliseconds = (time_obj.hour * 3600 + time_obj.minute * 60 + time_obj.second) * 1000 + time_obj.microsecond // 1000
return {"runningTime": milliseconds}
return {"runningTimeMS": milliseconds}


def describe_single_mmif(mmif_file: Union[str, Path]) -> dict:
Expand All @@ -183,16 +183,16 @@ def describe_single_mmif(mmif_file: Union[str, Path]) -> dict:

* ``workflowId``: A unique identifier for the workflow, based on the
sequence of app executions (app, version, parameter hashes). App
executions with errors or warnings are excluded from this identifier.
executions with errors are excluded from this identifier. App
executions with warnings are still considered successful for the purpose of this identifier.
* ``stats``:
* ``appCount``: Total number of identified app executions.
* ``errorViews``: A list of view IDs that reported errors.
* ``warningViews``: A list of view IDs that reported warnings.
* ``emptyViews``: A list of view IDs that contain no annotations.
* ``annotationCount``: A dictionary with the ``total`` number of
annotations across all app executions.
* ``annotationCountByType``: A dictionary mapping each annotation
type to its ``total`` count across all app executions.
type to its count, plus a ``total`` key for the sum of all
annotations across all app executions.
* ``apps``: A list of objects, where each object represents one app
execution. It includes metadata, profiling, and aggregated statistics
for all views generated by that execution. A special entry for views
Expand Down Expand Up @@ -247,10 +247,8 @@ def describe_single_mmif(mmif_file: Union[str, Path]) -> dict:
}
total_annotations_in_exec = sum(execution_ann_counter.values())
if total_annotations_in_exec > 0:
app_data['annotationCount'] = {'total': total_annotations_in_exec}
app_data['annotationCountByType'] = {
at_type: {'total': count} for at_type, count in execution_ann_counter.items()
}
app_data['annotationCountByType'] = dict(execution_ann_counter)
app_data['annotationCountByType']['total'] = total_annotations_in_exec
grouped_apps.append(app_data)

# Handle unassigned and problematic views
Expand All @@ -277,16 +275,19 @@ def describe_single_mmif(mmif_file: Union[str, Path]) -> dict:
})

# aggregate total annotation counts
total_annotations = 0
total_annotations_by_type = defaultdict(lambda: {'total': 0})
total_annotations_by_type = Counter()
for execution in grouped_apps:
# Only aggregate from actual apps, not the special unassigned entry
if execution.get('app') != "http://apps.clams.ai/non-existing-app/v1":
if 'annotationCount' in execution:
total_annotations += execution['annotationCount']['total']
if 'annotationCountByType' in execution:
for at_type, data in execution['annotationCountByType'].items():
total_annotations_by_type[at_type]['total'] += data['total']
exec_counts = execution['annotationCountByType'].copy()
del exec_counts['total']
total_annotations_by_type.update(Counter(exec_counts))

final_total_annotations = sum(total_annotations_by_type.values())
final_annotation_counts = dict(total_annotations_by_type)
if final_total_annotations > 0:
final_annotation_counts['total'] = final_total_annotations

return {
"workflowId": workflow_id,
Expand All @@ -295,8 +296,7 @@ def describe_single_mmif(mmif_file: Union[str, Path]) -> dict:
"errorViews": error_view_ids,
"warningViews": warning_view_ids,
"emptyViews": empty_view_ids,
"annotationCount": {"total": total_annotations},
"annotationCountByType": dict(total_annotations_by_type)
"annotationCountByType": final_annotation_counts
},
"apps": grouped_apps
}
Expand All @@ -314,25 +314,21 @@ def describe_mmif_collection(mmif_dir: Union[str, Path]) -> dict:
* ``mmifCountByStatus``: A dictionary summarizing the processing status of
all MMIF files in the collection. It includes:
* ``total``: Total number of MMIF files found.
* ``successful``: Number of MMIF files processed without errors or warnings.
* ``successful``: Number of MMIF files processed without errors (may contain warnings).
* ``withErrors``: Number of MMIF files containing app executions that reported errors.
* ``withWarnings``: Number of MMIF files containing app executions that reported warnings.
* ``invalid``: Number of files that failed to be parsed as valid MMIF.
* ``mmifCountByWorkflow``: A dictionary mapping each unique ``workflowId``
(from the single MMIF reports) to the ``count`` of MMIF files that share that workflow.
* ``appProfilings``: A dictionary summarizing the aggregated performance
statistics for each unique app found across the collection. Each entry,
keyed by the app's URI, includes:
* ``avgRunningTimeMS``: Average running time in milliseconds.
* ``minRunningTimeMS``: Minimum running time in milliseconds.
* ``maxRunningTimeMS``: Maximum running time in milliseconds.
* ``stdevRunningTimeMS``: Standard deviation of running times in milliseconds
(0 if only one execution).
* ``workflows``: A list of "workflow" objects found in the "successful" MMIF files (files with errors
are excluded), where each object contains:
* ``workflowId``: The unique identifier for the workflow.
* ``apps``: A list of app objects, each with ``app`` (name+ver identifier),
``appConfiguration``, and ``appProfiling`` statistics (avg, min, max, stdev running times)
aggregated per workflow.
* ``mmifs``: A list of MMIF file basenames belonging to this workflow.
* ``mmifCount``: The number of MMIF files in this workflow.
* ``annotationCountByType``: A dictionary aggregating annotation counts
across the entire collection. It includes:
* ``total``: The grand total number of annotations across all MMIF files.
* Individual entries keyed by annotation type URI, each showing the
``total`` count for that specific annotation type.
across the entire collection. It includes a ``total`` key for the grand
total, plus integer counts for each individual annotation type.

---
The docstring above is used to generate help messages for the CLI command.
Expand All @@ -342,24 +338,28 @@ def describe_mmif_collection(mmif_dir: Union[str, Path]) -> dict:
:return: A dictionary containing the summarized collection specification.
"""
import statistics
from collections import defaultdict
from collections import defaultdict, Counter

mmif_files = Path(mmif_dir).glob('*.mmif')
mmif_files = list(Path(mmif_dir).glob('*.mmif'))

status_summary = defaultdict(int)
status_summary['total'] = 0
status_summary['total'] = len(mmif_files)
status_summary['successful'] = 0
status_summary['withErrors'] = 0
status_summary['withWarnings'] = 0
status_summary['invalid'] = 0

workflow_analysis = defaultdict(lambda: {'count': 0})
app_profilings_raw = defaultdict(list)
annotation_counts = defaultdict(int)
annotation_counts['total'] = 0
aggregated_counts = Counter()

workflows_data = defaultdict(lambda: {
'mmifs': [],
'apps': defaultdict(lambda: {
'appConfiguration': None, # Store the first config here
'execution_times': []
})
})

for mmif_file in mmif_files:
status_summary['total'] += 1
try:
single_report = describe_single_mmif(mmif_file)
except Exception as e:
Expand All @@ -368,43 +368,75 @@ def describe_mmif_collection(mmif_dir: Union[str, Path]) -> dict:

if single_report['stats']['errorViews']:
status_summary['withErrors'] += 1
elif single_report['stats']['warningViews']:
continue # Exclude from all other stats

# If we get here, the MMIF has no errors and is considered "successful"
status_summary['successful'] += 1
if single_report['stats']['warningViews']:
status_summary['withWarnings'] += 1
else:
status_summary['successful'] += 1

# Workflow analysis
wf_id = single_report['workflowId']
workflow_analysis[wf_id]['count'] += 1

# App performance and annotation raw data collection from "apps" list
for app_execution in single_report.get('apps', []):
# app profilings
app = app_execution.get('app')
profiling = app_execution.get('appProfiling', {})
running_time = profiling.get('runningTime')
if app and running_time is not None:
app_profilings_raw[app].append(running_time)

# annotation counts
annotation_counts['total'] += app_execution.get('annotationCount', {}).get('total', 0)
for at_type, data in app_execution.get('annotationCountByType', {}).items():
annotation_counts[at_type] += data.get('total', 0)

# Process app performance data
profiles = {}
for app, execution_times in app_profilings_raw.items():
if execution_times:
profiles[app] = {
'avgRunningTimeMS': statistics.mean(execution_times),
'minRunningTimeMS': min(execution_times),
'maxRunningTimeMS': max(execution_times),
'stdevRunningTimeMS': statistics.stdev(execution_times) if len(execution_times) > 1 else 0
workflows_data[wf_id]['mmifs'].append(Path(mmif_file).name)

# Aggregate annotation counts for successful mmifs
report_counts = single_report['stats'].get('annotationCountByType', {})
if 'total' in report_counts:
del report_counts['total'] # don't add the sub-total to the main counter
aggregated_counts.update(report_counts)

for app_exec in single_report.get('apps', []):
app_uri = app_exec.get('app')
# skip the special "unassigned" app
if app_uri and app_uri != "http://apps.clams.ai/non-existing-app/v1":
running_time = app_exec.get('appProfiling', {}).get('runningTimeMS')
if running_time is not None:
workflows_data[wf_id]['apps'][app_uri]['execution_times'].append(running_time)

# Store the first non-empty app configuration we find for this app in this workflow
if workflows_data[wf_id]['apps'][app_uri]['appConfiguration'] is None:
config = app_exec.get('appConfiguration', {})
if config:
workflows_data[wf_id]['apps'][app_uri]['appConfiguration'] = config

# Process collected data into the final output format
final_workflows_list = []
for wf_id, wf_data in sorted(workflows_data.items()):
workflow_object = {
'workflowId': wf_id,
'mmifs': sorted(wf_data['mmifs']),
'mmifCount': len(wf_data['mmifs']),
'apps': []
}

for app_uri, app_data in sorted(wf_data['apps'].items()):
times = app_data['execution_times']
if times:
profiling_stats = {
'avgRunningTimeMS': statistics.mean(times),
'minRunningTimeMS': min(times),
'maxRunningTimeMS': max(times),
'stdevRunningTimeMS': statistics.stdev(times) if len(times) > 1 else 0
}
else:
profiling_stats = {}

app_object = {
'app': app_uri,
'appConfiguration': app_data['appConfiguration'] or {}, # Default to empty dict
'appProfiling': profiling_stats
}
workflow_object['apps'].append(app_object)

final_workflows_list.append(workflow_object)

# Finalize annotation counts
final_annotation_counts = dict(aggregated_counts)
grand_total = sum(final_annotation_counts.values())
if grand_total > 0:
final_annotation_counts['total'] = grand_total

return {
'mmifCountByStatus': dict(status_summary),
'mmifCountByWorkflow': {k: v for k, v in sorted(workflow_analysis.items())},
'appProfilings': profiles,
'annotationCountByType': dict(annotation_counts)
}
'workflows': final_workflows_list,
'annotationCountByType': final_annotation_counts
}
9 changes: 4 additions & 5 deletions tests/test_utils_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ def test_describe_single_mmif_empty(self):
result = mmif.utils.workflow_helper.describe_single_mmif(tmp_file)
self.assertEqual(result["stats"]["appCount"], 0)
self.assertEqual(len(result["apps"]), 0)
self.assertEqual(result["stats"]["annotationCount"]["total"], 0)
self.assertEqual(result["stats"]["annotationCountByType"], {})
finally:
os.unlink(tmp_file)

Expand All @@ -231,7 +231,7 @@ def test_describe_single_mmif_one_app(self):
app_exec = result["apps"][0]
self.assertEqual(app_exec["app"], view.metadata.app)
self.assertEqual(app_exec["viewIds"], [view.id])
self.assertEqual(app_exec["appProfiling"]["runningTime"], 1234)
self.assertEqual(app_exec["appProfiling"]["runningTimeMS"], 1234)
finally:
os.unlink(tmp_file)

Expand Down Expand Up @@ -294,9 +294,8 @@ def test_describe_collection_empty(self):
output = mmif.utils.workflow_helper.describe_mmif_collection(dummy_dir)
expected = {
'mmifCountByStatus': {'total': 0, 'successful': 0, 'withErrors': 0, 'withWarnings': 0, 'invalid': 0},
'mmifCountByWorkflow': {},
'appProfilings': {},
'annotationCountByType': {'total': 0}
'workflows': [],
'annotationCountByType': {}
}
self.assertEqual(output, expected)
finally:
Expand Down
Loading