Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 24 additions & 7 deletions pgconn/pgconn.go
Original file line number Diff line number Diff line change
Expand Up @@ -598,9 +598,13 @@ func (pgConn *PgConn) receiveMessage() (pgproto3.BackendMessage, error) {
case *pgproto3.ErrorResponse:
err := ErrorResponseToPgError(msg)
if pgConn.config.OnPgError != nil && !pgConn.config.OnPgError(pgConn, err) {
pgConn.status = connStatusClosed
pgConn.conn.Close() // Ignore error as the connection is already broken and there is already an error to return.
close(pgConn.cleanupDone)
// If the connection is already being closed (e.g. by asyncClose), don't close cleanupDone again to avoid
// a panic from closing an already-closed channel.
if pgConn.status != connStatusClosed {
pgConn.status = connStatusClosed
pgConn.conn.Close() // Ignore error as the connection is already broken and there is already an error to return.
close(pgConn.cleanupDone)
}
return nil, err
}
case *pgproto3.NoticeResponse:
Expand Down Expand Up @@ -1411,9 +1415,11 @@ func (pgConn *PgConn) CopyFrom(ctx context.Context, r io.Reader, sql string) (Co
// the goroutine. So instead check pgConn.bufferingReceiveErr which will have been set by the signalMessage. If an
// error is found then forcibly close the connection without sending the Terminate message.
if err := pgConn.bufferingReceiveErr; err != nil {
pgConn.status = connStatusClosed
pgConn.conn.Close()
close(pgConn.cleanupDone)
if pgConn.status != connStatusClosed {
pgConn.status = connStatusClosed
pgConn.conn.Close()
close(pgConn.cleanupDone)
}
return CommandTag{}, normalizeTimeoutError(ctx, err)
}
// peekMessage never returns err in the bufferingReceive mode - it only forwards the bufferingReceive variables.
Expand Down Expand Up @@ -2821,14 +2827,25 @@ func (p *Pipeline) Close() error {
}

for p.state.ExpectedReadyForQuery() > 0 {
_, err := p.getResults()
results, err := p.getResults()
if err != nil {
p.err = err
var pgErr *PgError
if !errors.As(err, &pgErr) {
p.conn.asyncClose()
break
}
} else if results == nil {
Copy link
Owner

@jackc jackc Feb 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How can this path be executed? I thought that getResults would always either return results or an error. But this would only happen if it returns nil, nil.

If any change is to happen around here I would expect there to be more error cases that trigger asyncClose() and break.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question. getResults() does return (nil, nil) -- it happens at the pipelineNil case (line 2514-2515) when ExtractFrontRequestType() finds the request event queue empty.

Here's the exact sequence in the double-FATAL scenario:

  1. Iteration 1 -- pops Prepare from queue, getResultsPrepare() reads ParseComplete + ParameterDescription + RowDescription, returns (psd, nil). Loop continues.

  2. Iteration 2 -- pops Prepare from queue, getResultsPrepare() calls receiveMessage() which reads the first FATAL. OnPgError returns false, connection is closed, cleanupDone is closed. Pipeline.receiveMessage() returns the error. Back in Close(): it's a PgError, so the loop continues (doesn't break).

  3. Iteration 3 -- pops Sync from queue, getResultsSync() calls receiveMessage() which reads the second FATAL. The guard prevents the double-close of cleanupDone. Pipeline.receiveMessage() returns the error. Back in Close(): it's again a PgError, so the loop continues.

  4. Iteration 4 -- queue is now empty. ExtractFrontRequestType() returns pipelineNil. getResults() returns (nil, nil). But ExpectedReadyForQuery() is still 1 because ReadyForQuery was never received (the server sent FATALs instead). Without the nil check, this spins forever.

The key insight is that FATAL PgErrors don't cause Close() to break (only non-PgError errors do), and FATAL errors consume queued request slots without the server ever sending ReadyForQuery, so expectedReadyForQueryCount is never decremented.

I've updated the comment in the latest push to explain this flow explicitly.

// getResults returns (nil, nil) when the request queue is exhausted (pipelineNil) but
// ExpectedReadyForQuery is still > 0. This happens when the server sends FATAL errors that
// consume queued request slots (e.g. Prepare, Sync) without ever sending ReadyForQuery --
// so expectedReadyForQueryCount is never decremented. Without this check the loop would spin
// indefinitely because there are no more requests to issue and no ReadyForQuery to receive.
p.conn.asyncClose()
if p.err == nil {
p.err = errors.New("pipeline: no more results but expected ReadyForQuery")
}
break
}
}

Expand Down
104 changes: 104 additions & 0 deletions pgconn/pgconn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4389,6 +4389,110 @@ func TestFatalErrorReceivedInPipelineMode(t *testing.T) {
require.Error(t, err)
}

// https://github.com/jackc/pgx/issues/2470
// When the server sends multiple FATAL errors in a single batch (as PgBouncer can do when
// terminating idle-in-transaction connections), Pipeline.Close() must not panic with
// "close of closed channel" on cleanupDone. The first FATAL triggers OnPgError which closes
// the connection and cleanupDone. The second FATAL, still in the read buffer, must not
// attempt to close cleanupDone again.
//
// This test sends all server responses in a single TCP write to guarantee both FATAL errors
// are in the chunkReader buffer simultaneously.
func TestPipelineCloseDoesNotPanicOnMultipleFatalErrors(t *testing.T) {
t.Parallel()

ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
defer cancel()

ln, err := net.Listen("tcp", "127.0.0.1:")
require.NoError(t, err)
defer ln.Close()

serverErrChan := make(chan error, 1)
go func() {
defer close(serverErrChan)

conn, err := ln.Accept()
if err != nil {
serverErrChan <- err
return
}
defer conn.Close()

err = conn.SetDeadline(time.Now().Add(59 * time.Second))
if err != nil {
serverErrChan <- err
return
}

backend := pgproto3.NewBackend(conn, conn)

// Handle startup
_, err = backend.ReceiveStartupMessage()
if err != nil {
serverErrChan <- err
return
}
backend.Send(&pgproto3.AuthenticationOk{})
backend.Send(&pgproto3.BackendKeyData{ProcessID: 0, SecretKey: 0})
backend.Send(&pgproto3.ReadyForQuery{TxStatus: 'I'})
err = backend.Flush()
if err != nil {
serverErrChan <- err
return
}

// Read all client pipeline messages (Parse, Describe, Parse, Describe, Sync)
for i := 0; i < 5; i++ {
_, err = backend.Receive()
if err != nil {
serverErrChan <- err
return
}
}

// Send ALL responses in a single write so they all end up in the chunkReader buffer.
// This simulates PgBouncer sending a FATAL and then the real PostgreSQL also sending
// a FATAL, both arriving in the same TCP segment.
backend.Send(&pgproto3.ParseComplete{})
backend.Send(&pgproto3.ParameterDescription{})
backend.Send(&pgproto3.RowDescription{Fields: []pgproto3.FieldDescription{
{Name: []byte("mock")},
}})
// Two FATAL errors back-to-back in the same write buffer
backend.Send(&pgproto3.ErrorResponse{Severity: "FATAL", Code: "57P01", Message: "terminating connection due to administrator command"})
backend.Send(&pgproto3.ErrorResponse{Severity: "FATAL", Code: "57P01", Message: "terminating connection due to administrator command"})
err = backend.Flush()
if err != nil {
serverErrChan <- err
return
}
}()

parts := strings.Split(ln.Addr().String(), ":")
host := parts[0]
port := parts[1]
connStr := fmt.Sprintf("sslmode=disable host=%s port=%s", host, port)

ctx, cancel = context.WithTimeout(ctx, 59*time.Second)
defer cancel()
conn, err := pgconn.Connect(ctx, connStr)
require.NoError(t, err)

pipeline := conn.StartPipeline(ctx)
pipeline.SendPrepare("s1", "select 1", nil)
pipeline.SendPrepare("s2", "select 2", nil)
err = pipeline.Sync()
require.NoError(t, err)

// Do NOT call GetResults. Call Close() directly so it drains results via getResults().
// The first FATAL closes the connection via OnPgError, including close(cleanupDone).
// The second FATAL is still buffered in chunkReader. Without the fix, processing it
// would attempt to close cleanupDone again, causing a panic.
err = pipeline.Close()
require.Error(t, err)
}

func mustEncode(buf []byte, err error) []byte {
if err != nil {
panic(err)
Expand Down