Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
246 changes: 246 additions & 0 deletions internal/pkg/object/command/dbt/dbt.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,246 @@
package dbt

import (
"context"
"fmt"
"os"
"os/exec"
"path/filepath"

heimdallContext "github.com/patterninc/heimdall/pkg/context"
"github.com/patterninc/heimdall/pkg/object/cluster"
"github.com/patterninc/heimdall/pkg/object/job"
"github.com/patterninc/heimdall/pkg/plugin"
)

// commandContext represents command-level configuration from the YAML config file
type commandContext struct {
DbtProjectPath string `yaml:"dbt_project_path,omitempty" json:"dbt_project_path,omitempty"`
DbtProfilesDir string `yaml:"dbt_profiles_dir,omitempty" json:"dbt_profiles_dir,omitempty"`
}

// jobContext represents job-level configuration from Airflow
type jobContext struct {
Command string `json:"command"` // build, run, test, snapshot
Selector string `json:"selector,omitempty"`
Models []string `json:"models,omitempty"`
Exclude []string `json:"exclude,omitempty"`
Threads int `json:"threads,omitempty"`
FullRefresh bool `json:"full_refresh,omitempty"`
Vars map[string]string `json:"vars,omitempty"`
Target string `json:"target,omitempty"` // prod, dev, etc.
}

// clusterContext represents Snowflake cluster configuration
type clusterContext struct {
Account string `yaml:"account,omitempty" json:"account,omitempty"`
User string `yaml:"user,omitempty" json:"user,omitempty"`
Database string `yaml:"database,omitempty" json:"database,omitempty"`
Warehouse string `yaml:"warehouse,omitempty" json:"warehouse,omitempty"`
PrivateKey string `yaml:"private_key,omitempty" json:"private_key,omitempty"`
Role string `yaml:"role,omitempty" json:"role,omitempty"`
Schema string `yaml:"schema,omitempty" json:"schema,omitempty"`
}

func New(cmdCtx *heimdallContext.Context) (plugin.Handler, error) {
cmd := &commandContext{}

// Parse command context from YAML config
if cmdCtx != nil {
if err := cmdCtx.Unmarshal(cmd); err != nil {
return nil, err
}
}

return cmd.handler, nil
}

func (c *commandContext) handler(ctx context.Context, r *plugin.Runtime, j *job.Job, cl *cluster.Cluster) error {
// Parse cluster context (Snowflake credentials)
clusterContext := &clusterContext{}
if cl.Context != nil {
if err := cl.Context.Unmarshal(clusterContext); err != nil {
return fmt.Errorf("failed to unmarshal cluster context: %w", err)
}
}

// Parse job context (dbt command parameters)
jobContext := &jobContext{}
if err := j.Context.Unmarshal(jobContext); err != nil {
return fmt.Errorf("failed to unmarshal job context: %w", err)
}

// Set defaults
if jobContext.Threads == 0 {
jobContext.Threads = 4
}
if jobContext.Target == "" {
jobContext.Target = "prod"
}
if jobContext.Command == "" {
jobContext.Command = "build"
}

// Generate profiles.yml
profilesContent, err := generateProfilesYML(clusterContext, jobContext.Target)
if err != nil {
return fmt.Errorf("failed to generate profiles.yml: %w", err)
}

// Ensure profiles directory exists
if err := os.MkdirAll(c.DbtProfilesDir, 0755); err != nil {
return fmt.Errorf("failed to create profiles directory: %w", err)
}

// Write profiles.yml
profilesPath := filepath.Join(c.DbtProfilesDir, "profiles.yml")
if err := os.WriteFile(profilesPath, []byte(profilesContent), 0600); err != nil {
return fmt.Errorf("failed to write profiles.yml: %w", err)
}

// Build dbt command
dbtCmd := buildDbtCommand(jobContext, c.DbtProjectPath, c.DbtProfilesDir)

// Execute dbt command
cmd := exec.CommandContext(ctx, dbtCmd[0], dbtCmd[1:]...)
cmd.Dir = c.DbtProjectPath
cmd.Env = append(os.Environ(),
fmt.Sprintf("DBT_PROFILES_DIR=%s", c.DbtProfilesDir),
)

// Redirect output to runtime's stdout/stderr
cmd.Stdout = r.Stdout
cmd.Stderr = r.Stderr

// Execute and wait
if err := cmd.Run(); err != nil {
return fmt.Errorf("dbt command failed: %w", err)
}

return nil
}

