From c3ede4651aa774bfaecb46f87501fb11876fea9b Mon Sep 17 00:00:00 2001 From: Varun Chawla Date: Tue, 10 Feb 2026 00:28:30 -0800 Subject: [PATCH 1/2] pgconn: fix panic on Pipeline.Close when server sends multiple FATAL errors When the server sends multiple FATAL ErrorResponse messages that are buffered together (as PgBouncer can do when terminating idle-in-transaction connections), Pipeline.Close() would panic with "close of closed channel". The root cause is that PgConn.receiveMessage() unconditionally calls close(cleanupDone) when OnPgError returns false, without checking if the channel was already closed by a previous error on the same connection. When two FATAL errors are in the chunkReader buffer, processing the first FATAL closes cleanupDone, and processing the second FATAL attempts to close it again, causing the panic. The fix: 1. Guard close(cleanupDone) in receiveMessage() and CopyFrom() with a status check to prevent double-close of the channel. 2. Add a nil-results check in Pipeline.Close() to break out of the result-draining loop when no more results are available but ExpectedReadyForQuery > 0 (preventing a potential infinite loop). Fixes #2470 Co-Authored-By: Claude Opus 4.6 --- pgconn/pgconn.go | 28 +++++++++--- pgconn/pgconn_test.go | 104 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 125 insertions(+), 7 deletions(-) diff --git a/pgconn/pgconn.go b/pgconn/pgconn.go index ec27ec34d..4571ca2fb 100644 --- a/pgconn/pgconn.go +++ b/pgconn/pgconn.go @@ -598,9 +598,13 @@ func (pgConn *PgConn) receiveMessage() (pgproto3.BackendMessage, error) { case *pgproto3.ErrorResponse: err := ErrorResponseToPgError(msg) if pgConn.config.OnPgError != nil && !pgConn.config.OnPgError(pgConn, err) { - pgConn.status = connStatusClosed - pgConn.conn.Close() // Ignore error as the connection is already broken and there is already an error to return. - close(pgConn.cleanupDone) + // If the connection is already being closed (e.g. by asyncClose), don't close cleanupDone again to avoid + // a panic from closing an already-closed channel. + if pgConn.status != connStatusClosed { + pgConn.status = connStatusClosed + pgConn.conn.Close() // Ignore error as the connection is already broken and there is already an error to return. + close(pgConn.cleanupDone) + } return nil, err } case *pgproto3.NoticeResponse: @@ -1411,9 +1415,11 @@ func (pgConn *PgConn) CopyFrom(ctx context.Context, r io.Reader, sql string) (Co // the goroutine. So instead check pgConn.bufferingReceiveErr which will have been set by the signalMessage. If an // error is found then forcibly close the connection without sending the Terminate message. if err := pgConn.bufferingReceiveErr; err != nil { - pgConn.status = connStatusClosed - pgConn.conn.Close() - close(pgConn.cleanupDone) + if pgConn.status != connStatusClosed { + pgConn.status = connStatusClosed + pgConn.conn.Close() + close(pgConn.cleanupDone) + } return CommandTag{}, normalizeTimeoutError(ctx, err) } // peekMessage never returns err in the bufferingReceive mode - it only forwards the bufferingReceive variables. @@ -2821,7 +2827,7 @@ func (p *Pipeline) Close() error { } for p.state.ExpectedReadyForQuery() > 0 { - _, err := p.getResults() + results, err := p.getResults() if err != nil { p.err = err var pgErr *PgError @@ -2829,6 +2835,14 @@ func (p *Pipeline) Close() error { p.conn.asyncClose() break } + } else if results == nil { + // No more results available but we haven't received all expected ReadyForQuery messages. + // This means the connection is in a bad state (e.g. server closed the connection). + p.conn.asyncClose() + if p.err == nil { + p.err = errors.New("pipeline: no more results but expected ReadyForQuery") + } + break } } diff --git a/pgconn/pgconn_test.go b/pgconn/pgconn_test.go index 3fd43a031..981556c57 100644 --- a/pgconn/pgconn_test.go +++ b/pgconn/pgconn_test.go @@ -4389,6 +4389,110 @@ func TestFatalErrorReceivedInPipelineMode(t *testing.T) { require.Error(t, err) } +// https://github.com/jackc/pgx/issues/2470 +// When the server sends multiple FATAL errors in a single batch (as PgBouncer can do when +// terminating idle-in-transaction connections), Pipeline.Close() must not panic with +// "close of closed channel" on cleanupDone. The first FATAL triggers OnPgError which closes +// the connection and cleanupDone. The second FATAL, still in the read buffer, must not +// attempt to close cleanupDone again. +// +// This test sends all server responses in a single TCP write to guarantee both FATAL errors +// are in the chunkReader buffer simultaneously. +func TestPipelineCloseDoesNotPanicOnMultipleFatalErrors(t *testing.T) { + t.Parallel() + + ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second) + defer cancel() + + ln, err := net.Listen("tcp", "127.0.0.1:") + require.NoError(t, err) + defer ln.Close() + + serverErrChan := make(chan error, 1) + go func() { + defer close(serverErrChan) + + conn, err := ln.Accept() + if err != nil { + serverErrChan <- err + return + } + defer conn.Close() + + err = conn.SetDeadline(time.Now().Add(59 * time.Second)) + if err != nil { + serverErrChan <- err + return + } + + backend := pgproto3.NewBackend(conn, conn) + + // Handle startup + _, err = backend.ReceiveStartupMessage() + if err != nil { + serverErrChan <- err + return + } + backend.Send(&pgproto3.AuthenticationOk{}) + backend.Send(&pgproto3.BackendKeyData{ProcessID: 0, SecretKey: 0}) + backend.Send(&pgproto3.ReadyForQuery{TxStatus: 'I'}) + err = backend.Flush() + if err != nil { + serverErrChan <- err + return + } + + // Read all client pipeline messages (Parse, Describe, Parse, Describe, Sync) + for i := 0; i < 5; i++ { + _, err = backend.Receive() + if err != nil { + serverErrChan <- err + return + } + } + + // Send ALL responses in a single write so they all end up in the chunkReader buffer. + // This simulates PgBouncer sending a FATAL and then the real PostgreSQL also sending + // a FATAL, both arriving in the same TCP segment. + backend.Send(&pgproto3.ParseComplete{}) + backend.Send(&pgproto3.ParameterDescription{}) + backend.Send(&pgproto3.RowDescription{Fields: []pgproto3.FieldDescription{ + {Name: []byte("mock")}, + }}) + // Two FATAL errors back-to-back in the same write buffer + backend.Send(&pgproto3.ErrorResponse{Severity: "FATAL", Code: "57P01", Message: "terminating connection due to administrator command"}) + backend.Send(&pgproto3.ErrorResponse{Severity: "FATAL", Code: "57P01", Message: "terminating connection due to administrator command"}) + err = backend.Flush() + if err != nil { + serverErrChan <- err + return + } + }() + + parts := strings.Split(ln.Addr().String(), ":") + host := parts[0] + port := parts[1] + connStr := fmt.Sprintf("sslmode=disable host=%s port=%s", host, port) + + ctx, cancel = context.WithTimeout(ctx, 59*time.Second) + defer cancel() + conn, err := pgconn.Connect(ctx, connStr) + require.NoError(t, err) + + pipeline := conn.StartPipeline(ctx) + pipeline.SendPrepare("s1", "select 1", nil) + pipeline.SendPrepare("s2", "select 2", nil) + err = pipeline.Sync() + require.NoError(t, err) + + // Do NOT call GetResults. Call Close() directly so it drains results via getResults(). + // The first FATAL closes the connection via OnPgError, including close(cleanupDone). + // The second FATAL is still buffered in chunkReader. Without the fix, processing it + // would attempt to close cleanupDone again, causing a panic. + err = pipeline.Close() + require.Error(t, err) +} + func mustEncode(buf []byte, err error) []byte { if err != nil { panic(err) From f4d3fb02600bb1a5359434a5e0a74cdee3941782 Mon Sep 17 00:00:00 2001 From: Varun Chawla Date: Tue, 10 Feb 2026 22:02:12 -0800 Subject: [PATCH 2/2] pgconn: improve comment explaining nil, nil path in Pipeline.Close Expand the comment on the results == nil guard in Pipeline.Close() to explain exactly when and why getResults() returns (nil, nil): When the server sends FATAL errors that consume queued request slots (e.g. Prepare, Sync) without ever sending ReadyForQuery, the request event queue is exhausted while expectedReadyForQueryCount remains > 0. ExtractFrontRequestType() returns pipelineNil, getResults() returns (nil, nil), and without this guard the loop would spin indefinitely. --- pgconn/pgconn.go | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/pgconn/pgconn.go b/pgconn/pgconn.go index 4571ca2fb..c94b0c530 100644 --- a/pgconn/pgconn.go +++ b/pgconn/pgconn.go @@ -2836,8 +2836,11 @@ func (p *Pipeline) Close() error { break } } else if results == nil { - // No more results available but we haven't received all expected ReadyForQuery messages. - // This means the connection is in a bad state (e.g. server closed the connection). + // getResults returns (nil, nil) when the request queue is exhausted (pipelineNil) but + // ExpectedReadyForQuery is still > 0. This happens when the server sends FATAL errors that + // consume queued request slots (e.g. Prepare, Sync) without ever sending ReadyForQuery -- + // so expectedReadyForQueryCount is never decremented. Without this check the loop would spin + // indefinitely because there are no more requests to issue and no ReadyForQuery to receive. p.conn.asyncClose() if p.err == nil { p.err = errors.New("pipeline: no more results but expected ReadyForQuery")