diff --git a/compose.yaml b/compose.yaml index 1027d4de..6945c090 100644 --- a/compose.yaml +++ b/compose.yaml @@ -20,6 +20,88 @@ services: 'PGPOOL_PARAMS_PORT': '7432' 'PGPOOL_PARAMS_BACKEND_HOSTNAME0': 'pg18' + supavisor-db: + profiles: ["supavisor"] + image: postgres:15 + shm_size: 128mb + environment: + POSTGRES_DB: pqgo + POSTGRES_USER: pqgo + POSTGRES_PASSWORD: unused + healthcheck: + test: ["CMD", "pg_isready", "-U", "pqgo"] + interval: 10s + timeout: 10s + retries: 5 + start_period: 5s + + supavisor: + profiles: ["supavisor"] + depends_on: + supavisor-db: + condition: service_healthy + image: supabase/supavisor:2.7.4 + command: sh -c '/app/bin/migrate && /app/bin/server' + environment: + PORT: 4000 + PROXY_PORT_SESSION: 5452 + PROXY_PORT_TRANSACTION: 6543 + CLUSTER_POSTGRES: "true" + DATABASE_URL: "ecto://pqgo:unused@supavisor-db:5432/pqgo" + SECRET_KEY_BASE: "12345678901234567890121234567890123456789012345678903212345678901234567890123456789032123456789012345678901234567890323456789032" + VAULT_ENC_KEY: "12345678901234567890123456789032" + API_JWT_SECRET: "dev" + METRICS_JWT_SECRET: "dev" + REGION: "local" + ERL_AFLAGS: -proto_dist inet_tcp + DB_POOL_SIZE: 50 + ports: + - '127.0.0.1:4000:4000' + - '127.0.0.1:5452:5452' + - '127.0.0.1:6543:6543' + healthcheck: + test: ["CMD", "curl", "-f", "http://localhost:4000/api/health"] + interval: 10s + timeout: 10s + retries: 5 + start_period: 5s + + supavisor-create-tenant: + profiles: ["supavisor"] + depends_on: + supavisor: + condition: service_healthy + image: supabase/supavisor:2.7.4 + command: |- + sh -c ' + curl -sX PUT \ + "http://supavisor:4000/api/tenants/dev_tenant" \ + --header "Accept: */*" \ + --header "User-Agent: Thunder Client (https://www.thunderclient.com)" \ + --header "Authorization: Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJzdXBhYmFzZSIsInJvbGUiOiJhbm9uIiwiaWF0IjoxNjQ1MTkyODI0LCJleHAiOjE5NjA3Njg4MjR9.M9jrxyvPLkUxWgOYSf5dNdJ8v_eRrq810ShFRT8N-6M" \ + --header "Content-Type: application/json" \ + --data-raw '\''{ + "tenant": { + "db_host": "supavisor-db", + "db_port": 5432, + "db_database": "pqgo", + "ip_version": "auto", + "enforce_ssl": false, + "require_user": false, + "auth_query": "SELECT rolname, rolpassword FROM pg_authid WHERE rolname=$1;", + "users": [ + { + "db_user": "pqgo", + "db_password": "unused", + "pool_size": 20, + "mode_type": "transaction", + "is_manager": true + } + ] + } + }'\'' + ' + pg18: image: 'postgres:18' ports: ['127.0.0.1:5432:5432'] diff --git a/conn_test.go b/conn_test.go index 852a608d..15b9cb0b 100644 --- a/conn_test.go +++ b/conn_test.go @@ -24,7 +24,11 @@ import ( ) func TestReconnect(t *testing.T) { - t.Parallel() + // Cannot run this test in parallel when using transaction-mode + // connection pooling. + if !pqtest.SupavisorTransactionMode() { + t.Parallel() + } db1 := pqtest.MustDB(t) tx, err := db1.Begin() if err != nil { @@ -156,6 +160,10 @@ func TestPgpass(t *testing.T) { } func TestExecNilSlice(t *testing.T) { + // This test won't work with transaction mode connection pooling + // without a transaction. + pqtest.SkipSupavisorTransactionMode(t) + db := pqtest.MustDB(t) _, err := db.Exec("create temp table x (b1 text, b2 text, b3 text)") @@ -203,6 +211,10 @@ func TestExecNilSlice(t *testing.T) { } func TestExec(t *testing.T) { + // This test won't work with transaction mode connection pooling + // without a transaction. + pqtest.SkipSupavisorTransactionMode(t) + db := pqtest.MustDB(t) _, err := db.Exec("CREATE TEMP TABLE temp (a int)") @@ -244,6 +256,12 @@ func TestExec(t *testing.T) { } func TestStatment(t *testing.T) { + // "It is important to note that although the direct connections and + // Supavisor in session mode support prepared statements, Supavisor in + // transaction mode does not." + // https://supabase.com/docs/guides/troubleshooting/disabling-prepared-statements-qL8lEL + pqtest.SkipSupavisorTransactionMode(t) + db := pqtest.MustDB(t) st, err := db.Prepare("SELECT 1") @@ -326,6 +344,12 @@ func TestParameterCountMismatch(t *testing.T) { // Test that EmptyQueryResponses are handled correctly. func TestEmptyQuery(t *testing.T) { + // "It is important to note that although the direct connections and + // Supavisor in session mode support prepared statements, Supavisor in + // transaction mode does not." + // https://supabase.com/docs/guides/troubleshooting/disabling-prepared-statements-qL8lEL + pqtest.SkipSupavisorTransactionMode(t) + db := pqtest.MustDB(t) res, err := db.Exec("") @@ -342,6 +366,7 @@ func TestEmptyQuery(t *testing.T) { if err != nil { t.Fatal(err) } + defer rows.Close() cols, err := rows.Columns() if err != nil { t.Fatal(err) @@ -374,6 +399,7 @@ func TestEmptyQuery(t *testing.T) { if err != nil { t.Fatal(err) } + defer rows.Close() cols, err = rows.Columns() if err != nil { t.Fatal(err) @@ -391,12 +417,19 @@ func TestEmptyQuery(t *testing.T) { // Test that rows.Columns() is correct even if there are no result rows. func TestEmptyResultSetColumns(t *testing.T) { + // "It is important to note that although the direct connections and + // Supavisor in session mode support prepared statements, Supavisor in + // transaction mode does not." + // https://supabase.com/docs/guides/troubleshooting/disabling-prepared-statements-qL8lEL + pqtest.SkipSupavisorTransactionMode(t) + db := pqtest.MustDB(t) rows, err := db.Query("SELECT 1 AS a, text 'bar' AS bar WHERE FALSE") if err != nil { t.Fatal(err) } + defer rows.Close() cols, err := rows.Columns() if err != nil { t.Fatal(err) @@ -422,6 +455,7 @@ func TestEmptyResultSetColumns(t *testing.T) { if err != nil { t.Fatal(err) } + defer rows.Close() cols, err = rows.Columns() if err != nil { t.Fatal(err) @@ -522,6 +556,12 @@ func TestEncodeDecode(t *testing.T) { } func TestNoData(t *testing.T) { + // "It is important to note that although the direct connections and + // Supavisor in session mode support prepared statements, Supavisor in + // transaction mode does not." + // https://supabase.com/docs/guides/troubleshooting/disabling-prepared-statements-qL8lEL + pqtest.SkipSupavisorTransactionMode(t) + db := pqtest.MustDB(t) st, err := db.Prepare("SELECT 1 WHERE true = false") @@ -580,8 +620,14 @@ func TestErrorDuringStartup(t *testing.T) { if !ok { t.Fatalf("wrong error type %T: %[1]s", err) } - if e.Code.Name() != "invalid_authorization_specification" && e.Code.Name() != "invalid_password" { - t.Fatalf("wrong error code %q: %s", e.Code.Name(), err) + if pqtest.Supavisor() { + if e.Code.Name() != "internal_error" || !strings.Contains(err.Error(), "Tenant or user not found") { + t.Fatalf("wrong error code %q: %s", e.Code.Name(), err) + } + } else { + if e.Code.Name() != "invalid_authorization_specification" && e.Code.Name() != "invalid_password" { + t.Fatalf("wrong error code %q: %s", e.Code.Name(), err) + } } } @@ -794,6 +840,10 @@ func TestErrorOnQueryRowSimpleQuery(t *testing.T) { // Test the QueryRow bug workarounds in stmt.exec() and simpleQuery() func TestQueryRowBugWorkaround(t *testing.T) { + // This test won't work with transaction mode connection pooling + // without a transaction. + pqtest.SkipSupavisorTransactionMode(t) + db := pqtest.MustDB(t) // stmt.exec() @@ -867,6 +917,7 @@ func TestQueryRowBugWorkaround(t *testing.T) { if err != nil { t.Fatalf("query failed: %s", err) } + defer rows.Close() if !rows.Next() { t.Fatalf("expected at least one result row; got %s", rows.Err()) } @@ -940,6 +991,10 @@ func TestSimpleQueryWithoutResponse(t *testing.T) { } func TestBindError(t *testing.T) { + // This test won't work with transaction mode connection pooling + // without a transaction. + pqtest.SkipSupavisorTransactionMode(t) + t.Parallel() db := pqtest.MustDB(t) @@ -948,8 +1003,9 @@ func TestBindError(t *testing.T) { t.Fatal(err) } - _, err = db.Query("select * from test where i=$1", "hhh") + rows, err := db.Query("select * from test where i=$1", "hhh") if err == nil { + rows.Close() t.Fatal("expected an error") } @@ -981,6 +1037,10 @@ func TestParseErrorInExtendedQuery(t *testing.T) { // TestReturning tests that an INSERT query using the RETURNING clause returns a row. func TestReturning(t *testing.T) { + // This test won't work with transaction mode connection pooling + // without a transaction. + pqtest.SkipSupavisorTransactionMode(t) + t.Parallel() db := pqtest.MustDB(t) @@ -994,6 +1054,7 @@ func TestReturning(t *testing.T) { if err != nil { t.Fatal(err) } + defer rows.Close() if !rows.Next() { t.Fatal("no rows") } @@ -1016,6 +1077,10 @@ func TestReturning(t *testing.T) { } func TestIssue186(t *testing.T) { + // This test won't work with transaction mode connection pooling + // without a transaction. + pqtest.SkipSupavisorTransactionMode(t) + t.Parallel() db := pqtest.MustDB(t) @@ -1060,6 +1125,12 @@ func TestIssue186(t *testing.T) { } func TestIssue196(t *testing.T) { + // "It is important to note that although the direct connections and + // Supavisor in session mode support prepared statements, Supavisor in + // transaction mode does not." + // https://supabase.com/docs/guides/troubleshooting/disabling-prepared-statements-qL8lEL + pqtest.SkipSupavisorTransactionMode(t) + t.Parallel() db := pqtest.MustDB(t) @@ -1220,6 +1291,7 @@ func TestNullAfterNonNull(t *testing.T) { if err != nil { t.Fatal(err) } + defer r.Close() var n sql.NullInt64 @@ -1282,6 +1354,10 @@ func Test64BitErrorChecking(t *testing.T) { } func TestCommit(t *testing.T) { + // This test won't work with transaction mode connection pooling + // without a transaction. + pqtest.SkipSupavisorTransactionMode(t) + db := pqtest.MustDB(t) _, err := db.Exec("CREATE TEMP TABLE temp (a int)") @@ -1333,6 +1409,10 @@ func TestErrorClass(t *testing.T) { } func TestRowsResultTag(t *testing.T) { + // This test won't work with transaction mode connection pooling + // without a transaction. + pqtest.SkipSupavisorTransactionMode(t) + type ResultTag interface { Result() driver.Result Tag() string @@ -1418,6 +1498,7 @@ func TestMultipleResult(t *testing.T) { if err != nil { t.Fatal(err) } + defer rows.Close() type set struct { cols []string rowCount int @@ -1485,12 +1566,13 @@ func TestCopyInStmtAffectedRows(t *testing.T) { t.Parallel() db := pqtest.MustDB(t) - _, err := db.Exec("CREATE TEMP TABLE temp (a int)") + txn, err := db.BeginTx(context.TODO(), nil) if err != nil { t.Fatal(err) } + defer txn.Rollback() - txn, err := db.BeginTx(context.TODO(), nil) + _, err = txn.Exec("CREATE TEMP TABLE temp (a int)") if err != nil { t.Fatal(err) } @@ -1499,6 +1581,7 @@ func TestCopyInStmtAffectedRows(t *testing.T) { if err != nil { t.Fatal(err) } + defer copyStmt.Close() res, err := copyStmt.Exec() if err != nil { @@ -1510,6 +1593,16 @@ func TestCopyInStmtAffectedRows(t *testing.T) { } func TestConnPrepareContext(t *testing.T) { + if testing.Short() { + t.Skip("skipping test in short mode") + } + + // "It is important to note that although the direct connections and + // Supavisor in session mode support prepared statements, Supavisor in + // transaction mode does not." + // https://supabase.com/docs/guides/troubleshooting/disabling-prepared-statements-qL8lEL + pqtest.SkipSupavisorTransactionMode(t) + t.Parallel() tests := []struct { @@ -1544,6 +1637,7 @@ func TestConnPrepareContext(t *testing.T) { }, } for _, tt := range tests { + tt := tt t.Run(tt.name, func(t *testing.T) { db := pqtest.MustDB(t) @@ -1563,6 +1657,16 @@ func TestConnPrepareContext(t *testing.T) { } func TestStmtQueryContext(t *testing.T) { + if testing.Short() { + t.Skip("skipping test in short mode") + } + + // "It is important to note that although the direct connections and + // Supavisor in session mode support prepared statements, Supavisor in + // transaction mode does not." + // https://supabase.com/docs/guides/troubleshooting/disabling-prepared-statements-qL8lEL + pqtest.SkipSupavisorTransactionMode(t) + if !pqtest.Pgpool() { t.Parallel() } @@ -1599,6 +1703,7 @@ func TestStmtQueryContext(t *testing.T) { }, } for _, tt := range tests { + tt := tt t.Run(tt.name, func(t *testing.T) { db := pqtest.MustDB(t) @@ -1610,7 +1715,11 @@ func TestStmtQueryContext(t *testing.T) { if err != nil { t.Fatal(err) } - _, err = stmt.QueryContext(ctx) + defer stmt.Close() + rows, err := stmt.QueryContext(ctx) + if rows != nil { + defer rows.Close() + } pgErr := (*Error)(nil) switch { case (err != nil) != tt.cancelExpected: @@ -1623,6 +1732,16 @@ func TestStmtQueryContext(t *testing.T) { } func TestStmtExecContext(t *testing.T) { + if testing.Short() { + t.Skip("skipping test in short mode") + } + + // "It is important to note that although the direct connections and + // Supavisor in session mode support prepared statements, Supavisor in + // transaction mode does not." + // https://supabase.com/docs/guides/troubleshooting/disabling-prepared-statements-qL8lEL + pqtest.SkipSupavisorTransactionMode(t) + if !pqtest.Pgpool() { t.Parallel() } @@ -1659,6 +1778,7 @@ func TestStmtExecContext(t *testing.T) { }, } for _, tt := range tests { + tt := tt t.Run(tt.name, func(t *testing.T) { db := pqtest.MustDB(t) @@ -1670,6 +1790,7 @@ func TestStmtExecContext(t *testing.T) { if err != nil { t.Fatal(err) } + defer stmt.Close() _, err = stmt.ExecContext(ctx) pgErr := (*Error)(nil) switch { @@ -1999,6 +2120,12 @@ func TestTxOptions(t *testing.T) { } func TestPing(t *testing.T) { + // "It is important to note that although the direct connections and + // Supavisor in session mode support prepared statements, Supavisor in + // transaction mode does not." + // https://supabase.com/docs/guides/troubleshooting/disabling-prepared-statements-qL8lEL + pqtest.SkipSupavisorTransactionMode(t) + t.Parallel() // TODO: hangs forever? pqtest.SkipPgpool(t) @@ -2020,6 +2147,7 @@ func TestPing(t *testing.T) { if err != nil { t.Fatal(err) } + defer conn.Close() // start a transaction and read backend pid of our connection tx, err := conn.BeginTx(ctx, &sql.TxOptions{ @@ -2029,6 +2157,7 @@ func TestPing(t *testing.T) { if err != nil { t.Fatal(err) } + defer tx.Rollback() rows, err := tx.Query("SELECT pg_backend_pid()") if err != nil { @@ -2099,6 +2228,7 @@ func TestAuth(t *testing.T) { t.Parallel() for _, tt := range tests { + tt := tt t.Run("", func(t *testing.T) { t.Run("unsupported auth", func(t *testing.T) { err := (&conn{}).auth(&tt.buf, Config{}) @@ -2112,6 +2242,7 @@ func TestAuth(t *testing.T) { t.Run("end to end", func(t *testing.T) { pqtest.SkipPgbouncer(t) // TODO: need to properly set up auth pqtest.SkipPgpool(t) // TODO: need to properly set up auth + pqtest.SkipSupavisor(t) // TODO: need to properly set up auth tests := []struct { conn, wantErr string @@ -2132,6 +2263,7 @@ func TestAuth(t *testing.T) { } for _, tt := range tests { + tt := tt t.Run(tt.conn, func(t *testing.T) { db := pqtest.MustDB(t, tt.conn) @@ -2145,6 +2277,10 @@ func TestAuth(t *testing.T) { } func TestUint64(t *testing.T) { + // This test won't work with transaction mode connection pooling + // without a transaction. + pqtest.SkipSupavisorTransactionMode(t) + db := pqtest.MustDB(t) pqtest.Exec(t, db, `create temp table tbl (n numeric)`) @@ -2154,6 +2290,7 @@ func TestUint64(t *testing.T) { if err != nil { t.Fatal(err) } + defer rows.Close() if rows.Next() { var i uint64 @@ -2169,6 +2306,10 @@ func TestUint64(t *testing.T) { } func TestBytea(t *testing.T) { + // This test won't work with transaction mode connection pooling + // without a transaction. + pqtest.SkipSupavisorTransactionMode(t) + tests := []struct { in any want string @@ -2198,6 +2339,10 @@ func TestBytea(t *testing.T) { } func TestJSONRawMessage(t *testing.T) { + // This test won't work with transaction mode connection pooling + // without a transaction. + pqtest.SkipSupavisorTransactionMode(t) + db := pqtest.MustDB(t) pqtest.Exec(t, db, `create temp table tbl (j json)`) diff --git a/connector_test.go b/connector_test.go index 370f9b21..2264e535 100644 --- a/connector_test.go +++ b/connector_test.go @@ -99,9 +99,16 @@ func TestNewConnector(t *testing.T) { if err != nil { t.Fatal(err) } - want := fmt.Sprintf( - `map[client_encoding:UTF8 connect_timeout:20 datestyle:ISO, MDY dbname:pqgo host:localhost port:%d search_path:foo sslmode:disable sslsni:yes user:pqgo]`, - cfg.Port) + var want string + if pqtest.Supavisor() { + want = fmt.Sprintf( + `map[client_encoding:UTF8 connect_timeout:20 datestyle:ISO, MDY dbname:pqgo host:localhost password:unused port:%d search_path:foo sslmode:disable sslsni:yes user:pqgo.dev_tenant]`, + cfg.Port) + } else { + want = fmt.Sprintf( + `map[client_encoding:UTF8 connect_timeout:20 datestyle:ISO, MDY dbname:pqgo host:localhost port:%d search_path:foo sslmode:disable sslsni:yes user:pqgo]`, + cfg.Port) + } if have := fmt.Sprintf("%v", c.cfg.tomap()); have != want { t.Errorf("\nhave: %s\nwant: %s", have, want) } @@ -115,6 +122,10 @@ func TestNewConnector(t *testing.T) { }) t.Run("database=", func(t *testing.T) { + // Supavisor really doesn't like trying to connect to a database + // other than the one the tenant has access to. + pqtest.SkipSupavisor(t) + want1, want2 := `pq: database "err" does not exist (3D000)`, `pq: database "two" does not exist (3D000)` if pqtest.Pgbouncer() { @@ -191,6 +202,10 @@ func TestParseOpts(t *testing.T) { } func TestRuntimeParameters(t *testing.T) { + // Skipped on Supavisor as the only parameter it uses is `user`: + // https://github.com/supabase/supavisor/blob/6b77121fc697b419e8203bac1c52a69910bb80f3/lib/supavisor/protocol/server.ex#L470-L474 + pqtest.SkipSupavisor(t) + tests := []struct { conninfo string param string @@ -329,6 +344,7 @@ func TestParseURL(t *testing.T) { t.Parallel() for _, tt := range tests { + tt := tt t.Run("", func(t *testing.T) { have, err := ParseURL(tt.in) if !pqtest.ErrorContains(err, tt.wantErr) { diff --git a/copy_test.go b/copy_test.go index 747a0b4b..c462cfcc 100644 --- a/copy_test.go +++ b/copy_test.go @@ -66,6 +66,12 @@ func TestCopyInSchemaStmt(t *testing.T) { } func TestCopyInMultipleValues(t *testing.T) { + // "It is important to note that although the direct connections and + // Supavisor in session mode support prepared statements, Supavisor in + // transaction mode does not." + // https://supabase.com/docs/guides/troubleshooting/disabling-prepared-statements-qL8lEL + pqtest.SkipSupavisorTransactionMode(t) + tests := []struct { cols []string }{ @@ -122,7 +128,10 @@ func TestCopyInMultipleValues(t *testing.T) { } func TestCopyInRaiseStmtTrigger(t *testing.T) { - t.Parallel() + // Transaction mode connection pooling breaks parallel tests using COPY. + if !pqtest.SupavisorTransactionMode() { + t.Parallel() + } db := pqtest.MustDB(t) txn, err := db.Begin() @@ -192,7 +201,10 @@ func TestCopyInRaiseStmtTrigger(t *testing.T) { } func TestCopyInTypes(t *testing.T) { - t.Parallel() + // Transaction mode connection pooling breaks parallel tests using COPY. + if !pqtest.SupavisorTransactionMode() { + t.Parallel() + } db := pqtest.MustDB(t) txn, err := db.Begin() @@ -251,7 +263,10 @@ func TestCopyInTypes(t *testing.T) { } func TestCopyInWrongType(t *testing.T) { - t.Parallel() + // Transaction mode connection pooling breaks parallel tests using COPY. + if !pqtest.SupavisorTransactionMode() { + t.Parallel() + } db := pqtest.MustDB(t) txn, err := db.Begin() @@ -324,7 +339,10 @@ func TestCopyInBinaryError(t *testing.T) { } func TestCopyFromError(t *testing.T) { - t.Parallel() + // Transaction mode connection pooling breaks parallel tests using COPY. + if !pqtest.SupavisorTransactionMode() { + t.Parallel() + } db := pqtest.MustDB(t) txn, err := db.Begin() @@ -374,6 +392,10 @@ func TestCopySyntaxError(t *testing.T) { // Tests for connection errors in copyin.resploop() func TestCopyRespLoopConnectionError(t *testing.T) { + // This test won't work with transaction mode connection pooling + // without a transaction. + pqtest.SkipSupavisorTransactionMode(t) + t.Parallel() db := pqtest.MustDB(t) diff --git a/encode_test.go b/encode_test.go index 5ad2d429..4b643dc7 100644 --- a/encode_test.go +++ b/encode_test.go @@ -3,6 +3,7 @@ package pq import ( "bytes" "database/sql" + "errors" "fmt" "regexp" "testing" @@ -126,7 +127,10 @@ func TestParseTsErrors(t *testing.T) { // Now test that sending the value into the database and parsing it back // returns the same time.Time value. func TestEncodeAndParseTs(t *testing.T) { - t.Parallel() + // Transaction mode connection pooling breaks parallel tests. + if !pqtest.SupavisorTransactionMode() { + t.Parallel() + } db := pqtest.MustDB(t, "timezone='Etc/UTC'") for i, tt := range timeTests { @@ -181,7 +185,10 @@ func TestFormatTs(t *testing.T) { } func TestFormatTsBackend(t *testing.T) { - t.Parallel() + // Transaction mode connection pooling breaks parallel tests. + if !pqtest.SupavisorTransactionMode() { + t.Parallel() + } db := pqtest.MustDB(t) var str string @@ -344,6 +351,12 @@ func TestTimestampWithTimeZone(t *testing.T) { } func TestTimestampWithOutTimezone(t *testing.T) { + // "It is important to note that although the direct connections and + // Supavisor in session mode support prepared statements, Supavisor in + // transaction mode does not." + // https://supabase.com/docs/guides/troubleshooting/disabling-prepared-statements-qL8lEL + pqtest.SkipSupavisorTransactionMode(t) + t.Parallel() db := pqtest.MustDB(t) @@ -525,7 +538,10 @@ func TestStringWithNul(t *testing.T) { } func TestByteSliceToText(t *testing.T) { - t.Parallel() + // Transaction mode connection pooling breaks parallel tests. + if !pqtest.SupavisorTransactionMode() { + t.Parallel() + } db := pqtest.MustDB(t) b := []byte("hello world") @@ -543,7 +559,10 @@ func TestByteSliceToText(t *testing.T) { } func TestStringToBytea(t *testing.T) { - t.Parallel() + // Transaction mode connection pooling breaks parallel tests. + if !pqtest.SupavisorTransactionMode() { + t.Parallel() + } db := pqtest.MustDB(t) b := "hello world" @@ -561,7 +580,10 @@ func TestStringToBytea(t *testing.T) { } func TestTextByteSliceToUUID(t *testing.T) { - t.Parallel() + // Transaction mode connection pooling breaks parallel tests. + if !pqtest.SupavisorTransactionMode() { + t.Parallel() + } db := pqtest.MustDB(t) b := []byte("a0eebc99-9c0b-4ef8-bb6d-6bb9bd380a11") @@ -588,7 +610,11 @@ func TestTextByteSliceToUUID(t *testing.T) { } func TestBinaryByteSlicetoUUID(t *testing.T) { - t.Parallel() + // Transaction mode connection pooling breaks parallel tests. + if !pqtest.SupavisorTransactionMode() { + t.Parallel() + } + db := pqtest.MustDB(t) b := []byte{'\xa0', '\xee', '\xbc', '\x99', @@ -609,17 +635,22 @@ func TestBinaryByteSlicetoUUID(t *testing.T) { t.Fatalf("expected %v but got %v", b, result) } } else { - pqErr := err.(*Error) - if pqErr == nil { - t.Errorf("Expected to get error") - } else if pqErr.Code != "22021" { - t.Fatalf("Expected to get invalid byte sequence for encoding error (22021), got %s", pqErr.Code) + var pqErr *Error + if errors.As(err, &pqErr) { + if pqErr.Code != "22021" { + t.Fatalf("Expected to get invalid byte sequence for encoding error (22021), got %s", pqErr.Code) + } + } else { + t.Errorf("Expected to get pq.Error, got %v", err) } } } func TestStringToUUID(t *testing.T) { - t.Parallel() + // Transaction mode connection pooling breaks parallel tests. + if !pqtest.SupavisorTransactionMode() { + t.Parallel() + } db := pqtest.MustDB(t) s := "a0eebc99-9c0b-4ef8-bb00-6bb9bd380a11" @@ -637,6 +668,12 @@ func TestStringToUUID(t *testing.T) { } func TestTextByteSliceToInt(t *testing.T) { + // "It is important to note that although the direct connections and + // Supavisor in session mode support prepared statements, Supavisor in + // transaction mode does not." + // https://supabase.com/docs/guides/troubleshooting/disabling-prepared-statements-qL8lEL + pqtest.SkipSupavisorTransactionMode(t) + t.Parallel() db := pqtest.MustDB(t) @@ -664,7 +701,11 @@ func TestTextByteSliceToInt(t *testing.T) { } func TestBinaryByteSliceToInt(t *testing.T) { - t.Parallel() + // Transaction mode connection pooling breaks parallel tests. + if !pqtest.SupavisorTransactionMode() { + t.Parallel() + } + db := pqtest.MustDB(t) expected := 12345678 @@ -681,11 +722,13 @@ func TestBinaryByteSliceToInt(t *testing.T) { t.Fatalf("expected %v but got %v", expected, result) } } else { - pqErr := err.(*Error) - if pqErr == nil { - t.Errorf("Expected to get error") - } else if pqErr.Code != "22021" { - t.Fatalf("Expected to get invalid byte sequence for encoding error (22021), got %s", pqErr.Code) + var pqErr *Error + if errors.As(err, &pqErr) { + if pqErr.Code != "22021" { + t.Fatalf("Expected to get invalid byte sequence for encoding error (22021), got %s", pqErr.Code) + } + } else { + t.Errorf("Expected to get pq.Error, got %v", err) } } } diff --git a/error_test.go b/error_test.go index c1fbb3ef..12f2a197 100644 --- a/error_test.go +++ b/error_test.go @@ -275,6 +275,7 @@ func TestNetworkError(t *testing.T) { } c.Dialer(failDialer{}) db := sql.OpenDB(c) + defer db.Close() db.SetMaxIdleConns(1) db.SetMaxOpenConns(1) if err := db.Ping(); err != nil { diff --git a/example_test.go b/example_test.go index 7789435d..f0fdf390 100644 --- a/example_test.go +++ b/example_test.go @@ -4,17 +4,28 @@ import ( "crypto/tls" "crypto/x509" "database/sql" - "fmt" - "log" "os" + "testing" "github.com/lib/pq" + "github.com/lib/pq/internal/pqtest" ) -func ExampleNewConnector() { - c, err := pq.NewConnector("host=postgres dbname=pqgo") +func getTestDSN(t *testing.T) string { + t.Helper() + var dsn string + if pqtest.Supavisor() { + dsn = "host=localhost dbname=pqgo sslmode=disable" + } else { + dsn = "host=postgres dbname=pqgo" + } + return dsn +} + +func TestExampleNewConnector(t *testing.T) { + c, err := pq.NewConnector(getTestDSN(t)) if err != nil { - log.Fatalf("could not create connector: %v", err) + t.Fatalf("could not create connector: %v", err) } db := sql.OpenDB(c) @@ -23,16 +34,16 @@ func ExampleNewConnector() { // Use the DB tx, err := db.Begin() if err != nil { - log.Fatalf("could not start transaction: %v", err) + t.Fatalf("could not start transaction: %v", err) } tx.Rollback() // Output: } -func ExampleNewConfig() { - cfg, err := pq.NewConfig("host=postgres dbname=pqgo") +func TestExampleNewConfig(t *testing.T) { + cfg, err := pq.NewConfig(getTestDSN(t)) if err != nil { - log.Fatal(err) + t.Fatal(err) } if cfg.Host == "localhost" { cfg.Host = "127.0.0.1" @@ -40,7 +51,7 @@ func ExampleNewConfig() { c, err := pq.NewConnectorConfig(cfg) if err != nil { - log.Fatal(err) + t.Fatal(err) } db := sql.OpenDB(c) @@ -49,23 +60,23 @@ func ExampleNewConfig() { // Use the DB tx, err := db.Begin() if err != nil { - log.Fatalf("could not start transaction: %v", err) + t.Fatalf("could not start transaction: %v", err) } tx.Rollback() // Output: } -func ExampleConnectorWithNoticeHandler() { +func TestExampleConnectorWithNoticeHandler(t *testing.T) { // Base connector to wrap dsn := "" base, err := pq.NewConnector(dsn) if err != nil { - log.Fatal(err) + t.Fatal(err) } // Wrap the connector to simply print out the message connector := pq.ConnectorWithNoticeHandler(base, func(notice *pq.Error) { - fmt.Println("Notice sent: " + notice.Message) + t.Logf("Notice sent: %s", notice.Message) }) db := sql.OpenDB(connector) defer db.Close() @@ -73,16 +84,19 @@ func ExampleConnectorWithNoticeHandler() { // Raise a notice sql := "DO language plpgsql $$ BEGIN RAISE NOTICE 'test notice'; END $$" if _, err := db.Exec(sql); err != nil { - log.Fatal(err) + t.Fatal(err) } // Output: // Notice sent: test notice } -func ExampleRegisterTLSConfig() { +func TestExampleRegisterTLSConfig(t *testing.T) { + // TODO: implement SSL support in Supavisor config + pqtest.SkipSupavisor(t) + pem, err := os.ReadFile("testdata/init/root.crt") if err != nil { - log.Fatal(err) + t.Fatal(err) } root := x509.NewCertPool() @@ -90,7 +104,7 @@ func ExampleRegisterTLSConfig() { certs, err := tls.LoadX509KeyPair("testdata/init/postgresql.crt", "testdata/init/postgresql.key") if err != nil { - log.Fatal(err) + t.Fatal(err) } pq.RegisterTLSConfig("mytls", &tls.Config{ @@ -101,35 +115,43 @@ func ExampleRegisterTLSConfig() { db, err := sql.Open("postgres", "host=postgres dbname=pqgo sslmode=pqgo-mytls") if err != nil { - log.Fatal(err) + t.Fatal(err) } + defer db.Close() err = db.Ping() if err != nil { - log.Fatal(err) + t.Fatal(err) } // Output: } -func ExampleCopyIn() { +func TestExampleCopyIn(t *testing.T) { + // This test won't work with transaction mode connection pooling + // without a transaction. + pqtest.SkipSupavisorTransactionMode(t) + // Connect and create table. - db, err := sql.Open("postgres", "") + db, err := sql.Open("postgres", getTestDSN(t)) if err != nil { - log.Fatal(err) + t.Fatal(err) } + defer db.Close() + _, err = db.Exec(`create temp table users (name text, age int)`) if err != nil { - log.Fatal(err) + t.Fatal(err) } // Need to start transaction and prepare a statement. tx, err := db.Begin() if err != nil { - log.Fatal(err) + t.Fatal(err) } + defer tx.Rollback() stmt, err := tx.Prepare(pq.CopyIn("users", "name", "age")) if err != nil { - log.Fatal(err) + t.Fatal(err) } // Insert rows. @@ -143,26 +165,35 @@ func ExampleCopyIn() { for _, user := range users { _, err = stmt.Exec(user.Name, int64(user.Age)) if err != nil { - log.Fatal(err) + t.Fatal(err) } } // Finalize copy and statement, and commit transaction. if _, err := stmt.Exec(); err != nil { - log.Fatal(err) + t.Fatal(err) } if err := stmt.Close(); err != nil { - log.Fatal(err) + t.Fatal(err) } if err := tx.Commit(); err != nil { - log.Fatal(err) + t.Fatal(err) } // Query rows to verify. rows, err := db.Query(`select * from users order by name`) if err != nil { - log.Fatal(err) + t.Fatal(err) } + defer rows.Close() + tests := []struct { + name string + age int + }{ + {"Donald Duck", 36}, + {"Scrooge McDuck", 86}, + } + i := 0 for rows.Next() { var ( name string @@ -170,9 +201,16 @@ func ExampleCopyIn() { ) err := rows.Scan(&name, &age) if err != nil { - log.Fatal(err) + t.Fatal(err) + } + t.Logf("%s %d", name, age) + if have, want := name, tests[i].name; have != want { + t.Errorf("\nhave: %s\nwant: %s", have, want) + } + if have, want := age, tests[i].age; have != want { + t.Errorf("\nhave: %d\nwant: %d", have, want) } - fmt.Println(name, age) + i = i + 1 } // Output: diff --git a/internal/pqtest/pqtest.go b/internal/pqtest/pqtest.go index 96c9ee84..022799ee 100644 --- a/internal/pqtest/pqtest.go +++ b/internal/pqtest/pqtest.go @@ -13,6 +13,10 @@ import ( func Pgbouncer() bool { return os.Getenv("PGPORT") == "6432" } func Pgpool() bool { return os.Getenv("PGPORT") == "7432" } +func SupavisorSessionMode() bool { return os.Getenv("PGPORT") == "5452" } +func SupavisorTransactionMode() bool { return os.Getenv("PGPORT") == "6543" } +func Supavisor() bool { return SupavisorSessionMode() || SupavisorTransactionMode() } + func SkipPgbouncer(t testing.TB) { t.Helper() if Pgbouncer() { @@ -27,6 +31,27 @@ func SkipPgpool(t testing.TB) { } } +func SkipSupavisorSessionMode(t testing.TB) { + t.Helper() + if SupavisorSessionMode() { + t.Skip("skipped for supavisor session mode (PGPORT=5452)") + } +} + +func SkipSupavisorTransactionMode(t testing.TB) { + t.Helper() + if SupavisorTransactionMode() { + t.Skip("skipped for supavisor transaction mode (PGPORT=6543)") + } +} + +func SkipSupavisor(t testing.TB) { + t.Helper() + if Supavisor() { + t.Skip("skipped for supavisor (PGPORT=5452,6543)") + } +} + func ForceBinaryParameters() bool { v, ok := os.LookupEnv("PQTEST_BINARY_PARAMETERS") if !ok { diff --git a/issues_test.go b/issues_test.go index 33aebc05..81c984af 100644 --- a/issues_test.go +++ b/issues_test.go @@ -11,6 +11,12 @@ import ( ) func TestIssue494(t *testing.T) { + // "It is important to note that although the direct connections and + // Supavisor in session mode support prepared statements, Supavisor in + // transaction mode does not." + // https://supabase.com/docs/guides/troubleshooting/disabling-prepared-statements-qL8lEL + pqtest.SkipSupavisorTransactionMode(t) + db := pqtest.MustDB(t) query := `CREATE TEMP TABLE t (i INT PRIMARY KEY)` @@ -30,7 +36,15 @@ func TestIssue494(t *testing.T) { } func TestIssue1046(t *testing.T) { - t.Parallel() + if testing.Short() { + t.Skip("skipping test in short mode") + } + + // Cannot run this test in parallel when using transaction mode + // connection pooling. + if !pqtest.SupavisorTransactionMode() { + t.Parallel() + } db := pqtest.MustDB(t) @@ -59,7 +73,8 @@ func TestIssue1046(t *testing.T) { } func TestIssue1062(t *testing.T) { - if !pqtest.Pgpool() { + // Transaction mode connection pooling breaks parallel tests. + if !pqtest.Pgpool() && !pqtest.Supavisor() { t.Parallel() } db := pqtest.MustDB(t) @@ -125,7 +140,6 @@ func TestQueryCancelRace(t *testing.T) { // Test cancelling a scan after it is started. This broke with 1.10.4. func TestQueryCancelledReused(t *testing.T) { - t.Parallel() db := pqtest.MustDB(t) ctx, cancel := context.WithCancel(context.Background()) diff --git a/notify_test.go b/notify_test.go index 0dfefb63..9a70402b 100644 --- a/notify_test.go +++ b/notify_test.go @@ -74,12 +74,20 @@ func newTestListenerConn(t *testing.T) (*ListenerConn, <-chan *Notification) { } func TestNewListenerConn(t *testing.T) { + // listen/notify currently broken in Supavisor + // https://github.com/supabase/supavisor/issues/85 + pqtest.SkipSupavisor(t) + l, _ := newTestListenerConn(t) defer l.Close() } func TestListenerConnListen(t *testing.T) { + // listen/notify currently broken in Supavisor + // https://github.com/supabase/supavisor/issues/85 + pqtest.SkipSupavisor(t) + l, channel := newTestListenerConn(t) defer l.Close() @@ -103,6 +111,10 @@ func TestListenerConnListen(t *testing.T) { } func TestListenerConnUnlisten(t *testing.T) { + // listen/notify currently broken in Supavisor + // https://github.com/supabase/supavisor/issues/85 + pqtest.SkipSupavisor(t) + l, channel := newTestListenerConn(t) defer l.Close() @@ -141,6 +153,10 @@ func TestListenerConnUnlisten(t *testing.T) { } func TestListenerConnUnlistenAll(t *testing.T) { + // listen/notify currently broken in Supavisor + // https://github.com/supabase/supavisor/issues/85 + pqtest.SkipSupavisor(t) + l, channel := newTestListenerConn(t) defer l.Close() @@ -179,6 +195,10 @@ func TestListenerConnUnlistenAll(t *testing.T) { } func TestListenerConnClose(t *testing.T) { + // listen/notify currently broken in Supavisor + // https://github.com/supabase/supavisor/issues/85 + pqtest.SkipSupavisor(t) + l, _ := newTestListenerConn(t) defer l.Close() @@ -193,6 +213,10 @@ func TestListenerConnClose(t *testing.T) { } func TestListernerConnPing(t *testing.T) { + // listen/notify currently broken in Supavisor + // https://github.com/supabase/supavisor/issues/85 + pqtest.SkipSupavisor(t) + l, _ := newTestListenerConn(t) defer l.Close() err := l.Ping() @@ -211,6 +235,10 @@ func TestListernerConnPing(t *testing.T) { // Test for deadlock where a query fails while another one is queued func TestListenerConnExecDeadlock(t *testing.T) { + // listen/notify currently broken in Supavisor + // https://github.com/supabase/supavisor/issues/85 + pqtest.SkipSupavisor(t) + l, _ := newTestListenerConn(t) defer l.Close() @@ -239,6 +267,10 @@ func TestListenerConnExecDeadlock(t *testing.T) { // Test for ListenerConn being closed while a slow query is executing func TestListenerConnCloseWhileQueryIsExecuting(t *testing.T) { + // listen/notify currently broken in Supavisor + // https://github.com/supabase/supavisor/issues/85 + pqtest.SkipSupavisor(t) + l, _ := newTestListenerConn(t) defer l.Close() @@ -270,6 +302,10 @@ func TestListenerConnCloseWhileQueryIsExecuting(t *testing.T) { } func TestListenerNotifyExtra(t *testing.T) { + // listen/notify currently broken in Supavisor + // https://github.com/supabase/supavisor/issues/85 + pqtest.SkipSupavisor(t) + db := pqtest.MustDB(t) l, channel := newTestListenerConn(t) @@ -311,6 +347,10 @@ func newTestListener(t *testing.T) (*Listener, <-chan ListenerEventType) { } func TestListenerListen(t *testing.T) { + // listen/notify currently broken in Supavisor + // https://github.com/supabase/supavisor/issues/85 + pqtest.SkipSupavisor(t) + l, _ := newTestListener(t) defer l.Close() @@ -333,6 +373,10 @@ func TestListenerListen(t *testing.T) { } func TestListenerUnlisten(t *testing.T) { + // listen/notify currently broken in Supavisor + // https://github.com/supabase/supavisor/issues/85 + pqtest.SkipSupavisor(t) + l, _ := newTestListener(t) defer l.Close() @@ -370,6 +414,10 @@ func TestListenerUnlisten(t *testing.T) { } func TestListenerUnlistenAll(t *testing.T) { + // listen/notify currently broken in Supavisor + // https://github.com/supabase/supavisor/issues/85 + pqtest.SkipSupavisor(t) + l, _ := newTestListener(t) defer l.Close() @@ -407,6 +455,10 @@ func TestListenerUnlistenAll(t *testing.T) { } func TestListenerFailedQuery(t *testing.T) { + // listen/notify currently broken in Supavisor + // https://github.com/supabase/supavisor/issues/85 + pqtest.SkipSupavisor(t) + l, eventch := newTestListener(t) defer l.Close() @@ -454,6 +506,10 @@ func TestListenerFailedQuery(t *testing.T) { } func TestListenerReconnect(t *testing.T) { + // listen/notify currently broken in Supavisor + // https://github.com/supabase/supavisor/issues/85 + pqtest.SkipSupavisor(t) + l, eventch := newTestListenerTimeout(t, 20*time.Millisecond, time.Hour) defer l.Close() @@ -487,6 +543,10 @@ func TestListenerReconnect(t *testing.T) { if !pqtest.ErrorContains(err, "unable to forward message to frontend") { t.Fatalf("unexpected error %T: %[1]s", err) } + } else if pqtest.Supavisor() { + if !pqtest.ErrorContains(err, "{:shutdown, :db_termination}") { + t.Fatalf("unexpected error %T: %[1]s", err) + } } else { if err != io.EOF { t.Fatalf("unexpected error %T: %[1]s", err) @@ -520,6 +580,10 @@ func TestListenerReconnect(t *testing.T) { } func TestListenerClose(t *testing.T) { + // listen/notify currently broken in Supavisor + // https://github.com/supabase/supavisor/issues/85 + pqtest.SkipSupavisor(t) + l, _ := newTestListenerTimeout(t, 20*time.Millisecond, time.Hour) defer l.Close() @@ -534,6 +598,10 @@ func TestListenerClose(t *testing.T) { } func TestListenerPing(t *testing.T) { + // listen/notify currently broken in Supavisor + // https://github.com/supabase/supavisor/issues/85 + pqtest.SkipSupavisor(t) + l, _ := newTestListenerTimeout(t, 20*time.Millisecond, time.Hour) defer l.Close() @@ -554,6 +622,10 @@ func TestListenerPing(t *testing.T) { } func TestConnectorWithNotificationHandler_Simple(t *testing.T) { + // listen/notify currently broken in Supavisor + // https://github.com/supabase/supavisor/issues/85 + pqtest.SkipSupavisor(t) + b, err := NewConnector("") if err != nil { t.Fatal(err) diff --git a/ssl_test.go b/ssl_test.go index cb497ea0..b972a8fd 100644 --- a/ssl_test.go +++ b/ssl_test.go @@ -30,6 +30,9 @@ func startSSLTest(t *testing.T, user string) { wantErr = "protocol_violation" } else if pqtest.Pgpool() { wantErr = "internal_error" + } else if pqtest.Supavisor() { + // TODO: "Either external_id or sni_hostname must be provided" {:single, "pqgosslcert", nil} + wantErr = "internal_error" } _, err := openSSLConn(t, "sslmode=disable user="+user) pqErr := pqError(t, err) @@ -39,6 +42,9 @@ func startSSLTest(t *testing.T, user string) { } func TestSSLMode(t *testing.T) { + // TODO: need additional config to test SSL w/ Supavisor + pqtest.SkipSupavisor(t) + tests := []struct { connect string wantErr bool @@ -92,6 +98,7 @@ func TestSSLMode(t *testing.T) { func TestSSLClientCertificates(t *testing.T) { pqtest.SkipPgpool(t) // TODO: can't get it to work. pqtest.SkipPgbouncer(t) // TODO: can't get it to work. + pqtest.SkipSupavisor(t) // TODO: need to set SSL up in Supavisor. startSSLTest(t, "pqgosslcert") @@ -147,6 +154,9 @@ func TestSSLClientCertificates(t *testing.T) { // Check that clint sends SNI data when sslsni is not disabled func TestSSLSNI(t *testing.T) { + // TODO: need additional config to test SSL w/ Supavisor + pqtest.SkipSupavisor(t) + startSSLTest(t, "pqgosslcert") tests := []struct {