Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion src/clgraph/orchestrators/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,11 @@
Supported orchestrators:
- Airflow (2.x and 3.x)
- Dagster (1.x)
- Prefect (2.x and 3.x)

Example:
from clgraph import Pipeline
from clgraph.orchestrators import AirflowOrchestrator, DagsterOrchestrator
from clgraph.orchestrators import AirflowOrchestrator, DagsterOrchestrator, PrefectOrchestrator

pipeline = Pipeline.from_sql_files("queries/", dialect="bigquery")

Expand All @@ -21,14 +22,20 @@
# Generate Dagster assets
dagster = DagsterOrchestrator(pipeline)
assets = dagster.to_assets(executor=execute_sql, group_name="analytics")

# Generate Prefect flow
prefect = PrefectOrchestrator(pipeline)
flow = prefect.to_flow(executor=execute_sql, flow_name="my_pipeline")
"""

from .airflow import AirflowOrchestrator
from .base import BaseOrchestrator
from .dagster import DagsterOrchestrator
from .prefect import PrefectOrchestrator

__all__ = [
"BaseOrchestrator",
"AirflowOrchestrator",
"DagsterOrchestrator",
"PrefectOrchestrator",
]
247 changes: 247 additions & 0 deletions src/clgraph/orchestrators/prefect.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,247 @@
"""
Prefect orchestrator integration for clgraph.

Converts clgraph pipelines to Prefect flows and deployments.
Supports Prefect 2.x and 3.x.
"""

from typing import TYPE_CHECKING, Any, Callable, Dict, List, Optional

from .base import BaseOrchestrator

if TYPE_CHECKING:
pass


class PrefectOrchestrator(BaseOrchestrator):
"""
Converts clgraph pipelines to Prefect flows.

Uses the Prefect @flow and @task decorators which are compatible
across both Prefect 2.x and 3.x versions.

Example:
from clgraph.orchestrators import PrefectOrchestrator

orchestrator = PrefectOrchestrator(pipeline)
flow_fn = orchestrator.to_flow(
executor=execute_sql,
flow_name="my_pipeline",
)

# Run the flow
flow_fn()
"""

def to_flow(
self,
executor: Callable[[str], None],
flow_name: str,
description: Optional[str] = None,
retries: int = 2,
retry_delay_seconds: int = 60,
timeout_seconds: Optional[int] = None,
tags: Optional[List[str]] = None,
**flow_kwargs,
):
"""
Create Prefect Flow from the pipeline.

Converts the pipeline's table dependency graph into a Prefect flow
where each SQL query becomes a task with proper dependencies.

Args:
executor: Function that executes SQL (takes sql string)
flow_name: Name for the Prefect flow
description: Optional flow description (auto-generated if not provided)
retries: Number of retries for failed tasks (default: 2)
retry_delay_seconds: Delay between retries in seconds (default: 60)
timeout_seconds: Optional task timeout in seconds
tags: Optional list of tags for filtering
**flow_kwargs: Additional flow parameters (version, task_runner, etc.)

Returns:
Prefect Flow function

Examples:
# Basic usage
flow_fn = orchestrator.to_flow(
executor=execute_sql,
flow_name="my_pipeline"
)

# Run the flow
flow_fn()

# Advanced usage with all parameters
flow_fn = orchestrator.to_flow(
executor=execute_sql,
flow_name="my_pipeline",
description="Daily analytics pipeline",
retries=3,
retry_delay_seconds=120,
tags=["analytics", "daily"],
)