func generateProfilesYML(clusterCtx *clusterContext, target string) (string, error) {
if clusterCtx.Account == "" || clusterCtx.User == "" || clusterCtx.Database == "" || clusterCtx.Warehouse == "" {
return "", fmt.Errorf("missing required Snowflake connection parameters")
}

if clusterCtx.PrivateKey == "" {
return "", fmt.Errorf("missing private_key path")
}

role := clusterCtx.Role
if role == "" {
role = "PUBLIC" // Default role if not specified
}
Comment on lines +132 to +135
Copy link

Copilot AI Jan 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The default role 'PUBLIC' may not be appropriate for production use. PUBLIC typically has limited permissions in Snowflake. Consider either requiring the role to be specified in the cluster configuration or using a more descriptive error message to indicate that the role should be configured.

Suggested change
role := clusterCtx.Role
if role == "" {
role = "PUBLIC" // Default role if not specified
}
if clusterCtx.Role == "" {
return "", fmt.Errorf("missing required Snowflake role; please configure a role instead of relying on the default PUBLIC role")
}
role := clusterCtx.Role

Copilot uses AI. Check for mistakes.

schema := clusterCtx.Schema
if schema == "" {
schema = "PUBLIC" // Default schema if not specified
}

// Generate profiles.yml content
profiles := fmt.Sprintf(`analytics_poc_profile:
Copy link

Copilot AI Jan 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The profile name 'analytics_poc_profile' is hardcoded. This should be configurable through the command context to support different dbt projects with different profile names. Consider adding a 'dbt_profile_name' field to the commandContext struct and using it here.

Suggested change
profiles := fmt.Sprintf(`analytics_poc_profile:
profileName := os.Getenv("DBT_PROFILE_NAME")
if profileName == "" {
profileName = "analytics_poc_profile"
}
profiles := fmt.Sprintf(profileName+`:

Copilot uses AI. Check for mistakes.
target: %s
outputs:
prod:
type: snowflake
account: %s
user: %s
role: %s
database: %s
warehouse: %s
schema: %s
authenticator: snowflake_jwt
private_key_path: %s
threads: 4
Copy link

Copilot AI Jan 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The threads value is hardcoded to 4 in both the prod and dev outputs. This should use the jobContext.Threads value that's passed as a parameter to allow per-job configuration of thread count. The threads parameter is already being set in the dbt command itself (line 214), but it would be better to use it in the profile as well for consistency.

Copilot uses AI. Check for mistakes.
client_session_keep_alive: false
dev:
type: snowflake
account: %s
user: %s
role: %s
database: analytics_dev_db
Copy link

Copilot AI Jan 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The database name 'analytics_dev_db' is hardcoded for the dev target. This should be derived from the cluster context or made configurable. Consider using the same database as prod with a different schema, or adding a 'dev_database' field to the cluster context.

Copilot uses AI. Check for mistakes.
warehouse: %s
schema: %s
authenticator: snowflake_jwt
private_key_path: %s
threads: 4
client_session_keep_alive: false
`,
target,
// prod output
clusterCtx.Account,
clusterCtx.User,
role,
clusterCtx.Database,
clusterCtx.Warehouse,
schema,
clusterCtx.PrivateKey,
// dev output
clusterCtx.Account,
clusterCtx.User,
role,
clusterCtx.Warehouse,
schema,
clusterCtx.PrivateKey,
)

return profiles, nil
}

func buildDbtCommand(jobCtx *jobContext, projectPath, profilesDir string) []string {
cmd := []string{"dbt", jobCtx.Command}

// Add selector
if jobCtx.Selector != "" {
cmd = append(cmd, "--selector", jobCtx.Selector)
}

// Add models
if len(jobCtx.Models) > 0 {
cmd = append(cmd, "--models")
cmd = append(cmd, jobCtx.Models...)
}

// Add exclude
if len(jobCtx.Exclude) > 0 {
cmd = append(cmd, "--exclude")
cmd = append(cmd, jobCtx.Exclude...)
}

// Add threads
if jobCtx.Threads > 0 {
cmd = append(cmd, "--threads", fmt.Sprintf("%d", jobCtx.Threads))
}
Comment on lines +213 to +215
Copy link

Copilot AI Jan 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The threads parameter is being added to the command even when it's greater than 0, but line 74-76 ensures a default of 4 is always set. This means the condition 'if jobCtx.Threads > 0' will always be true. Consider changing the condition or removing it since threads will always have a value.

Suggested change
if jobCtx.Threads > 0 {
cmd = append(cmd, "--threads", fmt.Sprintf("%d", jobCtx.Threads))
}
cmd = append(cmd, "--threads", fmt.Sprintf("%d", jobCtx.Threads))

Copilot uses AI. Check for mistakes.

// Add full refresh
if jobCtx.FullRefresh {
cmd = append(cmd, "--full-refresh")
}

// Add vars
if len(jobCtx.Vars) > 0 {
varsStr := ""
for k, v := range jobCtx.Vars {
if varsStr != "" {
varsStr += ","
}
varsStr += fmt.Sprintf("%s:%s", k, v)
}
cmd = append(cmd, "--vars", fmt.Sprintf("{%s}", varsStr))
Comment on lines +223 to +231
Copy link

Copilot AI Jan 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The vars are formatted as comma-separated key:value pairs, but dbt expects JSON format for the --vars argument. This should be properly JSON-encoded instead. For example, if vars contains {"key1": "value1", "key2": "value2"}, the current implementation would produce "{key1:value1,key2:value2}" which is not valid JSON. It should produce '{"key1":"value1","key2":"value2"}' instead.

Copilot uses AI. Check for mistakes.
}

// Add target
if jobCtx.Target != "" {
cmd = append(cmd, "--target", jobCtx.Target)
}
Comment on lines +235 to +237
Copy link

Copilot AI Jan 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The target parameter is being added to the command even though it's already set in the handler function with a default value on lines 77-79. This means the condition 'if jobCtx.Target != ""' will always be true. Consider removing this redundant check.

Suggested change
if jobCtx.Target != "" {
cmd = append(cmd, "--target", jobCtx.Target)
}
cmd = append(cmd, "--target", jobCtx.Target)

Copilot uses AI. Check for mistakes.

// Add project dir
cmd = append(cmd, "--project-dir", projectPath)

// Add profiles dir
cmd = append(cmd, "--profiles-dir", profilesDir)

return cmd
}
106 changes: 106 additions & 0 deletions plugins/dbt/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
# DBT Plugin for Heimdall

This plugin enables Heimdall to execute dbt commands against Snowflake.

## Overview

The dbt plugin allows Airflow DAGs to trigger dbt transformations through Heimdall, maintaining centralized credential management and job tracking.

## Features

- Execute dbt commands (`build`, `run`, `test`, `snapshot`)
- Support for selectors and model specifications
- Dynamic profiles.yml generation from cluster context
- Snowflake authentication via JWT (private key)
- Structured logging and error reporting

## Configuration

### Command Configuration

```yaml
commands:
- name: dbt-0.0.1
status: active
plugin: dbt
version: 0.0.1
description: Run dbt commands against Snowflake
context:
dbt_project_path: /opt/dbt/analytics_dbt_poc
dbt_profiles_dir: /opt/dbt/profiles
tags:
- type:dbt
cluster_tags:
- type:snowflake
```

### Cluster Configuration

Uses existing Snowflake cluster configuration:

```yaml
clusters:
- name: snowflake-heimdall-wh
context:
account: pattern
user: heimdall__app_user
database: iceberg_db
warehouse: heimdall_wh
private_key: /etc/pattern.d/snowflake_key.pem
tags:
- type:snowflake
- data:prod
```

## Job Context

When submitting a job from Airflow, provide the following context:

```json
{
"command": "build",
"selector": "hourly_models",
"threads": 4,
"target": "prod",
"full_refresh": false
}
```

### Supported Parameters

- `command` (string): dbt command to run (`build`, `run`, `test`, `snapshot`)
- `selector` (string, optional): Selector name from selectors.yml
- `models` ([]string, optional): List of specific models to run
- `exclude` ([]string, optional): Models to exclude
- `threads` (int): Number of threads for parallel execution
- `full_refresh` (bool): Whether to perform a full refresh
- `vars` (map[string]string, optional): dbt variables
- `target` (string): Target environment (prod, dev)

## Authentication

The plugin uses Snowflake JWT authentication:

1. Reads private key from cluster context path
2. Generates profiles.yml with JWT authenticator
3. Executes dbt with proper credentials

## Example Usage from Airflow

```python
from pattern.operators.dbt import DbtOperator

dbt_task = DbtOperator(
task_id="dbt_build_hourly",
command="build",
selector="hourly_models",
threads=4,
dag=dag
)
```

## Requirements

- dbt-core >= 1.7.0
- dbt-snowflake >= 1.7.0
- Valid Snowflake credentials in cluster configuration
Copy link

Copilot AI Jan 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The documentation states that dbt-core and dbt-snowflake version >= 1.7.0 are required, but there's no version validation in the code. Consider adding a version check during initialization to provide clear error messages if the required versions are not available, or documenting how version compatibility is enforced.

Suggested change
- Valid Snowflake credentials in cluster configuration
- Valid Snowflake credentials in cluster configuration
**Note:** The dbt plugin does not currently perform runtime validation of installed `dbt-core` or `dbt-snowflake` versions. Ensure your environment or dependency management (e.g., requirements.txt, Poetry, or container image) provides `dbt-core` and `dbt-snowflake` versions >= 1.7.0 before using this plugin.

Copilot uses AI. Check for mistakes.
11 changes: 11 additions & 0 deletions plugins/dbt/dbt.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package main

import (
"github.com/patterninc/heimdall/internal/pkg/object/command/dbt"
"github.com/patterninc/heimdall/pkg/context"
"github.com/patterninc/heimdall/pkg/plugin"
)

func New(ctx *context.Context) (plugin.Handler, error) {
return dbt.New(ctx)
}
Loading