Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions src/clgraph/orchestrators/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
- Dagster (1.x)
- Prefect (2.x and 3.x)
- Kestra (YAML-based declarative workflows)
- Mage (notebook-style block-based pipelines)

Example:
from clgraph import Pipeline
Expand All @@ -17,6 +18,7 @@
DagsterOrchestrator,
PrefectOrchestrator,
KestraOrchestrator,
MageOrchestrator,
)

pipeline = Pipeline.from_sql_files("queries/", dialect="bigquery")
Expand All @@ -36,18 +38,24 @@
# Generate Kestra flow YAML
kestra = KestraOrchestrator(pipeline)
yaml_content = kestra.to_flow(flow_id="my_pipeline", namespace="clgraph")

# Generate Mage pipeline
mage = MageOrchestrator(pipeline)
files = mage.to_pipeline_files(executor=execute_sql, pipeline_name="my_pipeline")
"""

from .airflow import AirflowOrchestrator
from .base import BaseOrchestrator
from .dagster import DagsterOrchestrator
from .kestra import KestraOrchestrator
from .mage import MageOrchestrator
from .prefect import PrefectOrchestrator

__all__ = [
"BaseOrchestrator",
"AirflowOrchestrator",
"DagsterOrchestrator",
"KestraOrchestrator",
"MageOrchestrator",
"PrefectOrchestrator",
]
304 changes: 304 additions & 0 deletions src/clgraph/orchestrators/mage.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,304 @@
"""
Mage orchestrator integration for clgraph.

Converts clgraph pipelines to Mage pipeline files (metadata.yaml and block Python files).
Mage is a modern data pipeline tool with a notebook-style UI and block-based architecture.
"""

import re
from typing import Any, Dict, List, Optional

from .base import BaseOrchestrator

# Mapping of db_connector name to (import_statement, class_name)
DB_CONNECTOR_MAP: Dict[str, tuple] = {
"clickhouse": (
"from mage_ai.io.clickhouse import ClickHouse",
"ClickHouse",
),
"postgres": (
"from mage_ai.io.postgres import Postgres",
"Postgres",
),
"bigquery": (
"from mage_ai.io.bigquery import BigQuery",
"BigQuery",
),
"snowflake": (
"from mage_ai.io.snowflake import Snowflake",
"Snowflake",
),
}

_CONNECTION_NAME_PATTERN = re.compile(r"^[a-zA-Z0-9_-]+$")


class MageOrchestrator(BaseOrchestrator):
"""
Converts clgraph pipelines to Mage pipelines.

Mage uses a block-based architecture where each SQL query becomes a block
(either data_loader or transformer). Dependencies are expressed via
upstream_blocks and downstream_blocks.

Example:
from clgraph.orchestrators import MageOrchestrator

orchestrator = MageOrchestrator(pipeline)
files = orchestrator.to_pipeline_files(
pipeline_name="my_pipeline",
)

# Write files to Mage project
import yaml
with open("pipelines/my_pipeline/metadata.yaml", "w") as f:
yaml.dump(files["metadata.yaml"], f)
for name, code in files["blocks"].items():
with open(f"pipelines/my_pipeline/{name}.py", "w") as f:
f.write(code)
"""

def to_pipeline_config(
self,
pipeline_name: str,
description: Optional[str] = None,
pipeline_type: str = "python",
**kwargs,
) -> Dict[str, Any]:
"""
Generate Mage pipeline configuration (metadata.yaml content).

Args:
pipeline_name: Name for the Mage pipeline
description: Optional pipeline description (auto-generated if not provided)
pipeline_type: Pipeline type (default: "python")
**kwargs: Additional configuration

Returns:
Dictionary representing metadata.yaml content
"""
table_graph = self.table_graph

if description is None:
query_count = len(table_graph.queries)
table_count = len(table_graph.tables)
description = (
f"Pipeline with {query_count} queries operating on "
f"{table_count} tables. Generated by clgraph."
)

# Build a map of block_name -> list of downstream block names
downstream_map: Dict[str, List[str]] = {}

block_entries = []
for query_id in table_graph.topological_sort():
query = table_graph.queries[query_id]
block_name = self._sanitize_name(query_id)

upstream_blocks = []
for source_table in query.source_tables:
if source_table in table_graph.tables:
table_node = table_graph.tables[source_table]
if table_node.created_by:
upstream_blocks.append(self._sanitize_name(table_node.created_by))

# Register this block as downstream of each upstream block
for upstream_name in upstream_blocks:
if upstream_name not in downstream_map:
downstream_map[upstream_name] = []
downstream_map[upstream_name].append(block_name)

block_entries.append(
{
"name": block_name,
"upstream_blocks": upstream_blocks,
}
)

# Build final block configs with downstream_blocks already set
blocks = [
{
"name": entry["name"],
"uuid": entry["name"],
"type": "data_loader" if not entry["upstream_blocks"] else "transformer",
"upstream_blocks": entry["upstream_blocks"],
"downstream_blocks": list(downstream_map.get(entry["name"], [])),
}
for entry in block_entries
]

config = {
"name": pipeline_name,
"uuid": pipeline_name,
"description": description,
"type": pipeline_type,
"blocks": blocks,
**kwargs,
}

return config

def to_blocks(
self,
connection_name: str = "clickhouse_default",
db_connector: str = "clickhouse",
) -> Dict[str, str]:
"""
Generate Mage block Python files.

Args:
connection_name: Name of database connection in Mage io_config.yaml
db_connector: Database connector type (clickhouse, postgres, bigquery, snowflake)

