From 84c15e4f0018886b8debce2ecc46079dfadd108b Mon Sep 17 00:00:00 2001 From: Dmitry Kotik <7944694+dkotik@users.noreply.github.com> Date: Fri, 19 Sep 2025 12:20:28 +0300 Subject: [PATCH] feature: prevent database writes when the topic is idle The busy waiting loop, that polls the next batches causes a modification to the database (sets the lock) this causes tools like litestream to write wal files to their replicas every poll interval this makes a restore incredibly slow and causes a constant drain on the cpu. --- DEVELOPMENT.md | 12 +++++++++++- Makefile | 4 +++- go.work.sum | 4 +--- wmsqlitemodernc/subscription.go | 10 ++++++++++ wmsqlitezombiezen/errors.go | 10 ++++++++++ wmsqlitezombiezen/subscription.go | 15 +++++++++++++-- 6 files changed, 48 insertions(+), 7 deletions(-) diff --git a/DEVELOPMENT.md b/DEVELOPMENT.md index 9933b0b..27c9ba9 100644 --- a/DEVELOPMENT.md +++ b/DEVELOPMENT.md @@ -10,7 +10,17 @@ SQLite3 does not support querying `FOR UPDATE`, which is used for row locking wh - [ ] Implement SQLite connection back off manager - A friend recommended implementing a back off manager. I think the SQLite `busy_timeout` produces a linear back off timeout. When attemping to write a row lock, SQLite will freeze the transaction until the previous one is complete up to the `busy_timeout` duration. This should prevent unneccessary waits due to polling. Perhaps this does not work like I imagine. Also, the ZombieZen variant uses immediate transactions, which may ignore the `busy_timeout`. This requires additional investigation before implementing. Implementation examples in other libraries to consider: + A friend recommended implementing a back off manager. I think the SQLite `busy_timeout` produces a linear back off timeout. When attemping to write a row lock, SQLite will freeze the transaction until the previous one is complete up to the `busy_timeout` duration. This should prevent unneccessary waits due to polling. Perhaps this does not work like I imagine. Also, the ZombieZen variant uses immediate transactions, which may ignore the `busy_timeout`. This requires additional investigation before implementing. + + Here is an example attempt: https://github.com/sandpapersoftware/watermillsqlite + + The busy waiting loop, that polls the next batches causes a modification to the database (sets the lock) + this causes tools like litestream to write wal files to their replicas every poll interval + this makes a restore incredibly slow and causes a constant drain on the cpu. + + I wonder if a rollback in a batch == 0 case, wouldn't be enough to release the lock or you only set the lock, when a batch is > 0? Lock + read operation are in the same transaction; I think I can just cancel the transaction if the batch size is 0 to prevent the write. + + Implementation examples in other libraries to consider: - https://github.com/ThreeDotsLabs/watermill-sql/blob/master/pkg/sql/backoff_manager.go - https://github.com/ov2b/watermill-sqlite3/blob/main/reset_latch_backoff_manager.go diff --git a/Makefile b/Makefile index b744576..d25d298 100644 --- a/Makefile +++ b/Makefile @@ -47,10 +47,12 @@ fmt: (cd test && go fmt ./... && goimports -l -w .) update_watermill: + (cd test && go get -u github.com/ThreeDotsLabs/watermill@latest && go mod tidy) (cd wmsqlitemodernc && go get -u github.com/ThreeDotsLabs/watermill-sqlite/test@latest && go get -u github.com/ThreeDotsLabs/watermill@latest && go mod tidy) (cd wmsqlitezombiezen && go get -u github.com/ThreeDotsLabs/watermill-sqlite/test@latest && go get -u github.com/ThreeDotsLabs/watermill@latest && go mod tidy) - sed -i '|^go 1\.|d' wmsqlitemodernc/go.mod wmsqlitezombiezen/go.mod + sed -i '|^go 1\.|d' test/go.mod wmsqlitemodernc/go.mod wmsqlitezombiezen/go.mod + (cd test && go mod edit -fmt) (cd wmsqlitemodernc && go mod edit -fmt) (cd wmsqlitezombiezen && go mod edit -fmt) diff --git a/go.work.sum b/go.work.sum index 9bf94cf..614914f 100644 --- a/go.work.sum +++ b/go.work.sum @@ -764,7 +764,6 @@ github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/ github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.7/go.mod h1:n+brtR0CgQNWTVd5ZUFpTBC8YFBDLK/h/bpaJ8/DtOE= github.com/google/go-cmp v0.5.8/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= -github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXiIaQGbYVAs8BPL6v8lEs= @@ -852,6 +851,7 @@ github.com/kr/fs v0.1.0/go.mod h1:FFnZGqtBN9Gxj7eW1uZ42v5BccTP0vu6NEaFoC2HwRg= github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pretty v0.3.0/go.mod h1:640gp4NfQd8pI5XOwp5fnNeVWj67G7CFk/SaSQn7NBk= +github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw= github.com/ledongthuc/pdf v0.0.0-20220302134840-0c2507a12d80/go.mod h1:imJHygn/1yfhB7XSJJKlFZKl/J+dCPAknuiaGOshXAs= github.com/lyft/protoc-gen-star v0.6.0/go.mod h1:TGAoBVkt8w7MPG72TrKIu85MIdXwDuzJYeZuUPFPNwA= @@ -1201,7 +1201,6 @@ golang.org/x/sync v0.2.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.3.0/go.mod h1:FU7BRWz2tNW+3quACPkgCx/L+uEAv1htQ0V83Z9Rj+Y= golang.org/x/sync v0.5.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sync v0.6.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= -golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sync v0.13.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA= golang.org/x/sync v0.14.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= @@ -1349,7 +1348,6 @@ golang.org/x/text v0.8.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= golang.org/x/text v0.10.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= -golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= golang.org/x/text v0.15.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= golang.org/x/text v0.16.0/go.mod h1:GhwF1Be+LQoKShO3cGOHzqOgRrGaYc9AvblQOmPVHnI= golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= diff --git a/wmsqlitemodernc/subscription.go b/wmsqlitemodernc/subscription.go index e1706e4..06ed399 100644 --- a/wmsqlitemodernc/subscription.go +++ b/wmsqlitemodernc/subscription.go @@ -44,6 +44,13 @@ func (s *subscription) NextBatch(ctx context.Context) (batch []rawMessage, err e } defer func() { if err == nil { + if len(batch) == 0 { + // cancel writing the lock to the database + // when no messages were fetched + // to avoid a database write on every poll interval + err = tx.Rollback() + return + } err = tx.Commit() } err = errors.Join(err, tx.Rollback()) @@ -185,6 +192,9 @@ func (s *subscription) Run(ctx context.Context) { } continue } + if len(batch) == 0 { + continue // the lock is never set on empty batches + } for _, next := range batch { if err = s.Send(ctx, next); err != nil { diff --git a/wmsqlitezombiezen/errors.go b/wmsqlitezombiezen/errors.go index 485f71c..9a306f8 100644 --- a/wmsqlitezombiezen/errors.go +++ b/wmsqlitezombiezen/errors.go @@ -37,6 +37,12 @@ const ( // This can only happen if there is a mistake in the SQLite query. Can occur if more than one row is returned // when one was expected or none were expected. You should never see this error. ErrMoreRowStepsThanExpected + + // ErrEmptyBatch is a sentinel error indicating the absence of new topic messages. + // This error is used to cancel the database transaction and the row lock + // when there are no new messages available. This prevents the subscriber + // from writing to the database every poll interval when the topic is idle. + ErrEmptyBatch ) func (e Error) Error() string { @@ -55,7 +61,11 @@ func (e Error) Error() string { return "consumer group is already locked by another consumer" case ErrMoreRowStepsThanExpected: return "more rows returned than expected" + case ErrEmptyBatch: + return "there are no new messages" default: return "unknown error" } } + +var errEmptyBatch error = ErrEmptyBatch // reference container for the error diff --git a/wmsqlitezombiezen/subscription.go b/wmsqlitezombiezen/subscription.go index 8477079..c6e45a2 100644 --- a/wmsqlitezombiezen/subscription.go +++ b/wmsqlitezombiezen/subscription.go @@ -52,7 +52,16 @@ func (s *subscription) NextBatch() (batch []rawMessage, err error) { return nil, err } // closeTransaction := sqlitex.Transaction(s.Connection) - defer closeTransaction(&err) + defer func() { + if len(batch) == 0 && err == nil { + // cancel writing the lock to the database + // when no messages were fetched + // to avoid a database write on every poll interval + closeTransaction(&errEmptyBatch) + return + } + closeTransaction(&err) + }() if err = s.stmtLockConsumerGroup.Reset(); err != nil { return nil, err @@ -154,7 +163,6 @@ func (s *subscription) ReleaseLock() (err error) { return err } if ok { - // return errors.New("acknowledgement returned a result") return ErrMoreRowStepsThanExpected } return nil @@ -232,6 +240,9 @@ func (s *subscription) Run(ctx context.Context) { } continue } + if len(batch) == 0 { + continue // the lock is never set on empty batches + } for _, next := range batch { if err = s.Send(ctx, next); err != nil {