Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
build
.envrc

*.txt
*.txt
.env
36 changes: 36 additions & 0 deletions internal/data/metrics.sql.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

49 changes: 48 additions & 1 deletion internal/data/models.sqlc.gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 11 additions & 0 deletions internal/data/sql/queries/metrics.sql
Original file line number Diff line number Diff line change
Expand Up @@ -208,3 +208,14 @@ WITH upserted AS (
)
SELECT EXISTS (SELECT 1 FROM upserted) AS did_upsert;

-- name: CreateSlotStatus :exec
INSERT INTO slot_status (
slot,
status
)
VALUES ($1, $2);

-- name: QueryLatestSlotStatus :one
SELECT slot, status FROM slot_status
ORDER BY slot DESC
LIMIT 1;
36 changes: 36 additions & 0 deletions internal/metrics/tx_mapper_db.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,42 @@ func (tm *TxMapperDB) AddBlock(
return err
}

func (tm *TxMapperDB) AddSlotStatus(
ctx context.Context,
slot int64,
status data.SlotStatusVal,
) error {
latest, err := tm.dbQuery.QueryLatestSlotStatus(ctx)
if err != nil {
if errors.Is(err, pgx.ErrNoRows) {
return tm.dbQuery.CreateSlotStatus(ctx, data.CreateSlotStatusParams{
Slot: slot,
Status: status,
})
}
return err
}

if slot <= latest.Slot {
return nil
}

for s := latest.Slot + 1; s < slot; s++ {
err := tm.dbQuery.CreateSlotStatus(ctx, data.CreateSlotStatusParams{
Slot: s,
Status: data.SlotStatusValMissed,
})
if err != nil {
return err
}
}

return tm.dbQuery.CreateSlotStatus(ctx, data.CreateSlotStatusParams{
Slot: slot,
Status: status,
})
}

func (tm *TxMapperDB) QueryBlockNumberFromValidatorRegistryEventsSyncedUntil(ctx context.Context) (int64, error) {
data, err := tm.dbQuery.QueryValidatorRegistryEventsSyncedUntil(ctx)
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions internal/metrics/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ type TxMapper interface {
ctx context.Context,
b *data.Block,
) error
AddSlotStatus(ctx context.Context, slot int64, status data.SlotStatusVal) error
QueryBlockNumberFromValidatorRegistryEventsSyncedUntil(ctx context.Context) (int64, error)
AddValidatorRegistryEvent(ctx context.Context, tx pgx.Tx, vr *validatorRegistryBindings.ValidatorregistryUpdated) error
UpdateValidatorStatus(ctx context.Context) error
Expand Down
13 changes: 11 additions & 2 deletions internal/watcher/blocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,16 +147,25 @@ func (bw *BlocksWatcher) insertBlock(ctx context.Context, header *types.Header)
bw.mostRecentBlock = header.Number.Uint64()
}

slot := int64(utils.GetSlotForBlock(header.Time, GenesisTimestamp, SlotDuration))

err := bw.txMapper.AddBlock(ctx, &data.Block{
BlockHash: header.Hash().Bytes(),
BlockNumber: header.Number.Int64(),
BlockTimestamp: int64(header.Time),
Slot: int64(utils.GetSlotForBlock(header.Time, GenesisTimestamp, SlotDuration)),
Slot: slot,
})
if err != nil {
log.Err(err).Msg("err adding block")
return err
}
return err

err = bw.txMapper.AddSlotStatus(ctx, slot, data.SlotStatusValProposed)
if err != nil {
log.Err(err).Msg("err adding slot status")
return err
}
return nil
}

func (bw *BlocksWatcher) clearOldBlocks(latestHeader *types.Header) {
Expand Down
4 changes: 4 additions & 0 deletions internal/watcher/blocks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,10 @@ func (m *mockTxMapper) AddBlock(ctx context.Context, b *data.Block) error {
panic("unexpected call to AddBlock")
}

func (m *mockTxMapper) AddSlotStatus(ctx context.Context, slot int64, status data.SlotStatusVal) error {
panic("unexpected call to AddSlotStatus")
}

func (m *mockTxMapper) QueryBlockNumberFromValidatorRegistryEventsSyncedUntil(ctx context.Context) (int64, error) {
panic("unexpected call to QueryBlockNumberFromValidatorRegistryEventsSyncedUntil")
}
Expand Down
21 changes: 21 additions & 0 deletions migrations/20260116055945_add_slot_status_table.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
-- +goose Up
-- +goose StatementBegin
CREATE TYPE slot_status_val AS ENUM
(
'proposed',
'missed'
);

CREATE TABLE IF NOT EXISTS slot_status
(
slot BIGINT PRIMARY KEY,
status slot_status_val NOT NULL,
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW() NOT NULL
);
-- +goose StatementEnd

-- +goose Down
-- +goose StatementBegin
DROP TABLE slot_status;
DROP TYPE slot_status_val;
-- +goose StatementEnd