diff --git a/src/clgraph/orchestrators/__init__.py b/src/clgraph/orchestrators/__init__.py index edf0311..32cb64c 100644 --- a/src/clgraph/orchestrators/__init__.py +++ b/src/clgraph/orchestrators/__init__.py @@ -7,10 +7,11 @@ Supported orchestrators: - Airflow (2.x and 3.x) - Dagster (1.x) +- Prefect (2.x and 3.x) Example: from clgraph import Pipeline - from clgraph.orchestrators import AirflowOrchestrator, DagsterOrchestrator + from clgraph.orchestrators import AirflowOrchestrator, DagsterOrchestrator, PrefectOrchestrator pipeline = Pipeline.from_sql_files("queries/", dialect="bigquery") @@ -21,14 +22,20 @@ # Generate Dagster assets dagster = DagsterOrchestrator(pipeline) assets = dagster.to_assets(executor=execute_sql, group_name="analytics") + + # Generate Prefect flow + prefect = PrefectOrchestrator(pipeline) + flow = prefect.to_flow(executor=execute_sql, flow_name="my_pipeline") """ from .airflow import AirflowOrchestrator from .base import BaseOrchestrator from .dagster import DagsterOrchestrator +from .prefect import PrefectOrchestrator __all__ = [ "BaseOrchestrator", "AirflowOrchestrator", "DagsterOrchestrator", + "PrefectOrchestrator", ] diff --git a/src/clgraph/orchestrators/prefect.py b/src/clgraph/orchestrators/prefect.py new file mode 100644 index 0000000..d75f4ab --- /dev/null +++ b/src/clgraph/orchestrators/prefect.py @@ -0,0 +1,247 @@ +""" +Prefect orchestrator integration for clgraph. + +Converts clgraph pipelines to Prefect flows and deployments. +Supports Prefect 2.x and 3.x. +""" + +from typing import TYPE_CHECKING, Any, Callable, Dict, List, Optional + +from .base import BaseOrchestrator + +if TYPE_CHECKING: + pass + + +class PrefectOrchestrator(BaseOrchestrator): + """ + Converts clgraph pipelines to Prefect flows. + + Uses the Prefect @flow and @task decorators which are compatible + across both Prefect 2.x and 3.x versions. + + Example: + from clgraph.orchestrators import PrefectOrchestrator + + orchestrator = PrefectOrchestrator(pipeline) + flow_fn = orchestrator.to_flow( + executor=execute_sql, + flow_name="my_pipeline", + ) + + # Run the flow + flow_fn() + """ + + def to_flow( + self, + executor: Callable[[str], None], + flow_name: str, + description: Optional[str] = None, + retries: int = 2, + retry_delay_seconds: int = 60, + timeout_seconds: Optional[int] = None, + tags: Optional[List[str]] = None, + **flow_kwargs, + ): + """ + Create Prefect Flow from the pipeline. + + Converts the pipeline's table dependency graph into a Prefect flow + where each SQL query becomes a task with proper dependencies. + + Args: + executor: Function that executes SQL (takes sql string) + flow_name: Name for the Prefect flow + description: Optional flow description (auto-generated if not provided) + retries: Number of retries for failed tasks (default: 2) + retry_delay_seconds: Delay between retries in seconds (default: 60) + timeout_seconds: Optional task timeout in seconds + tags: Optional list of tags for filtering + **flow_kwargs: Additional flow parameters (version, task_runner, etc.) + + Returns: + Prefect Flow function + + Examples: + # Basic usage + flow_fn = orchestrator.to_flow( + executor=execute_sql, + flow_name="my_pipeline" + ) + + # Run the flow + flow_fn() + + # Advanced usage with all parameters + flow_fn = orchestrator.to_flow( + executor=execute_sql, + flow_name="my_pipeline", + description="Daily analytics pipeline", + retries=3, + retry_delay_seconds=120, + tags=["analytics", "daily"], + ) + + Note: + - Requires Prefect 2.x or 3.x: pip install 'prefect>=2.0' + - Tasks are automatically wired based on table dependencies + - Use to_deployment() for scheduled execution + """ + try: + from prefect import flow, task + except ImportError as e: + raise ImportError( + "Prefect is required for flow generation. " + "Install it with: pip install 'prefect>=2.0'" + ) from e + + table_graph = self.table_graph + + # Auto-generate description + if description is None: + query_count = len(table_graph.queries) + table_count = len(table_graph.tables) + description = ( + f"Pipeline with {query_count} queries operating on " + f"{table_count} tables. Generated by clgraph." + ) + + # Default tags + task_tags = tags if tags is not None else ["clgraph"] + + # Create task factory with closure pattern + def make_task(query_id: str, sql: str, exec_fn: Callable): + task_name = self._sanitize_name(query_id) + + @task( + name=task_name, + retries=retries, + retry_delay_seconds=retry_delay_seconds, + timeout_seconds=timeout_seconds, + tags=task_tags, + ) + def sql_task(): + """Execute SQL query.""" + exec_fn(sql) + return query_id + + return sql_task + + # Build task callables for each query + task_callables: Dict[str, Any] = {} + for query_id in table_graph.topological_sort(): + query = table_graph.queries[query_id] + task_callables[query_id] = make_task(query_id, query.sql, executor) + + # Create flow with dependencies + @flow( + name=flow_name, + description=description, + **flow_kwargs, + ) + def pipeline_flow(): + """Generated pipeline flow.""" + task_futures: Dict[str, Any] = {} + + for query_id in table_graph.topological_sort(): + query = table_graph.queries[query_id] + + # Find upstream dependencies + wait_for = [] + for source_table in query.source_tables: + if source_table in table_graph.tables: + table_node = table_graph.tables[source_table] + if table_node.created_by and table_node.created_by in task_futures: + wait_for.append(task_futures[table_node.created_by]) + + # Submit task with dependencies + if wait_for: + task_futures[query_id] = task_callables[query_id].submit(wait_for=wait_for) + else: + task_futures[query_id] = task_callables[query_id].submit() + + # Wait for all tasks to complete and collect results + results = {qid: future.result() for qid, future in task_futures.items()} + return results + + return pipeline_flow + + def to_deployment( + self, + executor: Callable[[str], None], + flow_name: str, + deployment_name: str, + cron: Optional[str] = None, + interval_seconds: Optional[int] = None, + work_pool_name: Optional[str] = None, + tags: Optional[List[str]] = None, + **kwargs, + ): + """ + Create Prefect Deployment from the pipeline for scheduled execution. + + Args: + executor: Function that executes SQL (takes sql string) + flow_name: Name for the Prefect flow + deployment_name: Name for the deployment + cron: Cron schedule (e.g., "0 0 * * *" for daily at midnight) + interval_seconds: Interval schedule in seconds (alternative to cron) + work_pool_name: Work pool to use for execution + tags: Optional list of tags for the deployment + **kwargs: Additional parameters passed to to_flow() + + Returns: + Prefect RunnerDeployment instance (Prefect 3.x) + + Examples: + # Create deployment with cron schedule + deployment = orchestrator.to_deployment( + executor=execute_sql, + flow_name="my_pipeline", + deployment_name="daily_run", + cron="0 0 * * *", # Daily at midnight + ) + + # For Prefect 3.x, use serve() to run locally or deploy() to push to server + # deployment.apply() for Prefect 2.x compatibility + + Note: + - Requires Prefect 2.x or 3.x: pip install 'prefect>=2.0' + - Prefect 3.x: Returns RunnerDeployment, use with serve() or deploy() + - Use work_pool_name to specify execution environment + """ + try: + from prefect import flow as flow_decorator # noqa: F401 + except ImportError as e: + raise ImportError( + "Prefect is required for deployment generation. " + "Install it with: pip install 'prefect>=2.0'" + ) from e + + # Create flow + flow_fn = self.to_flow(executor=executor, flow_name=flow_name, **kwargs) + + # Build deployment configuration for Prefect 3.x using flow.to_deployment() + deployment_config: Dict[str, Any] = { + "name": deployment_name, + } + + if cron: + deployment_config["cron"] = cron + elif interval_seconds: + deployment_config["interval"] = interval_seconds + + if work_pool_name: + deployment_config["work_pool_name"] = work_pool_name + + if tags: + deployment_config["tags"] = tags + + # Use flow.to_deployment() for Prefect 3.x + deployment = flow_fn.to_deployment(**deployment_config) + + return deployment + + +__all__ = ["PrefectOrchestrator"] diff --git a/src/clgraph/pipeline.py b/src/clgraph/pipeline.py index eb1e926..42bc559 100644 --- a/src/clgraph/pipeline.py +++ b/src/clgraph/pipeline.py @@ -2371,6 +2371,136 @@ def to_dagster_job( **job_kwargs, ) + # ======================================================================== + # Orchestrator Methods - Prefect + # ======================================================================== + + def to_prefect_flow( + self, + executor: Callable[[str], None], + flow_name: str, + description: Optional[str] = None, + retries: int = 2, + retry_delay_seconds: int = 60, + timeout_seconds: Optional[int] = None, + tags: Optional[List[str]] = None, + **flow_kwargs, + ): + """ + Create Prefect Flow from this pipeline. + + Converts the pipeline's table dependency graph into a Prefect flow + where each SQL query becomes a task with proper dependencies. + + Args: + executor: Function that executes SQL (takes sql string) + flow_name: Name for the Prefect flow + description: Optional flow description (auto-generated if not provided) + retries: Number of retries for failed tasks (default: 2) + retry_delay_seconds: Delay between retries in seconds (default: 60) + timeout_seconds: Optional task timeout in seconds + tags: Optional list of tags for filtering + **flow_kwargs: Additional flow parameters (version, task_runner, etc.) + + Returns: + Prefect Flow function + + Examples: + # Basic usage + def execute_sql(sql: str): + from clickhouse_driver import Client + Client('localhost').execute(sql) + + flow_fn = pipeline.to_prefect_flow( + executor=execute_sql, + flow_name="my_pipeline" + ) + + # Run the flow + flow_fn() + + # Advanced usage with all parameters + flow_fn = pipeline.to_prefect_flow( + executor=execute_sql, + flow_name="my_pipeline", + description="Daily analytics pipeline", + retries=3, + retry_delay_seconds=120, + tags=["analytics", "daily"], + ) + + Note: + - Requires Prefect 2.x or 3.x: pip install 'prefect>=2.0' + - Tasks are automatically wired based on table dependencies + - Use to_prefect_deployment() for scheduled execution + """ + from .orchestrators import PrefectOrchestrator + + return PrefectOrchestrator(self).to_flow( + executor=executor, + flow_name=flow_name, + description=description, + retries=retries, + retry_delay_seconds=retry_delay_seconds, + timeout_seconds=timeout_seconds, + tags=tags, + **flow_kwargs, + ) + + def to_prefect_deployment( + self, + executor: Callable[[str], None], + flow_name: str, + deployment_name: str, + cron: Optional[str] = None, + interval_seconds: Optional[int] = None, + work_pool_name: Optional[str] = None, + **kwargs, + ): + """ + Create Prefect Deployment from this pipeline for scheduled execution. + + Args: + executor: Function that executes SQL (takes sql string) + flow_name: Name for the Prefect flow + deployment_name: Name for the deployment + cron: Cron schedule (e.g., "0 0 * * *" for daily at midnight) + interval_seconds: Interval schedule in seconds (alternative to cron) + work_pool_name: Work pool to use for execution + **kwargs: Additional parameters passed to to_prefect_flow() + + Returns: + Prefect Deployment instance + + Examples: + # Create deployment with cron schedule + deployment = pipeline.to_prefect_deployment( + executor=execute_sql, + flow_name="my_pipeline", + deployment_name="daily_run", + cron="0 0 * * *", # Daily at midnight + ) + + # Apply the deployment + deployment.apply() + + Note: + - Requires Prefect 2.x or 3.x: pip install 'prefect>=2.0' + - Deployment must be applied to register with Prefect server + - Use work_pool_name to specify execution environment + """ + from .orchestrators import PrefectOrchestrator + + return PrefectOrchestrator(self).to_deployment( + executor=executor, + flow_name=flow_name, + deployment_name=deployment_name, + cron=cron, + interval_seconds=interval_seconds, + work_pool_name=work_pool_name, + **kwargs, + ) + # ======================================================================== # Validation Methods # ======================================================================== diff --git a/tests/test_prefect_integration.py b/tests/test_prefect_integration.py new file mode 100644 index 0000000..0364ac0 --- /dev/null +++ b/tests/test_prefect_integration.py @@ -0,0 +1,360 @@ +""" +Tests for Prefect integration (to_prefect_flow, to_prefect_deployment). +""" + +import pytest + +from clgraph.pipeline import Pipeline + +# Check if Prefect is available +try: + import prefect # noqa: F401 + from prefect import flow, task # noqa: F401 + + PREFECT_AVAILABLE = True +except ImportError: + PREFECT_AVAILABLE = False + + +class TestToPrefectFlowBasic: + """Basic tests for to_prefect_flow method.""" + + def test_requires_prefect(self): + """Test that to_prefect_flow raises error when Prefect is not installed.""" + queries = [("query1", "CREATE TABLE table1 AS SELECT 1 as id")] + pipeline = Pipeline(queries, dialect="bigquery") + + def mock_executor(sql: str): + pass + + try: + flow_fn = pipeline.to_prefect_flow(executor=mock_executor, flow_name="test_flow") + # If we get here, prefect is installed + assert flow_fn is not None + except ImportError as e: + # Expected if prefect not installed + assert "Prefect is required" in str(e) + + @pytest.mark.skipif(not PREFECT_AVAILABLE, reason="Prefect not installed") + def test_basic_flow_generation(self): + """Test basic flow generation from pipeline.""" + queries = [ + ("staging", "CREATE TABLE staging AS SELECT 1 as id, 'Alice' as name"), + ( + "analytics", + "CREATE TABLE analytics AS SELECT id, name FROM staging WHERE id = 1", + ), + ] + pipeline = Pipeline(queries, dialect="bigquery") + + def mock_executor(sql: str): + pass + + flow_fn = pipeline.to_prefect_flow(executor=mock_executor, flow_name="test_pipeline") + + # Should return a flow function + assert flow_fn is not None + assert flow_fn.name == "test_pipeline" + + @pytest.mark.skipif(not PREFECT_AVAILABLE, reason="Prefect not installed") + def test_flow_with_custom_description(self): + """Test flow with custom description.""" + queries = [("query1", "CREATE TABLE table1 AS SELECT 1 as id")] + pipeline = Pipeline(queries, dialect="bigquery") + + def mock_executor(sql: str): + pass + + flow_fn = pipeline.to_prefect_flow( + executor=mock_executor, + flow_name="custom_flow", + description="My custom description", + ) + + assert flow_fn.description == "My custom description" + + @pytest.mark.skipif(not PREFECT_AVAILABLE, reason="Prefect not installed") + def test_flow_auto_generated_description(self): + """Test that description is auto-generated when not provided.""" + queries = [ + ("q1", "CREATE TABLE t1 AS SELECT 1"), + ("q2", "CREATE TABLE t2 AS SELECT 2"), + ] + pipeline = Pipeline(queries, dialect="bigquery") + + def mock_executor(sql: str): + pass + + flow_fn = pipeline.to_prefect_flow(executor=mock_executor, flow_name="auto_desc_flow") + + assert "2 queries" in flow_fn.description + assert "clgraph" in flow_fn.description + + @pytest.mark.skipif(not PREFECT_AVAILABLE, reason="Prefect not installed") + def test_flow_with_custom_retries(self): + """Test flow with custom retry settings.""" + queries = [("query1", "CREATE TABLE table1 AS SELECT 1 as id")] + pipeline = Pipeline(queries, dialect="bigquery") + + def mock_executor(sql: str): + pass + + flow_fn = pipeline.to_prefect_flow( + executor=mock_executor, + flow_name="retry_flow", + retries=5, + retry_delay_seconds=120, + ) + + assert flow_fn is not None + assert flow_fn.name == "retry_flow" + + @pytest.mark.skipif(not PREFECT_AVAILABLE, reason="Prefect not installed") + def test_flow_with_tags(self): + """Test flow with custom tags.""" + queries = [("query1", "CREATE TABLE table1 AS SELECT 1 as id")] + pipeline = Pipeline(queries, dialect="bigquery") + + def mock_executor(sql: str): + pass + + flow_fn = pipeline.to_prefect_flow( + executor=mock_executor, + flow_name="tagged_flow", + tags=["production", "critical"], + ) + + assert flow_fn is not None + + +class TestToPrefectFlowDependencies: + """Test dependency handling in Prefect flows.""" + + @pytest.mark.skipif(not PREFECT_AVAILABLE, reason="Prefect not installed") + def test_linear_dependencies(self): + """Test linear dependency chain A -> B -> C.""" + queries = [ + ("step1", "CREATE TABLE step1 AS SELECT 1 as id"), + ("step2", "CREATE TABLE step2 AS SELECT * FROM step1"), + ("step3", "CREATE TABLE step3 AS SELECT * FROM step2"), + ] + pipeline = Pipeline(queries, dialect="bigquery") + + def mock_executor(sql: str): + pass + + flow_fn = pipeline.to_prefect_flow(executor=mock_executor, flow_name="linear_flow") + + assert flow_fn is not None + + @pytest.mark.skipif(not PREFECT_AVAILABLE, reason="Prefect not installed") + def test_diamond_dependencies(self): + """Test diamond pattern: A -> B, A -> C, B -> D, C -> D.""" + queries = [ + ("source", "CREATE TABLE source AS SELECT 1 as id"), + ("left", "CREATE TABLE left AS SELECT * FROM source"), + ("right", "CREATE TABLE right AS SELECT * FROM source"), + ("final", "CREATE TABLE final AS SELECT * FROM left, right"), + ] + pipeline = Pipeline(queries, dialect="bigquery") + + def mock_executor(sql: str): + pass + + flow_fn = pipeline.to_prefect_flow(executor=mock_executor, flow_name="diamond_flow") + + assert flow_fn is not None + + @pytest.mark.skipif(not PREFECT_AVAILABLE, reason="Prefect not installed") + def test_parallel_independent_queries(self): + """Test parallel execution of independent queries.""" + queries = [ + ("table1", "CREATE TABLE table1 AS SELECT 1 as id"), + ("table2", "CREATE TABLE table2 AS SELECT 2 as id"), + ("table3", "CREATE TABLE table3 AS SELECT 3 as id"), + ] + pipeline = Pipeline(queries, dialect="bigquery") + + def mock_executor(sql: str): + pass + + flow_fn = pipeline.to_prefect_flow(executor=mock_executor, flow_name="parallel_flow") + + assert flow_fn is not None + + +class TestToPrefectFlowExecution: + """Test Prefect flow execution.""" + + @pytest.mark.skipif(not PREFECT_AVAILABLE, reason="Prefect not installed") + def test_flow_execution_tracking(self): + """Test that queries execute in correct order.""" + executed_queries = [] + + def tracking_executor(sql: str): + executed_queries.append(sql) + + queries = [ + ("staging", "CREATE TABLE staging AS SELECT 1"), + ("analytics", "CREATE TABLE analytics AS SELECT * FROM staging"), + ] + pipeline = Pipeline(queries, dialect="bigquery") + + flow_fn = pipeline.to_prefect_flow(executor=tracking_executor, flow_name="tracking_flow") + + # Run the flow + flow_fn() + + # Check that both queries were executed + assert len(executed_queries) == 2 + # Staging must execute before analytics + staging_idx = next(i for i, sql in enumerate(executed_queries) if "staging" in sql.lower()) + analytics_idx = next( + i for i, sql in enumerate(executed_queries) if "analytics" in sql.lower() + ) + assert staging_idx < analytics_idx + + @pytest.mark.skipif(not PREFECT_AVAILABLE, reason="Prefect not installed") + def test_flow_returns_results(self): + """Test that flow returns query IDs as results.""" + + def mock_executor(sql: str): + pass + + queries = [ + ("query1", "CREATE TABLE table1 AS SELECT 1"), + ("query2", "CREATE TABLE table2 AS SELECT 2"), + ] + pipeline = Pipeline(queries, dialect="bigquery") + + flow_fn = pipeline.to_prefect_flow(executor=mock_executor, flow_name="result_flow") + + result = flow_fn() + + assert "query1" in result + assert "query2" in result + + +class TestToPrefectDeployment: + """Tests for to_prefect_deployment method.""" + + def test_requires_prefect_for_deployment(self): + """Test that to_prefect_deployment raises error when Prefect is not installed.""" + queries = [("query1", "CREATE TABLE table1 AS SELECT 1 as id")] + pipeline = Pipeline(queries, dialect="bigquery") + + def mock_executor(sql: str): + pass + + try: + deployment = pipeline.to_prefect_deployment( + executor=mock_executor, + flow_name="test_flow", + deployment_name="test_deployment", + ) + # If we get here, prefect is installed + assert deployment is not None + except ImportError as e: + # Expected if prefect not installed + assert "Prefect is required" in str(e) + + @pytest.mark.skipif(not PREFECT_AVAILABLE, reason="Prefect not installed") + def test_basic_deployment_creation(self): + """Test basic deployment creation.""" + queries = [("query1", "CREATE TABLE table1 AS SELECT 1 as id")] + pipeline = Pipeline(queries, dialect="bigquery") + + def mock_executor(sql: str): + pass + + deployment = pipeline.to_prefect_deployment( + executor=mock_executor, + flow_name="deploy_flow", + deployment_name="test_deployment", + ) + + assert deployment is not None + # In Prefect 3.x, RunnerDeployment has 'name' attribute + assert deployment.name == "test_deployment" + + @pytest.mark.skipif(not PREFECT_AVAILABLE, reason="Prefect not installed") + def test_deployment_with_cron_schedule(self): + """Test deployment with cron schedule.""" + queries = [("query1", "CREATE TABLE table1 AS SELECT 1 as id")] + pipeline = Pipeline(queries, dialect="bigquery") + + def mock_executor(sql: str): + pass + + deployment = pipeline.to_prefect_deployment( + executor=mock_executor, + flow_name="scheduled_flow", + deployment_name="scheduled_deployment", + cron="0 0 * * *", # Daily at midnight + ) + + assert deployment is not None + # In Prefect 3.x, schedules are stored differently + assert deployment.schedules is not None or deployment.cron is not None + + @pytest.mark.skipif(not PREFECT_AVAILABLE, reason="Prefect not installed") + def test_deployment_with_interval_schedule(self): + """Test deployment with interval schedule.""" + queries = [("query1", "CREATE TABLE table1 AS SELECT 1 as id")] + pipeline = Pipeline(queries, dialect="bigquery") + + def mock_executor(sql: str): + pass + + deployment = pipeline.to_prefect_deployment( + executor=mock_executor, + flow_name="interval_flow", + deployment_name="interval_deployment", + interval_seconds=3600, # Every hour + ) + + assert deployment is not None + # In Prefect 3.x, interval schedules are stored differently + assert deployment.schedules is not None or deployment.interval is not None + + +class TestPrefectOrchestratorDirectUsage: + """Test using PrefectOrchestrator directly.""" + + @pytest.mark.skipif(not PREFECT_AVAILABLE, reason="Prefect not installed") + def test_direct_orchestrator_usage(self): + """Test using PrefectOrchestrator directly instead of pipeline methods.""" + from clgraph.orchestrators import PrefectOrchestrator + + queries = [ + ("staging", "CREATE TABLE staging AS SELECT 1 as id"), + ("analytics", "CREATE TABLE analytics AS SELECT * FROM staging"), + ] + pipeline = Pipeline(queries, dialect="bigquery") + + def mock_executor(sql: str): + pass + + orchestrator = PrefectOrchestrator(pipeline) + flow_fn = orchestrator.to_flow(executor=mock_executor, flow_name="direct_flow") + + assert flow_fn is not None + assert flow_fn.name == "direct_flow" + + @pytest.mark.skipif(not PREFECT_AVAILABLE, reason="Prefect not installed") + def test_sanitize_name(self): + """Test that task names are properly sanitized.""" + from clgraph.orchestrators import PrefectOrchestrator + + queries = [ + ("my-query.name", "CREATE TABLE my_table AS SELECT 1"), + ] + pipeline = Pipeline(queries, dialect="bigquery") + + orchestrator = PrefectOrchestrator(pipeline) + + # Check sanitization + sanitized = orchestrator._sanitize_name("my-query.name") + assert sanitized == "my_query_name" + assert "-" not in sanitized + assert "." not in sanitized