Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 38 additions & 25 deletions nipyapi/ci/verify_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,18 +87,17 @@ def _verify_controllers(process_group_id: str) -> List[Dict[str, Any]]:


def _verify_processors(process_group_id: str) -> List[Dict[str, Any]]:
"""Verify all processors directly in a process group."""
"""Verify all processors in a process group and its descendants."""
processors = nipyapi.canvas.list_all_processors(process_group_id)
# Filter to only processors directly in this PG (not descendants)
processors = [p for p in processors if p.component.parent_group_id == process_group_id]
log.debug("Found %d processors in PG", len(processors))
log.debug("Found %d processors in PG and descendants", len(processors))
return [_verify_single_processor(p) for p in processors]


def verify_config(
process_group_id: Optional[str] = None,
verify_controllers: bool = True,
verify_processors: bool = True,
only_failures: bool = False,
) -> dict:
"""
Verify configuration of all components in a process group.
Expand All @@ -113,10 +112,15 @@ def verify_config(
process_group_id: ID of the process group. Env: NIFI_PROCESS_GROUP_ID
verify_controllers: Verify controller services (default: True)
verify_processors: Verify processors (default: True)
only_failures: Only include failed components in results (default: False).
When True, controller_results and processor_results contain only
items with success=False. Reduces output size for large process groups.

Returns:
dict with keys: verified ("true"/"false"), failed_count,
controller_results, processor_results, summary, and process_group_name.
When only_failures=True, also includes controllers_checked and
processors_checked counts.
Caller should check verified or failed_count to determine next steps.

Raises:
Expand All @@ -127,6 +131,9 @@ def verify_config(
# CLI usage
nipyapi ci verify_config --process-group-id <pg-id>

# Only show failures (cleaner output for large flows)
nipyapi ci verify_config --process-group-id <pg-id> --only_failures

# Programmatic usage
result = nipyapi.ci.verify_config(process_group_id)
if result["verified"] == "true":
Expand All @@ -143,49 +150,55 @@ def verify_config(
process_group = nipyapi.canvas.get_process_group(process_group_id, "id")
if not process_group:
raise ValueError(f"Process group not found: {process_group_id}")

pg_name = process_group.component.name
log.debug("Found process group: %s", pg_name)
log.debug("Found process group: %s", process_group.component.name)

# Verify components
controller_results = _verify_controllers(process_group_id) if verify_controllers else []
processor_results = _verify_processors(process_group_id) if verify_processors else []
all_results = controller_results + processor_results

# Count failures (results with success=False, excluding skipped)
failed_count = sum(
1 for r in controller_results + processor_results if r.get("success") is False
)

# Log failures
for r in controller_results + processor_results:
# Count and log failures (results with success=False, excluding skipped)
failed_count = sum(1 for r in all_results if r.get("success") is False)
for r in all_results:
if r.get("success") is False:
log.warning("%s verification FAILED", r["name"])

# Build summary
total_verified = len(controller_results) + len(processor_results)
passed = total_verified - failed_count
total_verified = len(all_results)
summary = (
f"All {total_verified} components passed verification"
if failed_count == 0
else f"{passed}/{total_verified} passed, {failed_count} failed"
else f"{total_verified - failed_count}/{total_verified} passed, {failed_count} failed"
)

log.info("Verification complete: %s", summary)

# Build result with optional filtering
result = {
"verified": "true" if failed_count == 0 else "false",
"failed_count": failed_count,
"controller_results": controller_results,
"processor_results": processor_results,
"controller_results": (
[r for r in controller_results if r.get("success") is False]
if only_failures
else controller_results
),
"processor_results": (
[r for r in processor_results if r.get("success") is False]
if only_failures
else processor_results
),
"summary": summary,
"process_group_name": pg_name,
"process_group_name": process_group.component.name,
}

# Add checked counts when filtering (so caller knows the scope)
if only_failures:
result["controllers_checked"] = len(controller_results)
result["processors_checked"] = len(processor_results)

# Add error key for CLI exit code detection when verification fails
if failed_count > 0:
failed_names = [
r["name"] for r in controller_results + processor_results if r.get("success") is False
]
result["error"] = f"Verification failed for: {', '.join(failed_names)}"
result["error"] = "Verification failed for: {}".format(
", ".join(r["name"] for r in all_results if r.get("success") is False)
)

return result
Loading