diff --git a/historic_tx_pool.go b/historic_tx_pool.go new file mode 100644 index 0000000..4dfc595 --- /dev/null +++ b/historic_tx_pool.go @@ -0,0 +1,258 @@ +package sqlitebitmapstore + +import ( + "context" + "database/sql" + "errors" + "fmt" + "log/slog" + "sync" + + "github.com/Arkiv-Network/sqlite-bitmap-store/store" +) + +const MaxPoolCount int = 128 +const MaxTxCount int = 7 + +type HistoricTransaction struct { + tx *sql.Tx + pool *historicTransactionAtBlockPool + inUse bool +} + +func (h HistoricTransaction) ExecContext(ctx context.Context, query string, args ...any) (sql.Result, error) { + if !h.inUse { + return nil, fmt.Errorf("historic transaction has been returned to the pool") + } + return h.tx.ExecContext(ctx, query, args...) +} + +func (h HistoricTransaction) PrepareContext(ctx context.Context, query string) (*sql.Stmt, error) { + if !h.inUse { + return nil, fmt.Errorf("historic transaction has been returned to the pool") + } + return h.tx.PrepareContext(ctx, query) +} + +func (h HistoricTransaction) QueryContext(ctx context.Context, query string, args ...any) (*sql.Rows, error) { + if !h.inUse { + return nil, fmt.Errorf("historic transaction has been returned to the pool") + } + return h.tx.QueryContext(ctx, query, args...) +} + +func (h HistoricTransaction) QueryRowContext(ctx context.Context, query string, args ...any) *sql.Row { + // This calls QueryContext internally, so no need to check explicitly here whether + // the tx has been returned. + // In any case we also cannot construct an sql.Row struct here, since its fields + // are not exported. + return h.tx.QueryRowContext(ctx, query, args...) +} + +var _ store.DBTX = HistoricTransaction{} + +func (h *HistoricTransaction) rollback() error { + if err := h.tx.Rollback(); err != nil { + return fmt.Errorf("error rolling back historic transaction: %w", err) + } + return nil +} + +func (h *HistoricTransaction) Close() error { + return h.pool.returnTx(h) +} + +type historicTransactionAtBlockPool struct { + mu sync.Mutex + returned *sync.Cond + txs []*HistoricTransaction + closed bool + log *slog.Logger +} + +func (h *historicTransactionAtBlockPool) getTransaction(ctx context.Context) (*HistoricTransaction, error) { + h.returned.L.Lock() + defer h.returned.L.Unlock() + for { + select { + case <-ctx.Done(): + return nil, ctx.Err() + default: + } + + tx, err := func() (*HistoricTransaction, error) { + h.mu.Lock() + defer h.mu.Unlock() + + if h.closed { + return nil, fmt.Errorf("pool is closed") + } + + if len(h.txs) > 0 { + tx := h.txs[0] + h.txs = h.txs[1:] + tx.inUse = true + return tx, nil + } + + return nil, nil + }() + + if err != nil { + return nil, err + } + + if tx != nil { + return tx, nil + } + + h.returned.Wait() + } +} + +func (h *historicTransactionAtBlockPool) returnTx(tx *HistoricTransaction) error { + h.mu.Lock() + defer h.mu.Unlock() + + tx.inUse = false + + if h.closed { + err := tx.rollback() + if err != nil { + return err + } + } else { + // We always create a new HistoricTransaction so that we never set + // returned = false on a HistoricTransaction that we've ever given away + h.txs = append(h.txs, &HistoricTransaction{ + tx: tx.tx, + pool: h, + }) + h.returned.Signal() + } + + return nil +} + +func (h *historicTransactionAtBlockPool) Close() error { + h.mu.Lock() + defer h.mu.Unlock() + + errs := []error{} + for _, tx := range h.txs { + err := tx.rollback() + if err != nil { + errs = append(errs, err) + } + } + h.closed = true + return errors.Join(errs...) +} + +type HistoricTransactionPool struct { + readPool *sql.DB + // Protect the array below + mu sync.Mutex + pools []*historicTransactionAtBlockPool + + log *slog.Logger +} + +func NewHistoricTransactionPool(readPool *sql.DB, log *slog.Logger) *HistoricTransactionPool { + pools := make([]*historicTransactionAtBlockPool, 0, MaxPoolCount) + + return &HistoricTransactionPool{ + readPool: readPool, + pools: pools, + log: log, + } +} + +func (h *HistoricTransactionPool) newPoolAtBlock() (*historicTransactionAtBlockPool, error) { + txs := make([]*HistoricTransaction, 0, MaxTxCount) + pool := historicTransactionAtBlockPool{ + txs: txs, + log: h.log, + returned: sync.NewCond(&sync.Mutex{}), + } + + block := uint64(0) + for range MaxTxCount { + histTx, err := h.readPool.BeginTx(context.Background(), &sql.TxOptions{ + Isolation: sql.LevelSerializable, + ReadOnly: true, + }) + if err != nil { + return nil, fmt.Errorf("failed to start historic read transaction: %w", err) + } + + // We need to run a query in order for the transaction to actually refer to the + // correct end mark in the sqlite WAL. + store := store.New(histTx) + block, err = store.GetLastBlock(context.Background()) + if err != nil { + histTx.Rollback() + return nil, fmt.Errorf("failed to start historic read transaction: %w", err) + } + + pool.txs = append(pool.txs, &HistoricTransaction{ + tx: histTx, + pool: &pool, + }) + } + h.log.Info("created historic read transactions", "count", MaxTxCount, "at_block", block) + + return &pool, nil +} + +func (h *HistoricTransactionPool) GetTransaction(ctx context.Context, atBlock uint64) (*HistoricTransaction, error) { + lastBlock, err := store.New(h.readPool).GetLastBlock(ctx) + if err != nil { + return nil, err + } + + if atBlock > lastBlock { + return nil, fmt.Errorf("block in the future") + } + + h.mu.Lock() + defer h.mu.Unlock() + + numOfPools := len(h.pools) + if numOfPools == 0 { + return nil, fmt.Errorf("the historic transaction pool is empty") + } + + oldestAvailableBlock := lastBlock - (uint64(numOfPools) - 1) + + if atBlock < oldestAvailableBlock { + return nil, fmt.Errorf("requested block is no longer available, requested: %d, oldest available: %d", atBlock, oldestAvailableBlock) + } + + offset := lastBlock - atBlock + txIx := numOfPools - 1 - int(offset) + + return h.pools[txIx].getTransaction(ctx) +} + +func (h *HistoricTransactionPool) AddPoolAtBlock() error { + pool, err := h.newPoolAtBlock() + if err != nil { + return fmt.Errorf("failed to create historic read transaction pool: %w", err) + } + + h.mu.Lock() + defer h.mu.Unlock() + + for len(h.pools) >= MaxPoolCount { + oldPool := h.pools[0] + h.pools = h.pools[1:] + if err := oldPool.Close(); err != nil { + return fmt.Errorf("failed to close discarded historic read transaction pool: %w", err) + } + } + h.pools = append(h.pools, pool) + + h.log.Info("historic read transaction pool", "size", len(h.pools)) + return nil +} diff --git a/query_rpc.go b/query_rpc.go index 8cab39c..e659f22 100644 --- a/query_rpc.go +++ b/query_rpc.go @@ -113,21 +113,19 @@ func (s *SQLiteStore) QueryEntities( options *Options, ) (*QueryResponse, error) { - // TODO: wait for the block height - res := &QueryResponse{ Data: []json.RawMessage{}, - BlockNumber: 0, + BlockNumber: options.GetAtBlock(), Cursor: nil, } { - q := s.NewQueries() + store := store.New(s.readPool) timeoutCtx, cancel := context.WithTimeout(ctx, 3*time.Second) defer cancel() for { - lastBlock, err := q.GetLastBlock(ctx) + lastBlock, err := store.GetLastBlock(ctx) if err != nil { return nil, fmt.Errorf("error getting last block: %w", err) } @@ -136,7 +134,7 @@ func (s *SQLiteStore) QueryEntities( } select { case <-timeoutCtx.Done(): - return nil, fmt.Errorf("context cancelled: %w", ctx.Err()) + return nil, fmt.Errorf("timeout waiting for block %d, found block %d", options.GetAtBlock(), lastBlock) case <-time.After(100 * time.Millisecond): continue } @@ -149,7 +147,7 @@ func (s *SQLiteStore) QueryEntities( return nil, fmt.Errorf("error parsing query: %w", err) } - err = s.ReadTransaction(ctx, func(queries *store.Queries) error { + err = s.ReadTransaction(ctx, options.GetAtBlock(), func(queries *store.Queries) error { bitmap, err := q.Evaluate( ctx, diff --git a/sqlitestore.go b/sqlitestore.go index 609fee3..ddb8d3a 100644 --- a/sqlitestore.go +++ b/sqlitestore.go @@ -23,9 +23,10 @@ import ( ) type SQLiteStore struct { - writePool *sql.DB - readPool *sql.DB - log *slog.Logger + writePool *sql.DB + readPool *sql.DB + historicTxPool *HistoricTransactionPool + log *slog.Logger } func NewSQLiteStore( @@ -39,20 +40,20 @@ func NewSQLiteStore( return nil, fmt.Errorf("failed to create directory: %w", err) } - writeURL := fmt.Sprintf("file:%s?mode=rwc&_busy_timeout=11000&_journal_mode=WAL&_auto_vacuum=incremental&_foreign_keys=true&_txlock=immediate&_cache_size=65536", dbPath) + writeURL := fmt.Sprintf("file:%s?mode=rwc&_busy_timeout=11000&_journal_mode=WAL&_wal_autocheckpoint=0&_auto_vacuum=incremental&_foreign_keys=true&_txlock=immediate&_cache_size=65536", dbPath) writePool, err := sql.Open("sqlite3", writeURL) if err != nil { return nil, fmt.Errorf("failed to open write pool: %w", err) } - readURL := fmt.Sprintf("file:%s?_query_only=true&_busy_timeout=11000&_journal_mode=WAL&_auto_vacuum=incremental&_foreign_keys=true&_txlock=deferred&_cache_size=65536", dbPath) + readURL := fmt.Sprintf("file:%s?_query_only=true&_busy_timeout=11000&_journal_mode=WAL&_wal_autocheckpoint=0&_auto_vacuum=incremental&_txlock=deferred&_cache_size=65536", dbPath) readPool, err := sql.Open("sqlite3", readURL) if err != nil { return nil, fmt.Errorf("failed to open read pool: %w", err) } - readPool.SetMaxOpenConns(numberOfReadThreads) + //readPool.SetMaxOpenConns(numberOfReadThreads) readPool.SetMaxIdleConns(numberOfReadThreads) readPool.SetConnMaxLifetime(0) readPool.SetConnMaxIdleTime(0) @@ -64,7 +65,12 @@ func NewSQLiteStore( return nil, fmt.Errorf("failed to run migrations: %w", err) } - return &SQLiteStore{writePool: writePool, readPool: readPool, log: log}, nil + return &SQLiteStore{ + writePool: writePool, + readPool: readPool, + historicTxPool: NewHistoricTransactionPool(readPool, log), + log: log, + }, nil } func runMigrations(db *sql.DB) error { @@ -95,6 +101,7 @@ func (s *SQLiteStore) Close() error { } func (s *SQLiteStore) GetLastBlock(ctx context.Context) (uint64, error) { + // TODO shouldn't this use the read pool?? return store.New(s.writePool).GetLastBlock(ctx) } @@ -105,367 +112,365 @@ func (s *SQLiteStore) FollowEvents(ctx context.Context, iterator arkivevents.Bat return fmt.Errorf("failed to follow events: %w", batch.Error) } - err := func() error { + firstBlock := batch.Batch.Blocks[0].Number + lastBlock := batch.Batch.Blocks[len(batch.Batch.Blocks)-1].Number + s.log.Info("new batch", "firstBlock", firstBlock, "lastBlock", lastBlock) - tx, err := s.writePool.BeginTx(ctx, &sql.TxOptions{ - Isolation: sql.LevelSerializable, - ReadOnly: false, - }) + lastBlockFromDB, err := s.GetLastBlock(ctx) + if err != nil { + return fmt.Errorf("failed to get last block from database: %w", err) + } + + for _, block := range batch.Batch.Blocks { + if block.Number <= uint64(lastBlockFromDB) { + s.log.Info("skipping block", "block", block.Number, "lastBlockFromDB", lastBlockFromDB) + continue + } + err := s.processBlock(ctx, &block) if err != nil { - return fmt.Errorf("failed to begin transaction: %w", err) + return err } - defer tx.Rollback() + } + + } + + return nil +} - st := store.New(tx) +func (s *SQLiteStore) processBlock(ctx context.Context, block *events.Block) error { + updates := 0 + deletes := 0 + extends := 0 + creates := 0 + ownerChanges := 0 - firstBlock := batch.Batch.Blocks[0].Number - lastBlock := batch.Batch.Blocks[len(batch.Batch.Blocks)-1].Number - s.log.Info("new batch", "firstBlock", firstBlock, "lastBlock", lastBlock) + updatesMap := map[common.Hash][]*events.OPUpdate{} + + for _, operation := range block.Operations { + if operation.Update != nil { + currentUpdates := updatesMap[operation.Update.Key] + currentUpdates = append(currentUpdates, operation.Update) + updatesMap[operation.Update.Key] = currentUpdates + } + } - lastBlockFromDB, err := st.GetLastBlock(ctx) + tx, err := s.writePool.BeginTx(ctx, &sql.TxOptions{ + Isolation: sql.LevelSerializable, + ReadOnly: false, + }) + if err != nil { + return fmt.Errorf("failed to begin transaction: %w", err) + } + defer tx.Rollback() + defer func() { + if _, err := s.writePool.Exec("PRAGMA wal_checkpoint(PASSIVE)"); err != nil { + s.log.Warn("failed to checkpoint the WAL", "err", err) + } + }() + + st := store.New(tx) + + // blockNumber := block.Number + for _, operation := range block.Operations { + + switch { + + case operation.Create != nil: + // expiresAtBlock := blockNumber + operation.Create.BTL + creates++ + key := operation.Create.Key + + stringAttributes := maps.Clone(operation.Create.StringAttributes) + + stringAttributes["$owner"] = strings.ToLower(operation.Create.Owner.Hex()) + stringAttributes["$creator"] = strings.ToLower(operation.Create.Owner.Hex()) + stringAttributes["$key"] = strings.ToLower(key.Hex()) + + untilBlock := block.Number + operation.Create.BTL + numericAttributes := maps.Clone(operation.Create.NumericAttributes) + numericAttributes["$expiration"] = uint64(untilBlock) + numericAttributes["$createdAtBlock"] = uint64(block.Number) + numericAttributes["$lastModifiedAtBlock"] = uint64(block.Number) + + sequence := block.Number<<32 | operation.TxIndex<<16 | operation.OpIndex + numericAttributes["$sequence"] = sequence + numericAttributes["$txIndex"] = uint64(operation.TxIndex) + numericAttributes["$opIndex"] = uint64(operation.OpIndex) + + id, err := st.UpsertPayload( + ctx, + store.UpsertPayloadParams{ + EntityKey: operation.Create.Key.Bytes(), + Payload: operation.Create.Content, + ContentType: operation.Create.ContentType, + StringAttributes: store.NewStringAttributes(stringAttributes), + NumericAttributes: store.NewNumericAttributes(numericAttributes), + }, + ) if err != nil { - return fmt.Errorf("failed to get last block from database: %w", err) + return fmt.Errorf("failed to insert payload %s at block %d txIndex %d opIndex %d: %w", key.Hex(), block.Number, operation.TxIndex, operation.OpIndex, err) } - mainLoop: - for _, block := range batch.Batch.Blocks { + sbo := newStringBitmapOps(st) + + for k, v := range stringAttributes { + err = sbo.Add(ctx, k, v, id) + if err != nil { + return fmt.Errorf("failed to add string attribute value bitmap: %w", err) + } + } - updates := 0 - deletes := 0 - extends := 0 - creates := 0 - ownerChanges := 0 + nbo := newNumericBitmapOps(st) - if block.Number <= uint64(lastBlockFromDB) { - s.log.Info("skipping block", "block", block.Number, "lastBlockFromDB", lastBlockFromDB) - continue mainLoop + for k, v := range numericAttributes { + err = nbo.Add(ctx, k, v, id) + if err != nil { + return fmt.Errorf("failed to add numeric attribute value bitmap: %w", err) } + } + case operation.Update != nil: + updates++ - updatesMap := map[common.Hash][]*events.OPUpdate{} + updates := updatesMap[operation.Update.Key] + lastUpdate := updates[len(updates)-1] - for _, operation := range block.Operations { - if operation.Update != nil { - currentUpdates := updatesMap[operation.Update.Key] - currentUpdates = append(currentUpdates, operation.Update) - updatesMap[operation.Update.Key] = currentUpdates - } + if operation.Update != lastUpdate { + return nil + } + + key := operation.Update.Key.Bytes() + + latestPayload, err := st.GetPayloadForEntityKey(ctx, key) + if err != nil { + return fmt.Errorf("failed to get latest payload: %w", err) + } + + oldStringAttributes := latestPayload.StringAttributes + + oldNumericAttributes := latestPayload.NumericAttributes + + stringAttributes := maps.Clone(operation.Update.StringAttributes) + + stringAttributes["$owner"] = strings.ToLower(operation.Update.Owner.Hex()) + stringAttributes["$creator"] = oldStringAttributes.Values["$creator"] + stringAttributes["$key"] = strings.ToLower(operation.Update.Key.Hex()) + + untilBlock := block.Number + operation.Update.BTL + numericAttributes := maps.Clone(operation.Update.NumericAttributes) + numericAttributes["$expiration"] = uint64(untilBlock) + numericAttributes["$createdAtBlock"] = oldNumericAttributes.Values["$createdAtBlock"] + + numericAttributes["$sequence"] = oldNumericAttributes.Values["$sequence"] + numericAttributes["$txIndex"] = oldNumericAttributes.Values["$txIndex"] + numericAttributes["$opIndex"] = oldNumericAttributes.Values["$opIndex"] + numericAttributes["$lastModifiedAtBlock"] = uint64(block.Number) + + id, err := st.UpsertPayload( + ctx, + store.UpsertPayloadParams{ + EntityKey: key, + Payload: operation.Update.Content, + ContentType: operation.Update.ContentType, + StringAttributes: store.NewStringAttributes(stringAttributes), + NumericAttributes: store.NewNumericAttributes(numericAttributes), + }, + ) + if err != nil { + return fmt.Errorf("failed to insert payload 0x%x at block %d txIndex %d opIndex %d: %w", key, block.Number, operation.TxIndex, operation.OpIndex, err) + } + + sbo := newStringBitmapOps(st) + + for k, v := range oldStringAttributes.Values { + err = sbo.Remove(ctx, k, v, id) + if err != nil { + return fmt.Errorf("failed to remove string attribute value bitmap: %w", err) } + } - // blockNumber := block.Number - for _, operation := range block.Operations { - - switch { - - case operation.Create != nil: - // expiresAtBlock := blockNumber + operation.Create.BTL - creates++ - key := operation.Create.Key - - stringAttributes := maps.Clone(operation.Create.StringAttributes) - - stringAttributes["$owner"] = strings.ToLower(operation.Create.Owner.Hex()) - stringAttributes["$creator"] = strings.ToLower(operation.Create.Owner.Hex()) - stringAttributes["$key"] = strings.ToLower(key.Hex()) - - untilBlock := block.Number + operation.Create.BTL - numericAttributes := maps.Clone(operation.Create.NumericAttributes) - numericAttributes["$expiration"] = uint64(untilBlock) - numericAttributes["$createdAtBlock"] = uint64(block.Number) - numericAttributes["$lastModifiedAtBlock"] = uint64(block.Number) - - sequence := block.Number<<32 | operation.TxIndex<<16 | operation.OpIndex - numericAttributes["$sequence"] = sequence - numericAttributes["$txIndex"] = uint64(operation.TxIndex) - numericAttributes["$opIndex"] = uint64(operation.OpIndex) - - id, err := st.UpsertPayload( - ctx, - store.UpsertPayloadParams{ - EntityKey: operation.Create.Key.Bytes(), - Payload: operation.Create.Content, - ContentType: operation.Create.ContentType, - StringAttributes: store.NewStringAttributes(stringAttributes), - NumericAttributes: store.NewNumericAttributes(numericAttributes), - }, - ) - if err != nil { - return fmt.Errorf("failed to insert payload %s at block %d txIndex %d opIndex %d: %w", key.Hex(), block.Number, operation.TxIndex, operation.OpIndex, err) - } - - sbo := newStringBitmapOps(st) - - for k, v := range stringAttributes { - err = sbo.Add(ctx, k, v, id) - if err != nil { - return fmt.Errorf("failed to add string attribute value bitmap: %w", err) - } - } - - nbo := newNumericBitmapOps(st) - - for k, v := range numericAttributes { - err = nbo.Add(ctx, k, v, id) - if err != nil { - return fmt.Errorf("failed to add numeric attribute value bitmap: %w", err) - } - } - case operation.Update != nil: - updates++ - - updates := updatesMap[operation.Update.Key] - lastUpdate := updates[len(updates)-1] - - if operation.Update != lastUpdate { - continue mainLoop - } - - key := operation.Update.Key.Bytes() - - latestPayload, err := st.GetPayloadForEntityKey(ctx, key) - if err != nil { - return fmt.Errorf("failed to get latest payload: %w", err) - } - - oldStringAttributes := latestPayload.StringAttributes - - oldNumericAttributes := latestPayload.NumericAttributes - - stringAttributes := maps.Clone(operation.Update.StringAttributes) - - stringAttributes["$owner"] = strings.ToLower(operation.Update.Owner.Hex()) - stringAttributes["$creator"] = oldStringAttributes.Values["$creator"] - stringAttributes["$key"] = strings.ToLower(operation.Update.Key.Hex()) - - untilBlock := block.Number + operation.Update.BTL - numericAttributes := maps.Clone(operation.Update.NumericAttributes) - numericAttributes["$expiration"] = uint64(untilBlock) - numericAttributes["$createdAtBlock"] = oldNumericAttributes.Values["$createdAtBlock"] - - numericAttributes["$sequence"] = oldNumericAttributes.Values["$sequence"] - numericAttributes["$txIndex"] = oldNumericAttributes.Values["$txIndex"] - numericAttributes["$opIndex"] = oldNumericAttributes.Values["$opIndex"] - numericAttributes["$lastModifiedAtBlock"] = uint64(block.Number) - - id, err := st.UpsertPayload( - ctx, - store.UpsertPayloadParams{ - EntityKey: key, - Payload: operation.Update.Content, - ContentType: operation.Update.ContentType, - StringAttributes: store.NewStringAttributes(stringAttributes), - NumericAttributes: store.NewNumericAttributes(numericAttributes), - }, - ) - if err != nil { - return fmt.Errorf("failed to insert payload 0x%x at block %d txIndex %d opIndex %d: %w", key, block.Number, operation.TxIndex, operation.OpIndex, err) - } - - sbo := newStringBitmapOps(st) - - for k, v := range oldStringAttributes.Values { - err = sbo.Remove(ctx, k, v, id) - if err != nil { - return fmt.Errorf("failed to remove string attribute value bitmap: %w", err) - } - } - - nbo := newNumericBitmapOps(st) - - for k, v := range oldNumericAttributes.Values { - err = nbo.Remove(ctx, k, v, id) - if err != nil { - return fmt.Errorf("failed to remove numeric attribute value bitmap: %w", err) - } - } - - // TODO: delete entity from the indexes - - for k, v := range stringAttributes { - err = sbo.Add(ctx, k, v, id) - if err != nil { - return fmt.Errorf("failed to add string attribute value bitmap: %w", err) - } - } - - for k, v := range numericAttributes { - err = nbo.Add(ctx, k, v, id) - if err != nil { - return fmt.Errorf("failed to add numeric attribute value bitmap: %w", err) - } - } - - case operation.Delete != nil || operation.Expire != nil: - - deletes++ - var key []byte - if operation.Delete != nil { - key = common.Hash(*operation.Delete).Bytes() - } else { - key = common.Hash(*operation.Expire).Bytes() - } - - latestPayload, err := st.GetPayloadForEntityKey(ctx, key) - if err != nil { - return fmt.Errorf("failed to get latest payload: %w", err) - } - - oldStringAttributes := latestPayload.StringAttributes - - oldNumericAttributes := latestPayload.NumericAttributes - - sbo := newStringBitmapOps(st) - - for k, v := range oldStringAttributes.Values { - err = sbo.Remove(ctx, k, v, latestPayload.ID) - if err != nil { - return fmt.Errorf("failed to remove string attribute value bitmap: %w", err) - } - } - - nbo := newNumericBitmapOps(st) - - for k, v := range oldNumericAttributes.Values { - err = nbo.Remove(ctx, k, v, latestPayload.ID) - if err != nil { - return fmt.Errorf("failed to remove numeric attribute value bitmap: %w", err) - } - } - - err = st.DeletePayloadForEntityKey(ctx, key) - if err != nil { - return fmt.Errorf("failed to delete payload: %w", err) - } - - case operation.ExtendBTL != nil: - - extends++ - - key := operation.ExtendBTL.Key.Bytes() - - latestPayload, err := st.GetPayloadForEntityKey(ctx, key) - if err != nil { - return fmt.Errorf("failed to get latest payload: %w", err) - } - - oldNumericAttributes := latestPayload.NumericAttributes - - newToBlock := block.Number + operation.ExtendBTL.BTL - - numericAttributes := maps.Clone(oldNumericAttributes.Values) - numericAttributes["$expiration"] = uint64(newToBlock) - - oldExpiration := oldNumericAttributes.Values["$expiration"] - - id, err := st.UpsertPayload(ctx, store.UpsertPayloadParams{ - EntityKey: key, - Payload: latestPayload.Payload, - ContentType: latestPayload.ContentType, - StringAttributes: latestPayload.StringAttributes, - NumericAttributes: store.NewNumericAttributes(numericAttributes), - }) - if err != nil { - return fmt.Errorf("failed to insert payload at block %d txIndex %d opIndex %d: %w", block.Number, operation.TxIndex, operation.OpIndex, err) - } - - nbo := newNumericBitmapOps(st) + nbo := newNumericBitmapOps(st) - err = nbo.Remove(ctx, "$expiration", oldExpiration, id) - if err != nil { - return fmt.Errorf("failed to remove numeric attribute value bitmap: %w", err) - } - - err = nbo.Add(ctx, "$expiration", newToBlock, id) - if err != nil { - return fmt.Errorf("failed to add numeric attribute value bitmap: %w", err) - } - - case operation.ChangeOwner != nil: - ownerChanges++ - key := operation.ChangeOwner.Key.Bytes() - - latestPayload, err := st.GetPayloadForEntityKey(ctx, key) - if err != nil { - return fmt.Errorf("failed to get latest payload: %w", err) - } + for k, v := range oldNumericAttributes.Values { + err = nbo.Remove(ctx, k, v, id) + if err != nil { + return fmt.Errorf("failed to remove numeric attribute value bitmap: %w", err) + } + } - stringAttributes := latestPayload.StringAttributes + for k, v := range stringAttributes { + err = sbo.Add(ctx, k, v, id) + if err != nil { + return fmt.Errorf("failed to add string attribute value bitmap: %w", err) + } + } - oldOwner := stringAttributes.Values["$owner"] + for k, v := range numericAttributes { + err = nbo.Add(ctx, k, v, id) + if err != nil { + return fmt.Errorf("failed to add numeric attribute value bitmap: %w", err) + } + } + + case operation.Delete != nil || operation.Expire != nil: - newOwner := strings.ToLower(operation.ChangeOwner.Owner.Hex()) + deletes++ + var key []byte + if operation.Delete != nil { + key = common.Hash(*operation.Delete).Bytes() + } else { + key = common.Hash(*operation.Expire).Bytes() + } - stringAttributes.Values["$owner"] = newOwner + latestPayload, err := st.GetPayloadForEntityKey(ctx, key) + if err != nil { + return fmt.Errorf("failed to get latest payload: %w", err) + } - id, err := st.UpsertPayload( - ctx, - store.UpsertPayloadParams{ - EntityKey: key, - Payload: latestPayload.Payload, - ContentType: latestPayload.ContentType, - StringAttributes: stringAttributes, - NumericAttributes: latestPayload.NumericAttributes, - }, - ) - if err != nil { - return fmt.Errorf("failed to insert payload at block %d txIndex %d opIndex %d: %w", block.Number, operation.TxIndex, operation.OpIndex, err) - } + oldStringAttributes := latestPayload.StringAttributes - sbo := newStringBitmapOps(st) + oldNumericAttributes := latestPayload.NumericAttributes - err = sbo.Remove(ctx, "$owner", oldOwner, id) - if err != nil { - return fmt.Errorf("failed to remove string attribute value bitmap for owner: %w", err) - } + sbo := newStringBitmapOps(st) - err = sbo.Add(ctx, "$owner", newOwner, id) - if err != nil { - return fmt.Errorf("failed to add string attribute value bitmap for owner: %w", err) - } + for k, v := range oldStringAttributes.Values { + err = sbo.Remove(ctx, k, v, latestPayload.ID) + if err != nil { + return fmt.Errorf("failed to remove string attribute value bitmap: %w", err) + } + } - default: - return fmt.Errorf("unknown operation: %v", operation) - } + nbo := newNumericBitmapOps(st) + for k, v := range oldNumericAttributes.Values { + err = nbo.Remove(ctx, k, v, latestPayload.ID) + if err != nil { + return fmt.Errorf("failed to remove numeric attribute value bitmap: %w", err) } + } + + err = st.DeletePayloadForEntityKey(ctx, key) + if err != nil { + return fmt.Errorf("failed to delete payload: %w", err) + } + + case operation.ExtendBTL != nil: - s.log.Info("block updated", "block", block.Number, "creates", creates, "updates", updates, "deletes", deletes, "extends", extends, "ownerChanges", ownerChanges) + extends++ + key := operation.ExtendBTL.Key.Bytes() + + latestPayload, err := st.GetPayloadForEntityKey(ctx, key) + if err != nil { + return fmt.Errorf("failed to get latest payload: %w", err) } - err = st.UpsertLastBlock(ctx, lastBlock) + oldNumericAttributes := latestPayload.NumericAttributes + + newToBlock := block.Number + operation.ExtendBTL.BTL + + numericAttributes := maps.Clone(oldNumericAttributes.Values) + numericAttributes["$expiration"] = uint64(newToBlock) + + oldExpiration := oldNumericAttributes.Values["$expiration"] + + id, err := st.UpsertPayload(ctx, store.UpsertPayloadParams{ + EntityKey: key, + Payload: latestPayload.Payload, + ContentType: latestPayload.ContentType, + StringAttributes: latestPayload.StringAttributes, + NumericAttributes: store.NewNumericAttributes(numericAttributes), + }) if err != nil { - return fmt.Errorf("failed to upsert last block: %w", err) + return fmt.Errorf("failed to insert payload at block %d txIndex %d opIndex %d: %w", block.Number, operation.TxIndex, operation.OpIndex, err) } - err = tx.Commit() + nbo := newNumericBitmapOps(st) + + err = nbo.Remove(ctx, "$expiration", oldExpiration, id) if err != nil { - return fmt.Errorf("failed to commit transaction: %w", err) + return fmt.Errorf("failed to remove numeric attribute value bitmap: %w", err) } - return nil - }() - if err != nil { - return err + err = nbo.Add(ctx, "$expiration", newToBlock, id) + if err != nil { + return fmt.Errorf("failed to add numeric attribute value bitmap: %w", err) + } + + case operation.ChangeOwner != nil: + ownerChanges++ + key := operation.ChangeOwner.Key.Bytes() + + latestPayload, err := st.GetPayloadForEntityKey(ctx, key) + if err != nil { + return fmt.Errorf("failed to get latest payload: %w", err) + } + + stringAttributes := latestPayload.StringAttributes + + oldOwner := stringAttributes.Values["$owner"] + + newOwner := strings.ToLower(operation.ChangeOwner.Owner.Hex()) + + stringAttributes.Values["$owner"] = newOwner + + id, err := st.UpsertPayload( + ctx, + store.UpsertPayloadParams{ + EntityKey: key, + Payload: latestPayload.Payload, + ContentType: latestPayload.ContentType, + StringAttributes: stringAttributes, + NumericAttributes: latestPayload.NumericAttributes, + }, + ) + if err != nil { + return fmt.Errorf("failed to insert payload at block %d txIndex %d opIndex %d: %w", block.Number, operation.TxIndex, operation.OpIndex, err) + } + + sbo := newStringBitmapOps(st) + + err = sbo.Remove(ctx, "$owner", oldOwner, id) + if err != nil { + return fmt.Errorf("failed to remove string attribute value bitmap for owner: %w", err) + } + + err = sbo.Add(ctx, "$owner", newOwner, id) + if err != nil { + return fmt.Errorf("failed to add string attribute value bitmap for owner: %w", err) + } + + default: + return fmt.Errorf("unknown operation: %v", operation) } + } - return nil -} + s.log.Info("block updated", "block", block.Number, "creates", creates, "updates", updates, "deletes", deletes, "extends", extends, "ownerChanges", ownerChanges) -func (s *SQLiteStore) NewQueries() *store.Queries { - return store.New(s.readPool) -} + err = st.UpsertLastBlock(ctx, block.Number) + if err != nil { + return fmt.Errorf("failed to upsert last block: %w", err) + } -func (s *SQLiteStore) ReadTransaction(ctx context.Context, fn func(q *store.Queries) error) error { - tx, err := s.readPool.BeginTx(ctx, &sql.TxOptions{ - ReadOnly: true, - }) + err = tx.Commit() if err != nil { - return fmt.Errorf("failed to begin transaction: %w", err) + return fmt.Errorf("failed to commit transaction: %w", err) } - defer tx.Rollback() - st := store.New(tx) + if err := s.historicTxPool.AddPoolAtBlock(); err != nil { + return fmt.Errorf("error adding historic read transaction: %w", err) + } + + return nil +} + +func (s *SQLiteStore) ReadTransaction(ctx context.Context, atBlock uint64, fn func(q *store.Queries) error) error { + tx, err := s.historicTxPool.GetTransaction(ctx, atBlock) + if err != nil { + return fmt.Errorf("error obtaining historic read transaction: %w", err) + } + // TODO handle errors + defer tx.Close() - return fn(st) + return fn(store.New(tx)) }