diff --git a/.gitignore b/.gitignore index fa632b8..add4b09 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,5 @@ build .envrc -*.txt \ No newline at end of file +*.txt +.env diff --git a/internal/data/metrics.sql.go b/internal/data/metrics.sql.go index a827bc4..9172f2d 100644 --- a/internal/data/metrics.sql.go +++ b/internal/data/metrics.sql.go @@ -222,6 +222,24 @@ func (q *Queries) CreateProposerDuties(ctx context.Context, arg CreateProposerDu return err } +const createSlotStatus = `-- name: CreateSlotStatus :exec +INSERT INTO slot_status ( + slot, + status +) +VALUES ($1, $2) +` + +type CreateSlotStatusParams struct { + Slot int64 + Status SlotStatusVal +} + +func (q *Queries) CreateSlotStatus(ctx context.Context, arg CreateSlotStatusParams) error { + _, err := q.db.Exec(ctx, createSlotStatus, arg.Slot, arg.Status) + return err +} + const createTransactionSubmittedEvent = `-- name: CreateTransactionSubmittedEvent :exec INSERT into transaction_submitted_event ( event_block_hash, @@ -476,6 +494,24 @@ func (q *Queries) QueryDecryptionKeysAndMessage(ctx context.Context, slot int64) return items, nil } +const queryLatestSlotStatus = `-- name: QueryLatestSlotStatus :one +SELECT slot, status FROM slot_status +ORDER BY slot DESC +LIMIT 1 +` + +type QueryLatestSlotStatusRow struct { + Slot int64 + Status SlotStatusVal +} + +func (q *Queries) QueryLatestSlotStatus(ctx context.Context) (QueryLatestSlotStatusRow, error) { + row := q.db.QueryRow(ctx, queryLatestSlotStatus) + var i QueryLatestSlotStatusRow + err := row.Scan(&i.Slot, &i.Status) + return i, err +} + const queryTransactionSubmittedEvent = `-- name: QueryTransactionSubmittedEvent :many SELECT id, event_block_hash, event_block_number, event_tx_index, event_log_index, eon, tx_index, identity_prefix, sender, encrypted_transaction, created_at, updated_at, event_tx_hash FROM transaction_submitted_event WHERE eon = $1 AND tx_index >= $2 AND tx_index < $2 + $3 ORDER BY tx_index ASC diff --git a/internal/data/models.sqlc.gen.go b/internal/data/models.sqlc.gen.go index b1c7852..f974c39 100644 --- a/internal/data/models.sqlc.gen.go +++ b/internal/data/models.sqlc.gen.go @@ -11,6 +11,48 @@ import ( "github.com/jackc/pgx/v5/pgtype" ) +type SlotStatusVal string + +const ( + SlotStatusValProposed SlotStatusVal = "proposed" + SlotStatusValMissed SlotStatusVal = "missed" +) + +func (e *SlotStatusVal) Scan(src interface{}) error { + switch s := src.(type) { + case []byte: + *e = SlotStatusVal(s) + case string: + *e = SlotStatusVal(s) + default: + return fmt.Errorf("unsupported scan type for SlotStatusVal: %T", src) + } + return nil +} + +type NullSlotStatusVal struct { + SlotStatusVal SlotStatusVal + Valid bool // Valid is true if SlotStatusVal is not NULL +} + +// Scan implements the Scanner interface. +func (ns *NullSlotStatusVal) Scan(value interface{}) error { + if value == nil { + ns.SlotStatusVal, ns.Valid = "", false + return nil + } + ns.Valid = true + return ns.SlotStatusVal.Scan(value) +} + +// Value implements the driver Valuer interface. +func (ns NullSlotStatusVal) Value() (driver.Value, error) { + if !ns.Valid { + return nil, nil + } + return string(ns.SlotStatusVal), nil +} + type TxStatusVal string const ( @@ -168,6 +210,12 @@ type ProposerDuty struct { UpdatedAt pgtype.Timestamptz } +type SlotStatus struct { + Slot int64 + Status SlotStatusVal + CreatedAt pgtype.Timestamptz +} + type TransactionSubmittedEvent struct { ID int64 EventBlockHash []byte @@ -191,7 +239,6 @@ type TransactionSubmittedEventsSyncedUntil struct { } type ValidatorGraffiti struct { - ID int64 ValidatorIndex pgtype.Int8 Graffiti string BlockNumber int64 diff --git a/internal/data/sql/queries/metrics.sql b/internal/data/sql/queries/metrics.sql index c905f24..efb3b37 100644 --- a/internal/data/sql/queries/metrics.sql +++ b/internal/data/sql/queries/metrics.sql @@ -208,3 +208,14 @@ WITH upserted AS ( ) SELECT EXISTS (SELECT 1 FROM upserted) AS did_upsert; +-- name: CreateSlotStatus :exec +INSERT INTO slot_status ( + slot, + status +) +VALUES ($1, $2); + +-- name: QueryLatestSlotStatus :one +SELECT slot, status FROM slot_status +ORDER BY slot DESC +LIMIT 1; diff --git a/internal/metrics/tx_mapper_db.go b/internal/metrics/tx_mapper_db.go index d77c4df..94c1b62 100644 --- a/internal/metrics/tx_mapper_db.go +++ b/internal/metrics/tx_mapper_db.go @@ -206,6 +206,42 @@ func (tm *TxMapperDB) AddBlock( return err } +func (tm *TxMapperDB) AddSlotStatus( + ctx context.Context, + slot int64, + status data.SlotStatusVal, +) error { + latest, err := tm.dbQuery.QueryLatestSlotStatus(ctx) + if err != nil { + if errors.Is(err, pgx.ErrNoRows) { + return tm.dbQuery.CreateSlotStatus(ctx, data.CreateSlotStatusParams{ + Slot: slot, + Status: status, + }) + } + return err + } + + if slot <= latest.Slot { + return nil + } + + for s := latest.Slot + 1; s < slot; s++ { + err := tm.dbQuery.CreateSlotStatus(ctx, data.CreateSlotStatusParams{ + Slot: s, + Status: data.SlotStatusValMissed, + }) + if err != nil { + return err + } + } + + return tm.dbQuery.CreateSlotStatus(ctx, data.CreateSlotStatusParams{ + Slot: slot, + Status: status, + }) +} + func (tm *TxMapperDB) QueryBlockNumberFromValidatorRegistryEventsSyncedUntil(ctx context.Context) (int64, error) { data, err := tm.dbQuery.QueryValidatorRegistryEventsSyncedUntil(ctx) if err != nil { diff --git a/internal/metrics/types.go b/internal/metrics/types.go index ad94f58..ec0d461 100644 --- a/internal/metrics/types.go +++ b/internal/metrics/types.go @@ -61,6 +61,7 @@ type TxMapper interface { ctx context.Context, b *data.Block, ) error + AddSlotStatus(ctx context.Context, slot int64, status data.SlotStatusVal) error QueryBlockNumberFromValidatorRegistryEventsSyncedUntil(ctx context.Context) (int64, error) AddValidatorRegistryEvent(ctx context.Context, tx pgx.Tx, vr *validatorRegistryBindings.ValidatorregistryUpdated) error UpdateValidatorStatus(ctx context.Context) error diff --git a/internal/watcher/blocks.go b/internal/watcher/blocks.go index 2357239..45dada8 100644 --- a/internal/watcher/blocks.go +++ b/internal/watcher/blocks.go @@ -147,16 +147,25 @@ func (bw *BlocksWatcher) insertBlock(ctx context.Context, header *types.Header) bw.mostRecentBlock = header.Number.Uint64() } + slot := int64(utils.GetSlotForBlock(header.Time, GenesisTimestamp, SlotDuration)) + err := bw.txMapper.AddBlock(ctx, &data.Block{ BlockHash: header.Hash().Bytes(), BlockNumber: header.Number.Int64(), BlockTimestamp: int64(header.Time), - Slot: int64(utils.GetSlotForBlock(header.Time, GenesisTimestamp, SlotDuration)), + Slot: slot, }) if err != nil { log.Err(err).Msg("err adding block") + return err } - return err + + err = bw.txMapper.AddSlotStatus(ctx, slot, data.SlotStatusValProposed) + if err != nil { + log.Err(err).Msg("err adding slot status") + return err + } + return nil } func (bw *BlocksWatcher) clearOldBlocks(latestHeader *types.Header) { diff --git a/internal/watcher/blocks_test.go b/internal/watcher/blocks_test.go index edd12c0..e59f94b 100644 --- a/internal/watcher/blocks_test.go +++ b/internal/watcher/blocks_test.go @@ -127,6 +127,10 @@ func (m *mockTxMapper) AddBlock(ctx context.Context, b *data.Block) error { panic("unexpected call to AddBlock") } +func (m *mockTxMapper) AddSlotStatus(ctx context.Context, slot int64, status data.SlotStatusVal) error { + panic("unexpected call to AddSlotStatus") +} + func (m *mockTxMapper) QueryBlockNumberFromValidatorRegistryEventsSyncedUntil(ctx context.Context) (int64, error) { panic("unexpected call to QueryBlockNumberFromValidatorRegistryEventsSyncedUntil") } diff --git a/migrations/20260116055945_add_slot_status_table.sql b/migrations/20260116055945_add_slot_status_table.sql new file mode 100644 index 0000000..75c2962 --- /dev/null +++ b/migrations/20260116055945_add_slot_status_table.sql @@ -0,0 +1,21 @@ +-- +goose Up +-- +goose StatementBegin +CREATE TYPE slot_status_val AS ENUM +( + 'proposed', + 'missed' +); + +CREATE TABLE IF NOT EXISTS slot_status +( + slot BIGINT PRIMARY KEY, + status slot_status_val NOT NULL, + created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW() NOT NULL +); +-- +goose StatementEnd + +-- +goose Down +-- +goose StatementBegin +DROP TABLE slot_status; +DROP TYPE slot_status_val; +-- +goose StatementEnd