diff --git a/docs/guides/README.md b/docs/guides/README.md new file mode 100644 index 0000000..d0a46eb --- /dev/null +++ b/docs/guides/README.md @@ -0,0 +1,147 @@ +# ML Workflow Guides + +Learn how to build common machine learning workflows using Argo Connectors with both YAML and the Hera Python SDK. + +## Overview + +These guides demonstrate real-world ML workflows using the available connectors in this repository. Each guide includes: + +- **Architecture diagram** showing the workflow structure +- **Python (Hera) examples** for programmatic workflows +- **YAML examples** for direct Argo Workflows usage +- **Best practices** for production deployments +- **Real code** using actual Databricks notebooks or Spark jobs + +## Available Guides + +### [Data Ingestion](data-ingestion.md) +Build data ingestion pipelines to load, validate, and prepare raw data for processing. + +**Connectors used**: Databricks + +### [Feature Engineering](feature-engineering.md) +Create scalable feature engineering pipelines for ML model training. + +**Connectors used**: Databricks + +### [Model Training](model-training.md) +Train machine learning models at scale with distributed compute. + +**Connectors used**: Databricks + +**Coming soon**: PyTorch, TensorFlow, Ray connectors + +### [Batch Inference](batch-inference.md) +Score large datasets with trained models in production. + +**Connectors used**: Databricks + +### [Multi-Step Pipelines](multi-step-pipelines.md) +Build end-to-end ML pipelines by chaining multiple connectors together. + +**Connectors used**: Databricks, Spark + +### [Passing Data Between Steps](passing-data-between-steps.md) +Learn how to pass data and parameters between workflow steps. + +**Connectors used**: Databricks, Spark + +## ML Workflow Patterns + +### ETL Pattern +```mermaid +graph LR + A[Extract] --> B[Transform] + B --> C[Load] + style B fill:#e1f5ff +``` + +**Guides**: Data Ingestion, Feature Engineering + +### Training Pattern +```mermaid +graph LR + A[Load Data] --> B[Preprocess] + B --> C[Train Model] + C --> D[Evaluate] + D --> E[Register Model] + style C fill:#e1f5ff +``` + +**Guides**: Model Training + +### Inference Pattern +```mermaid +graph LR + A[Load Model] --> B[Load Data] + B --> C[Score] + C --> D[Save Predictions] + style C fill:#e1f5ff +``` + +**Guides**: Batch Inference + +### End-to-End Pipeline +```mermaid +graph LR + A[Ingest] --> B[Features] + B --> C[Train] + C --> D[Evaluate] + D --> E[Deploy] + E --> F[Inference] + style C fill:#e1f5ff + style F fill:#e1f5ff +``` + +**Guides**: Multi-Step Pipelines + +## Choosing the Right Connector + +### For Databricks Users +Use the **Databricks connector** when you: +- Already have notebooks in Databricks +- Need managed Spark clusters +- Want serverless compute +- Require Databricks-specific features (Unity Catalog, Delta Lake, etc.) + +### For Kubernetes-Native Spark +Use the **Apache Spark connector** when you: +- Want to run Spark entirely on Kubernetes +- Don't need Databricks-specific features +- Have existing Spark JARs or Python scripts +- Prefer open-source tooling + +### Combining Connectors +Many workflows use both: +- Databricks for interactive development and notebooks +- Spark for production batch jobs +- Different connectors for different stages of the pipeline + +## Best Practices Across All Guides + +### 1. Parameterize Everything +Use workflow parameters for configuration: +```python +arguments=[ + Parameter(name="environment", value="production"), + Parameter(name="date", value="2024-01-01"), +] +``` + +### 2. Handle Failures +Add retry strategies and error handling to production workflows. + +### 3. Monitor Progress +Use workflow outputs to track job progress and results. + +### 4. Optimize Resources +Choose appropriate cluster sizes and scaling strategies for your workload. + +### 5. Use Version Control +Store your workflow definitions in Git alongside your data/ML code. + +## Next Steps + +- Start with [Data Ingestion](data-ingestion.md) to build your first pipeline +- Learn about [Passing Data Between Steps](passing-data-between-steps.md) for complex workflows +- Explore [connector documentation](../connectors/README.md) for detailed parameter references diff --git a/docs/guides/data-ingestion.md b/docs/guides/data-ingestion.md new file mode 100644 index 0000000..6ef75d5 --- /dev/null +++ b/docs/guides/data-ingestion.md @@ -0,0 +1,637 @@ +# Data Ingestion Pipeline + +Learn how to build production data ingestion pipelines to extract, validate, and load data using Argo Connectors. + +## Overview + +Data ingestion is the first step in most ML workflows - loading raw data from various sources, validating quality, and preparing it for downstream processing. + +## Use Cases + +- **Batch data loads**: Daily/hourly ingestion from databases, APIs, or file systems +- **Data validation**: Check schema, completeness, and data quality +- **Format conversion**: Convert between formats (CSV to Parquet, JSON to Delta, etc.) +- **Incremental loads**: Load only new or changed data + +## Architecture + +```mermaid +graph LR + A[Data Source] --> B[Extract] + B --> C[Validate] + C --> D[Transform] + D --> E[Load to Storage] + E --> F[Data Catalog] + + style B fill:#e1f5ff + style C fill:#fff4e1 +``` + +## Basic Data Ingestion + +### CSV to Parquet Conversion + +{% tabs %} +{% tab title="Python (Hera)" %} +```python +from hera.workflows import Workflow, Steps, Step, TemplateRef, Parameter + +with Workflow( + generate_name="data-ingestion-", + namespace="default", + entrypoint="main", + arguments=[ + Parameter(name="source-path", value="s3://raw-data/2024-01-01/data.csv"), + Parameter(name="target-path", value="s3://processed-data/2024-01-01/"), + ] +) as w: + with Steps(name="main"): + Step( + name="ingest-and-convert", + template_ref=TemplateRef( + name="databricks-connector", + template="run-job", + cluster_scope=False, + ), + arguments={ + "code-path": "/Users/data-team/csv-to-parquet", + "task-type": "notebook", + "cluster-mode": "New", + "new-cluster-spark-version": "13.3.x-scala2.12", + "new-cluster-node-type": "i3.xlarge", + "new-cluster-num-workers": "3", + "args": "{{workflow.parameters.source-path}},{{workflow.parameters.target-path}}", + } + ) + +w.create() +``` +{% endtab %} + +{% tab title="YAML" %} +```yaml +apiVersion: argoproj.io/v1alpha1 +kind: Workflow +metadata: + generateName: data-ingestion- +spec: + entrypoint: main + arguments: + parameters: + - name: source-path + value: "s3://raw-data/2024-01-01/data.csv" + - name: target-path + value: "s3://processed-data/2024-01-01/" + + templates: + - name: main + steps: + - - name: ingest-and-convert + templateRef: + name: databricks-connector + template: run-job + arguments: + parameters: + - name: code-path + value: "/Users/data-team/csv-to-parquet" + - name: task-type + value: "notebook" + - name: cluster-mode + value: "New" + - name: new-cluster-spark-version + value: "13.3.x-scala2.12" + - name: new-cluster-node-type + value: "i3.xlarge" + - name: new-cluster-num-workers + value: "3" + - name: args + value: "{{workflow.parameters.source-path}},{{workflow.parameters.target-path}}" +``` +{% endtab %} +{% endtabs %} + +### Databricks Notebook: CSV to Parquet + +```python +# Databricks notebook: /Users/data-team/csv-to-parquet + +# COMMAND ---------- +# Get parameters from workflow args (comma-separated positional args) +import sys + +# Databricks passes args through sys.argv or we can use a widget +dbutils.widgets.text("args", "") +args_string = dbutils.widgets.get("args") + +# Parse comma-separated arguments +args = args_string.split(",") if args_string else [] +source_path = args[0] if len(args) > 0 else "" +target_path = args[1] if len(args) > 1 else "" + +print(f"Source: {source_path}") +print(f"Target: {target_path}") + +# COMMAND ---------- +# Read CSV data +df = (spark.read + .option("header", "true") + .option("inferSchema", "true") + .csv(source_path)) + +print(f"Loaded {df.count():,} records") +df.printSchema() + +# COMMAND ---------- +# Write as Parquet +(df.write + .mode("overwrite") + .parquet(target_path)) + +print(f"Saved to {target_path}") +``` + +## Data Validation Pipeline + +Validate data quality before loading: + +{% tabs %} +{% tab title="Python (Hera)" %} +```python +from hera.workflows import Workflow, Steps, Step, TemplateRef, Parameter + +with Workflow( + generate_name="data-ingestion-with-validation-", + namespace="default", + entrypoint="main", + arguments=[ + Parameter(name="source-path", value="s3://raw-data/2024-01-01/"), + Parameter(name="target-path", value="s3://processed-data/2024-01-01/"), + ] +) as w: + with Steps(name="main"): + # Step 1: Load and validate + Step( + name="validate-data", + template_ref=TemplateRef( + name="databricks-connector", + template="run-job", + cluster_scope=False, + ), + arguments={ + "code-path": "/Users/data-team/validate-schema", + "task-type": "notebook", + "cluster-mode": "Serverless", + "args": "{{workflow.parameters.source-path}}", + } + ) + + # Step 2: Transform and load + Step( + name="transform-load", + template_ref=TemplateRef( + name="databricks-connector", + template="run-job", + cluster_scope=False, + ), + arguments={ + "code-path": "/Users/data-team/transform-load", + "task-type": "notebook", + "cluster-mode": "New", + "new-cluster-spark-version": "13.3.x-scala2.12", + "new-cluster-node-type": "r5.xlarge", + "scaling-type": "autoscale", + "min-workers": "2", + "max-workers": "8", + "args": "{{workflow.parameters.source-path}},{{workflow.parameters.target-path}}", + } + ) + +w.create() +``` +{% endtab %} + +{% tab title="YAML" %} +```yaml +apiVersion: argoproj.io/v1alpha1 +kind: Workflow +metadata: + generateName: data-ingestion-with-validation- +spec: + entrypoint: main + arguments: + parameters: + - name: source-path + value: "s3://raw-data/2024-01-01/" + - name: target-path + value: "s3://processed-data/2024-01-01/" + templates: + - name: main + steps: + # Step 1: Load and validate + - - name: validate-data + templateRef: + name: databricks-connector + template: run-job + arguments: + parameters: + - name: code-path + value: "/Users/data-team/validate-schema" + - name: task-type + value: "notebook" + - name: cluster-mode + value: "Serverless" + - name: args + value: "{{workflow.parameters.source-path}}" + + # Step 2: Transform and load + - - name: transform-load + templateRef: + name: databricks-connector + template: run-job + arguments: + parameters: + - name: code-path + value: "/Users/data-team/transform-load" + - name: task-type + value: "notebook" + - name: cluster-mode + value: "New" + - name: new-cluster-spark-version + value: "13.3.x-scala2.12" + - name: new-cluster-node-type + value: "r5.xlarge" + - name: scaling-type + value: "autoscale" + - name: min-workers + value: "2" + - name: max-workers + value: "8" + - name: args + value: "{{workflow.parameters.source-path}},{{workflow.parameters.target-path}}" +``` +{% endtab %} +{% endtabs %} + +## Incremental Data Loading + +Load only new or changed data: + +{% tabs %} +{% tab title="Python (Hera)" %} +```python +from hera.workflows import Workflow, Steps, Step, TemplateRef, Parameter +from datetime import datetime, timedelta + +# Get yesterday's date +yesterday = (datetime.now() - timedelta(days=1)).strftime("%Y-%m-%d") + +with Workflow( + generate_name="incremental-load-", + namespace="default", + entrypoint="main", + arguments=[ + Parameter(name="load-date", value=yesterday), + ] +) as w: + with Steps(name="main"): + Step( + name="incremental-load", + template_ref=TemplateRef( + name="databricks-connector", + template="run-job", + cluster_scope=False, + ), + arguments={ + "code-path": "/Users/data-team/incremental-load", + "task-type": "notebook", + "cluster-mode": "New", + "new-cluster-spark-version": "13.3.x-scala2.12", + "new-cluster-node-type": "i3.xlarge", + "scaling-type": "autoscale", + "min-workers": "2", + "max-workers": "10", + "args": "{{workflow.parameters.load-date}}", + } + ) + +w.create() +``` +{% endtab %} + +{% tab title="YAML" %} +```yaml +apiVersion: argoproj.io/v1alpha1 +kind: Workflow +metadata: + generateName: incremental-load- +spec: + entrypoint: main + arguments: + parameters: + - name: load-date + value: "2024-01-01" + + templates: + - name: main + steps: + - - name: incremental-load + templateRef: + name: databricks-connector + template: run-job + arguments: + parameters: + - name: code-path + value: "/Users/data-team/incremental-load" + - name: task-type + value: "notebook" + - name: cluster-mode + value: "New" + - name: new-cluster-spark-version + value: "13.3.x-scala2.12" + - name: new-cluster-node-type + value: "i3.xlarge" + - name: scaling-type + value: "autoscale" + - name: min-workers + value: "2" + - name: max-workers + value: "10" + - name: args + value: "{{workflow.parameters.load-date}}" +``` +{% endtab %} +{% endtabs %} + +### Databricks Notebook: Incremental Load + +```python +# Databricks notebook: /Users/data-team/incremental-load + +# COMMAND ---------- +# Get parameters from workflow args (positional argument) +dbutils.widgets.text("args", "") +args_string = dbutils.widgets.get("args") + +# Parse the date from args (first positional argument) +load_date = args_string.strip() if args_string else "" + +print(f"Loading data for: {load_date}") + +# COMMAND ---------- +# Read incremental data +source_path = f"s3://raw-data/date={load_date}/" +df_new = spark.read.parquet(source_path) + +print(f"New records: {df_new.count():,}") + +# COMMAND ---------- +# Read existing data +target_path = "s3://processed-data/daily/" +try: + df_existing = spark.read.parquet(target_path) + print(f"Existing records: {df_existing.count():,}") +except: + df_existing = None + print("No existing data found") + +# COMMAND ---------- +# Merge and deduplicate +if df_existing is not None: + df_merged = df_new.union(df_existing).dropDuplicates(["id"]) +else: + df_merged = df_new + +print(f"Total records after merge: {df_merged.count():,}") + +# COMMAND ---------- +# Write back +# Only overwrite the specific partition(s) being written +spark.conf.set("spark.sql.sources.partitionOverwriteMode", "dynamic") + +(df_merged.write + .mode("overwrite") + .partitionBy("date") + .parquet(target_path)) + +print(f"Data loaded successfully") +``` + +## Multi-Source Ingestion + +Ingest from multiple sources in parallel: + +{% tabs %} +{% tab title="Python (Hera)" %} +```python +from hera.workflows import Workflow, Steps, Step, TemplateRef, Parameter + +with Workflow( + generate_name="multi-source-ingestion-", + namespace="default", + entrypoint="main", + arguments=[ + Parameter(name="date", value="2024-01-01"), + ] +) as w: + with Steps(name="main") as s: + # Ingest from multiple sources in parallel + with s.parallel(): + Step( + name="ingest-database-a", + template_ref=TemplateRef( + name="databricks-connector", + template="run-job", + cluster_scope=False, + ), + arguments={ + "code-path": "/Users/data-team/ingest-database-a", + "task-type": "notebook", + "cluster-mode": "Serverless", + "args": "{{workflow.parameters.date}}", + } + ) + + Step( + name="ingest-database-b", + template_ref=TemplateRef( + name="databricks-connector", + template="run-job", + cluster_scope=False, + ), + arguments={ + "code-path": "/Users/data-team/ingest-database-b", + "task-type": "notebook", + "cluster-mode": "Serverless", + "args": "{{workflow.parameters.date}}", + } + ) + + Step( + name="ingest-api-data", + template_ref=TemplateRef( + name="databricks-connector", + template="run-job", + cluster_scope=False, + ), + arguments={ + "code-path": "/Users/data-team/ingest-api", + "task-type": "notebook", + "cluster-mode": "Serverless", + "args": "{{workflow.parameters.date}}", + } + ) + + # Merge all sources + Step( + name="merge-sources", + template_ref=TemplateRef( + name="databricks-connector", + template="run-job", + cluster_scope=False, + ), + arguments={ + "code-path": "/Users/data-team/merge-all-sources", + "task-type": "notebook", + "cluster-mode": "New", + "new-cluster-spark-version": "13.3.x-scala2.12", + "new-cluster-node-type": "r5.2xlarge", + "new-cluster-num-workers": "4", + "args": "{{workflow.parameters.date}}", + } + ) + +w.create() +``` +{% endtab %} + +{% tab title="YAML" %} +```yaml +apiVersion: argoproj.io/v1alpha1 +kind: Workflow +metadata: + generateName: multi-source-ingestion- +spec: + entrypoint: main + arguments: + parameters: + - name: date + value: "2024-01-01" + + templates: + - name: main + steps: + # Ingest from multiple sources in parallel + - - name: ingest-database-a + templateRef: + name: databricks-connector + template: run-job + arguments: + parameters: + - name: code-path + value: "/Users/data-team/ingest-database-a" + - name: task-type + value: "notebook" + - name: cluster-mode + value: "Serverless" + - name: args + value: "{{workflow.parameters.date}}" + + - name: ingest-database-b + templateRef: + name: databricks-connector + template: run-job + arguments: + parameters: + - name: code-path + value: "/Users/data-team/ingest-database-b" + - name: task-type + value: "notebook" + - name: cluster-mode + value: "Serverless" + - name: args + value: "{{workflow.parameters.date}}" + + - name: ingest-api-data + templateRef: + name: databricks-connector + template: run-job + arguments: + parameters: + - name: code-path + value: "/Users/data-team/ingest-api" + - name: task-type + value: "notebook" + - name: cluster-mode + value: "Serverless" + - name: args + value: "{{workflow.parameters.date}}" + + # Merge all sources + - - name: merge-sources + templateRef: + name: databricks-connector + template: run-job + arguments: + parameters: + - name: code-path + value: "/Users/data-team/merge-all-sources" + - name: task-type + value: "notebook" + - name: cluster-mode + value: "New" + - name: new-cluster-spark-version + value: "13.3.x-scala2.12" + - name: new-cluster-node-type + value: "r5.2xlarge" + - name: new-cluster-num-workers + value: "4" + - name: args + value: "{{workflow.parameters.date}}" +``` +{% endtab %} +{% endtabs %} + +## Best Practices + +### 1. Use Serverless for Small Files +For files < 10GB, serverless is fastest: +```python +arguments={ + "cluster-mode": "Serverless", +} +``` + +### 2. Partition Large Datasets +Write data partitioned for efficient querying: +```python +# In your Databricks notebook +(df.write + .mode("overwrite") + .partitionBy("date", "region") + .parquet(target_path)) +``` + +### 3. Validate Before Processing +Check schema and data quality first: +```python +# Validate expected columns exist +expected_cols = ["id", "timestamp", "value"] +actual_cols = df.columns +missing = set(expected_cols) - set(actual_cols) + +if missing: + raise ValueError(f"Missing columns: {missing}") +``` + +### 4. Handle Schema Evolution +Use merge schema for evolving data: +```python +spark.conf.set("spark.sql.parquet.mergeSchema", "true") +``` + +## Next Steps + +- [Feature Engineering](feature-engineering.md) - Process ingested data +- [Model Training](model-training.md) - Train models on your data +- [Multi-Step Pipelines](multi-step-pipelines.md) - Build complete pipelines +- [Passing Data Between Steps](passing-data-between-steps.md) - Connect workflow steps diff --git a/docs/guides/feature-engineering.md b/docs/guides/feature-engineering.md new file mode 100644 index 0000000..832fa66 --- /dev/null +++ b/docs/guides/feature-engineering.md @@ -0,0 +1,397 @@ +# Feature Engineering Pipeline + +Learn how to build scalable feature engineering pipelines using Argo Connectors to transform raw data into ML-ready features. + +## Overview + +Feature engineering is the process of transforming raw data into features that machine learning models can use. This guide shows you how to build distributed feature engineering pipelines using Databricks and Spark connectors. + +## Use Cases + +- **Time-based features**: Create rolling windows, lags, and temporal aggregations +- **Aggregations**: Compute customer-level or product-level statistics +- **Encoding**: One-hot encoding, target encoding, embeddings +- **Feature stores**: Build reusable feature pipelines + +## Architecture + +```mermaid +graph LR + A[Raw Data] --> B[Clean & Filter] + B --> C[Compute Features] + C --> D[Join Features] + D --> E[Feature Store] + + style C fill:#e1f5ff +``` + +## Basic Feature Engineering + +### Computing Customer Features + +{% tabs %} +{% tab title="Python (Hera)" %} +```python +from hera.workflows import Workflow, Steps, Step, TemplateRef, Parameter + +with Workflow( + generate_name="feature-engineering-", + namespace="default", + entrypoint="main", + arguments=[ + Parameter(name="input-path", value="s3://data/transactions/"), + Parameter(name="output-path", value="s3://features/customer-features/"), + Parameter(name="lookback-days", value="90"), + ] +) as w: + with Steps(name="main"): + Step( + name="compute-features", + template_ref=TemplateRef( + name="databricks-connector", + template="run-job", + cluster_scope=False, + ), + arguments={ + "code-path": "/Users/ml-team/customer-features", + "task-type": "notebook", + "cluster-mode": "New", + "new-cluster-spark-version": "13.3.x-scala2.12", + "new-cluster-node-type": "r5.2xlarge", + "scaling-type": "autoscale", + "min-workers": "4", + "max-workers": "16", + "args": "{{workflow.parameters.input-path}},{{workflow.parameters.output-path}},{{workflow.parameters.lookback-days}}", + } + ) + +w.create() +``` +{% endtab %} + +{% tab title="YAML" %} +```yaml +apiVersion: argoproj.io/v1alpha1 +kind: Workflow +metadata: + generateName: feature-engineering- +spec: + entrypoint: main + arguments: + parameters: + - name: input-path + value: "s3://data/transactions/" + - name: output-path + value: "s3://features/customer-features/" + - name: lookback-days + value: "90" + + templates: + - name: main + steps: + - - name: compute-features + templateRef: + name: databricks-connector + template: run-job + arguments: + parameters: + - name: code-path + value: "/Users/ml-team/customer-features" + - name: task-type + value: "notebook" + - name: cluster-mode + value: "New" + - name: new-cluster-spark-version + value: "13.3.x-scala2.12" + - name: new-cluster-node-type + value: "r5.2xlarge" + - name: scaling-type + value: "autoscale" + - name: min-workers + value: "4" + - name: max-workers + value: "16" + - name: args + value: "{{workflow.parameters.input-path}},{{workflow.parameters.output-path}},{{workflow.parameters.lookback-days}}" +``` +{% endtab %} +{% endtabs %} + +### Databricks Notebook: Customer Features + +```python +# Databricks notebook: /Users/ml-team/customer-features + +# COMMAND ---------- +dbutils.widgets.text("input_path", "") +dbutils.widgets.text("output_path", "") +dbutils.widgets.text("lookback_days", "90") + +input_path = dbutils.widgets.get("input_path") +output_path = dbutils.widgets.get("output_path") +lookback_days = int(dbutils.widgets.get("lookback_days")) + +# COMMAND ---------- +from pyspark.sql import functions as F +from pyspark.sql.window import Window +from datetime import datetime, timedelta + +# Load transaction data +df_transactions = spark.read.parquet(input_path) + +# Filter to lookback period +cutoff_date = datetime.now() - timedelta(days=lookback_days) +df_recent = df_transactions.filter(F.col("transaction_date") >= cutoff_date) + +print(f"Transactions in last {lookback_days} days: {df_recent.count():,}") + +# COMMAND ---------- +# Compute aggregation features +customer_features = df_recent.groupBy("customer_id").agg( + # Transaction counts + F.count("*").alias("transaction_count"), + F.countDistinct("product_id").alias("unique_products"), + + # Monetary features + F.sum("amount").alias("total_spend"), + F.avg("amount").alias("avg_transaction_value"), + F.max("amount").alias("max_transaction_value"), + F.min("amount").alias("min_transaction_value"), + F.stddev("amount").alias("stddev_transaction_value"), + + # Temporal features + F.max("transaction_date").alias("last_transaction_date"), + F.min("transaction_date").alias("first_transaction_date"), +) + +# COMMAND ---------- +# Compute derived features +customer_features = customer_features.withColumn( + "avg_days_between_purchases", + F.datediff(F.col("last_transaction_date"), F.col("first_transaction_date")) / + F.when(F.col("transaction_count") > 1, F.col("transaction_count") - 1).otherwise(F.lit(None)) +).withColumn( + "days_since_last_purchase", + F.datediff(F.current_date(), F.col("last_transaction_date")) +) + +# COMMAND ---------- +# Add window-based features (percentile ranks) +window = Window.orderBy("total_spend") + +customer_features = customer_features.withColumn( + "spend_percentile", + F.percent_rank().over(window) +) + +# COMMAND ---------- +# Save features +(customer_features.write + .mode("overwrite") + .parquet(output_path)) + +print(f"Saved features for {customer_features.count():,} customers to {output_path}") +``` + +## Advanced: Multi-Stage Feature Pipeline + +Build features in stages for complex transformations: + +{% tabs %} +{% tab title="Python (Hera)" %} +```python +from hera.workflows import Workflow, Steps, Step, TemplateRef, Parameter + +with Workflow( + generate_name="multi-stage-features-", + namespace="default", + entrypoint="main", + arguments=[ + Parameter(name="date", value="2024-01-01"), + ] +) as w: + with Steps(name="main"): + # Stage 1: Raw aggregations + Step( + name="raw-aggregations", + template_ref=TemplateRef( + name="databricks-connector", + template="run-job", + cluster_scope=False, + ), + arguments={ + "code-path": "/Users/ml-team/01-raw-aggregations", + "task-type": "notebook", + "cluster-mode": "New", + "new-cluster-spark-version": "13.3.x-scala2.12", + "new-cluster-node-type": "r5.xlarge", + "scaling-type": "autoscale", + "min-workers": "3", + "max-workers": "12", + "args": "{{workflow.parameters.date}}", + } + ) + + # Stage 2: Time-series features + Step( + name="time-series-features", + template_ref=TemplateRef( + name="databricks-connector", + template="run-job", + cluster_scope=False, + ), + arguments={ + "code-path": "/Users/ml-team/02-time-series-features", + "task-type": "notebook", + "cluster-mode": "New", + "new-cluster-spark-version": "13.3.x-scala2.12", + "new-cluster-node-type": "r5.xlarge", + "scaling-type": "autoscale", + "min-workers": "3", + "max-workers": "12", + "args": "{{workflow.parameters.date}}", + } + ) + + # Stage 3: Join and finalize + Step( + name="join-features", + template_ref=TemplateRef( + name="databricks-connector", + template="run-job", + cluster_scope=False, + ), + arguments={ + "code-path": "/Users/ml-team/03-join-features", + "task-type": "notebook", + "cluster-mode": "Serverless", + "args": "{{workflow.parameters.date}}", + } + ) + +w.create() +``` +{% endtab %} + +{% tab title="YAML" %} +```yaml +apiVersion: argoproj.io/v1alpha1 +kind: Workflow +metadata: + generateName: multi-stage-features- +spec: + entrypoint: main + arguments: + parameters: + - name: date + value: "2024-01-01" + + templates: + - name: main + steps: + # Stage 1: Raw aggregations + - - name: raw-aggregations + templateRef: + name: databricks-connector + template: run-job + arguments: + parameters: + - name: code-path + value: "/Users/ml-team/01-raw-aggregations" + - name: task-type + value: "notebook" + - name: cluster-mode + value: "New" + - name: new-cluster-spark-version + value: "13.3.x-scala2.12" + - name: new-cluster-node-type + value: "r5.xlarge" + - name: scaling-type + value: "autoscale" + - name: min-workers + value: "3" + - name: max-workers + value: "12" + - name: args + value: "{{workflow.parameters.date}}" + + # Stage 2: Time-series features + - - name: time-series-features + templateRef: + name: databricks-connector + template: run-job + arguments: + parameters: + - name: code-path + value: "/Users/ml-team/02-time-series-features" + - name: task-type + value: "notebook" + - name: cluster-mode + value: "New" + - name: new-cluster-spark-version + value: "13.3.x-scala2.12" + - name: new-cluster-node-type + value: "r5.xlarge" + - name: scaling-type + value: "autoscale" + - name: min-workers + value: "3" + - name: max-workers + value: "12" + - name: args + value: "{{workflow.parameters.date}}" + + # Stage 3: Join and finalize + - - name: join-features + templateRef: + name: databricks-connector + template: run-job + arguments: + parameters: + - name: code-path + value: "/Users/ml-team/03-join-features" + - name: task-type + value: "notebook" + - name: cluster-mode + value: "Serverless" + - name: args + value: "{{workflow.parameters.date}}" +``` +{% endtab %} +{% endtabs %} + +## Best Practices + +### 1. Use Memory-Optimized Instances +Feature engineering often involves aggregations: +```python +arguments={ + "new-cluster-node-type": "r5.2xlarge", # Memory-optimized +} +``` + +### 2. Cache Intermediate Results +For iterative feature development: +```python +# In your notebook +df_features.cache() +df_features.count() # Materialize cache +``` + +### 3. Monitor Feature Distributions +Check for data drift: +```python +# Summary statistics +df_features.describe().show() + +# Check for nulls +df_features.select([F.sum(F.col(c).isNull().cast("int")).alias(c) + for c in df_features.columns]).show() +``` + +## Next Steps + +- [Model Training](model-training.md) - Train models with your engineered features +- [Batch Inference](batch-inference.md) - Use features for predictions +- [Data Ingestion](data-ingestion.md) - Load raw data for feature engineering +- [Multi-Step Pipelines](multi-step-pipelines.md) - Build end-to-end workflows diff --git a/docs/guides/model-training.md b/docs/guides/model-training.md new file mode 100644 index 0000000..46c549f --- /dev/null +++ b/docs/guides/model-training.md @@ -0,0 +1,495 @@ +# Model Training Pipeline + +Learn how to build scalable model training pipelines using Argo Connectors to train machine learning models with distributed compute. + +## Overview + +Model training involves fitting ML models to your data. This guide shows you how to build training pipelines that scale from experimentation to production using available connectors. + +## Use Cases + +- **Training at scale**: Train models on large datasets with distributed computing +- **Hyperparameter tuning**: Run multiple training jobs with different configurations +- **Model comparison**: Train multiple model types in parallel +- **Scheduled retraining**: Automatically retrain models on new data + +## Architecture + +```mermaid +graph LR + A[Training Data] --> B[Load & Split] + B --> C[Train Model] + C --> D[Evaluate] + D --> E[Register Model] + + style C fill:#e1f5ff +``` + +## Basic Model Training + +### Train with Databricks and MLflow + +{% tabs %} +{% tab title="Python (Hera)" %} +```python +from hera.workflows import Workflow, Steps, Step, TemplateRef, Parameter + +with Workflow( + generate_name="model-training-", + namespace="default", + entrypoint="main", + arguments=[ + Parameter(name="training-data", value="s3://ml-data/features/2024-01-01/"), + Parameter(name="model-name", value="churn-prediction"), + Parameter(name="experiment-name", value="/Shared/experiments/churn"), + ] +) as w: + with Steps(name="main"): + Step( + name="train-model", + template_ref=TemplateRef( + name="databricks-connector", + template="run-job", + cluster_scope=False, + ), + arguments={ + "code-path": "/Users/ml-team/train-churn-model", + "task-type": "notebook", + "cluster-mode": "New", + + # Use larger instances for training + "new-cluster-spark-version": "13.3.x-scala2.12", + "new-cluster-node-type": "r5.4xlarge", + "new-cluster-num-workers": "4", + + # Pass training configuration + "args": "{{workflow.parameters.training-data}},{{workflow.parameters.model-name}},{{workflow.parameters.experiment-name}}", + } + ) + +w.create() +``` +{% endtab %} + +{% tab title="YAML" %} +```yaml +apiVersion: argoproj.io/v1alpha1 +kind: Workflow +metadata: + generateName: model-training- +spec: + entrypoint: main + arguments: + parameters: + - name: training-data + value: "s3://ml-data/features/2024-01-01/" + - name: model-name + value: "churn-prediction" + - name: experiment-name + value: "/Shared/experiments/churn" + + templates: + - name: main + steps: + - - name: train-model + templateRef: + name: databricks-connector + template: run-job + arguments: + parameters: + - name: code-path + value: "/Users/ml-team/train-churn-model" + - name: task-type + value: "notebook" + - name: cluster-mode + value: "New" + - name: new-cluster-spark-version + value: "13.3.x-scala2.12" + - name: new-cluster-node-type + value: "r5.4xlarge" + - name: new-cluster-num-workers + value: "4" + - name: args + value: "{{workflow.parameters.training-data}},{{workflow.parameters.model-name}},{{workflow.parameters.experiment-name}}" +``` +{% endtab %} +{% endtabs %} + +### Databricks Notebook: Train Model + +```python +# Databricks notebook: /Users/ml-team/train-churn-model + +# COMMAND ---------- +dbutils.widgets.text("training_data", "") +dbutils.widgets.text("model_name", "") +dbutils.widgets.text("experiment_name", "") + +training_data = dbutils.widgets.get("training_data") +model_name = dbutils.widgets.get("model_name") +experiment_name = dbutils.widgets.get("experiment_name") + +# COMMAND ---------- +import mlflow +import mlflow.sklearn +from sklearn.ensemble import RandomForestClassifier +from sklearn.model_selection import train_test_split +from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score + +# Set MLflow experiment +mlflow.set_experiment(experiment_name) + +# COMMAND ---------- +# Load training data +df = spark.read.parquet(training_data) +df_pandas = df.toPandas() + +# Prepare features and target +X = df_pandas.drop(['customer_id', 'churned'], axis=1) +y = df_pandas['churned'] + +X_train, X_test, y_train, y_test = train_test_split( + X, y, test_size=0.2, random_state=42 +) + +print(f"Training samples: {len(X_train):,}") +print(f"Test samples: {len(X_test):,}") + +# COMMAND ---------- +# Train model with MLflow tracking +with mlflow.start_run(run_name=f"{model_name}-training"): + # Log parameters + n_estimators = 100 + max_depth = 10 + + mlflow.log_param("n_estimators", n_estimators) + mlflow.log_param("max_depth", max_depth) + mlflow.log_param("training_samples", len(X_train)) + + # Train model + model = RandomForestClassifier( + n_estimators=n_estimators, + max_depth=max_depth, + random_state=42 + ) + model.fit(X_train, y_train) + + # Evaluate + y_pred = model.predict(X_test) + + accuracy = accuracy_score(y_test, y_pred) + precision = precision_score(y_test, y_pred) + recall = recall_score(y_test, y_pred) + f1 = f1_score(y_test, y_pred) + + # Log metrics + mlflow.log_metric("accuracy", accuracy) + mlflow.log_metric("precision", precision) + mlflow.log_metric("recall", recall) + mlflow.log_metric("f1_score", f1) + + print(f"Accuracy: {accuracy:.4f}") + print(f"Precision: {precision:.4f}") + print(f"Recall: {recall:.4f}") + print(f"F1 Score: {f1:.4f}") + + # Log model + mlflow.sklearn.log_model(model, "model") + + # Register model + model_uri = f"runs:/{mlflow.active_run().info.run_id}/model" + mlflow.register_model(model_uri, model_name) + + print(f"Model registered as: {model_name}") +``` + +## GPU-Accelerated Training + +> **Note**: This example demonstrates the pattern. The PyTorch and TensorFlow connectors are coming soon. For now, use Databricks with GPU instances. + +### Deep Learning with GPU Instances + +{% tabs %} +{% tab title="Python (Hera)" %} +```python +from hera.workflows import Workflow, Steps, Step, TemplateRef, Parameter + +with Workflow( + generate_name="gpu-training-", + namespace="default", + entrypoint="main", + arguments=[ + Parameter(name="training-data", value="s3://ml-data/images/"), + Parameter(name="model-name", value="image-classifier"), + ] +) as w: + with Steps(name="main"): + Step( + name="train-on-gpu", + template_ref=TemplateRef( + name="databricks-connector", + template="run-job", + cluster_scope=False, + ), + arguments={ + "code-path": "/Users/ml-team/train-image-model", + "task-type": "notebook", + "cluster-mode": "New", + + # GPU-enabled Databricks Runtime + "new-cluster-spark-version": "13.3.x-gpu-ml-scala2.12", + "new-cluster-node-type": "g4dn.xlarge", # 1 T4 GPU + "new-cluster-num-workers": "2", + + "args": "{{workflow.parameters.training-data}},{{workflow.parameters.model-name}}", + } + ) + +w.create() +``` +{% endtab %} + +{% tab title="YAML" %} +```yaml +apiVersion: argoproj.io/v1alpha1 +kind: Workflow +metadata: + generateName: gpu-training- +spec: + entrypoint: main + arguments: + parameters: + - name: training-data + value: "s3://ml-data/images/" + - name: model-name + value: "image-classifier" + + templates: + - name: main + steps: + - - name: train-on-gpu + templateRef: + name: databricks-connector + template: run-job + arguments: + parameters: + - name: code-path + value: "/Users/ml-team/train-image-model" + - name: task-type + value: "notebook" + - name: cluster-mode + value: "New" + - name: new-cluster-spark-version + value: "13.3.x-gpu-ml-scala2.12" + - name: new-cluster-node-type + value: "g4dn.xlarge" + - name: new-cluster-num-workers + value: "2" + - name: args + value: "{{workflow.parameters.training-data}},{{workflow.parameters.model-name}}" +``` +{% endtab %} +{% endtabs %} + +> **Coming Soon**: Dedicated PyTorch, TensorFlow, and Ray connectors for distributed training + +## Hyperparameter Tuning + +Run multiple training jobs with different hyperparameters: + +{% tabs %} +{% tab title="Python (Hera)" %} +```python +from hera.workflows import Workflow, Steps, Step, TemplateRef, Parameter + +# Hyperparameter configurations +configs = [ + {"n_estimators": "100", "max_depth": "10"}, + {"n_estimators": "200", "max_depth": "15"}, + {"n_estimators": "300", "max_depth": "20"}, +] + +with Workflow( + generate_name="hyperparameter-tuning-", + namespace="default", + entrypoint="main", + arguments=[ + Parameter(name="training-data", value="s3://ml-data/features/"), + ] +) as w: + with Steps(name="main") as s: + # Train models in parallel with different configs + with s.parallel(): + for i, config in enumerate(configs): + Step( + name=f"train-config-{i}", + template_ref=TemplateRef( + name="databricks-connector", + template="run-job", + cluster_scope=False, + ), + arguments={ + "code-path": "/Users/ml-team/train-with-config", + "task-type": "notebook", + "cluster-mode": "New", + "new-cluster-spark-version": "13.3.x-scala2.12", + "new-cluster-node-type": "r5.2xlarge", + "new-cluster-num-workers": "2", + "args": f"{{{{workflow.parameters.training-data}}}},{config['n_estimators']},{config['max_depth']}", + } + ) + +w.create() +``` +{% endtab %} + +{% tab title="YAML" %} +```yaml +apiVersion: argoproj.io/v1alpha1 +kind: Workflow +metadata: + generateName: hyperparameter-tuning- +spec: + entrypoint: main + arguments: + parameters: + - name: training-data + value: "s3://ml-data/features/" + + templates: + - name: main + steps: + # Train models in parallel + - - name: train-config-0 + templateRef: + name: databricks-connector + template: run-job + arguments: + parameters: + - name: code-path + value: "/Users/ml-team/train-with-config" + - name: task-type + value: "notebook" + - name: cluster-mode + value: "New" + - name: new-cluster-spark-version + value: "13.3.x-scala2.12" + - name: new-cluster-node-type + value: "r5.2xlarge" + - name: new-cluster-num-workers + value: "2" + - name: args + value: "{{workflow.parameters.training-data}},100,10" + + - name: train-config-1 + templateRef: + name: databricks-connector + template: run-job + arguments: + parameters: + - name: code-path + value: "/Users/ml-team/train-with-config" + - name: task-type + value: "notebook" + - name: cluster-mode + value: "New" + - name: new-cluster-spark-version + value: "13.3.x-scala2.12" + - name: new-cluster-node-type + value: "r5.2xlarge" + - name: new-cluster-num-workers + value: "2" + - name: args + value: "{{workflow.parameters.training-data}},200,15" + + - name: train-config-2 + templateRef: + name: databricks-connector + template: run-job + arguments: + parameters: + - name: code-path + value: "/Users/ml-team/train-with-config" + - name: task-type + value: "notebook" + - name: cluster-mode + value: "New" + - name: new-cluster-spark-version + value: "13.3.x-scala2.12" + - name: new-cluster-node-type + value: "r5.2xlarge" + - name: new-cluster-num-workers + value: "2" + - name: args + value: "{{workflow.parameters.training-data}},300,20" +``` +{% endtab %} +{% endtabs %} + +## Best Practices + +### 1. Use MLflow for Experiment Tracking +Track all experiments in your training notebooks: +```python +import mlflow + +with mlflow.start_run(): + mlflow.log_params(params) + mlflow.log_metrics(metrics) + mlflow.log_model(model, "model") +``` + +### 2. Split Data Properly +Use consistent random seeds for reproducibility: +```python +train, test = df.randomSplit([0.8, 0.2], seed=42) +``` + +### 3. Monitor Training Progress +Log intermediate metrics: +```python +for epoch in range(num_epochs): + # Training code + mlflow.log_metric("train_loss", loss, step=epoch) +``` + +### 4. Save Model Artifacts +Use MLflow to version models: +```python +mlflow.register_model(model_uri, model_name) +``` + +## Coming Soon + +The following connectors will enable additional training workflows: + +### PyTorch Connector +Train deep learning models with PyTorch: +- Distributed training across multiple GPUs +- Integration with PyTorch Lightning +- Automatic checkpoint management + +### TensorFlow Connector +Train neural networks with TensorFlow: +- TensorFlow Distributed training +- TensorBoard integration +- Model serving preparation + +### Ray Connector +Scalable hyperparameter tuning: +- Ray Tune for hyperparameter search +- Distributed training with Ray Train +- Automatic resource allocation + +### Weights & Biases Connector +Enhanced experiment tracking: +- Automatic logging and visualization +- Model versioning +- Collaboration features + +## Next Steps + +- [Batch Inference](batch-inference.md) - Use trained models for predictions +- [Feature Engineering](feature-engineering.md) - Prepare data for training +- [Multi-Step Pipelines](multi-step-pipelines.md) - Build end-to-end ML workflows +- [Databricks Examples](../connectors/databricks/hera-examples.md) - More patterns diff --git a/docs/guides/multi-step-pipelines.md b/docs/guides/multi-step-pipelines.md new file mode 100644 index 0000000..d3fddfb --- /dev/null +++ b/docs/guides/multi-step-pipelines.md @@ -0,0 +1,767 @@ +# Multi-Step Pipelines + +Learn how to build end-to-end ML pipelines by chaining multiple connectors and workflow steps together. + +## Overview + +Real-world ML workflows typically involve multiple stages: data ingestion, feature engineering, model training, evaluation, and deployment. This guide shows you how to orchestrate these stages into cohesive pipelines. + +## Use Cases + +- **End-to-end ML pipelines**: From raw data to production models +- **Multi-stage processing**: Sequential data transformations +- **Conditional workflows**: Different paths based on results +- **Parallel processing**: Run independent tasks simultaneously + +## Architecture + +```mermaid +graph TB + A[Ingest Data] --> B[Validate] + B --> C[Feature Engineering] + C --> D[Train Model] + D --> E{Meets Criteria?} + E -->|Yes| F[Register Model] + E -->|No| G[Alert Team] + F --> H[Batch Inference] + + style C fill:#e1f5ff + style D fill:#e1f5ff + style H fill:#e1f5ff +``` + +## Complete ML Pipeline + +### From Data to Production Model + +{% tabs %} +{% tab title="Python (Hera)" %} +```python +from hera.workflows import Workflow, Steps, Step, TemplateRef, Parameter +from datetime import datetime + +today = datetime.now().strftime("%Y-%m-%d") + +with Workflow( + generate_name="ml-pipeline-", + namespace="default", + entrypoint="main", + arguments=[ + Parameter(name="date", value=today), + Parameter(name="model-name", value="churn-predictor"), + ] +) as w: + with Steps(name="main"): + # Step 1: Data Ingestion + Step( + name="ingest-data", + template_ref=TemplateRef( + name="databricks-connector", + template="run-job", + cluster_scope=False, + ), + arguments={ + "code-path": "/Users/data-team/01-ingest", + "task-type": "notebook", + "cluster-mode": "Serverless", + "run-name": "ingest-{{workflow.parameters.date}}", + "args": "{{workflow.parameters.date}}", + } + ) + + # Step 2: Data Validation + Step( + name="validate-data", + template_ref=TemplateRef( + name="databricks-connector", + template="run-job", + cluster_scope=False, + ), + arguments={ + "code-path": "/Users/data-team/02-validate", + "task-type": "notebook", + "cluster-mode": "Serverless", + "run-name": "validate-{{workflow.parameters.date}}", + "args": "{{workflow.parameters.date}}", + } + ) + + # Step 3: Feature Engineering + Step( + name="feature-engineering", + template_ref=TemplateRef( + name="databricks-connector", + template="run-job", + cluster_scope=False, + ), + arguments={ + "code-path": "/Users/ml-team/03-features", + "task-type": "notebook", + "cluster-mode": "New", + "new-cluster-spark-version": "13.3.x-scala2.12", + "new-cluster-node-type": "r5.2xlarge", + "scaling-type": "autoscale", + "min-workers": "4", + "max-workers": "16", + "run-name": "features-{{workflow.parameters.date}}", + "args": "{{workflow.parameters.date}}", + } + ) + + # Step 4: Model Training + Step( + name="train-model", + template_ref=TemplateRef( + name="databricks-connector", + template="run-job", + cluster_scope=False, + ), + arguments={ + "code-path": "/Users/ml-team/04-train", + "task-type": "notebook", + "cluster-mode": "New", + "new-cluster-spark-version": "13.3.x-scala2.12", + "new-cluster-node-type": "r5.4xlarge", + "new-cluster-num-workers": "4", + "run-name": "train-{{workflow.parameters.date}}", + "args": "{{workflow.parameters.date}},{{workflow.parameters.model-name}}", + } + ) + + # Step 5: Model Evaluation + Step( + name="evaluate-model", + template_ref=TemplateRef( + name="databricks-connector", + template="run-job", + cluster_scope=False, + ), + arguments={ + "code-path": "/Users/ml-team/05-evaluate", + "task-type": "notebook", + "cluster-mode": "Serverless", + "run-name": "evaluate-{{workflow.parameters.date}}", + "args": "{{workflow.parameters.model-name}}", + } + ) + + # Step 6: Batch Inference (production deployment) + Step( + name="batch-inference", + template_ref=TemplateRef( + name="databricks-connector", + template="run-job", + cluster_scope=False, + ), + arguments={ + "code-path": "/Users/ml-team/06-inference", + "task-type": "notebook", + "cluster-mode": "New", + "new-cluster-spark-version": "13.3.x-scala2.12", + "new-cluster-node-type": "r5.2xlarge", + "scaling-type": "autoscale", + "min-workers": "5", + "max-workers": "20", + "run-name": "inference-{{workflow.parameters.date}}", + "args": "{{workflow.parameters.model-name}},{{workflow.parameters.date}}", + } + ) + +w.create() +``` +{% endtab %} + +{% tab title="YAML" %} +```yaml +apiVersion: argoproj.io/v1alpha1 +kind: Workflow +metadata: + generateName: ml-pipeline- +spec: + entrypoint: main + arguments: + parameters: + - name: date + value: "2024-01-01" + - name: model-name + value: "churn-predictor" + + templates: + - name: main + steps: + # Step 1: Data Ingestion + - - name: ingest-data + templateRef: + name: databricks-connector + template: run-job + arguments: + parameters: + - name: code-path + value: "/Users/data-team/01-ingest" + - name: task-type + value: "notebook" + - name: cluster-mode + value: "Serverless" + - name: run-name + value: "ingest-{{workflow.parameters.date}}" + - name: args + value: "{{workflow.parameters.date}}" + + # Step 2: Data Validation + - - name: validate-data + templateRef: + name: databricks-connector + template: run-job + arguments: + parameters: + - name: code-path + value: "/Users/data-team/02-validate" + - name: task-type + value: "notebook" + - name: cluster-mode + value: "Serverless" + - name: run-name + value: "validate-{{workflow.parameters.date}}" + - name: args + value: "{{workflow.parameters.date}}" + + # Step 3: Feature Engineering + - - name: feature-engineering + templateRef: + name: databricks-connector + template: run-job + arguments: + parameters: + - name: code-path + value: "/Users/ml-team/03-features" + - name: task-type + value: "notebook" + - name: cluster-mode + value: "New" + - name: new-cluster-spark-version + value: "13.3.x-scala2.12" + - name: new-cluster-node-type + value: "r5.2xlarge" + - name: scaling-type + value: "autoscale" + - name: min-workers + value: "4" + - name: max-workers + value: "16" + - name: run-name + value: "features-{{workflow.parameters.date}}" + - name: args + value: "{{workflow.parameters.date}}" + + # Step 4: Model Training + - - name: train-model + templateRef: + name: databricks-connector + template: run-job + arguments: + parameters: + - name: code-path + value: "/Users/ml-team/04-train" + - name: task-type + value: "notebook" + - name: cluster-mode + value: "New" + - name: new-cluster-spark-version + value: "13.3.x-scala2.12" + - name: new-cluster-node-type + value: "r5.4xlarge" + - name: new-cluster-num-workers + value: "4" + - name: run-name + value: "train-{{workflow.parameters.date}}" + - name: args + value: "{{workflow.parameters.date}},{{workflow.parameters.model-name}}" + + # Step 5: Model Evaluation + - - name: evaluate-model + templateRef: + name: databricks-connector + template: run-job + arguments: + parameters: + - name: code-path + value: "/Users/ml-team/05-evaluate" + - name: task-type + value: "notebook" + - name: cluster-mode + value: "Serverless" + - name: run-name + value: "evaluate-{{workflow.parameters.date}}" + - name: args + value: "{{workflow.parameters.model-name}}" + + # Step 6: Batch Inference + - - name: batch-inference + templateRef: + name: databricks-connector + template: run-job + arguments: + parameters: + - name: code-path + value: "/Users/ml-team/06-inference" + - name: task-type + value: "notebook" + - name: cluster-mode + value: "New" + - name: new-cluster-spark-version + value: "13.3.x-scala2.12" + - name: new-cluster-node-type + value: "r5.2xlarge" + - name: scaling-type + value: "autoscale" + - name: min-workers + value: "5" + - name: max-workers + value: "20" + - name: run-name + value: "inference-{{workflow.parameters.date}}" + - name: args + value: "{{workflow.parameters.model-name}},{{workflow.parameters.date}}" +``` +{% endtab %} +{% endtabs %} + +## Combining Databricks and Spark Connectors + +Use Databricks for development and Spark for production jobs: + +{% tabs %} +{% tab title="Python (Hera)" %} +```python +from hera.workflows import Workflow, Steps, Step, TemplateRef, Parameter + +with Workflow( + generate_name="hybrid-pipeline-", + namespace="default", + entrypoint="main" +) as w: + with Steps(name="main"): + # Use Databricks for feature engineering (interactive development) + Step( + name="feature-engineering", + template_ref=TemplateRef( + name="databricks-connector", + template="run-job", + cluster_scope=False, + ), + arguments={ + "code-path": "/Users/ml-team/feature-engineering", + "task-type": "notebook", + "cluster-mode": "New", + "new-cluster-spark-version": "13.3.x-scala2.12", + "new-cluster-node-type": "r5.xlarge", + "new-cluster-num-workers": "3", + } + ) + + # Use Spark on K8s for production batch job + Step( + name="batch-processing", + template_ref=TemplateRef( + name="spark-data-connector-jvm", + template="spark", + cluster_scope=False, + ), + arguments={ + "type": "Scala", + "mainClass": "com.company.BatchProcessor", + "mainApplicationFile": "s3://jars/batch-processor.jar", + "namespace": "default", + "globalImage": "gcr.io/spark-operator/spark:v3.1.1", + "executorInstances": "10", + "driverMemory": "4096m", + "executorMemory": "8192m", + } + ) + +w.create() +``` +{% endtab %} + +{% tab title="YAML" %} +```yaml +apiVersion: argoproj.io/v1alpha1 +kind: Workflow +metadata: + generateName: hybrid-pipeline- +spec: + entrypoint: main + templates: + - name: main + steps: + # Use Databricks for feature engineering + - - name: feature-engineering + templateRef: + name: databricks-connector + template: run-job + arguments: + parameters: + - name: code-path + value: "/Users/ml-team/feature-engineering" + - name: task-type + value: "notebook" + - name: cluster-mode + value: "New" + - name: new-cluster-spark-version + value: "13.3.x-scala2.12" + - name: new-cluster-node-type + value: "r5.xlarge" + - name: new-cluster-num-workers + value: "3" + + # Use Spark on K8s for production batch + - - name: batch-processing + templateRef: + name: spark-data-connector-jvm + template: spark + arguments: + parameters: + - name: type + value: "Scala" + - name: mainClass + value: "com.company.BatchProcessor" + - name: mainApplicationFile + value: "s3://jars/batch-processor.jar" + - name: namespace + value: "default" + - name: globalImage + value: "gcr.io/spark-operator/spark:v3.1.1" + - name: executorInstances + value: "10" + - name: driverMemory + value: "4096m" + - name: executorMemory + value: "8192m" +``` +{% endtab %} +{% endtabs %} + +## Scheduled Daily ML Pipeline + +Run your pipeline on a schedule using Argo's CronWorkflow: + +{% tabs %} +{% tab title="Python (Hera)" %} +```python +from hera.workflows import CronWorkflow, Steps, Step, TemplateRef, Parameter +from datetime import datetime + +with CronWorkflow( + name="daily-ml-pipeline", + namespace="default", + entrypoint="main", + schedule="0 2 * * *", # Daily at 2 AM + timezone="America/New_York", + arguments=[ + Parameter(name="model-name", value="daily-churn-model"), + ] +) as w: + with Steps(name="main"): + # Ingest yesterday's data + Step( + name="ingest", + template_ref=TemplateRef( + name="databricks-connector", + template="run-job", + cluster_scope=False, + ), + arguments={ + "code-path": "/Users/data-team/daily-ingest", + "task-type": "notebook", + "cluster-mode": "Serverless", + } + ) + + # Feature engineering + Step( + name="features", + template_ref=TemplateRef( + name="databricks-connector", + template="run-job", + cluster_scope=False, + ), + arguments={ + "code-path": "/Users/ml-team/daily-features", + "task-type": "notebook", + "cluster-mode": "New", + "new-cluster-spark-version": "13.3.x-scala2.12", + "new-cluster-node-type": "r5.xlarge", + "scaling-type": "autoscale", + "min-workers": "2", + "max-workers": "8", + } + ) + + # Train model + Step( + name="train", + template_ref=TemplateRef( + name="databricks-connector", + template="run-job", + cluster_scope=False, + ), + arguments={ + "code-path": "/Users/ml-team/daily-train", + "task-type": "notebook", + "cluster-mode": "New", + "new-cluster-spark-version": "13.3.x-scala2.12", + "new-cluster-node-type": "r5.2xlarge", + "new-cluster-num-workers": "3", + "args": "{{workflow.parameters.model-name}}", + } + ) + +w.create() +``` +{% endtab %} + +{% tab title="YAML" %} +```yaml +apiVersion: argoproj.io/v1alpha1 +kind: CronWorkflow +metadata: + name: daily-ml-pipeline +spec: + schedule: "0 2 * * *" # Daily at 2 AM + timezone: "America/New_York" + workflowSpec: + entrypoint: main + arguments: + parameters: + - name: model-name + value: "daily-churn-model" + + templates: + - name: main + steps: + # Ingest yesterday's data + - - name: ingest + templateRef: + name: databricks-connector + template: run-job + arguments: + parameters: + - name: code-path + value: "/Users/data-team/daily-ingest" + - name: task-type + value: "notebook" + - name: cluster-mode + value: "Serverless" + + # Feature engineering + - - name: features + templateRef: + name: databricks-connector + template: run-job + arguments: + parameters: + - name: code-path + value: "/Users/ml-team/daily-features" + - name: task-type + value: "notebook" + - name: cluster-mode + value: "New" + - name: new-cluster-spark-version + value: "13.3.x-scala2.12" + - name: new-cluster-node-type + value: "r5.xlarge" + - name: scaling-type + value: "autoscale" + - name: min-workers + value: "2" + - name: max-workers + value: "8" + + # Train model + - - name: train + templateRef: + name: databricks-connector + template: run-job + arguments: + parameters: + - name: code-path + value: "/Users/ml-team/daily-train" + - name: task-type + value: "notebook" + - name: cluster-mode + value: "New" + - name: new-cluster-spark-version + value: "13.3.x-scala2.12" + - name: new-cluster-node-type + value: "r5.2xlarge" + - name: new-cluster-num-workers + value: "3" + - name: args + value: "{{workflow.parameters.model-name}}" +``` +{% endtab %} +{% endtabs %} + +## Parallel Processing + +Process multiple entities in parallel for faster pipelines: + +{% tabs %} +{% tab title="Python (Hera)" %} +```python +from hera.workflows import Workflow, Steps, Step, TemplateRef, Parameter + +regions = ["us-west", "us-east", "eu-west", "ap-south"] + +with Workflow( + generate_name="regional-pipeline-", + namespace="default", + entrypoint="main" +) as w: + with Steps(name="main") as s: + # Process each region in parallel + with s.parallel(): + for region in regions: + Step( + name=f"process-{region}", + template_ref=TemplateRef( + name="databricks-connector", + template="run-job", + cluster_scope=False, + ), + arguments={ + "code-path": "/Users/ml-team/regional-processing", + "task-type": "notebook", + "cluster-mode": "New", + "new-cluster-spark-version": "13.3.x-scala2.12", + "new-cluster-node-type": "i3.xlarge", + "new-cluster-num-workers": "3", + "args": region, + } + ) + + # Aggregate results from all regions + Step( + name="aggregate-regions", + template_ref=TemplateRef( + name="databricks-connector", + template="run-job", + cluster_scope=False, + ), + arguments={ + "code-path": "/Users/ml-team/aggregate-all", + "task-type": "notebook", + "cluster-mode": "Serverless", + } + ) + +w.create() +``` +{% endtab %} + +{% tab title="YAML" %} +```yaml +apiVersion: argoproj.io/v1alpha1 +kind: Workflow +metadata: + generateName: regional-pipeline- +spec: + entrypoint: main + templates: + - name: main + steps: + # Process each region in parallel + - - name: process-us-west + templateRef: + name: databricks-connector + template: run-job + arguments: + parameters: + - name: code-path + value: "/Users/ml-team/regional-processing" + - name: task-type + value: "notebook" + - name: cluster-mode + value: "New" + - name: new-cluster-spark-version + value: "13.3.x-scala2.12" + - name: new-cluster-node-type + value: "i3.xlarge" + - name: new-cluster-num-workers + value: "3" + - name: args + value: "us-west" + + - name: process-us-east + templateRef: + name: databricks-connector + template: run-job + arguments: + parameters: + - name: code-path + value: "/Users/ml-team/regional-processing" + - name: task-type + value: "notebook" + - name: cluster-mode + value: "New" + - name: new-cluster-spark-version + value: "13.3.x-scala2.12" + - name: new-cluster-node-type + value: "i3.xlarge" + - name: new-cluster-num-workers + value: "3" + - name: args + value: "us-east" + + # ... other regions ... + + # Aggregate results + - - name: aggregate-regions + templateRef: + name: databricks-connector + template: run-job + arguments: + parameters: + - name: code-path + value: "/Users/ml-team/aggregate-all" + - name: task-type + value: "notebook" + - name: cluster-mode + value: "Serverless" +``` +{% endtab %} +{% endtabs %} + +## Best Practices + +### 1. Use Appropriate Cluster Sizes per Stage +- **Ingestion/Validation**: Serverless or small clusters +- **Feature Engineering**: Memory-optimized with autoscaling +- **Training**: Larger, stable clusters +- **Inference**: Autoscaling for variable load + +### 2. Name Runs Descriptively +Include workflow context in run names: +```python +arguments={ + "run-name": "features-{{workflow.name}}-{{workflow.parameters.date}}", +} +``` + +### 3. Add Error Notifications +Configure email notifications for failures: +```python +arguments={ + "email-notifications": "ml-team@company.com,oncall@company.com", +} +``` + +### 4. Monitor End-to-End Duration +Track pipeline execution time in Argo UI. + +## Next Steps + +- [Passing Data Between Steps](passing-data-between-steps.md) - Learn about outputs and artifacts +- [Data Ingestion](data-ingestion.md) - Start your pipeline +- [Model Training](model-training.md) - Training stage details +- [Batch Inference](batch-inference.md) - Production scoring \ No newline at end of file diff --git a/docs/guides/passing-data-between-steps.md b/docs/guides/passing-data-between-steps.md new file mode 100644 index 0000000..fe45be8 --- /dev/null +++ b/docs/guides/passing-data-between-steps.md @@ -0,0 +1,1173 @@ +# Passing Data Between Steps + +Learn how to pass data, parameters, and artifacts between workflow steps when using Argo Connectors. + +## Overview + +Multi-step workflows often need to share information between steps. Argo Workflows provides three mechanisms for this: + +1. **Parameters**: Small pieces of metadata (job IDs, URLs, metrics) +2. **Artifacts**: Large data files via S3/GCS artifact repository +3. **Volumes**: Shared storage for parallel reads/writes (fastest for large files) + +## Using Output Parameters + +Connectors expose output parameters that subsequent steps can reference. + +### Databricks Connector Outputs + +The Databricks connector provides these output parameters: + +| Output | Description | Example | +|--------|-------------|---------| +| `run-id` | Databricks run ID | `12345` | +| `run-url` | Link to Databricks UI | `https://workspace.databricks.com/...` | +| `result` | Job result/output | `{"status": "success"}` | +| `state` | Final job state | `SUCCESS` | +| `json` | Full job details | Complete JSON response | + +### Spark Connector Outputs + +The Spark connector provides these output parameters: + +| Output | Description | Example | +|--------|-------------|---------| +| `spark-job-name` | SparkApplication name | `spark-pi-abc123` | +| `spark-job-namespace` | Kubernetes namespace | `default` | + +## Basic Parameter Passing + +### Accessing Connector Outputs + +{% tabs %} +{% tab title="Python (Hera)" %} +```python +from hera.workflows import Workflow, Steps, Step, TemplateRef, script + +with Workflow( + generate_name="pipeline-with-outputs-", + namespace="default", + entrypoint="main" +) as w: + @script() + def process_results(run_id: str, run_url: str, state: str): + print(f"Databricks Run ID: {run_id}") + print(f"Databricks UI: {run_url}") + print(f"Job State: {state}") + + # Your custom logic here + if state != "SUCCESS": + raise Exception("Job failed!") + + with Steps(name="main"): + # Step 1: Run Databricks job + Step( + name="databricks-job", + template_ref=TemplateRef( + name="databricks-connector", + template="run-job", + cluster_scope=False, + ), + arguments={ + "code-path": "/Users/team/process-data", + "task-type": "notebook", + "cluster-mode": "Serverless", + } + ) + + # Step 2: Use outputs from Step 1 + process_results( + arguments={ + "run_id": "{{steps.databricks-job.outputs.parameters.run-id}}", + "run_url": "{{steps.databricks-job.outputs.parameters.run-url}}", + "state": "{{steps.databricks-job.outputs.parameters.state}}", + } + ) + +w.create() +``` +{% endtab %} + +{% tab title="YAML" %} +```yaml +apiVersion: argoproj.io/v1alpha1 +kind: Workflow +metadata: + generateName: pipeline-with-outputs- +spec: + entrypoint: main + templates: + - name: main + steps: + # Step 1: Run Databricks job + - - name: databricks-job + templateRef: + name: databricks-connector + template: run-job + arguments: + parameters: + - name: code-path + value: "/Users/team/process-data" + - name: task-type + value: "notebook" + - name: cluster-mode + value: "Serverless" + + # Step 2: Use outputs + - - name: process-results + template: print-info + arguments: + parameters: + - name: run-id + value: "{{steps.databricks-job.outputs.parameters.run-id}}" + - name: run-url + value: "{{steps.databricks-job.outputs.parameters.run-url}}" + - name: state + value: "{{steps.databricks-job.outputs.parameters.state}}" + + - name: print-info + inputs: + parameters: + - name: run-id + - name: run-url + - name: state + container: + image: alpine:latest + command: [sh, -c] + args: + - | + echo "Databricks Run ID: {{inputs.parameters.run-id}}" + echo "Databricks UI: {{inputs.parameters.run-url}}" + echo "Job State: {{inputs.parameters.state}}" +``` +{% endtab %} +{% endtabs %} + +## Passing Parameters Between Connectors + +### From Databricks to Spark + +{% tabs %} +{% tab title="Python (Hera)" %} +```python +from hera.workflows import Workflow, Steps, Step, TemplateRef + +with Workflow( + generate_name="databricks-to-spark-", + namespace="default", + entrypoint="main" +) as w: + with Steps(name="main"): + # Step 1: Process with Databricks + Step( + name="databricks-processing", + template_ref=TemplateRef( + name="databricks-connector", + template="run-job", + cluster_scope=False, + ), + arguments={ + "code-path": "/Users/team/prepare-data", + "task-type": "notebook", + "cluster-mode": "New", + "new-cluster-spark-version": "13.3.x-scala2.12", + "new-cluster-node-type": "i3.xlarge", + "new-cluster-num-workers": "2", + } + ) + + # Step 2: Additional processing with Spark on K8s + Step( + name="spark-processing", + template_ref=TemplateRef( + name="spark-data-connector-python", + template="spark", + cluster_scope=False, + ), + arguments={ + "mainApplicationFile": "s3://scripts/process.py", + "namespace": "default", + "globalImage": "gcr.io/spark-operator/spark-py:v3.1.1", + "executorInstances": "5", + "driverMemory": "2048m", + "executorMemory": "4096m", + } + ) + + # Step 3: Cleanup Spark resources + Step( + name="cleanup-spark", + template_ref=TemplateRef( + name="spark-delete-resource", + template="spark-delete", + cluster_scope=False, + ), + arguments={ + "namespace": "{{steps.spark-processing.outputs.parameters.spark-job-namespace}}", + "spark-job-name": "{{steps.spark-processing.outputs.parameters.spark-job-name}}", + } + ) + +w.create() +``` +{% endtab %} + +{% tab title="YAML" %} +```yaml +apiVersion: argoproj.io/v1alpha1 +kind: Workflow +metadata: + generateName: databricks-to-spark- +spec: + entrypoint: main + templates: + - name: main + steps: + # Step 1: Databricks processing + - - name: databricks-processing + templateRef: + name: databricks-connector + template: run-job + arguments: + parameters: + - name: code-path + value: "/Users/team/prepare-data" + - name: task-type + value: "notebook" + - name: cluster-mode + value: "New" + - name: new-cluster-spark-version + value: "13.3.x-scala2.12" + - name: new-cluster-node-type + value: "i3.xlarge" + - name: new-cluster-num-workers + value: "2" + + # Step 2: Spark processing + - - name: spark-processing + templateRef: + name: spark-data-connector-python + template: spark + arguments: + parameters: + - name: mainApplicationFile + value: "s3://scripts/process.py" + - name: namespace + value: "default" + - name: globalImage + value: "gcr.io/spark-operator/spark-py:v3.1.1" + - name: executorInstances + value: "5" + - name: driverMemory + value: "2048m" + - name: executorMemory + value: "4096m" + + # Step 3: Cleanup + - - name: cleanup-spark + templateRef: + name: spark-delete-resource + template: spark-delete + arguments: + parameters: + - name: namespace + value: "{{steps.spark-processing.outputs.parameters.spark-job-namespace}}" + - name: spark-job-name + value: "{{steps.spark-processing.outputs.parameters.spark-job-name}}" +``` +{% endtab %} +{% endtabs %} + +## Using Workflow-Level Parameters + +Share parameters across multiple steps: + +{% tabs %} +{% tab title="Python (Hera)" %} +```python +from hera.workflows import Workflow, Steps, Step, TemplateRef, Parameter + +with Workflow( + generate_name="shared-params-", + namespace="default", + entrypoint="main", + arguments=[ + Parameter(name="date", value="2024-01-01"), + Parameter(name="environment", value="production"), + Parameter(name="region", value="us-west-2"), + ] +) as w: + with Steps(name="main"): + # All steps can access workflow parameters + Step( + name="step1", + template_ref=TemplateRef( + name="databricks-connector", + template="run-job", + cluster_scope=False, + ), + arguments={ + "code-path": "/Users/team/step1", + "task-type": "notebook", + "cluster-mode": "Serverless", + "run-name": "step1-{{workflow.parameters.environment}}-{{workflow.parameters.date}}", + "args": "{{workflow.parameters.date}},{{workflow.parameters.region}}", + } + ) + + Step( + name="step2", + template_ref=TemplateRef( + name="databricks-connector", + template="run-job", + cluster_scope=False, + ), + arguments={ + "code-path": "/Users/team/step2", + "task-type": "notebook", + "cluster-mode": "Serverless", + "run-name": "step2-{{workflow.parameters.environment}}-{{workflow.parameters.date}}", + "args": "{{workflow.parameters.date}},{{workflow.parameters.region}}", + } + ) + +w.create() +``` +{% endtab %} + +{% tab title="YAML" %} +```yaml +apiVersion: argoproj.io/v1alpha1 +kind: Workflow +metadata: + generateName: shared-params- +spec: + entrypoint: main + arguments: + parameters: + - name: date + value: "2024-01-01" + - name: environment + value: "production" + - name: region + value: "us-west-2" + + templates: + - name: main + steps: + - - name: step1 + templateRef: + name: databricks-connector + template: run-job + arguments: + parameters: + - name: code-path + value: "/Users/team/step1" + - name: task-type + value: "notebook" + - name: cluster-mode + value: "Serverless" + - name: run-name + value: "step1-{{workflow.parameters.environment}}-{{workflow.parameters.date}}" + - name: args + value: "{{workflow.parameters.date}},{{workflow.parameters.region}}" + + - - name: step2 + templateRef: + name: databricks-connector + template: run-job + arguments: + parameters: + - name: code-path + value: "/Users/team/step2" + - name: task-type + value: "notebook" + - name: cluster-mode + value: "Serverless" + - name: run-name + value: "step2-{{workflow.parameters.environment}}-{{workflow.parameters.date}}" + - name: args + value: "{{workflow.parameters.date}},{{workflow.parameters.region}}" +``` +{% endtab %} +{% endtabs %} + +## Working with Artifacts + +While connectors primarily use parameters, you can combine them with artifact passing for large data: + +{% tabs %} +{% tab title="Python (Hera)" %} +```python +from hera.workflows import ( + Workflow, + Steps, + Step, + TemplateRef, + script, + Artifact +) + +with Workflow( + generate_name="pipeline-with-artifacts-", + namespace="default", + entrypoint="main" +) as w: + @script( + outputs=[ + Artifact(name="model-path", path="/tmp/model_path.txt") + ] + ) + def save_model_path(): + """Save model path for next step""" + model_path = "s3://models/churn-model-v1/" + with open("/tmp/model_path.txt", "w") as f: + f.write(model_path) + + @script( + inputs=[ + Artifact(name="model-path", path="/tmp/model_path.txt") + ] + ) + def use_model_path(): + """Use model path from previous step""" + with open("/tmp/model_path.txt", "r") as f: + model_path = f.read() + print(f"Using model from: {model_path}") + + with Steps(name="main"): + # Step 1: Train and save model path + Step( + name="train", + template_ref=TemplateRef( + name="databricks-connector", + template="run-job", + cluster_scope=False, + ), + arguments={ + "code-path": "/Users/ml-team/train", + "task-type": "notebook", + "cluster-mode": "Serverless", + } + ) + + save_model_path(name="save-path") + + # Step 2: Use the model path + use_model_path( + name="use-path", + arguments={ + "model-path": "{{steps.save-path.outputs.artifacts.model-path}}" + } + ) + +w.create() +``` +{% endtab %} + +{% tab title="YAML" %} +```yaml +apiVersion: argoproj.io/v1alpha1 +kind: Workflow +metadata: + generateName: pipeline-with-artifacts- +spec: + entrypoint: main + templates: + - name: main + steps: + - - name: train + templateRef: + name: databricks-connector + template: run-job + arguments: + parameters: + - name: code-path + value: "/Users/ml-team/train" + - name: task-type + value: "notebook" + - name: cluster-mode + value: "Serverless" + + - - name: save-path + template: save-model-path + + - - name: use-path + template: use-model-path + arguments: + artifacts: + - name: model-path + from: "{{steps.save-path.outputs.artifacts.model-path}}" + + - name: save-model-path + outputs: + artifacts: + - name: model-path + path: /tmp/model_path.txt + script: + image: python:3.9 + command: [python] + source: | + model_path = "s3://models/churn-model-v1/" + with open("/tmp/model_path.txt", "w") as f: + f.write(model_path) + + - name: use-model-path + inputs: + artifacts: + - name: model-path + path: /tmp/model_path.txt + script: + image: python:3.9 + command: [python] + source: | + with open("/tmp/model_path.txt", "r") as f: + model_path = f.read() + print(f"Using model from: {model_path}") +``` +{% endtab %} +{% endtabs %} + +## Passing Data Through Storage + +The recommended pattern for large data is to write to cloud storage and pass the path: + +### Writing Results to S3 + +In your Databricks notebook: +```python +# Save results to S3 +output_path = f"s3://ml-data/results/{run_id}/" +df_results.write.mode("overwrite").parquet(output_path) + +# Return the path via dbutils +dbutils.notebook.exit(output_path) +``` + +Then reference it in the next step: +```python +arguments={ + "args": "{{steps.previous-step.outputs.parameters.result}}", # Contains the S3 path +} +``` + +## Pattern: Metadata Passing + +Pass metadata between steps while data stays in storage: + +{% tabs %} +{% tab title="Python (Hera)" %} +```python +from hera.workflows import Workflow, Steps, Step, TemplateRef, Parameter + +with Workflow( + generate_name="metadata-passing-", + namespace="default", + entrypoint="main", + arguments=[ + Parameter(name="base-path", value="s3://ml-data/pipeline/"), + ] +) as w: + with Steps(name="main"): + # Step 1: Process and save to S3 + Step( + name="process", + template_ref=TemplateRef( + name="databricks-connector", + template="run-job", + cluster_scope=False, + ), + arguments={ + "code-path": "/Users/team/process", + "task-type": "notebook", + "cluster-mode": "Serverless", + "args": "{{workflow.parameters.base-path}}", + } + ) + + # Step 2: Reference S3 path from output + # The notebook should use dbutils.notebook.exit(path) to return the path + Step( + name="validate", + template_ref=TemplateRef( + name="databricks-connector", + template="run-job", + cluster_scope=False, + ), + arguments={ + "code-path": "/Users/team/validate", + "task-type": "notebook", + "cluster-mode": "Serverless", + # Use the path returned from previous step + "args": "{{steps.process.outputs.parameters.result}}", + } + ) + +w.create() +``` +{% endtab %} + +{% tab title="YAML" %} +```yaml +apiVersion: argoproj.io/v1alpha1 +kind: Workflow +metadata: + generateName: metadata-passing- +spec: + entrypoint: main + arguments: + parameters: + - name: base-path + value: "s3://ml-data/pipeline/" + + templates: + - name: main + steps: + - - name: process + templateRef: + name: databricks-connector + template: run-job + arguments: + parameters: + - name: code-path + value: "/Users/team/process" + - name: task-type + value: "notebook" + - name: cluster-mode + value: "Serverless" + - name: args + value: "{{workflow.parameters.base-path}}" + + - - name: validate + templateRef: + name: databricks-connector + template: run-job + arguments: + parameters: + - name: code-path + value: "/Users/team/validate" + - name: task-type + value: "notebook" + - name: cluster-mode + value: "Serverless" + - name: args + value: "{{steps.process.outputs.parameters.result}}" +``` +{% endtab %} +{% endtabs %} + +### Databricks Notebook Pattern + +```python +# Databricks notebook: /Users/team/process + +# COMMAND ---------- +dbutils.widgets.text("base_path", "") +base_path = dbutils.widgets.get("base_path") + +# COMMAND ---------- +# Process data +df = spark.read.parquet(f"{base_path}/input/") + +# ... processing logic ... + +# COMMAND ---------- +# Save results +output_path = f"{base_path}/processed/run-{datetime.now().strftime('%Y%m%d%H%M%S')}/" +df.write.mode("overwrite").parquet(output_path) + +# COMMAND ---------- +# Return the output path to Argo +dbutils.notebook.exit(output_path) +``` + +## Using Volumes for Parallel Data Access + +For workflows that need to share large files across parallel steps, volumes provide significantly better performance than artifact repositories. + +### Performance Comparison + +Based on [Pipekit's ArgoCon 2023 talk](https://www.youtube.com/watch?v=QZI-LXJGWYI): + +| Method | Time to Share 10GB File Across 3 Parallel Steps | Use Case | +|--------|------|----------| +| **NFS Volume** | ~20 seconds | Fast parallel reads/writes | +| **S3 Artifacts** | ~7 minutes | Simpler setup, slower | + +**Use volumes when:** +- Multiple parallel steps need to read the same large file +- You need fast data sharing between steps +- Steps need to write to shared storage simultaneously + +**Use artifacts (S3/GCS) when:** +- Simple sequential workflows +- Smaller files (< 1GB) +- You want managed storage without volume provisioning + +### NFS Volume Pattern + +{% tabs %} +{% tab title="Python (Hera)" %} +```python +from hera.workflows import ( + Workflow, + Steps, + Step, + script, + Parameter, + models as m +) + +with Workflow( + generate_name="volume-sharing-", + namespace="default", + entrypoint="main", + volume_claim_templates=[ + m.PersistentVolumeClaim( + metadata=m.ObjectMeta(name="workdir"), + spec=m.PersistentVolumeClaimSpec( + access_modes=["ReadWriteMany"], + storage_class_name="nfs", # Requires NFS provisioner + resources=m.ResourceRequirements( + requests={"storage": "50Gi"} + ) + ) + ) + ] +) as w: + @script( + volume_mounts=[m.VolumeMount(name="workdir", mount_path="/workdir")] + ) + def create_large_file(): + """Create a large file that parallel steps will read""" + import subprocess + # Create 10GB file + subprocess.run([ + "dd", "if=/dev/zero", "of=/workdir/LARGE_FILE", + "bs=1", "count=0", "seek=10G" + ]) + print("Large file created") + + @script( + volume_mounts=[m.VolumeMount(name="workdir", mount_path="/workdir")] + ) + def process_file(index: str): + """Process the shared file in parallel""" + import subprocess + import os + + print(f"Worker {index} starting") + + # Verify file exists + result = subprocess.run(["ls", "-lah", "/workdir"], capture_output=True, text=True) + print(result.stdout) + + # Check file size + result = subprocess.run(["du", "-h", "/workdir/LARGE_FILE"], capture_output=True, text=True) + print(f"File size: {result.stdout}") + + # Create new file (each worker creates their own) + subprocess.run([ + "dd", "if=/dev/zero", f"of=/workdir/OUTPUT_{index}", + "bs=1", "count=0", "seek=5G" + ]) + print(f"Worker {index} created output file") + + with Steps(name="main") as s: + # Step 1: Create large file + create_large_file(name="setup") + + # Step 2: Process in parallel (all can read the same file) + with s.parallel(): + for i in range(1, 4): + process_file( + name=f"worker-{i}", + arguments={"index": str(i)} + ) + +w.create() +``` +{% endtab %} + +{% tab title="YAML" %} +```yaml +apiVersion: argoproj.io/v1alpha1 +kind: Workflow +metadata: + generateName: volume-sharing- +spec: + entrypoint: main + + # Define shared volume + volumeClaimTemplates: + - metadata: + name: workdir + spec: + accessModes: ["ReadWriteMany"] + storageClassName: nfs # Requires NFS provisioner + resources: + requests: + storage: 50Gi + + templates: + - name: main + dag: + tasks: + # Create large file first + - name: setup + template: create-file + + # Process in parallel + - name: worker-1 + template: process-file + arguments: + parameters: + - name: index + value: "1" + depends: setup + + - name: worker-2 + template: process-file + arguments: + parameters: + - name: index + value: "2" + depends: setup + + - name: worker-3 + template: process-file + arguments: + parameters: + - name: index + value: "3" + depends: setup + + - name: create-file + container: + image: alpine + command: [sh, -c] + args: + - | + cd /workdir + dd if=/dev/zero of=LARGE_FILE bs=1 count=0 seek=10G + echo "Large file created" + volumeMounts: + - name: workdir + mountPath: /workdir + + - name: process-file + inputs: + parameters: + - name: index + container: + image: alpine + command: [sh, -c] + args: + - | + echo "Worker {{inputs.parameters.index}} starting" + cd /workdir + ls -lah + du -h LARGE_FILE + echo "Creating output file" + dd if=/dev/zero of=OUTPUT_{{inputs.parameters.index}} bs=1 count=0 seek=5G + echo "Worker {{inputs.parameters.index}} done" + volumeMounts: + - name: workdir + mountPath: /workdir +``` +{% endtab %} +{% endtabs %} + +### Setting Up NFS Provisioner + +To use the volume pattern, you need an NFS provisioner in your cluster: + +```bash +# Install nfs-server-provisioner +helm repo add nfs-ganesha-server-and-external-provisioner \ + https://kubernetes-sigs.github.io/nfs-ganesha-server-and-external-provisioner/ +helm install nfs-provisioner nfs-ganesha-server-and-external-provisioner/nfs-server-provisioner \ + --set persistence.enabled=true \ + --set persistence.size=200Gi \ + --set storageClass.name=nfs +``` + +### Combining Volumes with Connectors + +Use volumes to share data between custom steps and connector steps: + +{% tabs %} +{% tab title="Python (Hera)" %} +```python +from hera.workflows import ( + Workflow, + Steps, + Step, + TemplateRef, + script, + Parameter, + models as m +) + +with Workflow( + generate_name="connector-with-volume-", + namespace="default", + entrypoint="main", + volume_claim_templates=[ + m.PersistentVolumeClaim( + metadata=m.ObjectMeta(name="shared-data"), + spec=m.PersistentVolumeClaimSpec( + access_modes=["ReadWriteMany"], + storage_class_name="nfs", + resources=m.ResourceRequirements( + requests={"storage": "100Gi"} + ) + ) + ) + ] +) as w: + @script( + volume_mounts=[m.VolumeMount(name="shared-data", mount_path="/data")] + ) + def prepare_data(): + """Prepare training data and save to shared volume""" + import pandas as pd + import os + + # Generate or download data + data = pd.DataFrame({ + 'feature1': range(1000000), + 'feature2': range(1000000), + 'target': [i % 2 for i in range(1000000)] + }) + + # Save to shared volume + data.to_parquet("/data/training_data.parquet") + print(f"Saved {len(data):,} rows to /data/training_data.parquet") + + @script( + volume_mounts=[m.VolumeMount(name="shared-data", mount_path="/data")] + ) + def upload_to_s3(): + """Upload prepared data to S3 for Databricks to access""" + import subprocess + + # Use AWS CLI or boto3 to upload + subprocess.run([ + "aws", "s3", "cp", + "/data/training_data.parquet", + "s3://ml-data/prepared/training_data.parquet" + ]) + print("Uploaded to S3") + + with Steps(name="main"): + # Step 1: Prepare data locally + prepare_data(name="prep") + + # Step 2: Upload to S3 + upload_to_s3(name="upload") + + # Step 3: Train model in Databricks using the S3 data + Step( + name="train", + template_ref=TemplateRef( + name="databricks-connector", + template="run-job", + cluster_scope=False, + ), + arguments={ + "code-path": "/Users/ml-team/train-model", + "task-type": "notebook", + "cluster-mode": "New", + "new-cluster-spark-version": "13.3.x-scala2.12", + "new-cluster-node-type": "r5.2xlarge", + "new-cluster-num-workers": "3", + "args": "s3://ml-data/prepared/training_data.parquet", + } + ) + +w.create() +``` +{% endtab %} + +{% tab title="YAML" %} +```yaml +apiVersion: argoproj.io/v1alpha1 +kind: Workflow +metadata: + generateName: connector-with-volume- +spec: + entrypoint: main + + volumeClaimTemplates: + - metadata: + name: shared-data + spec: + accessModes: ["ReadWriteMany"] + storageClassName: nfs + resources: + requests: + storage: 100Gi + + templates: + - name: main + steps: + - - name: prep + template: prepare-data + + - - name: upload + template: upload-to-s3 + + - - name: train + templateRef: + name: databricks-connector + template: run-job + arguments: + parameters: + - name: code-path + value: "/Users/ml-team/train-model" + - name: task-type + value: "notebook" + - name: cluster-mode + value: "New" + - name: new-cluster-spark-version + value: "13.3.x-scala2.12" + - name: new-cluster-node-type + value: "r5.2xlarge" + - name: new-cluster-num-workers + value: "3" + - name: args + value: "s3://ml-data/prepared/training_data.parquet" + + - name: prepare-data + script: + image: python:3.9 + command: [python] + source: | + import pandas as pd + + data = pd.DataFrame({ + 'feature1': range(1000000), + 'feature2': range(1000000), + 'target': [i % 2 for i in range(1000000)] + }) + + data.to_parquet("/data/training_data.parquet") + print(f"Saved {len(data):,} rows") + volumeMounts: + - name: shared-data + mountPath: /data + + - name: upload-to-s3 + container: + image: amazon/aws-cli + command: [sh, -c] + args: + - | + aws s3 cp /data/training_data.parquet s3://ml-data/prepared/training_data.parquet + echo "Uploaded to S3" + volumeMounts: + - name: shared-data + mountPath: /data +``` +{% endtab %} +{% endtabs %} + +### When to Use Volumes vs Artifacts + +**Use Volumes (NFS/EFS) when:** +- Multiple parallel steps read the same large file (10GB+) +- Performance is critical (~20 seconds vs ~7 minutes for 10GB) +- Steps need to write to shared storage simultaneously +- Working with intermediate model checkpoints + +**Use Artifacts (S3/GCS) when:** +- Sequential workflows (one step at a time) +- Smaller files (< 1GB) +- Simpler setup preferred (no volume provisioner needed) +- Long-term storage required + +**Use Cloud Storage (S3/GCS directly) when:** +- Working with Databricks/Spark connectors (they access S3 natively) +- Very large datasets (100GB+) +- Data needs to persist beyond workflow execution +- Multiple workflows need to access the same data + +### Volume Access Modes + +| Access Mode | Description | Use Case | +|-------------|-------------|----------| +| `ReadWriteOnce` (RWO) | Single node read-write | Not suitable for parallel steps | +| `ReadWriteMany` (RWX) | Multiple nodes read-write | **Required for parallel access** | +| `ReadOnlyMany` (ROX) | Multiple nodes read-only | Parallel reads of static data | + +**Important**: Use `ReadWriteMany` for parallel workflows. This requires a storage class that supports RWX (like NFS, EFS, or Azure Files). + +## Best Practices + +### 1. Choose the Right Data Passing Method + +**Parameters** (< 1MB): +- Job IDs, URLs, status codes +- File paths, S3 URIs +- Configuration values +- Small result sets + +**Volumes** (large files, parallel access): +- Intermediate datasets shared across parallel workers +- Model checkpoints during distributed training +- Large files that multiple steps need to read + +**Artifacts** (sequential, moderate size): +- Model files (1-10GB) +- Datasets for sequential processing +- Results that need long-term storage + +**Cloud Storage** (very large, persistent): +- Training datasets (100GB+) +- Model registries +- Production predictions +- Data lakes + +### 2. Design for Failure +Make steps idempotent - they should produce the same result if re-run: +```python +# Use overwrite mode +df.write.mode("overwrite").parquet(path) +``` + +### 3. Document Data Contracts +Clearly define what each step outputs: +```python +# In your notebook +""" +Outputs: +- S3 Path: s3://bucket/path/ +- Schema: customer_id (string), score (double), timestamp (timestamp) +""" +``` + +### 4. Consider Cost vs Performance + +**Volumes (NFS/EFS)**: +- Faster for large files +- Incurs storage costs while workflow runs +- Cleaned up when workflow completes + +**Artifacts (S3/GCS)**: +- Slower data transfer +- Cheaper long-term storage +- Persists after workflow completes + +## Additional Resources + +- **[Pipekit ArgoCon Talk](https://www.youtube.com/watch?v=QZI-LXJGWYI)**: Configuring Volumes for Parallel Workflow Reads and Writes +- **[Example Workflows](https://github.com/pipekit/talk-demos/tree/main/argocon-demos/2023-configuring-volumes-for-parallel-workflow-reads-and-writes)**: Complete demo code +- **[NFS CI Example](https://github.com/pipekit/argo-workflows-ci-example)**: Working example with nfs-server-provisioner + +## Next Steps + +- [Multi-Step Pipelines](multi-step-pipelines.md) - Build complex workflows +- [Data Ingestion](data-ingestion.md) - Start your pipeline +- [Batch Inference](batch-inference.md) - Use outputs for predictions +- [Feature Engineering](feature-engineering.md) - Process data at scale