Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
258 changes: 258 additions & 0 deletions historic_tx_pool.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,258 @@
package sqlitebitmapstore

import (
"context"
"database/sql"
"errors"
"fmt"
"log/slog"
"sync"

"github.com/Arkiv-Network/sqlite-bitmap-store/store"
)

const MaxPoolCount int = 128
const MaxTxCount int = 7

type HistoricTransaction struct {
tx *sql.Tx
pool *historicTransactionAtBlockPool
inUse bool
}

func (h HistoricTransaction) ExecContext(ctx context.Context, query string, args ...any) (sql.Result, error) {
if !h.inUse {
return nil, fmt.Errorf("historic transaction has been returned to the pool")
}
return h.tx.ExecContext(ctx, query, args...)
}

func (h HistoricTransaction) PrepareContext(ctx context.Context, query string) (*sql.Stmt, error) {
if !h.inUse {
return nil, fmt.Errorf("historic transaction has been returned to the pool")
}
return h.tx.PrepareContext(ctx, query)
}

func (h HistoricTransaction) QueryContext(ctx context.Context, query string, args ...any) (*sql.Rows, error) {
if !h.inUse {
return nil, fmt.Errorf("historic transaction has been returned to the pool")
}
return h.tx.QueryContext(ctx, query, args...)
}

func (h HistoricTransaction) QueryRowContext(ctx context.Context, query string, args ...any) *sql.Row {
// This calls QueryContext internally, so no need to check explicitly here whether
// the tx has been returned.
// In any case we also cannot construct an sql.Row struct here, since its fields
// are not exported.
return h.tx.QueryRowContext(ctx, query, args...)
}

var _ store.DBTX = HistoricTransaction{}

func (h *HistoricTransaction) rollback() error {
if err := h.tx.Rollback(); err != nil {
return fmt.Errorf("error rolling back historic transaction: %w", err)
}
return nil
}

func (h *HistoricTransaction) Close() error {
return h.pool.returnTx(h)
}

type historicTransactionAtBlockPool struct {
mu sync.Mutex
returned *sync.Cond
txs []*HistoricTransaction
closed bool
log *slog.Logger
}

func (h *historicTransactionAtBlockPool) getTransaction(ctx context.Context) (*HistoricTransaction, error) {
h.returned.L.Lock()
defer h.returned.L.Unlock()
for {
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
}

tx, err := func() (*HistoricTransaction, error) {
h.mu.Lock()
defer h.mu.Unlock()

if h.closed {
return nil, fmt.Errorf("pool is closed")
}

if len(h.txs) > 0 {
tx := h.txs[0]
h.txs = h.txs[1:]
tx.inUse = true
return tx, nil
}

return nil, nil
}()

if err != nil {
return nil, err
}

if tx != nil {
return tx, nil
}

h.returned.Wait()
}
}

func (h *historicTransactionAtBlockPool) returnTx(tx *HistoricTransaction) error {
h.mu.Lock()
defer h.mu.Unlock()

tx.inUse = false

if h.closed {
err := tx.rollback()
if err != nil {
return err
}
} else {
// We always create a new HistoricTransaction so that we never set
// returned = false on a HistoricTransaction that we've ever given away
h.txs = append(h.txs, &HistoricTransaction{
tx: tx.tx,
pool: h,
})
h.returned.Signal()
}

return nil
}

func (h *historicTransactionAtBlockPool) Close() error {
h.mu.Lock()
defer h.mu.Unlock()

errs := []error{}
for _, tx := range h.txs {
err := tx.rollback()
if err != nil {
errs = append(errs, err)
}
}
h.closed = true
return errors.Join(errs...)
}

type HistoricTransactionPool struct {
readPool *sql.DB
// Protect the array below
mu sync.Mutex
pools []*historicTransactionAtBlockPool

log *slog.Logger
}

func NewHistoricTransactionPool(readPool *sql.DB, log *slog.Logger) *HistoricTransactionPool {
pools := make([]*historicTransactionAtBlockPool, 0, MaxPoolCount)

return &HistoricTransactionPool{
readPool: readPool,
pools: pools,
log: log,
}
}

