Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion DEVELOPMENT.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,17 @@ SQLite3 does not support querying `FOR UPDATE`, which is used for row locking wh

- [ ] Implement SQLite connection back off manager

A friend recommended implementing a back off manager. I think the SQLite `busy_timeout` produces a linear back off timeout. When attemping to write a row lock, SQLite will freeze the transaction until the previous one is complete up to the `busy_timeout` duration. This should prevent unneccessary waits due to polling. Perhaps this does not work like I imagine. Also, the ZombieZen variant uses immediate transactions, which may ignore the `busy_timeout`. This requires additional investigation before implementing. Implementation examples in other libraries to consider:
A friend recommended implementing a back off manager. I think the SQLite `busy_timeout` produces a linear back off timeout. When attemping to write a row lock, SQLite will freeze the transaction until the previous one is complete up to the `busy_timeout` duration. This should prevent unneccessary waits due to polling. Perhaps this does not work like I imagine. Also, the ZombieZen variant uses immediate transactions, which may ignore the `busy_timeout`. This requires additional investigation before implementing.

Here is an example attempt: https://github.com/sandpapersoftware/watermillsqlite

The busy waiting loop, that polls the next batches causes a modification to the database (sets the lock)
this causes tools like litestream to write wal files to their replicas every poll interval
this makes a restore incredibly slow and causes a constant drain on the cpu.

I wonder if a rollback in a batch == 0 case, wouldn't be enough to release the lock or you only set the lock, when a batch is > 0? Lock + read operation are in the same transaction; I think I can just cancel the transaction if the batch size is 0 to prevent the write.

Implementation examples in other libraries to consider:

- https://github.com/ThreeDotsLabs/watermill-sql/blob/master/pkg/sql/backoff_manager.go
- https://github.com/ov2b/watermill-sqlite3/blob/main/reset_latch_backoff_manager.go
Expand Down
4 changes: 3 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,12 @@ fmt:
(cd test && go fmt ./... && goimports -l -w .)

update_watermill:
(cd test && go get -u github.com/ThreeDotsLabs/watermill@latest && go mod tidy)
(cd wmsqlitemodernc && go get -u github.com/ThreeDotsLabs/watermill-sqlite/test@latest && go get -u github.com/ThreeDotsLabs/watermill@latest && go mod tidy)
(cd wmsqlitezombiezen && go get -u github.com/ThreeDotsLabs/watermill-sqlite/test@latest && go get -u github.com/ThreeDotsLabs/watermill@latest && go mod tidy)

sed -i '|^go 1\.|d' wmsqlitemodernc/go.mod wmsqlitezombiezen/go.mod
sed -i '|^go 1\.|d' test/go.mod wmsqlitemodernc/go.mod wmsqlitezombiezen/go.mod
(cd test && go mod edit -fmt)
(cd wmsqlitemodernc && go mod edit -fmt)
(cd wmsqlitezombiezen && go mod edit -fmt)

