diff --git a/pgconn/pgconn.go b/pgconn/pgconn.go index ec27ec34d..c94b0c530 100644 --- a/pgconn/pgconn.go +++ b/pgconn/pgconn.go @@ -598,9 +598,13 @@ func (pgConn *PgConn) receiveMessage() (pgproto3.BackendMessage, error) { case *pgproto3.ErrorResponse: err := ErrorResponseToPgError(msg) if pgConn.config.OnPgError != nil && !pgConn.config.OnPgError(pgConn, err) { - pgConn.status = connStatusClosed - pgConn.conn.Close() // Ignore error as the connection is already broken and there is already an error to return. - close(pgConn.cleanupDone) + // If the connection is already being closed (e.g. by asyncClose), don't close cleanupDone again to avoid + // a panic from closing an already-closed channel. + if pgConn.status != connStatusClosed { + pgConn.status = connStatusClosed + pgConn.conn.Close() // Ignore error as the connection is already broken and there is already an error to return. + close(pgConn.cleanupDone) + } return nil, err } case *pgproto3.NoticeResponse: @@ -1411,9 +1415,11 @@ func (pgConn *PgConn) CopyFrom(ctx context.Context, r io.Reader, sql string) (Co // the goroutine. So instead check pgConn.bufferingReceiveErr which will have been set by the signalMessage. If an // error is found then forcibly close the connection without sending the Terminate message. if err := pgConn.bufferingReceiveErr; err != nil { - pgConn.status = connStatusClosed - pgConn.conn.Close() - close(pgConn.cleanupDone) + if pgConn.status != connStatusClosed { + pgConn.status = connStatusClosed + pgConn.conn.Close() + close(pgConn.cleanupDone) + } return CommandTag{}, normalizeTimeoutError(ctx, err) } // peekMessage never returns err in the bufferingReceive mode - it only forwards the bufferingReceive variables. @@ -2821,7 +2827,7 @@ func (p *Pipeline) Close() error { } for p.state.ExpectedReadyForQuery() > 0 { - _, err := p.getResults() + results, err := p.getResults() if err != nil { p.err = err var pgErr *PgError @@ -2829,6 +2835,17 @@ func (p *Pipeline) Close() error { p.conn.asyncClose() break } + } else if results == nil { + // getResults returns (nil, nil) when the request queue is exhausted (pipelineNil) but + // ExpectedReadyForQuery is still > 0. This happens when the server sends FATAL errors that + // consume queued request slots (e.g. Prepare, Sync) without ever sending ReadyForQuery -- + // so expectedReadyForQueryCount is never decremented. Without this check the loop would spin + // indefinitely because there are no more requests to issue and no ReadyForQuery to receive. + p.conn.asyncClose() + if p.err == nil { + p.err = errors.New("pipeline: no more results but expected ReadyForQuery") + } + break } } diff --git a/pgconn/pgconn_test.go b/pgconn/pgconn_test.go index 3fd43a031..981556c57 100644 --- a/pgconn/pgconn_test.go +++ b/pgconn/pgconn_test.go @@ -4389,6 +4389,110 @@ func TestFatalErrorReceivedInPipelineMode(t *testing.T) { require.Error(t, err) } +// https://github.com/jackc/pgx/issues/2470 +// When the server sends multiple FATAL errors in a single batch (as PgBouncer can do when +// terminating idle-in-transaction connections), Pipeline.Close() must not panic with +// "close of closed channel" on cleanupDone. The first FATAL triggers OnPgError which closes +// the connection and cleanupDone. The second FATAL, still in the read buffer, must not +// attempt to close cleanupDone again. +// +// This test sends all server responses in a single TCP write to guarantee both FATAL errors +// are in the chunkReader buffer simultaneously. +func TestPipelineCloseDoesNotPanicOnMultipleFatalErrors(t *testing.T) { + t.Parallel() + + ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second) + defer cancel() + + ln, err := net.Listen("tcp", "127.0.0.1:") + require.NoError(t, err) + defer ln.Close() + + serverErrChan := make(chan error, 1) + go func() { + defer close(serverErrChan) + + conn, err := ln.Accept() + if err != nil { + serverErrChan <- err + return + } + defer conn.Close() + + err = conn.SetDeadline(time.Now().Add(59 * time.Second)) + if err != nil { + serverErrChan <- err + return + } + + backend := pgproto3.NewBackend(conn, conn) + + // Handle startup + _, err = backend.ReceiveStartupMessage() + if err != nil { + serverErrChan <- err + return + } + backend.Send(&pgproto3.AuthenticationOk{}) + backend.Send(&pgproto3.BackendKeyData{ProcessID: 0, SecretKey: 0}) + backend.Send(&pgproto3.ReadyForQuery{TxStatus: 'I'}) + err = backend.Flush() + if err != nil { + serverErrChan <- err + return + } + + // Read all client pipeline messages (Parse, Describe, Parse, Describe, Sync) + for i := 0; i < 5; i++ { + _, err = backend.Receive() + if err != nil { + serverErrChan <- err + return + } + } + + // Send ALL responses in a single write so they all end up in the chunkReader buffer. + // This simulates PgBouncer sending a FATAL and then the real PostgreSQL also sending + // a FATAL, both arriving in the same TCP segment. + backend.Send(&pgproto3.ParseComplete{}) + backend.Send(&pgproto3.ParameterDescription{}) + backend.Send(&pgproto3.RowDescription{Fields: []pgproto3.FieldDescription{ + {Name: []byte("mock")}, + }}) + // Two FATAL errors back-to-back in the same write buffer + backend.Send(&pgproto3.ErrorResponse{Severity: "FATAL", Code: "57P01", Message: "terminating connection due to administrator command"}) + backend.Send(&pgproto3.ErrorResponse{Severity: "FATAL", Code: "57P01", Message: "terminating connection due to administrator command"}) + err = backend.Flush() + if err != nil { + serverErrChan <- err + return + } + }() + + parts := strings.Split(ln.Addr().String(), ":") + host := parts[0] + port := parts[1] + connStr := fmt.Sprintf("sslmode=disable host=%s port=%s", host, port) + + ctx, cancel = context.WithTimeout(ctx, 59*time.Second) + defer cancel() + conn, err := pgconn.Connect(ctx, connStr) + require.NoError(t, err) + + pipeline := conn.StartPipeline(ctx) + pipeline.SendPrepare("s1", "select 1", nil) + pipeline.SendPrepare("s2", "select 2", nil) + err = pipeline.Sync() + require.NoError(t, err) + + // Do NOT call GetResults. Call Close() directly so it drains results via getResults(). + // The first FATAL closes the connection via OnPgError, including close(cleanupDone). + // The second FATAL is still buffered in chunkReader. Without the fix, processing it + // would attempt to close cleanupDone again, causing a panic. + err = pipeline.Close() + require.Error(t, err) +} + func mustEncode(buf []byte, err error) []byte { if err != nil { panic(err)