Returns:
Dictionary mapping block_name -> block_code
"""
table_graph = self.table_graph
blocks = {}

for query_id in table_graph.topological_sort():
query = table_graph.queries[query_id]
block_name = self._sanitize_name(query_id)

# Determine upstream blocks
upstream_blocks = []
for source_table in query.source_tables:
if source_table in table_graph.tables:
table_node = table_graph.tables[source_table]
if table_node.created_by:
upstream_blocks.append(self._sanitize_name(table_node.created_by))

# Determine block type
block_type = "data_loader" if not upstream_blocks else "transformer"

# Generate block code
code = self._generate_block_code(
block_name=block_name,
block_type=block_type,
sql=query.sql,
query_id=query_id,
upstream_blocks=upstream_blocks,
connection_name=connection_name,
db_connector=db_connector,
)

blocks[block_name] = code

return blocks

def _generate_block_code(
self,
block_name: str,
block_type: str,
sql: str,
query_id: str,
upstream_blocks: List[str],
connection_name: str,
db_connector: str = "clickhouse",
) -> str:
"""Generate Python code for a Mage block."""

# Validate connection_name to prevent code injection
if not _CONNECTION_NAME_PATTERN.match(connection_name):
raise ValueError(
f"Invalid connection_name '{connection_name}': "
"must contain only alphanumeric characters, underscores, and hyphens"
)

# Resolve db connector
if db_connector not in DB_CONNECTOR_MAP:
raise ValueError(
f"Unsupported db_connector '{db_connector}'. "
f"Supported: {', '.join(sorted(DB_CONNECTOR_MAP))}"
)
connector_import, connector_class = DB_CONNECTOR_MAP[db_connector]

# Mage requires specific function names for each block type
if block_type == "data_loader":
decorator = "@data_loader"
func_name = "load_data"
imports = "from mage_ai.data_preparation.decorators import data_loader"
else:
decorator = "@transformer"
func_name = "transform"
imports = "from mage_ai.data_preparation.decorators import transformer"

# Build function signature
if upstream_blocks:
args = ", ".join([f"data_{i}" for i in range(len(upstream_blocks))])
func_args = f"({args}, *args, **kwargs)"
else:
func_args = "(*args, **kwargs)"

# Escape triple quotes in SQL if present
escaped_sql = sql.replace('"""', '\\"\\"\\"')

code = f'''"""
Block: {block_name}
Query ID: {query_id}
Type: {block_type}
Generated by clgraph
"""
{imports}
{connector_import}


{decorator}
def {func_name}{func_args}:
"""
Execute SQL query via {connector_class}.
"""
sql = """
{escaped_sql}
"""

with {connector_class}.with_config(config_profile="{connection_name}") as loader:
loader.execute(sql)

return {{"status": "success", "query_id": "{query_id}"}}
'''
return code

def to_pipeline_files(
self,
pipeline_name: str,
description: Optional[str] = None,
connection_name: str = "clickhouse_default",
db_connector: str = "clickhouse",
) -> Dict[str, Any]:
"""
Generate complete Mage pipeline file structure.

Args:
pipeline_name: Name for the Mage pipeline
description: Optional pipeline description
connection_name: Database connection name in Mage io_config.yaml
db_connector: Database connector type (clickhouse, postgres, bigquery, snowflake)

Returns:
Dictionary with file structure:
{
"metadata.yaml": <dict>,
"blocks": {
"block1.py": <code>,
"block2.py": <code>,
}
}
"""
config = self.to_pipeline_config(
pipeline_name=pipeline_name,
description=description,
)

blocks = self.to_blocks(
connection_name=connection_name,
db_connector=db_connector,
)

return {
"metadata.yaml": config,
"blocks": blocks,
}


__all__ = ["MageOrchestrator"]
9 changes: 4 additions & 5 deletions src/clgraph/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -2612,10 +2612,10 @@ def to_kestra_flow(

def to_mage_pipeline(
self,
executor: Callable[[str], None],
pipeline_name: str,
description: Optional[str] = None,
connection_name: str = "clickhouse_default",
db_connector: str = "clickhouse",
) -> Dict[str, Any]:
"""
Generate Mage pipeline files from this pipeline.
Expand All @@ -2625,10 +2625,10 @@ def to_mage_pipeline(
data_loader or transformer).

Args:
executor: Function that executes SQL (for code reference)
pipeline_name: Name for the Mage pipeline
description: Optional pipeline description (auto-generated if not provided)
connection_name: Database connection name in Mage io_config.yaml
db_connector: Database connector type (clickhouse, postgres, bigquery, snowflake)

Returns:
Dictionary with pipeline file structure:
Expand All @@ -2640,7 +2640,6 @@ def to_mage_pipeline(
Examples:
# Generate Mage pipeline files
files = pipeline.to_mage_pipeline(
executor=execute_sql,
pipeline_name="enterprise_pipeline",
)

Expand All @@ -2656,15 +2655,15 @@ def to_mage_pipeline(
- First query (no dependencies) becomes data_loader block
- Subsequent queries become transformer blocks
- Dependencies are managed via upstream_blocks/downstream_blocks
- Requires mage-ai package and ClickHouse connection in io_config.yaml
- Requires mage-ai package and database connection in io_config.yaml
"""
from .orchestrators import MageOrchestrator

return MageOrchestrator(self).to_pipeline_files(
executor=executor,
pipeline_name=pipeline_name,
description=description,
connection_name=connection_name,
db_connector=db_connector,
)

# ========================================================================
Expand Down
Loading
Loading