func (h *HistoricTransactionPool) newPoolAtBlock() (*historicTransactionAtBlockPool, error) {
txs := make([]*HistoricTransaction, 0, MaxTxCount)
pool := historicTransactionAtBlockPool{
txs: txs,
log: h.log,
returned: sync.NewCond(&sync.Mutex{}),
}

block := uint64(0)
for range MaxTxCount {
histTx, err := h.readPool.BeginTx(context.Background(), &sql.TxOptions{
Isolation: sql.LevelSerializable,
ReadOnly: true,
})
if err != nil {
return nil, fmt.Errorf("failed to start historic read transaction: %w", err)
}

// We need to run a query in order for the transaction to actually refer to the
// correct end mark in the sqlite WAL.
store := store.New(histTx)
block, err = store.GetLastBlock(context.Background())
if err != nil {
histTx.Rollback()
return nil, fmt.Errorf("failed to start historic read transaction: %w", err)
}

pool.txs = append(pool.txs, &HistoricTransaction{
tx: histTx,
pool: &pool,
})
}
h.log.Info("created historic read transactions", "count", MaxTxCount, "at_block", block)

return &pool, nil
}

func (h *HistoricTransactionPool) GetTransaction(ctx context.Context, atBlock uint64) (*HistoricTransaction, error) {
lastBlock, err := store.New(h.readPool).GetLastBlock(ctx)
if err != nil {
return nil, err
}

if atBlock > lastBlock {
return nil, fmt.Errorf("block in the future")
}

h.mu.Lock()
defer h.mu.Unlock()

numOfPools := len(h.pools)
if numOfPools == 0 {
return nil, fmt.Errorf("the historic transaction pool is empty")
}

oldestAvailableBlock := lastBlock - (uint64(numOfPools) - 1)

if atBlock < oldestAvailableBlock {
return nil, fmt.Errorf("requested block is no longer available, requested: %d, oldest available: %d", atBlock, oldestAvailableBlock)
}

offset := lastBlock - atBlock
txIx := numOfPools - 1 - int(offset)

return h.pools[txIx].getTransaction(ctx)
}

func (h *HistoricTransactionPool) AddPoolAtBlock() error {
pool, err := h.newPoolAtBlock()
if err != nil {
return fmt.Errorf("failed to create historic read transaction pool: %w", err)
}

h.mu.Lock()
defer h.mu.Unlock()

for len(h.pools) >= MaxPoolCount {
oldPool := h.pools[0]
h.pools = h.pools[1:]
if err := oldPool.Close(); err != nil {
return fmt.Errorf("failed to close discarded historic read transaction pool: %w", err)
}
}
h.pools = append(h.pools, pool)

h.log.Info("historic read transaction pool", "size", len(h.pools))
return nil
}
12 changes: 5 additions & 7 deletions query_rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,21 +113,19 @@ func (s *SQLiteStore) QueryEntities(
options *Options,
) (*QueryResponse, error) {

// TODO: wait for the block height

res := &QueryResponse{
Data: []json.RawMessage{},
BlockNumber: 0,
BlockNumber: options.GetAtBlock(),
Cursor: nil,
}

{
q := s.NewQueries()
store := store.New(s.readPool)
timeoutCtx, cancel := context.WithTimeout(ctx, 3*time.Second)
defer cancel()

for {
lastBlock, err := q.GetLastBlock(ctx)
lastBlock, err := store.GetLastBlock(ctx)
if err != nil {
return nil, fmt.Errorf("error getting last block: %w", err)
}
Expand All @@ -136,7 +134,7 @@ func (s *SQLiteStore) QueryEntities(
}
select {
case <-timeoutCtx.Done():
return nil, fmt.Errorf("context cancelled: %w", ctx.Err())
return nil, fmt.Errorf("timeout waiting for block %d, found block %d", options.GetAtBlock(), lastBlock)
case <-time.After(100 * time.Millisecond):
continue
}
Expand All @@ -149,7 +147,7 @@ func (s *SQLiteStore) QueryEntities(
return nil, fmt.Errorf("error parsing query: %w", err)
}

err = s.ReadTransaction(ctx, func(queries *store.Queries) error {
err = s.ReadTransaction(ctx, options.GetAtBlock(), func(queries *store.Queries) error {

bitmap, err := q.Evaluate(
ctx,
Expand Down
Loading