-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathWorkflowRunner.py
More file actions
62 lines (51 loc) · 1.8 KB
/
WorkflowRunner.py
File metadata and controls
62 lines (51 loc) · 1.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
from agent.Orchestrator import Orchestrator
from typing import Any, Dict
import asyncio
async def run_data_quality_analysis(
goal: str,
reports_dir: str = "ge_reports",
max_rounds: int = 20,
enable_console: bool = True
) -> Dict[str, Any]:
"""
Convenience function to run complete data quality analysis.
Args:
goal: Data quality goal to analyze
reports_dir: Directory for storing reports
max_rounds: Maximum conversation rounds per phase
enable_console: Whether to show console output
Returns:
Dictionary with complete workflow results
"""
orchestrator = Orchestrator(
reports_dir=reports_dir,
max_rounds=max_rounds,
enable_console_output=enable_console
)
return await orchestrator.run_analysis(goal)
if __name__ == "__main__":
"""Example usage of the Orchestrator"""
async def main():
# Example data quality goal
goal = "Analyze missing values in the RIDEBOOKING table and assess data quality"
# Run analysis
results = await run_data_quality_analysis(
goal=goal,
reports_dir="ge_reports",
enable_console=True
)
# Print summary
print("\n" + "="*80)
print("WORKFLOW SUMMARY")
print("="*80)
print(f"Goal: {results['goal']}")
print(f"Success: {results['success']}")
if results.get("analysis"):
analysis = results["analysis"]
print(f"\nIssues Found: {len(analysis.issues)}")
print(f"Recommendations: {len(analysis.recommendations)}")
if results.get("report"):
print(f"\nFinal Report: Generated ✓")
print("="*80)
# Run the example
asyncio.run(main())