From 6bcea5d1f23ddf0d9e215e8132a9bf2e10d49d4f Mon Sep 17 00:00:00 2001 From: Mustafa Sadedil Date: Thu, 11 Sep 2025 00:05:50 +0300 Subject: [PATCH 1/4] Extract database name from query instead of path for mssql --- job.go | 75 ++++++++++++++++++++++++++++++++++++++++------------------ 1 file changed, 52 insertions(+), 23 deletions(-) diff --git a/job.go b/job.go index 64cdab20..ca798e6f 100644 --- a/job.go +++ b/job.go @@ -2,6 +2,9 @@ package main import ( "context" + "crypto/rsa" + "crypto/x509" + "encoding/pem" "fmt" "net/url" "os" @@ -9,9 +12,6 @@ import ( "strconv" "strings" "time" - "crypto/rsa" - "crypto/x509" - "encoding/pem" _ "github.com/ClickHouse/clickhouse-go/v2" // register the ClickHouse driver "github.com/cenkalti/backoff" @@ -356,6 +356,25 @@ func (j *Job) updateConnections() { if u.User != nil { user = u.User.Username() } + + // For SQL Server connections, + // url.path is reserved for sql server instance not the database name + // database name can be specified in multiple ways: + // 1. Query parameter: database=dbname + // 2. Query parameter: initial catalog=dbname + database := "" + if strings.HasPrefix(conn, "sqlserver://") { + // Check for 'database' parameter first + if dbParam := getQueryStringCaseInsensitive(u.Query(), "database"); dbParam != "" { + database = dbParam + } else if catalogParam := getQueryStringCaseInsensitive(u.Query(), "initial catalog"); catalogParam != "" { + // 'initial catalog' is an alternative to 'database' parameter + database = catalogParam + } + } else { + database = strings.TrimPrefix(u.Path, "/") + } + // we expose some of the connection variables as labels, so we need to // remember them newConn := &connection{ @@ -363,7 +382,7 @@ func (j *Job) updateConnections() { url: conn, driver: u.Scheme, host: u.Host, - database: strings.TrimPrefix(u.Path, "/"), + database: database, user: user, } if newConn.driver == "athena" { @@ -382,18 +401,18 @@ func (j *Job) updateConnections() { level.Error(j.log).Log("msg", "Failed to parse Snowflake URL", "url", conn, "err", err) continue } - + queryParams := u.Query() privateKeyPath := os.ExpandEnv(queryParams.Get("private_key_file")) - + cfg := &gosnowflake.Config{ - Account: u.Host, - User: u.User.Username(), - Role: queryParams.Get("role"), + Account: u.Host, + User: u.User.Username(), + Role: queryParams.Get("role"), Database: queryParams.Get("database"), - Schema: queryParams.Get("schema"), + Schema: queryParams.Get("schema"), } - + if privateKeyPath != "" { // RSA key auth keyBytes, err := os.ReadFile(privateKeyPath) @@ -401,13 +420,13 @@ func (j *Job) updateConnections() { level.Error(j.log).Log("msg", "Failed to read private key file", "path", privateKeyPath, "err", err) continue } - + keyBlock, _ := pem.Decode(keyBytes) if keyBlock == nil { level.Error(j.log).Log("msg", "Failed to decode PEM block", "path", privateKeyPath) continue } - + var privateKey *rsa.PrivateKey if parsedKey, err := x509.ParsePKCS8PrivateKey(keyBlock.Bytes); err == nil { privateKey, _ = parsedKey.(*rsa.PrivateKey) @@ -417,16 +436,16 @@ func (j *Job) updateConnections() { level.Error(j.log).Log("msg", "Failed to parse private key", "err", err) continue } - + cfg.Authenticator = gosnowflake.AuthTypeJwt cfg.PrivateKey = privateKey - + dsn, err := gosnowflake.DSN(cfg) if err != nil { level.Error(j.log).Log("msg", "Failed to create Snowflake DSN with RSA", "err", err) continue } - + newConn.snowflakeConfig = cfg newConn.snowflakeDSN = dsn newConn.host = u.Host @@ -441,20 +460,20 @@ func (j *Job) updateConnections() { cfg.Port = port } } - + dsn, err := gosnowflake.DSN(cfg) if err != nil { level.Error(j.log).Log("msg", "Failed to create Snowflake DSN with password", "err", err) continue } - + newConn.conn, err = sqlx.Open("snowflake", dsn) if err != nil { level.Error(j.log).Log("msg", "Failed to open Snowflake connection", "err", err) continue } } - + j.conns = append(j.conns, newConn) continue } @@ -632,21 +651,21 @@ func (c *connection) connect(job *Job) error { } c.tokenExpirationTime = time.Now().Add(time.Hour) } - + db, err := sqlx.Open("snowflake", c.snowflakeDSN) if err != nil { return fmt.Errorf("failed to open Snowflake connection: %w (host: %s)", err, c.host) } - + db.SetMaxOpenConns(1) db.SetMaxIdleConns(0) db.SetConnMaxLifetime(30 * time.Minute) - + if err := db.Ping(); err != nil { db.Close() return fmt.Errorf("failed to ping Snowflake: %w (host: %s)", err, c.host) } - + c.conn = db return nil } @@ -683,3 +702,13 @@ func (c *connection) connect(job *Job) error { c.conn = conn return nil } + +func getQueryStringCaseInsensitive(values url.Values, key string) string { + key = strings.ToLower(key) + for k, v := range values { + if strings.ToLower(k) == key && len(v) > 0 { + return v[0] + } + } + return "" +} From bbbb48d41ad7cd23c06a62db9ee7d3812e70c1fb Mon Sep 17 00:00:00 2001 From: Mustafa Sadedil Date: Thu, 11 Sep 2025 00:25:03 +0300 Subject: [PATCH 2/4] Whitespace fix --- job.go | 36 ++++++++++++++++++------------------ 1 file changed, 18 insertions(+), 18 deletions(-) diff --git a/job.go b/job.go index ca798e6f..c0c9f6f0 100644 --- a/job.go +++ b/job.go @@ -2,9 +2,6 @@ package main import ( "context" - "crypto/rsa" - "crypto/x509" - "encoding/pem" "fmt" "net/url" "os" @@ -12,6 +9,9 @@ import ( "strconv" "strings" "time" + "crypto/rsa" + "crypto/x509" + "encoding/pem" _ "github.com/ClickHouse/clickhouse-go/v2" // register the ClickHouse driver "github.com/cenkalti/backoff" @@ -401,10 +401,10 @@ func (j *Job) updateConnections() { level.Error(j.log).Log("msg", "Failed to parse Snowflake URL", "url", conn, "err", err) continue } - + queryParams := u.Query() privateKeyPath := os.ExpandEnv(queryParams.Get("private_key_file")) - + cfg := &gosnowflake.Config{ Account: u.Host, User: u.User.Username(), @@ -412,7 +412,7 @@ func (j *Job) updateConnections() { Database: queryParams.Get("database"), Schema: queryParams.Get("schema"), } - + if privateKeyPath != "" { // RSA key auth keyBytes, err := os.ReadFile(privateKeyPath) @@ -420,13 +420,13 @@ func (j *Job) updateConnections() { level.Error(j.log).Log("msg", "Failed to read private key file", "path", privateKeyPath, "err", err) continue } - + keyBlock, _ := pem.Decode(keyBytes) if keyBlock == nil { level.Error(j.log).Log("msg", "Failed to decode PEM block", "path", privateKeyPath) continue } - + var privateKey *rsa.PrivateKey if parsedKey, err := x509.ParsePKCS8PrivateKey(keyBlock.Bytes); err == nil { privateKey, _ = parsedKey.(*rsa.PrivateKey) @@ -436,16 +436,16 @@ func (j *Job) updateConnections() { level.Error(j.log).Log("msg", "Failed to parse private key", "err", err) continue } - + cfg.Authenticator = gosnowflake.AuthTypeJwt cfg.PrivateKey = privateKey - + dsn, err := gosnowflake.DSN(cfg) if err != nil { level.Error(j.log).Log("msg", "Failed to create Snowflake DSN with RSA", "err", err) continue } - + newConn.snowflakeConfig = cfg newConn.snowflakeDSN = dsn newConn.host = u.Host @@ -460,20 +460,20 @@ func (j *Job) updateConnections() { cfg.Port = port } } - + dsn, err := gosnowflake.DSN(cfg) if err != nil { level.Error(j.log).Log("msg", "Failed to create Snowflake DSN with password", "err", err) continue } - + newConn.conn, err = sqlx.Open("snowflake", dsn) if err != nil { level.Error(j.log).Log("msg", "Failed to open Snowflake connection", "err", err) continue } } - + j.conns = append(j.conns, newConn) continue } @@ -651,21 +651,21 @@ func (c *connection) connect(job *Job) error { } c.tokenExpirationTime = time.Now().Add(time.Hour) } - + db, err := sqlx.Open("snowflake", c.snowflakeDSN) if err != nil { return fmt.Errorf("failed to open Snowflake connection: %w (host: %s)", err, c.host) } - + db.SetMaxOpenConns(1) db.SetMaxIdleConns(0) db.SetConnMaxLifetime(30 * time.Minute) - + if err := db.Ping(); err != nil { db.Close() return fmt.Errorf("failed to ping Snowflake: %w (host: %s)", err, c.host) } - + c.conn = db return nil } From 1d643880a86334c218f3e58329b05a80081cd8ff Mon Sep 17 00:00:00 2001 From: Mustafa Sadedil Date: Thu, 11 Sep 2025 00:26:34 +0300 Subject: [PATCH 3/4] Whitespace fix 2 --- job.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/job.go b/job.go index c0c9f6f0..7a762adc 100644 --- a/job.go +++ b/job.go @@ -651,7 +651,7 @@ func (c *connection) connect(job *Job) error { } c.tokenExpirationTime = time.Now().Add(time.Hour) } - + db, err := sqlx.Open("snowflake", c.snowflakeDSN) if err != nil { return fmt.Errorf("failed to open Snowflake connection: %w (host: %s)", err, c.host) From ade9e44fc6c1debe6748f750dd0931479af9a552 Mon Sep 17 00:00:00 2001 From: Mustafa Sadedil Date: Thu, 11 Sep 2025 00:27:29 +0300 Subject: [PATCH 4/4] Whitespace fix 3 --- job.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/job.go b/job.go index 7a762adc..44e2bd06 100644 --- a/job.go +++ b/job.go @@ -651,21 +651,21 @@ func (c *connection) connect(job *Job) error { } c.tokenExpirationTime = time.Now().Add(time.Hour) } - + db, err := sqlx.Open("snowflake", c.snowflakeDSN) if err != nil { return fmt.Errorf("failed to open Snowflake connection: %w (host: %s)", err, c.host) } - + db.SetMaxOpenConns(1) db.SetMaxIdleConns(0) db.SetConnMaxLifetime(30 * time.Minute) - + if err := db.Ping(); err != nil { db.Close() return fmt.Errorf("failed to ping Snowflake: %w (host: %s)", err, c.host) } - + c.conn = db return nil }