-
Notifications
You must be signed in to change notification settings - Fork 7
Add dbt plugin for Heimdall-based dbt execution #78
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| @@ -0,0 +1,246 @@ | ||||||||||||||
| package dbt | ||||||||||||||
|
|
||||||||||||||
| import ( | ||||||||||||||
| "context" | ||||||||||||||
| "fmt" | ||||||||||||||
| "os" | ||||||||||||||
| "os/exec" | ||||||||||||||
| "path/filepath" | ||||||||||||||
|
|
||||||||||||||
| heimdallContext "github.com/patterninc/heimdall/pkg/context" | ||||||||||||||
| "github.com/patterninc/heimdall/pkg/object/cluster" | ||||||||||||||
| "github.com/patterninc/heimdall/pkg/object/job" | ||||||||||||||
| "github.com/patterninc/heimdall/pkg/plugin" | ||||||||||||||
| ) | ||||||||||||||
|
|
||||||||||||||
| // commandContext represents command-level configuration from the YAML config file | ||||||||||||||
| type commandContext struct { | ||||||||||||||
| DbtProjectPath string `yaml:"dbt_project_path,omitempty" json:"dbt_project_path,omitempty"` | ||||||||||||||
| DbtProfilesDir string `yaml:"dbt_profiles_dir,omitempty" json:"dbt_profiles_dir,omitempty"` | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| // jobContext represents job-level configuration from Airflow | ||||||||||||||
| type jobContext struct { | ||||||||||||||
| Command string `json:"command"` // build, run, test, snapshot | ||||||||||||||
| Selector string `json:"selector,omitempty"` | ||||||||||||||
| Models []string `json:"models,omitempty"` | ||||||||||||||
| Exclude []string `json:"exclude,omitempty"` | ||||||||||||||
| Threads int `json:"threads,omitempty"` | ||||||||||||||
| FullRefresh bool `json:"full_refresh,omitempty"` | ||||||||||||||
| Vars map[string]string `json:"vars,omitempty"` | ||||||||||||||
| Target string `json:"target,omitempty"` // prod, dev, etc. | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| // clusterContext represents Snowflake cluster configuration | ||||||||||||||
| type clusterContext struct { | ||||||||||||||
| Account string `yaml:"account,omitempty" json:"account,omitempty"` | ||||||||||||||
| User string `yaml:"user,omitempty" json:"user,omitempty"` | ||||||||||||||
| Database string `yaml:"database,omitempty" json:"database,omitempty"` | ||||||||||||||
| Warehouse string `yaml:"warehouse,omitempty" json:"warehouse,omitempty"` | ||||||||||||||
| PrivateKey string `yaml:"private_key,omitempty" json:"private_key,omitempty"` | ||||||||||||||
| Role string `yaml:"role,omitempty" json:"role,omitempty"` | ||||||||||||||
| Schema string `yaml:"schema,omitempty" json:"schema,omitempty"` | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| func New(cmdCtx *heimdallContext.Context) (plugin.Handler, error) { | ||||||||||||||
| cmd := &commandContext{} | ||||||||||||||
|
|
||||||||||||||
| // Parse command context from YAML config | ||||||||||||||
| if cmdCtx != nil { | ||||||||||||||
| if err := cmdCtx.Unmarshal(cmd); err != nil { | ||||||||||||||
| return nil, err | ||||||||||||||
| } | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| return cmd.handler, nil | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| func (c *commandContext) handler(ctx context.Context, r *plugin.Runtime, j *job.Job, cl *cluster.Cluster) error { | ||||||||||||||
| // Parse cluster context (Snowflake credentials) | ||||||||||||||
| clusterContext := &clusterContext{} | ||||||||||||||
| if cl.Context != nil { | ||||||||||||||
| if err := cl.Context.Unmarshal(clusterContext); err != nil { | ||||||||||||||
| return fmt.Errorf("failed to unmarshal cluster context: %w", err) | ||||||||||||||
| } | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| // Parse job context (dbt command parameters) | ||||||||||||||
| jobContext := &jobContext{} | ||||||||||||||
| if err := j.Context.Unmarshal(jobContext); err != nil { | ||||||||||||||
| return fmt.Errorf("failed to unmarshal job context: %w", err) | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| // Set defaults | ||||||||||||||
| if jobContext.Threads == 0 { | ||||||||||||||
| jobContext.Threads = 4 | ||||||||||||||
| } | ||||||||||||||
| if jobContext.Target == "" { | ||||||||||||||
| jobContext.Target = "prod" | ||||||||||||||
| } | ||||||||||||||
| if jobContext.Command == "" { | ||||||||||||||
| jobContext.Command = "build" | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| // Generate profiles.yml | ||||||||||||||
| profilesContent, err := generateProfilesYML(clusterContext, jobContext.Target) | ||||||||||||||
| if err != nil { | ||||||||||||||
| return fmt.Errorf("failed to generate profiles.yml: %w", err) | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| // Ensure profiles directory exists | ||||||||||||||
| if err := os.MkdirAll(c.DbtProfilesDir, 0755); err != nil { | ||||||||||||||
| return fmt.Errorf("failed to create profiles directory: %w", err) | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| // Write profiles.yml | ||||||||||||||
| profilesPath := filepath.Join(c.DbtProfilesDir, "profiles.yml") | ||||||||||||||
| if err := os.WriteFile(profilesPath, []byte(profilesContent), 0600); err != nil { | ||||||||||||||
| return fmt.Errorf("failed to write profiles.yml: %w", err) | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| // Build dbt command | ||||||||||||||
| dbtCmd := buildDbtCommand(jobContext, c.DbtProjectPath, c.DbtProfilesDir) | ||||||||||||||
|
|
||||||||||||||
| // Execute dbt command | ||||||||||||||
| cmd := exec.CommandContext(ctx, dbtCmd[0], dbtCmd[1:]...) | ||||||||||||||
| cmd.Dir = c.DbtProjectPath | ||||||||||||||
| cmd.Env = append(os.Environ(), | ||||||||||||||
| fmt.Sprintf("DBT_PROFILES_DIR=%s", c.DbtProfilesDir), | ||||||||||||||
| ) | ||||||||||||||
|
|
||||||||||||||
| // Redirect output to runtime's stdout/stderr | ||||||||||||||
| cmd.Stdout = r.Stdout | ||||||||||||||
| cmd.Stderr = r.Stderr | ||||||||||||||
|
|
||||||||||||||
| // Execute and wait | ||||||||||||||
| if err := cmd.Run(); err != nil { | ||||||||||||||
| return fmt.Errorf("dbt command failed: %w", err) | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| return nil | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| func generateProfilesYML(clusterCtx *clusterContext, target string) (string, error) { | ||||||||||||||
| if clusterCtx.Account == "" || clusterCtx.User == "" || clusterCtx.Database == "" || clusterCtx.Warehouse == "" { | ||||||||||||||
| return "", fmt.Errorf("missing required Snowflake connection parameters") | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| if clusterCtx.PrivateKey == "" { | ||||||||||||||
| return "", fmt.Errorf("missing private_key path") | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| role := clusterCtx.Role | ||||||||||||||
| if role == "" { | ||||||||||||||
| role = "PUBLIC" // Default role if not specified | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| schema := clusterCtx.Schema | ||||||||||||||
| if schema == "" { | ||||||||||||||
| schema = "PUBLIC" // Default schema if not specified | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| // Generate profiles.yml content | ||||||||||||||
| profiles := fmt.Sprintf(`analytics_poc_profile: | ||||||||||||||
|
||||||||||||||
| profiles := fmt.Sprintf(`analytics_poc_profile: | |
| profileName := os.Getenv("DBT_PROFILE_NAME") | |
| if profileName == "" { | |
| profileName = "analytics_poc_profile" | |
| } | |
| profiles := fmt.Sprintf(profileName+`: |
Copilot
AI
Jan 9, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The threads value is hardcoded to 4 in both the prod and dev outputs. This should use the jobContext.Threads value that's passed as a parameter to allow per-job configuration of thread count. The threads parameter is already being set in the dbt command itself (line 214), but it would be better to use it in the profile as well for consistency.
Copilot
AI
Jan 9, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The database name 'analytics_dev_db' is hardcoded for the dev target. This should be derived from the cluster context or made configurable. Consider using the same database as prod with a different schema, or adding a 'dev_database' field to the cluster context.
Copilot
AI
Jan 9, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The threads parameter is being added to the command even when it's greater than 0, but line 74-76 ensures a default of 4 is always set. This means the condition 'if jobCtx.Threads > 0' will always be true. Consider changing the condition or removing it since threads will always have a value.
| if jobCtx.Threads > 0 { | |
| cmd = append(cmd, "--threads", fmt.Sprintf("%d", jobCtx.Threads)) | |
| } | |
| cmd = append(cmd, "--threads", fmt.Sprintf("%d", jobCtx.Threads)) |
Copilot
AI
Jan 9, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The vars are formatted as comma-separated key:value pairs, but dbt expects JSON format for the --vars argument. This should be properly JSON-encoded instead. For example, if vars contains {"key1": "value1", "key2": "value2"}, the current implementation would produce "{key1:value1,key2:value2}" which is not valid JSON. It should produce '{"key1":"value1","key2":"value2"}' instead.
Copilot
AI
Jan 9, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The target parameter is being added to the command even though it's already set in the handler function with a default value on lines 77-79. This means the condition 'if jobCtx.Target != ""' will always be true. Consider removing this redundant check.
| if jobCtx.Target != "" { | |
| cmd = append(cmd, "--target", jobCtx.Target) | |
| } | |
| cmd = append(cmd, "--target", jobCtx.Target) |
| Original file line number | Diff line number | Diff line change | ||||||||
|---|---|---|---|---|---|---|---|---|---|---|
| @@ -0,0 +1,106 @@ | ||||||||||
| # DBT Plugin for Heimdall | ||||||||||
|
|
||||||||||
| This plugin enables Heimdall to execute dbt commands against Snowflake. | ||||||||||
|
|
||||||||||
| ## Overview | ||||||||||
|
|
||||||||||
| The dbt plugin allows Airflow DAGs to trigger dbt transformations through Heimdall, maintaining centralized credential management and job tracking. | ||||||||||
|
|
||||||||||
| ## Features | ||||||||||
|
|
||||||||||
| - Execute dbt commands (`build`, `run`, `test`, `snapshot`) | ||||||||||
| - Support for selectors and model specifications | ||||||||||
| - Dynamic profiles.yml generation from cluster context | ||||||||||
| - Snowflake authentication via JWT (private key) | ||||||||||
| - Structured logging and error reporting | ||||||||||
|
|
||||||||||
| ## Configuration | ||||||||||
|
|
||||||||||
| ### Command Configuration | ||||||||||
|
|
||||||||||
| ```yaml | ||||||||||
| commands: | ||||||||||
| - name: dbt-0.0.1 | ||||||||||
| status: active | ||||||||||
| plugin: dbt | ||||||||||
| version: 0.0.1 | ||||||||||
| description: Run dbt commands against Snowflake | ||||||||||
| context: | ||||||||||
| dbt_project_path: /opt/dbt/analytics_dbt_poc | ||||||||||
| dbt_profiles_dir: /opt/dbt/profiles | ||||||||||
| tags: | ||||||||||
| - type:dbt | ||||||||||
| cluster_tags: | ||||||||||
| - type:snowflake | ||||||||||
| ``` | ||||||||||
|
|
||||||||||
| ### Cluster Configuration | ||||||||||
|
|
||||||||||
| Uses existing Snowflake cluster configuration: | ||||||||||
|
|
||||||||||
| ```yaml | ||||||||||
| clusters: | ||||||||||
| - name: snowflake-heimdall-wh | ||||||||||
| context: | ||||||||||
| account: pattern | ||||||||||
| user: heimdall__app_user | ||||||||||
| database: iceberg_db | ||||||||||
| warehouse: heimdall_wh | ||||||||||
| private_key: /etc/pattern.d/snowflake_key.pem | ||||||||||
| tags: | ||||||||||
| - type:snowflake | ||||||||||
| - data:prod | ||||||||||
| ``` | ||||||||||
|
|
||||||||||
| ## Job Context | ||||||||||
|
|
||||||||||
| When submitting a job from Airflow, provide the following context: | ||||||||||
|
|
||||||||||
| ```json | ||||||||||
| { | ||||||||||
| "command": "build", | ||||||||||
| "selector": "hourly_models", | ||||||||||
| "threads": 4, | ||||||||||
| "target": "prod", | ||||||||||
| "full_refresh": false | ||||||||||
| } | ||||||||||
| ``` | ||||||||||
|
|
||||||||||
| ### Supported Parameters | ||||||||||
|
|
||||||||||
| - `command` (string): dbt command to run (`build`, `run`, `test`, `snapshot`) | ||||||||||
| - `selector` (string, optional): Selector name from selectors.yml | ||||||||||
| - `models` ([]string, optional): List of specific models to run | ||||||||||
| - `exclude` ([]string, optional): Models to exclude | ||||||||||
| - `threads` (int): Number of threads for parallel execution | ||||||||||
| - `full_refresh` (bool): Whether to perform a full refresh | ||||||||||
| - `vars` (map[string]string, optional): dbt variables | ||||||||||
| - `target` (string): Target environment (prod, dev) | ||||||||||
|
|
||||||||||
| ## Authentication | ||||||||||
|
|
||||||||||
| The plugin uses Snowflake JWT authentication: | ||||||||||
|
|
||||||||||
| 1. Reads private key from cluster context path | ||||||||||
| 2. Generates profiles.yml with JWT authenticator | ||||||||||
| 3. Executes dbt with proper credentials | ||||||||||
|
|
||||||||||
| ## Example Usage from Airflow | ||||||||||
|
|
||||||||||
| ```python | ||||||||||
| from pattern.operators.dbt import DbtOperator | ||||||||||
|
|
||||||||||
| dbt_task = DbtOperator( | ||||||||||
| task_id="dbt_build_hourly", | ||||||||||
| command="build", | ||||||||||
| selector="hourly_models", | ||||||||||
| threads=4, | ||||||||||
| dag=dag | ||||||||||
| ) | ||||||||||
| ``` | ||||||||||
|
|
||||||||||
| ## Requirements | ||||||||||
|
|
||||||||||
| - dbt-core >= 1.7.0 | ||||||||||
| - dbt-snowflake >= 1.7.0 | ||||||||||
| - Valid Snowflake credentials in cluster configuration | ||||||||||
|
||||||||||
| - Valid Snowflake credentials in cluster configuration | |
| - Valid Snowflake credentials in cluster configuration | |
| **Note:** The dbt plugin does not currently perform runtime validation of installed `dbt-core` or `dbt-snowflake` versions. Ensure your environment or dependency management (e.g., requirements.txt, Poetry, or container image) provides `dbt-core` and `dbt-snowflake` versions >= 1.7.0 before using this plugin. |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,11 @@ | ||
| package main | ||
|
|
||
| import ( | ||
| "github.com/patterninc/heimdall/internal/pkg/object/command/dbt" | ||
| "github.com/patterninc/heimdall/pkg/context" | ||
| "github.com/patterninc/heimdall/pkg/plugin" | ||
| ) | ||
|
|
||
| func New(ctx *context.Context) (plugin.Handler, error) { | ||
| return dbt.New(ctx) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The default role 'PUBLIC' may not be appropriate for production use. PUBLIC typically has limited permissions in Snowflake. Consider either requiring the role to be specified in the cluster configuration or using a more descriptive error message to indicate that the role should be configured.