Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions src/clgraph/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,17 @@
from .agent import AgentResult, LineageAgent, QuestionType
from .diff import ColumnDiff, PipelineDiff

# Import execution functionality
from .execution import PipelineExecutor

# Import export functionality
from .export import CSVExporter, JSONExporter

# Import validation models
from .models import IssueCategory, IssueSeverity, ValidationIssue

# Import orchestrator integrations
from .orchestrators import AirflowOrchestrator, DagsterOrchestrator
from .parser import (
ColumnEdge,
ColumnLineageGraph,
Expand Down Expand Up @@ -177,4 +183,9 @@
"BASIC_TOOLS",
"LLM_TOOLS",
"ALL_TOOLS",
# Orchestrator integrations
"AirflowOrchestrator",
"DagsterOrchestrator",
# Execution
"PipelineExecutor",
]
285 changes: 285 additions & 0 deletions src/clgraph/execution.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,285 @@
"""
Pipeline execution module for clgraph.

Provides synchronous and asynchronous execution of SQL pipelines
with concurrent execution within dependency levels.
"""

import asyncio
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import TYPE_CHECKING, Any, Awaitable, Callable, Dict, List, Tuple

if TYPE_CHECKING:
from .pipeline import Pipeline


class PipelineExecutor:
"""
Executes clgraph pipelines with concurrent execution support.

Provides both synchronous and asynchronous execution modes,
with automatic parallelization within dependency levels.

Example:
from clgraph.execution import PipelineExecutor

executor = PipelineExecutor(pipeline)
result = executor.run(execute_sql, max_workers=4)

# Or async
result = await executor.async_run(async_execute_sql, max_workers=4)
"""

def __init__(self, pipeline: "Pipeline") -> None:
"""
Initialize executor with a Pipeline instance.

Args:
pipeline: The clgraph Pipeline to execute
"""
self.pipeline = pipeline
self.table_graph = pipeline.table_graph

def get_execution_levels(self) -> List[List[str]]:
"""
Group queries into levels for concurrent execution.

Level 0: Queries with no dependencies
Level 1: Queries that depend only on Level 0
Level 2: Queries that depend on Level 0 or 1
etc.

Queries in the same level can run concurrently.

Returns:
List of levels, where each level is a list of query IDs
"""
levels = []
completed = set()

while len(completed) < len(self.table_graph.queries):
current_level = []

for query_id, query in self.table_graph.queries.items():
if query_id in completed:
continue

# Check if all dependencies are completed
dependencies_met = True
for source_table in query.source_tables:
# Find query that creates this table
table_node = self.table_graph.tables.get(source_table)
if table_node and table_node.created_by:
if table_node.created_by not in completed:
dependencies_met = False
break

if dependencies_met:
current_level.append(query_id)

if not current_level:
# No progress - circular dependency
raise RuntimeError("Circular dependency detected in pipeline")

levels.append(current_level)
completed.update(current_level)

return levels

def run(
self,
executor: Callable[[str], None],
max_workers: int = 4,
verbose: bool = True,
) -> Dict[str, Any]:
"""
Execute pipeline synchronously with concurrent execution.

Args:
executor: Function that executes SQL (takes sql string)
max_workers: Max concurrent workers (default: 4)
verbose: Print progress (default: True)

Returns:
dict with execution results: {
"completed": list of completed query IDs,
"failed": list of (query_id, error) tuples,
"elapsed_seconds": total execution time,
"total_queries": total number of queries
}

Example:
def execute_sql(sql: str):
import duckdb
conn = duckdb.connect()
conn.execute(sql)

result = executor.run(execute_sql, max_workers=4)
print(f"Completed {len(result['completed'])} queries")
"""
if verbose:
print(f"🚀 Starting pipeline execution ({len(self.table_graph.queries)} queries)")
print()

# Track completed queries
completed = set()
failed: List[Tuple[str, str]] = []
start_time = time.time()

# Group queries by level for concurrent execution
levels = self.get_execution_levels()

# Execute level by level
for level_num, level_queries in enumerate(levels, 1):
if verbose:
print(f"📊 Level {level_num}: {len(level_queries)} queries")

# Execute queries in this level concurrently
with ThreadPoolExecutor(max_workers=max_workers) as pool:
futures = {}

for query_id in level_queries:
query = self.table_graph.queries[query_id]
future = pool.submit(executor, query.sql)
futures[future] = query_id

# Wait for completion
for future in as_completed(futures):
query_id = futures[future]

