From 41fc1f50ea3752bdc019732103518049765d10c8 Mon Sep 17 00:00:00 2001 From: apbari Date: Fri, 9 Jan 2026 14:36:12 +0530 Subject: [PATCH] feat: Add dbt plugin for Heimdall-based dbt execution - Add dbt plugin entry point and command handler - Implement dynamic profiles.yml generation - Support JWT authentication with Snowflake - Enable selector and model targeting - Stream dbt output to Heimdall logs --- internal/pkg/object/command/dbt/dbt.go | 246 +++++++++++++++++++++++++ plugins/dbt/README.md | 106 +++++++++++ plugins/dbt/dbt.go | 11 ++ 3 files changed, 363 insertions(+) create mode 100644 internal/pkg/object/command/dbt/dbt.go create mode 100644 plugins/dbt/README.md create mode 100644 plugins/dbt/dbt.go diff --git a/internal/pkg/object/command/dbt/dbt.go b/internal/pkg/object/command/dbt/dbt.go new file mode 100644 index 0000000..1751b7e --- /dev/null +++ b/internal/pkg/object/command/dbt/dbt.go @@ -0,0 +1,246 @@ +package dbt + +import ( + "context" + "fmt" + "os" + "os/exec" + "path/filepath" + + heimdallContext "github.com/patterninc/heimdall/pkg/context" + "github.com/patterninc/heimdall/pkg/object/cluster" + "github.com/patterninc/heimdall/pkg/object/job" + "github.com/patterninc/heimdall/pkg/plugin" +) + +// commandContext represents command-level configuration from the YAML config file +type commandContext struct { + DbtProjectPath string `yaml:"dbt_project_path,omitempty" json:"dbt_project_path,omitempty"` + DbtProfilesDir string `yaml:"dbt_profiles_dir,omitempty" json:"dbt_profiles_dir,omitempty"` +} + +// jobContext represents job-level configuration from Airflow +type jobContext struct { + Command string `json:"command"` // build, run, test, snapshot + Selector string `json:"selector,omitempty"` + Models []string `json:"models,omitempty"` + Exclude []string `json:"exclude,omitempty"` + Threads int `json:"threads,omitempty"` + FullRefresh bool `json:"full_refresh,omitempty"` + Vars map[string]string `json:"vars,omitempty"` + Target string `json:"target,omitempty"` // prod, dev, etc. +} + +// clusterContext represents Snowflake cluster configuration +type clusterContext struct { + Account string `yaml:"account,omitempty" json:"account,omitempty"` + User string `yaml:"user,omitempty" json:"user,omitempty"` + Database string `yaml:"database,omitempty" json:"database,omitempty"` + Warehouse string `yaml:"warehouse,omitempty" json:"warehouse,omitempty"` + PrivateKey string `yaml:"private_key,omitempty" json:"private_key,omitempty"` + Role string `yaml:"role,omitempty" json:"role,omitempty"` + Schema string `yaml:"schema,omitempty" json:"schema,omitempty"` +} + +func New(cmdCtx *heimdallContext.Context) (plugin.Handler, error) { + cmd := &commandContext{} + + // Parse command context from YAML config + if cmdCtx != nil { + if err := cmdCtx.Unmarshal(cmd); err != nil { + return nil, err + } + } + + return cmd.handler, nil +} + +func (c *commandContext) handler(ctx context.Context, r *plugin.Runtime, j *job.Job, cl *cluster.Cluster) error { + // Parse cluster context (Snowflake credentials) + clusterContext := &clusterContext{} + if cl.Context != nil { + if err := cl.Context.Unmarshal(clusterContext); err != nil { + return fmt.Errorf("failed to unmarshal cluster context: %w", err) + } + } + + // Parse job context (dbt command parameters) + jobContext := &jobContext{} + if err := j.Context.Unmarshal(jobContext); err != nil { + return fmt.Errorf("failed to unmarshal job context: %w", err) + } + + // Set defaults + if jobContext.Threads == 0 { + jobContext.Threads = 4 + } + if jobContext.Target == "" { + jobContext.Target = "prod" + } + if jobContext.Command == "" { + jobContext.Command = "build" + } + + // Generate profiles.yml + profilesContent, err := generateProfilesYML(clusterContext, jobContext.Target) + if err != nil { + return fmt.Errorf("failed to generate profiles.yml: %w", err) + } + + // Ensure profiles directory exists + if err := os.MkdirAll(c.DbtProfilesDir, 0755); err != nil { + return fmt.Errorf("failed to create profiles directory: %w", err) + } + + // Write profiles.yml + profilesPath := filepath.Join(c.DbtProfilesDir, "profiles.yml") + if err := os.WriteFile(profilesPath, []byte(profilesContent), 0600); err != nil { + return fmt.Errorf("failed to write profiles.yml: %w", err) + } + + // Build dbt command + dbtCmd := buildDbtCommand(jobContext, c.DbtProjectPath, c.DbtProfilesDir) + + // Execute dbt command + cmd := exec.CommandContext(ctx, dbtCmd[0], dbtCmd[1:]...) + cmd.Dir = c.DbtProjectPath + cmd.Env = append(os.Environ(), + fmt.Sprintf("DBT_PROFILES_DIR=%s", c.DbtProfilesDir), + ) + + // Redirect output to runtime's stdout/stderr + cmd.Stdout = r.Stdout + cmd.Stderr = r.Stderr + + // Execute and wait + if err := cmd.Run(); err != nil { + return fmt.Errorf("dbt command failed: %w", err) + } + + return nil +} + +func generateProfilesYML(clusterCtx *clusterContext, target string) (string, error) { + if clusterCtx.Account == "" || clusterCtx.User == "" || clusterCtx.Database == "" || clusterCtx.Warehouse == "" { + return "", fmt.Errorf("missing required Snowflake connection parameters") + } + + if clusterCtx.PrivateKey == "" { + return "", fmt.Errorf("missing private_key path") + } + + role := clusterCtx.Role + if role == "" { + role = "PUBLIC" // Default role if not specified + } + + schema := clusterCtx.Schema + if schema == "" { + schema = "PUBLIC" // Default schema if not specified + } + + // Generate profiles.yml content + profiles := fmt.Sprintf(`analytics_poc_profile: + target: %s + outputs: + prod: + type: snowflake + account: %s + user: %s + role: %s + database: %s + warehouse: %s + schema: %s + authenticator: snowflake_jwt + private_key_path: %s + threads: 4 + client_session_keep_alive: false + dev: + type: snowflake + account: %s + user: %s + role: %s + database: analytics_dev_db + warehouse: %s + schema: %s + authenticator: snowflake_jwt + private_key_path: %s + threads: 4 + client_session_keep_alive: false +`, + target, + // prod output + clusterCtx.Account, + clusterCtx.User, + role, + clusterCtx.Database, + clusterCtx.Warehouse, + schema, + clusterCtx.PrivateKey, + // dev output + clusterCtx.Account, + clusterCtx.User, + role, + clusterCtx.Warehouse, + schema, + clusterCtx.PrivateKey, + ) + + return profiles, nil +} + +func buildDbtCommand(jobCtx *jobContext, projectPath, profilesDir string) []string { + cmd := []string{"dbt", jobCtx.Command} + + // Add selector + if jobCtx.Selector != "" { + cmd = append(cmd, "--selector", jobCtx.Selector) + } + + // Add models + if len(jobCtx.Models) > 0 { + cmd = append(cmd, "--models") + cmd = append(cmd, jobCtx.Models...) + } + + // Add exclude + if len(jobCtx.Exclude) > 0 { + cmd = append(cmd, "--exclude") + cmd = append(cmd, jobCtx.Exclude...) + } + + // Add threads + if jobCtx.Threads > 0 { + cmd = append(cmd, "--threads", fmt.Sprintf("%d", jobCtx.Threads)) + } + + // Add full refresh + if jobCtx.FullRefresh { + cmd = append(cmd, "--full-refresh") + } + + // Add vars + if len(jobCtx.Vars) > 0 { + varsStr := "" + for k, v := range jobCtx.Vars { + if varsStr != "" { + varsStr += "," + } + varsStr += fmt.Sprintf("%s:%s", k, v) + } + cmd = append(cmd, "--vars", fmt.Sprintf("{%s}", varsStr)) + } + + // Add target + if jobCtx.Target != "" { + cmd = append(cmd, "--target", jobCtx.Target) + } + + // Add project dir + cmd = append(cmd, "--project-dir", projectPath) + + // Add profiles dir + cmd = append(cmd, "--profiles-dir", profilesDir) + + return cmd +} diff --git a/plugins/dbt/README.md b/plugins/dbt/README.md new file mode 100644 index 0000000..672bae0 --- /dev/null +++ b/plugins/dbt/README.md @@ -0,0 +1,106 @@ +# DBT Plugin for Heimdall + +This plugin enables Heimdall to execute dbt commands against Snowflake. + +## Overview + +The dbt plugin allows Airflow DAGs to trigger dbt transformations through Heimdall, maintaining centralized credential management and job tracking. + +## Features + +- Execute dbt commands (`build`, `run`, `test`, `snapshot`) +- Support for selectors and model specifications +- Dynamic profiles.yml generation from cluster context +- Snowflake authentication via JWT (private key) +- Structured logging and error reporting + +## Configuration + +### Command Configuration + +```yaml +commands: + - name: dbt-0.0.1 + status: active + plugin: dbt + version: 0.0.1 + description: Run dbt commands against Snowflake + context: + dbt_project_path: /opt/dbt/analytics_dbt_poc + dbt_profiles_dir: /opt/dbt/profiles + tags: + - type:dbt + cluster_tags: + - type:snowflake +``` + +### Cluster Configuration + +Uses existing Snowflake cluster configuration: + +```yaml +clusters: + - name: snowflake-heimdall-wh + context: + account: pattern + user: heimdall__app_user + database: iceberg_db + warehouse: heimdall_wh + private_key: /etc/pattern.d/snowflake_key.pem + tags: + - type:snowflake + - data:prod +``` + +## Job Context + +When submitting a job from Airflow, provide the following context: + +```json +{ + "command": "build", + "selector": "hourly_models", + "threads": 4, + "target": "prod", + "full_refresh": false +} +``` + +### Supported Parameters + +- `command` (string): dbt command to run (`build`, `run`, `test`, `snapshot`) +- `selector` (string, optional): Selector name from selectors.yml +- `models` ([]string, optional): List of specific models to run +- `exclude` ([]string, optional): Models to exclude +- `threads` (int): Number of threads for parallel execution +- `full_refresh` (bool): Whether to perform a full refresh +- `vars` (map[string]string, optional): dbt variables +- `target` (string): Target environment (prod, dev) + +## Authentication + +The plugin uses Snowflake JWT authentication: + +1. Reads private key from cluster context path +2. Generates profiles.yml with JWT authenticator +3. Executes dbt with proper credentials + +## Example Usage from Airflow + +```python +from pattern.operators.dbt import DbtOperator + +dbt_task = DbtOperator( + task_id="dbt_build_hourly", + command="build", + selector="hourly_models", + threads=4, + dag=dag +) +``` + +## Requirements + +- dbt-core >= 1.7.0 +- dbt-snowflake >= 1.7.0 +- Valid Snowflake credentials in cluster configuration diff --git a/plugins/dbt/dbt.go b/plugins/dbt/dbt.go new file mode 100644 index 0000000..8588208 --- /dev/null +++ b/plugins/dbt/dbt.go @@ -0,0 +1,11 @@ +package main + +import ( + "github.com/patterninc/heimdall/internal/pkg/object/command/dbt" + "github.com/patterninc/heimdall/pkg/context" + "github.com/patterninc/heimdall/pkg/plugin" +) + +func New(ctx *context.Context) (plugin.Handler, error) { + return dbt.New(ctx) +}