From 745ba63f957bda5aadcd9988b66578cd6faee3dd Mon Sep 17 00:00:00 2001 From: Sergey Dobrodey Date: Tue, 10 Feb 2026 16:29:29 +0200 Subject: [PATCH] BGReader: fix race while Stopping --- pgconn/internal/bgreader/bgreader.go | 4 +- pgconn/internal/bgreader/bgreader_test.go | 64 +++++++++++++++++++++++ 2 files changed, 66 insertions(+), 2 deletions(-) diff --git a/pgconn/internal/bgreader/bgreader.go b/pgconn/internal/bgreader/bgreader.go index e65c2c2bf..1774760a0 100644 --- a/pgconn/internal/bgreader/bgreader.go +++ b/pgconn/internal/bgreader/bgreader.go @@ -95,8 +95,8 @@ func (r *BGReader) Read(p []byte) (int, error) { return r.readFromReadResults(p) } - // There are no unread background read results and the background reader is stopped. - if r.status == StatusStopped { + // There are no unread background read results and the background reader is stopped or stopping. + if r.status == StatusStopped || r.status == StatusStopping { return r.r.Read(p) } diff --git a/pgconn/internal/bgreader/bgreader_test.go b/pgconn/internal/bgreader/bgreader_test.go index 3a8aa00f6..7ee221469 100644 --- a/pgconn/internal/bgreader/bgreader_test.go +++ b/pgconn/internal/bgreader/bgreader_test.go @@ -5,6 +5,7 @@ import ( "errors" "io" "math/rand/v2" + "sync" "testing" "time" @@ -138,3 +139,66 @@ func TestBGReaderStress(t *testing.T) { } } } + +type blockingReader struct { + started chan struct{} + allowFirst chan struct{} + mu sync.Mutex + reads int +} + +func newBlockingReader() *blockingReader { + return &blockingReader{ + started: make(chan struct{}), + allowFirst: make(chan struct{}), + } +} + +func (r *blockingReader) Read(p []byte) (int, error) { + r.mu.Lock() + r.reads++ + reads := r.reads + r.mu.Unlock() + + if reads == 1 { + close(r.started) + <-r.allowFirst + return 0, io.EOF + } + + return copy(p, []byte("direct")), nil +} + +func TestBGReaderReadDoesNotWaitWhenStopping(t *testing.T) { + rr := newBlockingReader() + defer close(rr.allowFirst) + + bgr := bgreader.New(rr) + bgr.Start() + + select { + case <-rr.started: + case <-time.After(1 * time.Second): + t.Fatal("background read did not start") + } + + bgr.Stop() + + buf := make([]byte, 6) + done := make(chan struct{}) + var n int + var err error + go func() { + n, err = bgr.Read(buf) + close(done) + }() + + select { + case <-done: + case <-time.After(250 * time.Millisecond): + t.Fatal("Read blocked while status was StatusStopping") + } + + require.NoError(t, err) + require.Equal(t, "direct", string(buf[:n])) +}