From cec20a1677f069425d54893e49075eec47f04cfc Mon Sep 17 00:00:00 2001 From: r-vdp Date: Fri, 16 Jan 2026 17:55:32 +0100 Subject: [PATCH] Bring back bi-temporality --- historic_tx_pool.go | 283 +++++++++++++++++++++++++++++++++++ pusher/push_iterator_test.go | 46 ++++-- query_rpc.go | 12 +- sqlitestore.go | 184 +++++++++++++---------- sqlitestore_test.go | 62 ++++++-- 5 files changed, 474 insertions(+), 113 deletions(-) create mode 100644 historic_tx_pool.go diff --git a/historic_tx_pool.go b/historic_tx_pool.go new file mode 100644 index 0000000..5723d32 --- /dev/null +++ b/historic_tx_pool.go @@ -0,0 +1,283 @@ +package sqlitebitmapstore + +import ( + "context" + "database/sql" + "errors" + "fmt" + "log/slog" + "sync" + + "github.com/Arkiv-Network/sqlite-bitmap-store/store" +) + +const MaxPoolCount int = 128 +const MaxTxCount int = 7 + +type HistoricTransaction struct { + tx *sql.Tx + pool *historicTransactionAtBlockPool + inUse bool +} + +func (h HistoricTransaction) AtBlock() uint64 { + return h.pool.block +} + +func (h HistoricTransaction) ExecContext(ctx context.Context, query string, args ...any) (sql.Result, error) { + if !h.inUse { + return nil, fmt.Errorf("historic transaction has been returned to the pool") + } + return h.tx.ExecContext(ctx, query, args...) +} + +func (h HistoricTransaction) PrepareContext(ctx context.Context, query string) (*sql.Stmt, error) { + if !h.inUse { + return nil, fmt.Errorf("historic transaction has been returned to the pool") + } + return h.tx.PrepareContext(ctx, query) +} + +func (h HistoricTransaction) QueryContext(ctx context.Context, query string, args ...any) (*sql.Rows, error) { + if !h.inUse { + return nil, fmt.Errorf("historic transaction has been returned to the pool") + } + return h.tx.QueryContext(ctx, query, args...) +} + +func (h HistoricTransaction) QueryRowContext(ctx context.Context, query string, args ...any) *sql.Row { + // This calls QueryContext internally, so no need to check explicitly here whether + // the tx has been returned. + // In any case we also cannot construct an sql.Row struct here, since its fields + // are not exported. + return h.tx.QueryRowContext(ctx, query, args...) +} + +var _ store.DBTX = HistoricTransaction{} + +func (h *HistoricTransaction) rollback() error { + if err := h.tx.Rollback(); err != nil && !errors.Is(err, sql.ErrTxDone) { + return fmt.Errorf("error rolling back historic transaction: %w", err) + } + return nil +} + +func (h *HistoricTransaction) Close() error { + return h.pool.returnTx(h) +} + +type historicTransactionAtBlockPool struct { + block uint64 + mu sync.Mutex + returned *sync.Cond + txs []*HistoricTransaction + closed bool + log *slog.Logger +} + +func (h *historicTransactionAtBlockPool) getTransaction(ctx context.Context) (*HistoricTransaction, error) { + h.returned.L.Lock() + defer h.returned.L.Unlock() + for { + select { + case <-ctx.Done(): + return nil, ctx.Err() + default: + } + + tx, err := func() (*HistoricTransaction, error) { + h.mu.Lock() + defer h.mu.Unlock() + + if h.closed { + return nil, fmt.Errorf("pool is closed") + } + + if len(h.txs) > 0 { + tx := h.txs[0] + h.txs = h.txs[1:] + tx.inUse = true + return tx, nil + } + + return nil, nil + }() + + if err != nil { + return nil, err + } + + if tx != nil { + return tx, nil + } + + h.returned.Wait() + } +} + +func (h *historicTransactionAtBlockPool) returnTx(tx *HistoricTransaction) error { + h.mu.Lock() + defer h.mu.Unlock() + + tx.inUse = false + + if h.closed { + if err := tx.rollback(); err != nil && !errors.Is(err, sql.ErrTxDone) { + return err + } + } else { + // We always create a new HistoricTransaction so that we never set + // returned = false on a HistoricTransaction that we've ever given away + h.txs = append(h.txs, &HistoricTransaction{ + tx: tx.tx, + pool: h, + }) + h.returned.Signal() + } + + return nil +} + +func (h *historicTransactionAtBlockPool) Close() error { + h.mu.Lock() + defer h.mu.Unlock() + + errs := []error{} + for _, tx := range h.txs { + if err := tx.rollback(); err != nil && !errors.Is(err, sql.ErrTxDone) { + errs = append(errs, err) + } + } + h.closed = true + return errors.Join(errs...) +} + +type HistoricTransactionPool struct { + readPool *sql.DB + + // Protect the array below + mu sync.Mutex + pools []*historicTransactionAtBlockPool + + log *slog.Logger +} + +func NewHistoricTransactionPool(readPool *sql.DB, log *slog.Logger) *HistoricTransactionPool { + pools := make([]*historicTransactionAtBlockPool, 0, MaxPoolCount) + + return &HistoricTransactionPool{ + readPool: readPool, + pools: pools, + log: log, + } +} + +func (h *HistoricTransactionPool) newPoolAtBlock(block uint64) (*historicTransactionAtBlockPool, error) { + txs := make([]*HistoricTransaction, 0, MaxTxCount) + pool := historicTransactionAtBlockPool{ + block: block, + txs: txs, + log: h.log, + returned: sync.NewCond(&sync.Mutex{}), + } + + b := uint64(0) + for range MaxTxCount { + histTx, err := h.readPool.BeginTx(context.Background(), &sql.TxOptions{ + Isolation: sql.LevelSerializable, + ReadOnly: true, + }) + if err != nil { + return nil, fmt.Errorf("failed to start historic read transaction: %w", err) + } + + // We need to run a query in order for the transaction to actually refer to the + // correct end mark in the sqlite WAL. + store := store.New(histTx) + b, err = store.GetLastBlock(context.Background()) + if err != nil { + return nil, errors.Join( + fmt.Errorf("failed to start historic read transaction: %w", err), + histTx.Rollback(), + pool.Close(), + ) + } + if b != block { + return nil, errors.Join( + fmt.Errorf("failed to start tx at right block, got %d, expected %d", b, block), + histTx.Rollback(), + pool.Close(), + ) + } + + pool.txs = append(pool.txs, &HistoricTransaction{ + tx: histTx, + pool: &pool, + }) + } + h.log.Info("created historic read transactions", "count", MaxTxCount, "at_block", b) + + return &pool, nil +} + +func (h *HistoricTransactionPool) GetTransaction(ctx context.Context, atBlock uint64) (*HistoricTransaction, error) { + h.mu.Lock() + defer h.mu.Unlock() + + lastBlock, err := store.New(h.readPool).GetLastBlock(ctx) + if err != nil { + return nil, err + } + + if atBlock > lastBlock { + return nil, fmt.Errorf("block in the future") + } + + numOfPools := len(h.pools) + if numOfPools == 0 { + return nil, fmt.Errorf("the historic transaction pool is empty") + } + + oldestAvailableBlock := lastBlock - (uint64(numOfPools) - 1) + + if atBlock < oldestAvailableBlock { + return nil, fmt.Errorf("requested block is no longer available, requested: %d, oldest available: %d", atBlock, oldestAvailableBlock) + } + + offset := lastBlock - atBlock + txIx := numOfPools - 1 - int(offset) + + return h.pools[txIx].getTransaction(ctx) +} + +// CommitTxAndCreatePoolAtBlock commits the given transaction and creates a new +// pool of read transactions. It's important to do both at once while locking +// the HistoricTransactionPool to ensure that no other go-routine can get +// a transaction from the pool after the transaction was committed but before +// the new pool of read transactions for the new block has been added. +func (h *HistoricTransactionPool) CommitTxAndCreatePoolAtBlock(block uint64, tx *sql.Tx) error { + h.mu.Lock() + defer h.mu.Unlock() + + err := tx.Commit() + if err != nil { + return fmt.Errorf("failed to commit transaction: %w", err) + } + + pool, err := h.newPoolAtBlock(block) + if err != nil { + return fmt.Errorf("failed to create historic read transaction pool: %w", err) + } + + for len(h.pools) >= MaxPoolCount { + oldPool := h.pools[0] + h.pools = h.pools[1:] + if err := oldPool.Close(); err != nil { + return fmt.Errorf("failed to close discarded historic read transaction pool: %w", err) + } + } + h.pools = append(h.pools, pool) + + h.log.Info("historic read transaction pool", "size", len(h.pools)) + return nil +} diff --git a/pusher/push_iterator_test.go b/pusher/push_iterator_test.go index 83e9404..4151a4f 100644 --- a/pusher/push_iterator_test.go +++ b/pusher/push_iterator_test.go @@ -98,7 +98,7 @@ var _ = Describe("PushIterator", func() { var payload []byte var contentType string - err = sqlStore.ReadTransaction(ctx, func(q *store.Queries) error { + err = sqlStore.ReadTransaction(ctx, lastBlock, func(q *store.Queries) error { row, err := q.GetPayloadForEntityKey(ctx, key.Bytes()) if err != nil { return err @@ -173,7 +173,7 @@ var _ = Describe("PushIterator", func() { Expect(err).NotTo(HaveOccurred()) Expect(lastBlock).To(Equal(uint64(101))) - err = sqlStore.ReadTransaction(ctx, func(q *store.Queries) error { + err = sqlStore.ReadTransaction(ctx, lastBlock, func(q *store.Queries) error { row1, err := q.GetPayloadForEntityKey(ctx, key1.Bytes()) if err != nil { return err @@ -262,7 +262,10 @@ var _ = Describe("PushIterator", func() { err = sqlStore.FollowEvents(ctx, arkivevents.BatchIterator(updateIterator.Iterator())) Expect(err).NotTo(HaveOccurred()) - err = sqlStore.ReadTransaction(ctx, func(q *store.Queries) error { + lastBlock, err := sqlStore.GetLastBlock(ctx) + Expect(err).NotTo(HaveOccurred()) + + err = sqlStore.ReadTransaction(ctx, lastBlock, func(q *store.Queries) error { row, err := q.GetPayloadForEntityKey(ctx, key.Bytes()) if err != nil { return err @@ -340,7 +343,10 @@ var _ = Describe("PushIterator", func() { err = sqlStore.FollowEvents(ctx, arkivevents.BatchIterator(deleteIterator.Iterator())) Expect(err).NotTo(HaveOccurred()) - err = sqlStore.ReadTransaction(ctx, func(q *store.Queries) error { + lastBlock, err := sqlStore.GetLastBlock(ctx) + Expect(err).NotTo(HaveOccurred()) + + err = sqlStore.ReadTransaction(ctx, lastBlock, func(q *store.Queries) error { _, err := q.GetPayloadForEntityKey(ctx, key.Bytes()) Expect(err).To(HaveOccurred()) return nil @@ -386,8 +392,11 @@ var _ = Describe("PushIterator", func() { err := sqlStore.FollowEvents(ctx, arkivevents.BatchIterator(iterator.Iterator())) Expect(err).NotTo(HaveOccurred()) + lastBlock, err := sqlStore.GetLastBlock(ctx) + Expect(err).NotTo(HaveOccurred()) + var originalExpiration uint64 - err = sqlStore.ReadTransaction(ctx, func(q *store.Queries) error { + err = sqlStore.ReadTransaction(ctx, lastBlock, func(q *store.Queries) error { row, err := q.GetPayloadForEntityKey(ctx, key.Bytes()) if err != nil { return err @@ -427,7 +436,10 @@ var _ = Describe("PushIterator", func() { err = sqlStore.FollowEvents(ctx, arkivevents.BatchIterator(extendIterator.Iterator())) Expect(err).NotTo(HaveOccurred()) - err = sqlStore.ReadTransaction(ctx, func(q *store.Queries) error { + lastBlock, err = sqlStore.GetLastBlock(ctx) + Expect(err).NotTo(HaveOccurred()) + + err = sqlStore.ReadTransaction(ctx, lastBlock, func(q *store.Queries) error { row, err := q.GetPayloadForEntityKey(ctx, key.Bytes()) if err != nil { return err @@ -478,7 +490,10 @@ var _ = Describe("PushIterator", func() { err := sqlStore.FollowEvents(ctx, arkivevents.BatchIterator(iterator.Iterator())) Expect(err).NotTo(HaveOccurred()) - err = sqlStore.ReadTransaction(ctx, func(q *store.Queries) error { + lastBlock, err := sqlStore.GetLastBlock(ctx) + Expect(err).NotTo(HaveOccurred()) + + err = sqlStore.ReadTransaction(ctx, lastBlock, func(q *store.Queries) error { row, err := q.GetPayloadForEntityKey(ctx, key.Bytes()) if err != nil { return err @@ -517,7 +532,10 @@ var _ = Describe("PushIterator", func() { err = sqlStore.FollowEvents(ctx, arkivevents.BatchIterator(changeOwnerIterator.Iterator())) Expect(err).NotTo(HaveOccurred()) - err = sqlStore.ReadTransaction(ctx, func(q *store.Queries) error { + lastBlock, err = sqlStore.GetLastBlock(ctx) + Expect(err).NotTo(HaveOccurred()) + + err = sqlStore.ReadTransaction(ctx, lastBlock, func(q *store.Queries) error { row, err := q.GetPayloadForEntityKey(ctx, key.Bytes()) if err != nil { return err @@ -596,7 +614,7 @@ var _ = Describe("PushIterator", func() { Expect(err).NotTo(HaveOccurred()) Expect(lastBlock).To(Equal(uint64(101))) - err = sqlStore.ReadTransaction(ctx, func(q *store.Queries) error { + err = sqlStore.ReadTransaction(ctx, lastBlock, func(q *store.Queries) error { row1, err := q.GetPayloadForEntityKey(ctx, key1.Bytes()) if err != nil { return err @@ -685,7 +703,10 @@ var _ = Describe("PushIterator", func() { err = sqlStore.FollowEvents(ctx, arkivevents.BatchIterator(replayIterator.Iterator())) Expect(err).NotTo(HaveOccurred()) - err = sqlStore.ReadTransaction(ctx, func(q *store.Queries) error { + lastBlock, err := sqlStore.GetLastBlock(ctx) + Expect(err).NotTo(HaveOccurred()) + + err = sqlStore.ReadTransaction(ctx, lastBlock, func(q *store.Queries) error { row, err := q.GetPayloadForEntityKey(ctx, key.Bytes()) if err != nil { return err @@ -734,7 +755,10 @@ var _ = Describe("PushIterator", func() { err := sqlStore.FollowEvents(ctx, arkivevents.BatchIterator(iterator.Iterator())) Expect(err).NotTo(HaveOccurred()) - err = sqlStore.ReadTransaction(ctx, func(q *store.Queries) error { + lastBlock, err := sqlStore.GetLastBlock(ctx) + Expect(err).NotTo(HaveOccurred()) + + err = sqlStore.ReadTransaction(ctx, lastBlock, func(q *store.Queries) error { row, err := q.GetPayloadForEntityKey(ctx, key.Bytes()) if err != nil { return err diff --git a/query_rpc.go b/query_rpc.go index 8cab39c..f92f8d1 100644 --- a/query_rpc.go +++ b/query_rpc.go @@ -113,21 +113,19 @@ func (s *SQLiteStore) QueryEntities( options *Options, ) (*QueryResponse, error) { - // TODO: wait for the block height - res := &QueryResponse{ Data: []json.RawMessage{}, - BlockNumber: 0, + BlockNumber: options.GetAtBlock(), Cursor: nil, } { - q := s.NewQueries() + store := store.New(s.readPool) timeoutCtx, cancel := context.WithTimeout(ctx, 3*time.Second) defer cancel() for { - lastBlock, err := q.GetLastBlock(ctx) + lastBlock, err := store.GetLastBlock(ctx) if err != nil { return nil, fmt.Errorf("error getting last block: %w", err) } @@ -136,7 +134,7 @@ func (s *SQLiteStore) QueryEntities( } select { case <-timeoutCtx.Done(): - return nil, fmt.Errorf("context cancelled: %w", ctx.Err()) + return nil, fmt.Errorf("timeout waiting for block %d, found block %d, %w", options.GetAtBlock(), lastBlock, timeoutCtx.Err()) case <-time.After(100 * time.Millisecond): continue } @@ -149,7 +147,7 @@ func (s *SQLiteStore) QueryEntities( return nil, fmt.Errorf("error parsing query: %w", err) } - err = s.ReadTransaction(ctx, func(queries *store.Queries) error { + err = s.ReadTransaction(ctx, options.GetAtBlock(), func(queries *store.Queries) error { bitmap, err := q.Evaluate( ctx, diff --git a/sqlitestore.go b/sqlitestore.go index 8834eaf..3a1e0d5 100644 --- a/sqlitestore.go +++ b/sqlitestore.go @@ -24,9 +24,10 @@ import ( ) type SQLiteStore struct { - writePool *sql.DB - readPool *sql.DB - log *slog.Logger + writePool *sql.DB + readPool *sql.DB + historicTxPool *HistoricTransactionPool + log *slog.Logger } func NewSQLiteStore( @@ -40,20 +41,47 @@ func NewSQLiteStore( return nil, fmt.Errorf("failed to create directory: %w", err) } - writeURL := fmt.Sprintf("file:%s?mode=rwc&_busy_timeout=11000&_journal_mode=WAL&_auto_vacuum=incremental&_foreign_keys=true&_txlock=immediate&_cache_size=65536", dbPath) + writeURL := strings.Join( + []string{ + "file:", + dbPath, + "?mode=rwc", + "&_busy_timeout=11000", + "&_journal_mode=WAL", + "&_wal_autocheckpoint=0", + "&_auto_vacuum=incremental", + "&_foreign_keys=true", + "&_txlock=immediate", + "&_cache_size=65536", + }, + "", + ) writePool, err := sql.Open("sqlite3", writeURL) if err != nil { return nil, fmt.Errorf("failed to open write pool: %w", err) } - readURL := fmt.Sprintf("file:%s?_query_only=true&_busy_timeout=11000&_journal_mode=WAL&_auto_vacuum=incremental&_foreign_keys=true&_txlock=deferred&_cache_size=65536", dbPath) + readURL := strings.Join( + []string{ + "file:", + dbPath, + "?_query_only=true", + "&_busy_timeout=11000", + "&_journal_mode=WAL", + "&_wal_autocheckpoint=0", + "&_auto_vacuum=incremental", + "&_foreign_keys=true", + "&_txlock=deferred", + "&_cache_size=65536", + }, + "", + ) readPool, err := sql.Open("sqlite3", readURL) if err != nil { return nil, fmt.Errorf("failed to open read pool: %w", err) } - readPool.SetMaxOpenConns(numberOfReadThreads) readPool.SetMaxIdleConns(numberOfReadThreads) readPool.SetConnMaxLifetime(0) readPool.SetConnMaxIdleTime(0) @@ -65,7 +93,12 @@ func NewSQLiteStore( return nil, fmt.Errorf("failed to run migrations: %w", err) } - return &SQLiteStore{writePool: writePool, readPool: readPool, log: log}, nil + return &SQLiteStore{ + writePool: writePool, + readPool: readPool, + historicTxPool: NewHistoricTransactionPool(readPool, log), + log: log, + }, nil } func runMigrations(db *sql.DB) error { @@ -96,6 +129,7 @@ func (s *SQLiteStore) Close() error { } func (s *SQLiteStore) GetLastBlock(ctx context.Context) (uint64, error) { + // TODO shouldn't this use the read pool?? return store.New(s.writePool).GetLastBlock(ctx) } @@ -112,34 +146,44 @@ func (s *SQLiteStore) FollowEvents(ctx context.Context, iterator arkivevents.Bat totalExtends := 0 totalOwnerChanges := 0 - err := func() error { + firstBlock := batch.Batch.Blocks[0].Number + lastBlock := batch.Batch.Blocks[len(batch.Batch.Blocks)-1].Number + s.log.Info("new batch", "firstBlock", firstBlock, "lastBlock", lastBlock) - tx, err := s.writePool.BeginTx(ctx, &sql.TxOptions{ - Isolation: sql.LevelSerializable, - ReadOnly: false, - }) - if err != nil { - return fmt.Errorf("failed to begin transaction: %w", err) - } - defer tx.Rollback() - - st := store.New(tx) + lastBlockFromDB, err := s.GetLastBlock(ctx) + if err != nil { + return fmt.Errorf("failed to get last block from database: %w", err) + } - firstBlock := batch.Batch.Blocks[0].Number - lastBlock := batch.Batch.Blocks[len(batch.Batch.Blocks)-1].Number - s.log.Info("new batch", "firstBlock", firstBlock, "lastBlock", lastBlock) + startTime := time.Now() - lastBlockFromDB, err := st.GetLastBlock(ctx) - if err != nil { - return fmt.Errorf("failed to get last block from database: %w", err) + mainLoop: + for _, block := range batch.Batch.Blocks { + if block.Number <= uint64(lastBlockFromDB) { + s.log.Info("skipping block", "block", block.Number, "lastBlockFromDB", lastBlockFromDB) + continue mainLoop } - cache := newBitmapCache(st) + err := func() (err error) { + tx, err := s.writePool.BeginTx(ctx, &sql.TxOptions{ + Isolation: sql.LevelSerializable, + ReadOnly: false, + }) + if err != nil { + return fmt.Errorf("failed to begin transaction: %w", err) + } + defer func() { + var e1 error + if e := tx.Rollback(); e != nil && !errors.Is(e, sql.ErrTxDone) { + e1 = e + } + _, e2 := s.writePool.Exec("PRAGMA wal_checkpoint(PASSIVE)") + err = errors.Join(err, e1, e2) + }() - startTime := time.Now() + st := store.New(tx) - mainLoop: - for _, block := range batch.Batch.Blocks { + cache := newBitmapCache(st) updates := 0 deletes := 0 @@ -147,11 +191,6 @@ func (s *SQLiteStore) FollowEvents(ctx context.Context, iterator arkivevents.Bat creates := 0 ownerChanges := 0 - if block.Number <= uint64(lastBlockFromDB) { - s.log.Info("skipping block", "block", block.Number, "lastBlockFromDB", lastBlockFromDB) - continue mainLoop - } - updatesMap := map[common.Hash][]*events.OPUpdate{} for _, operation := range block.Operations { @@ -162,14 +201,12 @@ func (s *SQLiteStore) FollowEvents(ctx context.Context, iterator arkivevents.Bat } } - // blockNumber := block.Number operationLoop: for _, operation := range block.Operations { switch { case operation.Create != nil: - // expiresAtBlock := blockNumber + operation.Create.BTL creates++ key := operation.Create.Key @@ -295,8 +332,6 @@ func (s *SQLiteStore) FollowEvents(ctx context.Context, iterator arkivevents.Bat } } - // TODO: delete entity from the indexes - for k, v := range stringAttributes { err = cache.AddToStringBitmap(ctx, k, v, id) if err != nil { @@ -455,66 +490,51 @@ func (s *SQLiteStore) FollowEvents(ctx context.Context, iterator arkivevents.Bat totalDeletes += deletes totalExtends += extends totalOwnerChanges += ownerChanges - } - err = st.UpsertLastBlock(ctx, lastBlock) - if err != nil { - return fmt.Errorf("failed to upsert last block: %w", err) - } + err = cache.Flush(ctx) + if err != nil { + return fmt.Errorf("failed to flush bitmap cache: %w", err) + } - err = cache.Flush(ctx) - if err != nil { - return fmt.Errorf("failed to flush bitmap cache: %w", err) - } + err = st.UpsertLastBlock(ctx, block.Number) + if err != nil { + return fmt.Errorf("failed to upsert last block: %w", err) + } + + if err := s.historicTxPool.CommitTxAndCreatePoolAtBlock(block.Number, tx); err != nil { + return fmt.Errorf("error adding historic read transaction: %w", err) + } + + return + }() - err = tx.Commit() if err != nil { - return fmt.Errorf("failed to commit transaction: %w", err) + return fmt.Errorf("failed to process block: %w", err) } + } - s.log.Info("batch processed", "firstBlock", firstBlock, "lastBlock", lastBlock, "processingTime", time.Since(startTime).Milliseconds(), "creates", totalCreates, "updates", totalUpdates, "deletes", totalDeletes, "extends", totalExtends, "ownerChanges", totalOwnerChanges) + s.log.Info("batch processed", "firstBlock", firstBlock, "lastBlock", lastBlock, "processingTime", time.Since(startTime).Milliseconds(), "creates", totalCreates, "updates", totalUpdates, "deletes", totalDeletes, "extends", totalExtends, "ownerChanges", totalOwnerChanges) - return nil - }() - if err != nil { - return err - } } return nil } -func (s *SQLiteStore) NewQueries() *store.Queries { - return store.New(s.readPool) -} - -func (s *SQLiteStore) ReadTransaction(ctx context.Context, fn func(q *store.Queries) error) error { - tx, err := s.readPool.BeginTx(ctx, &sql.TxOptions{ - ReadOnly: true, - }) +func (s *SQLiteStore) ReadTransaction(ctx context.Context, atBlock uint64, fn func(q *store.Queries) error) (err error) { + tx, err := s.historicTxPool.GetTransaction(ctx, atBlock) if err != nil { - return fmt.Errorf("failed to begin transaction: %w", err) + return fmt.Errorf("failed to obtain a read transaction: %w", err) + } + if tx.AtBlock() != atBlock { + return fmt.Errorf("failed to obtain a read transaction for the right block, got %d, expected %d", tx.AtBlock(), atBlock) } - defer tx.Rollback() - - st := store.New(tx) - return fn(st) -} + defer func() { + err = errors.Join(err, tx.Close()) + }() -func (s *SQLiteStore) GetNumberOfEntities(ctx context.Context) (numberOfEntities uint64, err error) { - err = s.ReadTransaction(ctx, func(q *store.Queries) error { - ni, err := q.GetNumberOfEntities(ctx) - if err != nil { - return fmt.Errorf("failed to get number of entities: %w", err) - } - numberOfEntities = uint64(ni) - return nil - }) - - if err != nil { - return 0, err - } + st := store.New(tx) - return numberOfEntities, nil + err = fn(st) + return } diff --git a/sqlitestore_test.go b/sqlitestore_test.go index b7e947d..0e936a4 100644 --- a/sqlitestore_test.go +++ b/sqlitestore_test.go @@ -124,7 +124,7 @@ var _ = Describe("SQLiteStore", func() { Expect(err).NotTo(HaveOccurred()) Expect(lastBlock).To(Equal(uint64(101))) - err = sqlStore.ReadTransaction(ctx, func(q *store.Queries) error { + err = sqlStore.ReadTransaction(ctx, lastBlock, func(q *store.Queries) error { // Query by string attribute: type = "document" docBitmap, err := q.EvaluateStringAttributeValueEqual(ctx, store.EvaluateStringAttributeValueEqualParams{ Name: "type", @@ -305,8 +305,11 @@ var _ = Describe("SQLiteStore", func() { err = sqlStore.FollowEvents(ctx, arkivevents.BatchIterator(updateIterator.Iterator())) Expect(err).NotTo(HaveOccurred()) + lastBlock, err := sqlStore.GetLastBlock(ctx) + Expect(err).NotTo(HaveOccurred()) + // Verify the update - err = sqlStore.ReadTransaction(ctx, func(q *store.Queries) error { + err = sqlStore.ReadTransaction(ctx, lastBlock, func(q *store.Queries) error { row, err := q.GetPayloadForEntityKey(ctx, key.Bytes()) Expect(err).NotTo(HaveOccurred()) @@ -383,8 +386,11 @@ var _ = Describe("SQLiteStore", func() { err := sqlStore.FollowEvents(ctx, arkivevents.BatchIterator(createIterator.Iterator())) Expect(err).NotTo(HaveOccurred()) + lastBlock, err := sqlStore.GetLastBlock(ctx) + Expect(err).NotTo(HaveOccurred()) + // Verify entity exists - err = sqlStore.ReadTransaction(ctx, func(q *store.Queries) error { + err = sqlStore.ReadTransaction(ctx, lastBlock, func(q *store.Queries) error { _, err := q.GetPayloadForEntityKey(ctx, key.Bytes()) Expect(err).NotTo(HaveOccurred()) return nil @@ -418,8 +424,11 @@ var _ = Describe("SQLiteStore", func() { err = sqlStore.FollowEvents(ctx, arkivevents.BatchIterator(deleteIterator.Iterator())) Expect(err).NotTo(HaveOccurred()) + lastBlock, err = sqlStore.GetLastBlock(ctx) + Expect(err).NotTo(HaveOccurred()) + // Verify entity is deleted - err = sqlStore.ReadTransaction(ctx, func(q *store.Queries) error { + err = sqlStore.ReadTransaction(ctx, lastBlock, func(q *store.Queries) error { _, err := q.GetPayloadForEntityKey(ctx, key.Bytes()) Expect(err).To(HaveOccurred()) @@ -504,8 +513,11 @@ var _ = Describe("SQLiteStore", func() { err = sqlStore.FollowEvents(ctx, arkivevents.BatchIterator(expireIterator.Iterator())) Expect(err).NotTo(HaveOccurred()) + lastBlock, err := sqlStore.GetLastBlock(ctx) + Expect(err).NotTo(HaveOccurred()) + // Verify entity is expired (deleted) - err = sqlStore.ReadTransaction(ctx, func(q *store.Queries) error { + err = sqlStore.ReadTransaction(ctx, lastBlock, func(q *store.Queries) error { _, err := q.GetPayloadForEntityKey(ctx, key.Bytes()) Expect(err).To(HaveOccurred()) return nil @@ -553,9 +565,12 @@ var _ = Describe("SQLiteStore", func() { err := sqlStore.FollowEvents(ctx, arkivevents.BatchIterator(createIterator.Iterator())) Expect(err).NotTo(HaveOccurred()) + lastBlock, err := sqlStore.GetLastBlock(ctx) + Expect(err).NotTo(HaveOccurred()) + // Verify original expiration var originalExpiration uint64 - err = sqlStore.ReadTransaction(ctx, func(q *store.Queries) error { + err = sqlStore.ReadTransaction(ctx, lastBlock, func(q *store.Queries) error { row, err := q.GetPayloadForEntityKey(ctx, key.Bytes()) Expect(err).NotTo(HaveOccurred()) originalExpiration = row.NumericAttributes.Values["$expiration"] @@ -593,8 +608,11 @@ var _ = Describe("SQLiteStore", func() { err = sqlStore.FollowEvents(ctx, arkivevents.BatchIterator(extendIterator.Iterator())) Expect(err).NotTo(HaveOccurred()) + lastBlock, err = sqlStore.GetLastBlock(ctx) + Expect(err).NotTo(HaveOccurred()) + // Verify extended expiration - err = sqlStore.ReadTransaction(ctx, func(q *store.Queries) error { + err = sqlStore.ReadTransaction(ctx, lastBlock, func(q *store.Queries) error { row, err := q.GetPayloadForEntityKey(ctx, key.Bytes()) Expect(err).NotTo(HaveOccurred()) newExpiration := row.NumericAttributes.Values["$expiration"] @@ -662,8 +680,11 @@ var _ = Describe("SQLiteStore", func() { err := sqlStore.FollowEvents(ctx, arkivevents.BatchIterator(createIterator.Iterator())) Expect(err).NotTo(HaveOccurred()) + lastBlock, err := sqlStore.GetLastBlock(ctx) + Expect(err).NotTo(HaveOccurred()) + // Verify original owner - err = sqlStore.ReadTransaction(ctx, func(q *store.Queries) error { + err = sqlStore.ReadTransaction(ctx, lastBlock, func(q *store.Queries) error { row, err := q.GetPayloadForEntityKey(ctx, key.Bytes()) Expect(err).NotTo(HaveOccurred()) Expect(row.StringAttributes.Values["$owner"]).To(Equal(strings.ToLower(originalOwner.Hex()))) @@ -701,8 +722,11 @@ var _ = Describe("SQLiteStore", func() { err = sqlStore.FollowEvents(ctx, arkivevents.BatchIterator(changeOwnerIterator.Iterator())) Expect(err).NotTo(HaveOccurred()) + lastBlock, err = sqlStore.GetLastBlock(ctx) + Expect(err).NotTo(HaveOccurred()) + // Verify new owner and creator preserved - err = sqlStore.ReadTransaction(ctx, func(q *store.Queries) error { + err = sqlStore.ReadTransaction(ctx, lastBlock, func(q *store.Queries) error { row, err := q.GetPayloadForEntityKey(ctx, key.Bytes()) Expect(err).NotTo(HaveOccurred()) Expect(row.StringAttributes.Values["$owner"]).To(Equal(strings.ToLower(newOwner.Hex()))) @@ -813,8 +837,11 @@ var _ = Describe("SQLiteStore", func() { err = sqlStore.FollowEvents(ctx, arkivevents.BatchIterator(updateIterator.Iterator())) Expect(err).NotTo(HaveOccurred()) + lastBlock, err := sqlStore.GetLastBlock(ctx) + Expect(err).NotTo(HaveOccurred()) + // Verify the update was applied - err = sqlStore.ReadTransaction(ctx, func(q *store.Queries) error { + err = sqlStore.ReadTransaction(ctx, lastBlock, func(q *store.Queries) error { row, err := q.GetPayloadForEntityKey(ctx, key.Bytes()) Expect(err).NotTo(HaveOccurred()) Expect(row.Payload).To(Equal([]byte("final update"))) @@ -923,9 +950,12 @@ var _ = Describe("SQLiteStore", func() { err = sqlStore.FollowEvents(ctx, arkivevents.BatchIterator(updateIterator.Iterator())) Expect(err).NotTo(HaveOccurred()) + lastBlock, err := sqlStore.GetLastBlock(ctx) + Expect(err).NotTo(HaveOccurred()) + // With `continue operationLoop`, non-last updates are skipped but processing // continues to the next operation. The last update for the key is applied. - err = sqlStore.ReadTransaction(ctx, func(q *store.Queries) error { + err = sqlStore.ReadTransaction(ctx, lastBlock, func(q *store.Queries) error { row, err := q.GetPayloadForEntityKey(ctx, key.Bytes()) Expect(err).NotTo(HaveOccurred()) // The last update (second one) should be applied @@ -1011,8 +1041,11 @@ var _ = Describe("SQLiteStore", func() { err = sqlStore.FollowEvents(ctx, arkivevents.BatchIterator(replayIterator.Iterator())) Expect(err).NotTo(HaveOccurred()) + lastBlock, err := sqlStore.GetLastBlock(ctx) + Expect(err).NotTo(HaveOccurred()) + // Verify original content is preserved - err = sqlStore.ReadTransaction(ctx, func(q *store.Queries) error { + err = sqlStore.ReadTransaction(ctx, lastBlock, func(q *store.Queries) error { row, err := q.GetPayloadForEntityKey(ctx, key.Bytes()) Expect(err).NotTo(HaveOccurred()) Expect(row.Payload).To(Equal([]byte("original"))) @@ -1076,7 +1109,10 @@ var _ = Describe("SQLiteStore", func() { err := sqlStore.FollowEvents(ctx, arkivevents.BatchIterator(iterator.Iterator())) Expect(err).NotTo(HaveOccurred()) - err = sqlStore.ReadTransaction(ctx, func(q *store.Queries) error { + lastBlock, err := sqlStore.GetLastBlock(ctx) + Expect(err).NotTo(HaveOccurred()) + + err = sqlStore.ReadTransaction(ctx, lastBlock, func(q *store.Queries) error { row, err := q.GetPayloadForEntityKey(ctx, key.Bytes()) Expect(err).NotTo(HaveOccurred())