Expand Down
4 changes: 1 addition & 3 deletions go.work.sum
Original file line number Diff line number Diff line change
Expand Up @@ -764,7 +764,6 @@ github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/
github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.7/go.mod h1:n+brtR0CgQNWTVd5ZUFpTBC8YFBDLK/h/bpaJ8/DtOE=
github.com/google/go-cmp v0.5.8/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXiIaQGbYVAs8BPL6v8lEs=
Expand Down Expand Up @@ -852,6 +851,7 @@ github.com/kr/fs v0.1.0/go.mod h1:FFnZGqtBN9Gxj7eW1uZ42v5BccTP0vu6NEaFoC2HwRg=
github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
github.com/kr/pretty v0.3.0/go.mod h1:640gp4NfQd8pI5XOwp5fnNeVWj67G7CFk/SaSQn7NBk=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw=
github.com/ledongthuc/pdf v0.0.0-20220302134840-0c2507a12d80/go.mod h1:imJHygn/1yfhB7XSJJKlFZKl/J+dCPAknuiaGOshXAs=
github.com/lyft/protoc-gen-star v0.6.0/go.mod h1:TGAoBVkt8w7MPG72TrKIu85MIdXwDuzJYeZuUPFPNwA=
Expand Down Expand Up @@ -1201,7 +1201,6 @@ golang.org/x/sync v0.2.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.3.0/go.mod h1:FU7BRWz2tNW+3quACPkgCx/L+uEAv1htQ0V83Z9Rj+Y=
golang.org/x/sync v0.5.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sync v0.6.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sync v0.13.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA=
golang.org/x/sync v0.14.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
Expand Down Expand Up @@ -1349,7 +1348,6 @@ golang.org/x/text v0.8.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8=
golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8=
golang.org/x/text v0.10.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE=
golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE=
golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
golang.org/x/text v0.15.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
golang.org/x/text v0.16.0/go.mod h1:GhwF1Be+LQoKShO3cGOHzqOgRrGaYc9AvblQOmPVHnI=
golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
Expand Down
10 changes: 10 additions & 0 deletions wmsqlitemodernc/subscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,13 @@ func (s *subscription) NextBatch(ctx context.Context) (batch []rawMessage, err e
}
defer func() {
if err == nil {
if len(batch) == 0 {
// cancel writing the lock to the database
// when no messages were fetched
// to avoid a database write on every poll interval
err = tx.Rollback()
return
}
err = tx.Commit()
}
err = errors.Join(err, tx.Rollback())
Expand Down Expand Up @@ -185,6 +192,9 @@ func (s *subscription) Run(ctx context.Context) {
}
continue
}
if len(batch) == 0 {
continue // the lock is never set on empty batches
}

for _, next := range batch {
if err = s.Send(ctx, next); err != nil {
Expand Down
10 changes: 10 additions & 0 deletions wmsqlitezombiezen/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,12 @@ const (
// This can only happen if there is a mistake in the SQLite query. Can occur if more than one row is returned
// when one was expected or none were expected. You should never see this error.
ErrMoreRowStepsThanExpected

// ErrEmptyBatch is a sentinel error indicating the absence of new topic messages.
// This error is used to cancel the database transaction and the row lock
// when there are no new messages available. This prevents the subscriber
// from writing to the database every poll interval when the topic is idle.
ErrEmptyBatch
)

func (e Error) Error() string {
Expand All @@ -55,7 +61,11 @@ func (e Error) Error() string {
return "consumer group is already locked by another consumer"
case ErrMoreRowStepsThanExpected:
return "more rows returned than expected"
case ErrEmptyBatch:
return "there are no new messages"
default:
return "unknown error"
}
}

var errEmptyBatch error = ErrEmptyBatch // reference container for the error
15 changes: 13 additions & 2 deletions wmsqlitezombiezen/subscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,16 @@ func (s *subscription) NextBatch() (batch []rawMessage, err error) {
return nil, err
}
// closeTransaction := sqlitex.Transaction(s.Connection)
defer closeTransaction(&err)
defer func() {
if len(batch) == 0 && err == nil {
// cancel writing the lock to the database
// when no messages were fetched
// to avoid a database write on every poll interval
closeTransaction(&errEmptyBatch)
return
}
closeTransaction(&err)
}()

if err = s.stmtLockConsumerGroup.Reset(); err != nil {
return nil, err
Expand Down Expand Up @@ -154,7 +163,6 @@ func (s *subscription) ReleaseLock() (err error) {
return err
}
if ok {
// return errors.New("acknowledgement returned a result")
return ErrMoreRowStepsThanExpected
}
return nil
Expand Down Expand Up @@ -232,6 +240,9 @@ func (s *subscription) Run(ctx context.Context) {
}
continue
}
if len(batch) == 0 {
continue // the lock is never set on empty batches
}

for _, next := range batch {
if err = s.Send(ctx, next); err != nil {
Expand Down
Loading