try:
future.result()
completed.add(query_id)

if verbose:
print(f" ✅ {query_id}")
except Exception as e:
failed.append((query_id, str(e)))

if verbose:
print(f" ❌ {query_id}: {e}")

if verbose:
print()

elapsed = time.time() - start_time

# Summary
if verbose:
print("=" * 60)
print(f"✅ Pipeline completed in {elapsed:.2f}s")
print(f" Successful: {len(completed)}")
print(f" Failed: {len(failed)}")
if failed:
print("\n⚠️ Failed queries:")
for query_id, error in failed:
print(f" - {query_id}: {error}")
print("=" * 60)

return {
"completed": list(completed),
"failed": failed,
"elapsed_seconds": elapsed,
"total_queries": len(self.table_graph.queries),
}

async def async_run(
self,
executor: Callable[[str], Awaitable[None]],
max_workers: int = 4,
verbose: bool = True,
) -> Dict[str, Any]:
"""
Execute pipeline asynchronously with concurrent execution.

Args:
executor: Async function that executes SQL (takes sql string)
max_workers: Max concurrent workers (controls semaphore, default: 4)
verbose: Print progress (default: True)

Returns:
dict with execution results: {
"completed": list of completed query IDs,
"failed": list of (query_id, error) tuples,
"elapsed_seconds": total execution time,
"total_queries": total number of queries
}

Example:
async def execute_sql(sql: str):
# Your async database connection
await async_conn.execute(sql)

result = await executor.async_run(execute_sql, max_workers=4)
print(f"Completed {len(result['completed'])} queries")
"""
if verbose:
print(f"🚀 Starting async pipeline execution ({len(self.table_graph.queries)} queries)")
print()

# Track completed queries
completed = set()
failed: List[Tuple[str, str]] = []
start_time = time.time()

# Group queries by level for concurrent execution
levels = self.get_execution_levels()

# Create semaphore to limit concurrency
semaphore = asyncio.Semaphore(max_workers)

# Execute level by level
for level_num, level_queries in enumerate(levels, 1):
if verbose:
print(f"📊 Level {level_num}: {len(level_queries)} queries")

async def execute_with_semaphore(query_id: str, sql: str):
"""Execute query with semaphore for concurrency control"""
async with semaphore:
try:
await executor(sql)
completed.add(query_id)
if verbose:
print(f" ✅ {query_id}")
except Exception as e:
failed.append((query_id, str(e)))
if verbose:
print(f" ❌ {query_id}: {e}")

# Execute queries in this level concurrently
tasks = []
for query_id in level_queries:
query = self.table_graph.queries[query_id]
task = execute_with_semaphore(query_id, query.sql)
tasks.append(task)

# Wait for all tasks in this level to complete
await asyncio.gather(*tasks)

if verbose:
print()

elapsed = time.time() - start_time

# Summary
if verbose:
print("=" * 60)
print(f"✅ Pipeline completed in {elapsed:.2f}s")
print(f" Successful: {len(completed)}")
print(f" Failed: {len(failed)}")
if failed:
print("\n⚠️ Failed queries:")
for query_id, error in failed:
print(f" - {query_id}: {error}")
print("=" * 60)

return {
"completed": list(completed),
"failed": failed,
"elapsed_seconds": elapsed,
"total_queries": len(self.table_graph.queries),
}


__all__ = ["PipelineExecutor"]
34 changes: 34 additions & 0 deletions src/clgraph/orchestrators/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
"""
Orchestrator integrations for clgraph.

This package provides integrations with various workflow orchestrators,
allowing clgraph pipelines to be deployed to production environments.

Supported orchestrators:
- Airflow (2.x and 3.x)
- Dagster (1.x)

Example:
from clgraph import Pipeline
from clgraph.orchestrators import AirflowOrchestrator, DagsterOrchestrator

pipeline = Pipeline.from_sql_files("queries/", dialect="bigquery")

# Generate Airflow DAG
airflow = AirflowOrchestrator(pipeline)
dag = airflow.to_dag(executor=execute_sql, dag_id="my_pipeline")

# Generate Dagster assets
dagster = DagsterOrchestrator(pipeline)
assets = dagster.to_assets(executor=execute_sql, group_name="analytics")
"""

from .airflow import AirflowOrchestrator
from .base import BaseOrchestrator
from .dagster import DagsterOrchestrator

__all__ = [
"BaseOrchestrator",
"AirflowOrchestrator",
"DagsterOrchestrator",
]
Loading
Loading