Note:
- Requires Prefect 2.x or 3.x: pip install 'prefect>=2.0'
- Tasks are automatically wired based on table dependencies
- Use to_deployment() for scheduled execution
"""
try:
from prefect import flow, task
except ImportError as e:
raise ImportError(
"Prefect is required for flow generation. "
"Install it with: pip install 'prefect>=2.0'"
) from e

table_graph = self.table_graph

# Auto-generate description
if description is None:
query_count = len(table_graph.queries)
table_count = len(table_graph.tables)
description = (
f"Pipeline with {query_count} queries operating on "
f"{table_count} tables. Generated by clgraph."
)

# Default tags
task_tags = tags if tags is not None else ["clgraph"]

# Create task factory with closure pattern
def make_task(query_id: str, sql: str, exec_fn: Callable):
task_name = self._sanitize_name(query_id)

@task(
name=task_name,
retries=retries,
retry_delay_seconds=retry_delay_seconds,
timeout_seconds=timeout_seconds,
tags=task_tags,
)
def sql_task():
"""Execute SQL query."""
exec_fn(sql)
return query_id

return sql_task

# Build task callables for each query
task_callables: Dict[str, Any] = {}
for query_id in table_graph.topological_sort():
query = table_graph.queries[query_id]
task_callables[query_id] = make_task(query_id, query.sql, executor)

# Create flow with dependencies
@flow(
name=flow_name,
description=description,
**flow_kwargs,
)
def pipeline_flow():
"""Generated pipeline flow."""
task_futures: Dict[str, Any] = {}

for query_id in table_graph.topological_sort():
query = table_graph.queries[query_id]

# Find upstream dependencies
wait_for = []
for source_table in query.source_tables:
if source_table in table_graph.tables:
table_node = table_graph.tables[source_table]
if table_node.created_by and table_node.created_by in task_futures:
wait_for.append(task_futures[table_node.created_by])

# Submit task with dependencies
if wait_for:
task_futures[query_id] = task_callables[query_id].submit(wait_for=wait_for)
else:
task_futures[query_id] = task_callables[query_id].submit()

# Wait for all tasks to complete and collect results
results = {qid: future.result() for qid, future in task_futures.items()}
return results

return pipeline_flow

def to_deployment(
self,
executor: Callable[[str], None],
flow_name: str,
deployment_name: str,
cron: Optional[str] = None,
interval_seconds: Optional[int] = None,
work_pool_name: Optional[str] = None,
tags: Optional[List[str]] = None,
**kwargs,
):
"""
Create Prefect Deployment from the pipeline for scheduled execution.

Args:
executor: Function that executes SQL (takes sql string)
flow_name: Name for the Prefect flow
deployment_name: Name for the deployment
cron: Cron schedule (e.g., "0 0 * * *" for daily at midnight)
interval_seconds: Interval schedule in seconds (alternative to cron)
work_pool_name: Work pool to use for execution
tags: Optional list of tags for the deployment
**kwargs: Additional parameters passed to to_flow()

Returns:
Prefect RunnerDeployment instance (Prefect 3.x)

Examples:
# Create deployment with cron schedule
deployment = orchestrator.to_deployment(
executor=execute_sql,
flow_name="my_pipeline",
deployment_name="daily_run",
cron="0 0 * * *", # Daily at midnight
)

# For Prefect 3.x, use serve() to run locally or deploy() to push to server
# deployment.apply() for Prefect 2.x compatibility

Note:
- Requires Prefect 2.x or 3.x: pip install 'prefect>=2.0'
- Prefect 3.x: Returns RunnerDeployment, use with serve() or deploy()
- Use work_pool_name to specify execution environment
"""
try:
from prefect import flow as flow_decorator # noqa: F401
except ImportError as e:
raise ImportError(
"Prefect is required for deployment generation. "
"Install it with: pip install 'prefect>=2.0'"
) from e

# Create flow
flow_fn = self.to_flow(executor=executor, flow_name=flow_name, **kwargs)

# Build deployment configuration for Prefect 3.x using flow.to_deployment()
deployment_config: Dict[str, Any] = {
"name": deployment_name,
}

if cron:
deployment_config["cron"] = cron
elif interval_seconds:
deployment_config["interval"] = interval_seconds

if work_pool_name:
deployment_config["work_pool_name"] = work_pool_name

if tags:
deployment_config["tags"] = tags

# Use flow.to_deployment() for Prefect 3.x
deployment = flow_fn.to_deployment(**deployment_config)

return deployment


__all__ = ["PrefectOrchestrator"]
Loading
Loading