From 3acb4d50bc5f56fd27edd1410c5b0d44b6f623f8 Mon Sep 17 00:00:00 2001 From: Evgeny Danienko <6655321@bk.ru> Date: Fri, 21 Nov 2025 19:11:47 +0100 Subject: [PATCH 1/3] initial --- .gitignore | 1 + Makefile | 6 + application/buckets.go | 6 +- as/asconfig.json | 22 ++ as/package-lock.json | 53 ++++ as/package.json | 11 + as/src/sdk.ts | 100 +++++++ as/src/strategy.ts | 62 ++++ build/strategy.wasm | Bin 0 -> 2360 bytes cmd/main.go | 59 +++- cmd/main_test.go | 20 +- go.mod | 1 + wasmstrategy/addressbook.go | 11 + wasmstrategy/host.go | 308 ++++++++++++++++++++ wasmstrategy/host_test.go | 148 ++++++++++ wasmstrategy/manager.go | 151 ++++++++++ wasmstrategy/testdata/uniswap_strategy.wasm | Bin 0 -> 2360 bytes wasmstrategy/types.go | 41 +++ 18 files changed, 979 insertions(+), 21 deletions(-) create mode 100644 as/asconfig.json create mode 100644 as/package-lock.json create mode 100644 as/package.json create mode 100644 as/src/sdk.ts create mode 100644 as/src/strategy.ts create mode 100644 build/strategy.wasm create mode 100644 wasmstrategy/addressbook.go create mode 100644 wasmstrategy/host.go create mode 100644 wasmstrategy/host_test.go create mode 100644 wasmstrategy/manager.go create mode 100644 wasmstrategy/testdata/uniswap_strategy.wasm create mode 100644 wasmstrategy/types.go diff --git a/.gitignore b/.gitignore index e374580..b1455a1 100644 --- a/.gitignore +++ b/.gitignore @@ -7,3 +7,4 @@ pelacli_data/ test_consensus/ test_consensus_app/ multichain/ +as/node_modules/ diff --git a/Makefile b/Makefile index 45fb0aa..9e96398 100644 --- a/Makefile +++ b/Makefile @@ -32,6 +32,12 @@ clean: tidy: go mod tidy +wasm-deps: + npm --prefix as install + +wasm-build: wasm-deps + npm --prefix as run build + tests: go test -short -timeout 20m -failfast -shuffle=on -v ./... $(params) diff --git a/application/buckets.go b/application/buckets.go index 18a439b..9eb554e 100644 --- a/application/buckets.go +++ b/application/buckets.go @@ -3,11 +3,13 @@ package application import "github.com/ledgerwatch/erigon-lib/kv" const ( - AccountsBucket = "appaccounts" // token+account -> value + AccountsBucket = "appaccounts" // token+account -> value + StrategyStateBucket = "strategykv" ) func Tables() kv.TableCfg { return kv.TableCfg{ - AccountsBucket: {}, + AccountsBucket: {}, + StrategyStateBucket: {}, } } diff --git a/as/asconfig.json b/as/asconfig.json new file mode 100644 index 0000000..c08b386 --- /dev/null +++ b/as/asconfig.json @@ -0,0 +1,22 @@ +{ + "targets": { + "debug": { + "optimizeLevel": 0, + "shrinkLevel": 0, + "debug": true, + "outFile": "../build/strategy.debug.wasm" + }, + "release": { + "optimizeLevel": 3, + "shrinkLevel": 2, + "noAssert": true, + "converge": false, + "outFile": "../build/strategy.wasm" + } + }, + "options": { + "runtime": "stub", + "exportTable": true, + "exportMemory": true + } +} diff --git a/as/package-lock.json b/as/package-lock.json new file mode 100644 index 0000000..2fc53a2 --- /dev/null +++ b/as/package-lock.json @@ -0,0 +1,53 @@ +{ + "name": "pelagos-strategy", + "version": "0.1.0", + "lockfileVersion": 3, + "requires": true, + "packages": { + "": { + "name": "pelagos-strategy", + "version": "0.1.0", + "devDependencies": { + "assemblyscript": "^0.27.0" + } + }, + "node_modules/assemblyscript": { + "version": "0.27.37", + "resolved": "https://registry.npmjs.org/assemblyscript/-/assemblyscript-0.27.37.tgz", + "integrity": "sha512-YtY5k3PiV3SyUQ6gRlR2OCn8dcVRwkpiG/k2T5buoL2ymH/Z/YbaYWbk/f9mO2HTgEtGWjPiAQrIuvA7G/63Gg==", + "dev": true, + "dependencies": { + "binaryen": "116.0.0-nightly.20240114", + "long": "^5.2.4" + }, + "bin": { + "asc": "bin/asc.js", + "asinit": "bin/asinit.js" + }, + "engines": { + "node": ">=18", + "npm": ">=10" + }, + "funding": { + "type": "opencollective", + "url": "https://opencollective.com/assemblyscript" + } + }, + "node_modules/binaryen": { + "version": "116.0.0-nightly.20240114", + "resolved": "https://registry.npmjs.org/binaryen/-/binaryen-116.0.0-nightly.20240114.tgz", + "integrity": "sha512-0GZrojJnuhoe+hiwji7QFaL3tBlJoA+KFUN7ouYSDGZLSo9CKM8swQX8n/UcbR0d1VuZKU+nhogNzv423JEu5A==", + "dev": true, + "bin": { + "wasm-opt": "bin/wasm-opt", + "wasm2js": "bin/wasm2js" + } + }, + "node_modules/long": { + "version": "5.3.2", + "resolved": "https://registry.npmjs.org/long/-/long-5.3.2.tgz", + "integrity": "sha512-mNAgZ1GmyNhD7AuqnTG3/VQ26o760+ZYBPKjPvugO8+nLbYfX6TVpJPseBvopbdY+qpZ/lKUnmEc1LeZYS3QAA==", + "dev": true + } + } +} diff --git a/as/package.json b/as/package.json new file mode 100644 index 0000000..9e43c0e --- /dev/null +++ b/as/package.json @@ -0,0 +1,11 @@ +{ + "name": "pelagos-strategy", + "version": "0.1.0", + "private": true, + "scripts": { + "build": "asc src/strategy.ts --target release --config asconfig.json" + }, + "devDependencies": { + "assemblyscript": "^0.27.0" + } +} diff --git a/as/src/sdk.ts b/as/src/sdk.ts new file mode 100644 index 0000000..5dccf7b --- /dev/null +++ b/as/src/sdk.ts @@ -0,0 +1,100 @@ +// SPDX-License-Identifier: MIT +// Pelagos AssemblyScript SDK shim exposing host functions provided by the Go runtime. + +// EventKind enumerates the event payloads that the Go host exposes to strategies. +export enum EventKind { + Unknown = 0, + ERC20Transfer = 1, +} + +// AddressId groups frequently used contracts so strategies can branch on a small enum. +export enum AddressId { + Unknown = 0, + UniswapV2Pair = 1, + UniswapV3Pool = 2, +} + +// DbSlot identifiers backed by the runtime key/value store. +export const SLOT_LAST_UNI_TRANSFER_BLOCK: i32 = 1; + +// LogLevel mirrors the numeric contract expected by env.log. +export enum LogLevel { + Debug = 10, + Info = 20, + Warn = 30, + Error = 40, +} + +@external("env", "get_block_number") +declare function hostGetBlockNumber(): i64; + +@external("env", "get_chain_id") +declare function hostGetChainId(): i64; + +@external("env", "get_event_count") +declare function hostGetEventCount(): i32; + +@external("env", "get_event_kind") +declare function hostGetEventKind(index: i32): i32; + +@external("env", "get_event_address_id") +declare function hostGetEventAddressId(index: i32): i32; + +@external("env", "db_get_u64") +declare function hostDbGet(slot: i32): i64; + +@external("env", "db_put_u64") +declare function hostDbPut(slot: i32, value: i64): void; + +@external("env", "log") +declare function hostLog(level: i32, ptr: usize, len: i32): void; + +// getBlockNumber returns the current Pelagos block number. +export function getBlockNumber(): i64 { + return hostGetBlockNumber(); +} + +// getChainId returns the appchain identifier. +export function getChainId(): i64 { + return hostGetChainId(); +} + +// getEventCount returns how many events are available for this block. +export function getEventCount(): i32 { + return hostGetEventCount(); +} + +// getEventKind exposes the strongly typed EventKind for the event at index. +export function getEventKind(index: i32): EventKind { + return changetype(hostGetEventKind(index)); +} + +// getEventAddressId returns the AddressId classification for the event at index. +export function getEventAddressId(index: i32): AddressId { + return changetype(hostGetEventAddressId(index)); +} + +// dbGetU64 reads a 64-bit slot from the strategy KV store. +export function dbGetU64(slot: i32): i64 { + return hostDbGet(slot); +} + +// dbPutU64 writes a 64-bit slot to the strategy KV store. +export function dbPutU64(slot: i32, value: i64): void { + hostDbPut(slot, value); +} + +function writeLog(level: LogLevel, message: string): void { + const encoded = String.UTF8.encode(message); + hostLog(level, changetype(encoded), encoded.byteLength); +} + +// logDebug emits a debugging log line. +export function logDebug(message: string): void { + writeLog(LogLevel.Debug, message); +} + +// logInfo emits an informational log line. +export function logInfo(message: string): void { + writeLog(LogLevel.Info, message); +} diff --git a/as/src/strategy.ts b/as/src/strategy.ts new file mode 100644 index 0000000..cfe50a1 --- /dev/null +++ b/as/src/strategy.ts @@ -0,0 +1,62 @@ +import { + AddressId, + EventKind, + SLOT_LAST_UNI_TRANSFER_BLOCK, + dbGetU64, + dbPutU64, + getBlockNumber, + getEventAddressId, + getEventCount, + getEventKind, + logInfo, +} from "./sdk"; + +const BLOCK_STALE_THRESHOLD: i64 = 6000; + +function hasUniswapTransfer(events: i32): bool { + for (let index = 0; index < events; index++) { + if (getEventKind(index) != EventKind.ERC20Transfer) { + continue; + } + + const addressId = getEventAddressId(index); + if ( + addressId == AddressId.UniswapV2Pair || + addressId == AddressId.UniswapV3Pool + ) { + return true; + } + } + + return false; +} + +// on_block is invoked by the Go runtime every Pelagos block. +export function on_block(): void { + const blockNumber = getBlockNumber(); + const eventCount = getEventCount(); + + if (hasUniswapTransfer(eventCount)) { + dbPutU64(SLOT_LAST_UNI_TRANSFER_BLOCK, blockNumber); + logInfo( + "Uniswap ERC20 transfer detected at block " + blockNumber.toString() + ); + + return; + } + + const lastSeen = dbGetU64(SLOT_LAST_UNI_TRANSFER_BLOCK); + if (lastSeen <= 0) { + return; + } + + const delta = blockNumber - lastSeen; + if (delta > BLOCK_STALE_THRESHOLD) { + logInfo( + "No Uniswap transfers observed since block " + + lastSeen.toString() + + ", delta=" + + delta.toString() + ); + } +} diff --git a/build/strategy.wasm b/build/strategy.wasm new file mode 100644 index 0000000000000000000000000000000000000000..7178724c0e831d46b179fdda4b407dd20b4971a4 GIT binary patch literal 2360 zcmZ8iQEXI26umR^-tP9zZu>-{!~%J{fWZi~3Km5Pof;dei8TmOt4Z7KF4))Y?)L3# z0R!EQq9G~?AI2EsM-wBE7y|f3lnPsa8j3rNYp(tw-y!_)H7DMnRm=N zZLW}6?b@VqC)xjk9;KKo0<1MKDKIiJBK%p{C)D;k9Q3~gXlQ(&FuBr8*kSK%(B>V z{PWMQoSDxwOP;><=Gkx0i)QYLlfRz5fdj-?LSBJ9RA%yg$EBRtPc_-YZa_`ig$A@I z`FYJow52^uXP~)RKDJ)EMoj5$Vb#rCIx9VL9n{I8W^7DiGtny@5fEl9u_!T<@<)mqdKC6x-xeWyn=-U{HhTC+S#FV)t0=RZe zAT%(kFo>T(#(@`9A1F&7H77W6ULo$B>Ya!YBxbqsC;Kj_Ubui`7LpUj2{Fi)kJfPt zK%-fH1*U*8n@2Zf==8s5pyRx3%FK%(;PDpt3S|WtV577RK^XxF*CW!4mG1DyZCraB zNn|Tu>nI;1M`?jC&nE(^dxVFJ-oX*979zSiCgGrN`o6zqFDW_H4r7RxW z6%kVm0skOK7s-&^KoD1fv+M;q(HmsceNkb+GX{S0q9EZ~W+nxU7FWBOHVxz?kLe3m zcN?ophjXH5e$yTVU(bLJ!pvN&i>o+8>m+^)Ti0T!|FTz*c{im+iOdHn{gNc2U08dl zOrunz{WMHfwDc6bPtwEG2|J_!-ZF0YQW3UC1?(Z#Nc35NXBd`8c`8u_-Z9wbHc$?c zIohJf7zKY75L2P&@fiTCO#5g(J%C8AUV!I7R40IclEzWJ9Mv4hsn)Dsj4&8`Kkhlg z14l+Lqk>19sSCI*T08+n-C@~9HfCU|RPE;?GQBs1&i z7S80=N#-iNsCU!R9qd_;Wl$mRl=~h<9A~{8|k!k=9bP`4O&*vXAP(U`nbc0 gXJ-VbCG&Q%HR3hfs_$hS7i$AzSfdhH4b%320B4`%1ONa4 literal 0 HcmV?d00001 diff --git a/cmd/main.go b/cmd/main.go index 25d1e19..1be02e5 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -7,9 +7,11 @@ import ( "flag" "os" "os/signal" + "path/filepath" "syscall" "github.com/0xAtelerix/sdk/gosdk" + "github.com/0xAtelerix/sdk/gosdk/apptypes" "github.com/0xAtelerix/sdk/gosdk/rpc" "github.com/0xAtelerix/sdk/gosdk/txpool" "github.com/fxamacker/cbor/v2" @@ -21,6 +23,7 @@ import ( "github.com/0xAtelerix/example/application" "github.com/0xAtelerix/example/application/api" + "github.com/0xAtelerix/example/wasmstrategy" ) const ChainID = 42 @@ -192,16 +195,52 @@ func Run(ctx context.Context, args RuntimeArgs, _ chan<- int) { log.Info().Msg("Starting appchain...") - appchainExample := gosdk.NewAppchain( - stateTransition, - application.BlockConstructor, - txPool, - config, - appchainDB, - subs, - msa, - txBatchDB, - ) + var strategyManager *wasmstrategy.Manager + strategyPath := filepath.Join("build", "strategy.wasm") + if info, errStat := os.Stat(strategyPath); errStat == nil && !info.IsDir() { + manager, err := wasmstrategy.NewManager(ctx, wasmstrategy.ManagerConfig{ + Logger: &log.Logger, + DB: appchainDB, + Multichain: msa, + WasmPath: strategyPath, + AddressBook: wasmstrategy.DefaultAddressBook(), + ChainID: apptypes.ChainType(ChainID), + }) + if err != nil { + log.Warn().Err(err).Msg("Failed to initialize WASM strategy. continuing without it") + } else { + strategyManager = manager + defer strategyManager.Close(ctx) + } + } else { + log.Warn().Err(errStat).Str("path", strategyPath).Msg("No WASM strategy found, skipping runtime") + } + + var appchainExample gosdk.Appchain[*gosdk.BatchProcesser[application.Transaction[application.Receipt], application.Receipt], application.Transaction[application.Receipt], application.Receipt, *application.Block] + if strategyManager != nil { + appchainExample = gosdk.NewAppchain( + stateTransition, + application.BlockConstructor, + txPool, + config, + appchainDB, + subs, + msa, + txBatchDB, + gosdk.WithBlockObservers[*gosdk.BatchProcesser[application.Transaction[application.Receipt], application.Receipt], application.Transaction[application.Receipt], application.Receipt, *application.Block](strategyManager), + ) + } else { + appchainExample = gosdk.NewAppchain( + stateTransition, + application.BlockConstructor, + txPool, + config, + appchainDB, + subs, + msa, + txBatchDB, + ) + } if err != nil { log.Fatal().Err(err).Msg("Failed to start appchain") diff --git a/cmd/main_test.go b/cmd/main_test.go index fe0aa9c..32e9f4a 100644 --- a/cmd/main_test.go +++ b/cmd/main_test.go @@ -80,17 +80,19 @@ func TestEndToEnd(t *testing.T) { defer cancel() if err = waitUntil(ctx, func() bool { - // GET is fine; we only care the port is bound. - var req *http.Request - req, err = http.NewRequestWithContext(ctx, http.MethodGet, rpcURL, nil) - require.NoError(t, err, "GET req /rpc") + req, reqErr := http.NewRequestWithContext(ctx, http.MethodGet, rpcURL, nil) + if reqErr != nil { + t.Fatalf("GET req /rpc: %v", reqErr) + } - var resp *http.Response - resp, err = http.DefaultClient.Do(req) - require.NoError(t, err, "GET res /rpc") + resp, respErr := http.DefaultClient.Do(req) + if respErr != nil { + return false + } - err = resp.Body.Close() - require.NoError(t, err) + if closeErr := resp.Body.Close(); closeErr != nil { + t.Fatalf("close resp: %v", closeErr) + } return true }); err != nil { diff --git a/go.mod b/go.mod index 532d499..6db47ff 100644 --- a/go.mod +++ b/go.mod @@ -12,6 +12,7 @@ require ( github.com/ledgerwatch/log/v3 v3.9.0 github.com/rs/zerolog v1.34.0 github.com/stretchr/testify v1.11.1 + github.com/tetratelabs/wazero v1.8.1 ) require ( diff --git a/wasmstrategy/addressbook.go b/wasmstrategy/addressbook.go new file mode 100644 index 0000000..3558f1a --- /dev/null +++ b/wasmstrategy/addressbook.go @@ -0,0 +1,11 @@ +package wasmstrategy + +import "github.com/0xAtelerix/example/application" + +// DefaultAddressBook seeds the runtime with known pools so the example strategy can react. +func DefaultAddressBook() map[string]AddressID { + return map[string]AddressID{ + normalizeAddress(application.ExampleContractAddress): AddressIDUniswapV2Pair, + "0x8ad599c3a0ff1de082011efddc58f1908eb6e6d8": AddressIDUniswapV3Pool, // Uniswap v3 USDC/ETH pool + } +} diff --git a/wasmstrategy/host.go b/wasmstrategy/host.go new file mode 100644 index 0000000..9e9d6db --- /dev/null +++ b/wasmstrategy/host.go @@ -0,0 +1,308 @@ +package wasmstrategy + +import ( + "context" + "encoding/binary" + "fmt" + "strings" + "sync" + + "github.com/ledgerwatch/erigon-lib/kv" + "github.com/rs/zerolog" + "github.com/tetratelabs/wazero" + "github.com/tetratelabs/wazero/api" + + "github.com/0xAtelerix/example/application" +) + +// hostEnv backs the env.* host functions consumed by AssemblyScript strategies. +type hostEnv struct { + logger *zerolog.Logger + db kv.RwDB + + mu sync.RWMutex + blockCtx BlockContext +} + +func newHostEnv(logger *zerolog.Logger, db kv.RwDB) *hostEnv { + return &hostEnv{logger: logger, db: db} +} + +func (h *hostEnv) setContext(ctx BlockContext) { + h.mu.Lock() + h.blockCtx = ctx + h.mu.Unlock() +} + +func (h *hostEnv) register(ctx context.Context, runtime wazero.Runtime) error { + builder := runtime.NewHostModuleBuilder("env") + + builder.NewFunctionBuilder(). + WithGoModuleFunction(api.GoModuleFunc(h.getBlockNumber), []api.ValueType{}, []api.ValueType{api.ValueTypeI64}). + Export("get_block_number") + + builder.NewFunctionBuilder(). + WithGoModuleFunction(api.GoModuleFunc(h.getChainID), []api.ValueType{}, []api.ValueType{api.ValueTypeI64}). + Export("get_chain_id") + + builder.NewFunctionBuilder(). + WithGoModuleFunction(api.GoModuleFunc(h.getEventCount), []api.ValueType{}, []api.ValueType{api.ValueTypeI32}). + Export("get_event_count") + + builder.NewFunctionBuilder(). + WithGoModuleFunction(api.GoModuleFunc(h.getEventKind), []api.ValueType{api.ValueTypeI32}, []api.ValueType{api.ValueTypeI32}). + Export("get_event_kind") + + builder.NewFunctionBuilder(). + WithGoModuleFunction(api.GoModuleFunc(h.getEventAddressID), []api.ValueType{api.ValueTypeI32}, []api.ValueType{api.ValueTypeI32}). + Export("get_event_address_id") + + builder.NewFunctionBuilder(). + WithGoModuleFunction(api.GoModuleFunc(h.dbGetU64), []api.ValueType{api.ValueTypeI32}, []api.ValueType{api.ValueTypeI64}). + Export("db_get_u64") + + builder.NewFunctionBuilder(). + WithGoModuleFunction(api.GoModuleFunc(h.dbPutU64), []api.ValueType{api.ValueTypeI32, api.ValueTypeI64}, []api.ValueType{}). + Export("db_put_u64") + + builder.NewFunctionBuilder(). + WithGoModuleFunction(api.GoModuleFunc(h.log), []api.ValueType{api.ValueTypeI32, api.ValueTypeI32, api.ValueTypeI32}, []api.ValueType{}). + Export("log") + + builder.NewFunctionBuilder(). + WithGoModuleFunction(api.GoModuleFunc(h.abort), []api.ValueType{ + api.ValueTypeI32, + api.ValueTypeI32, + api.ValueTypeI32, + api.ValueTypeI32, + }, []api.ValueType{}). + Export("abort") + + _, err := builder.Instantiate(ctx) + + return err +} + +func (h *hostEnv) snapshot() BlockContext { + h.mu.RLock() + defer h.mu.RUnlock() + + return h.blockCtx +} + +func (h *hostEnv) getBlockNumber(_ context.Context, _ api.Module, stack []uint64) { + stack[0] = h.snapshot().BlockNumber +} + +func (h *hostEnv) getChainID(_ context.Context, _ api.Module, stack []uint64) { + stack[0] = uint64(h.snapshot().ChainID) +} + +func (h *hostEnv) getEventCount(_ context.Context, _ api.Module, stack []uint64) { + stack[0] = uint64(uint32(len(h.snapshot().Events))) +} + +func (h *hostEnv) getEventKind(_ context.Context, _ api.Module, stack []uint64) { + idx := int32(int64(stack[0])) + events := h.snapshot().Events + if idx < 0 || int(idx) >= len(events) { + stack[0] = 0 + + return + } + + stack[0] = uint64(uint32(events[idx].Kind)) +} + +func (h *hostEnv) getEventAddressID(_ context.Context, _ api.Module, stack []uint64) { + idx := int32(int64(stack[0])) + events := h.snapshot().Events + if idx < 0 || int(idx) >= len(events) { + stack[0] = 0 + + return + } + + stack[0] = uint64(uint32(events[idx].Target)) +} + +func (h *hostEnv) slotKey(slot int32) []byte { + var k [4]byte + binary.BigEndian.PutUint32(k[:], uint32(slot)) + + return k[:] +} + +func (h *hostEnv) dbGetU64(ctx context.Context, _ api.Module, stack []uint64) { + slot := int32(int64(stack[0])) + key := h.slotKey(slot) + + var val uint64 + err := h.db.View(ctx, func(tx kv.Tx) error { + v, err := tx.GetOne(application.StrategyStateBucket, key) + if err != nil { + return err + } + + if len(v) == 8 { + val = binary.BigEndian.Uint64(v) + } + + return nil + }) + if err != nil { + h.logger.Warn().Err(err).Msg("db_get_u64 failed") + stack[0] = 0 + + return + } + + stack[0] = val +} + +func (h *hostEnv) dbPutU64(ctx context.Context, _ api.Module, stack []uint64) { + slot := int32(int64(stack[0])) + value := stack[1] + key := h.slotKey(slot) + var buf [8]byte + binary.BigEndian.PutUint64(buf[:], value) + + err := h.db.Update(ctx, func(tx kv.RwTx) error { + return tx.Put(application.StrategyStateBucket, key, buf[:]) + }) + if err != nil { + h.logger.Warn().Err(err).Msg("db_put_u64 failed") + } +} + +func (h *hostEnv) log(_ context.Context, mod api.Module, stack []uint64) { + level := int32(int64(stack[0])) + ptr := uint32(stack[1]) + length := uint32(stack[2]) + + memory := mod.Memory() + if memory == nil { + return + } + + msg := "" + if length > 0 { + if data, ok := memory.Read(ptr, length); ok { + msg = string(data) + } else { + h.logger.Warn().Uint32("ptr", ptr).Uint32("len", length).Msg("log read failed") + } + } + + switch level { + case 10: + h.logger.Debug().Msg(msg) + case 20: + h.logger.Info().Msg(msg) + case 30: + h.logger.Warn().Msg(msg) + case 40: + h.logger.Error().Msg(msg) + default: + h.logger.Info().Int32("level", level).Msg(msg) + } +} + +func (h *hostEnv) abort(_ context.Context, mod api.Module, stack []uint64) { + messagePtr := uint32(stack[0]) + filePtr := uint32(stack[1]) + line := uint32(stack[2]) + column := uint32(stack[3]) + + h.logger.Error(). + Uint32("message_ptr", messagePtr). + Uint32("file_ptr", filePtr). + Uint32("line", line). + Uint32("column", column). + Msg("wasm abort invoked") + + panic(fmt.Sprintf("wasm abort (%d:%d)", line, column)) +} + +// StrategyModule wraps a single WASM module instance plus host wiring. +type StrategyModule struct { + id string + runtime wazero.Runtime + module api.Module + host *hostEnv +} + +// ModuleConfig configures a StrategyModule instance. +type ModuleConfig struct { + ID string + Wasm []byte + DB kv.RwDB + Logger *zerolog.Logger +} + +// NewStrategyModule compiles and instantiates the WASM strategy and host functions. +func NewStrategyModule(ctx context.Context, cfg ModuleConfig) (*StrategyModule, error) { + if len(cfg.Wasm) == 0 { + return nil, fmt.Errorf("strategy wasm is empty") + } + + logger := cfg.Logger + if logger == nil { + logger = zerolog.Ctx(ctx) + } + + runtime := wazero.NewRuntime(ctx) + host := newHostEnv(logger, cfg.DB) + + if err := host.register(ctx, runtime); err != nil { + return nil, fmt.Errorf("register host env: %w", err) + } + + compiled, err := runtime.CompileModule(ctx, cfg.Wasm) + if err != nil { + return nil, fmt.Errorf("compile strategy wasm: %w", err) + } + + module, err := runtime.InstantiateModule(ctx, compiled, wazero.NewModuleConfig()) + if err != nil { + return nil, fmt.Errorf("instantiate strategy wasm: %w", err) + } + + return &StrategyModule{ + id: cfg.ID, + runtime: runtime, + module: module, + host: host, + }, nil +} + +// Close releases the underlying runtime resources. +func (s *StrategyModule) Close(ctx context.Context) error { + if s.module != nil { + _ = s.module.Close(ctx) + } + + if s.runtime != nil { + return s.runtime.Close(ctx) + } + + return nil +} + +// OnBlock invokes the exported on_block handler with the provided context. +func (s *StrategyModule) OnBlock(ctx context.Context, block BlockContext) error { + s.host.setContext(block) + + exported := s.module.ExportedFunction("on_block") + if exported == nil { + return fmt.Errorf("strategy %s does not export on_block", s.id) + } + + _, err := exported.Call(ctx) + + return err +} + +func normalizeAddress(addr string) string { + return strings.ToLower(addr) +} diff --git a/wasmstrategy/host_test.go b/wasmstrategy/host_test.go new file mode 100644 index 0000000..2a7c442 --- /dev/null +++ b/wasmstrategy/host_test.go @@ -0,0 +1,148 @@ +package wasmstrategy + +import ( + "bytes" + "context" + _ "embed" + "encoding/binary" + "fmt" + "path/filepath" + "strings" + "testing" + + "github.com/ledgerwatch/erigon-lib/kv" + "github.com/ledgerwatch/erigon-lib/kv/mdbx" + mdbxlog "github.com/ledgerwatch/log/v3" + "github.com/rs/zerolog" + "github.com/stretchr/testify/require" + + "github.com/0xAtelerix/example/application" +) + +//go:embed testdata/uniswap_strategy.wasm +var testStrategyWasm []byte + +func newTestDB(t *testing.T) kv.RwDB { + t.Helper() + + dbPath := filepath.Join(t.TempDir(), "strategy.mdbx") + + db, err := mdbx.NewMDBX(mdbxlog.New()). + Path(dbPath). + WithTableCfg(func(_ kv.TableCfg) kv.TableCfg { + return kv.TableCfg{ + application.StrategyStateBucket: {}, + } + }).Open() + require.NoError(t, err) + + t.Cleanup(func() { + db.Close() + }) + + return db +} + +func readSlot(t *testing.T, ctx context.Context, db kv.RwDB, slot int32) uint64 { + t.Helper() + + key := make([]byte, 4) + binary.BigEndian.PutUint32(key, uint32(slot)) + + var out uint64 + + require.NoError(t, db.View(ctx, func(tx kv.Tx) error { + data, err := tx.GetOne(application.StrategyStateBucket, key) + if err != nil { + return err + } + + if len(data) == 8 { + out = binary.BigEndian.Uint64(data) + } + + return nil + })) + + return out +} + +func writeSlot(t *testing.T, ctx context.Context, db kv.RwDB, slot int32, value uint64) { + t.Helper() + + key := make([]byte, 4) + binary.BigEndian.PutUint32(key, uint32(slot)) + + var buf [8]byte + binary.BigEndian.PutUint64(buf[:], value) + + require.NoError(t, db.Update(ctx, func(tx kv.RwTx) error { + return tx.Put(application.StrategyStateBucket, key, buf[:]) + })) +} + +func newStrategyModule(t *testing.T, ctx context.Context, db kv.RwDB, sink *bytes.Buffer) *StrategyModule { + t.Helper() + + logger := zerolog.New(sink).Level(zerolog.DebugLevel) + + module, err := NewStrategyModule(ctx, ModuleConfig{ + ID: "test-strategy", + Wasm: testStrategyWasm, + DB: db, + Logger: &logger, + }) + require.NoError(t, err) + + t.Cleanup(func() { + require.NoError(t, module.Close(ctx)) + }) + + return module +} + +func TestStrategyModuleWritesSlotOnEvent(t *testing.T) { + ctx := context.Background() + db := newTestDB(t) + var logs bytes.Buffer + + module := newStrategyModule(t, ctx, db, &logs) + + blockNumber := uint64(42_000) + err := module.OnBlock(ctx, BlockContext{ + BlockNumber: blockNumber, + ChainID: 42, + Events: []StrategyEvent{ + {Kind: EventKindERC20Transfer, Target: AddressIDUniswapV2Pair}, + }, + }) + require.NoError(t, err) + + value := readSlot(t, ctx, db, SlotLastUniTransferBlock) + require.Equal(t, blockNumber, value, "strategy should persist last observed block number") + require.Contains(t, logs.String(), "Uniswap ERC20 transfer detected", "expected info log from WASM strategy") +} + +func TestStrategyModuleLogsStaleTransferWarning(t *testing.T) { + ctx := context.Background() + db := newTestDB(t) + var logs bytes.Buffer + + module := newStrategyModule(t, ctx, db, &logs) + + lastSeen := uint64(1) + writeSlot(t, ctx, db, SlotLastUniTransferBlock, lastSeen) + + blockNumber := lastSeen + 7000 + err := module.OnBlock(ctx, BlockContext{ + BlockNumber: blockNumber, + ChainID: 42, + }) + require.NoError(t, err) + + value := readSlot(t, ctx, db, SlotLastUniTransferBlock) + require.Equal(t, lastSeen, value, "slot should remain unchanged when no transfer is seen") + + expected := fmt.Sprintf("No Uniswap transfers observed since block %d", lastSeen) + require.Truef(t, strings.Contains(logs.String(), expected), "expected stale warning log containing %q", expected) +} diff --git a/wasmstrategy/manager.go b/wasmstrategy/manager.go new file mode 100644 index 0000000..f70ed7e --- /dev/null +++ b/wasmstrategy/manager.go @@ -0,0 +1,151 @@ +package wasmstrategy + +import ( + "context" + "fmt" + "os" + + "github.com/ledgerwatch/erigon-lib/kv" + "github.com/rs/zerolog" + + "github.com/0xAtelerix/example/application" + "github.com/0xAtelerix/sdk/gosdk" + "github.com/0xAtelerix/sdk/gosdk/apptypes" +) + +// ManagerConfig wires the WASM runtime into the Pelagos example node. +type ManagerConfig struct { + Logger *zerolog.Logger + DB kv.RwDB + Multichain *gosdk.MultichainStateAccess + WasmPath string + AddressBook map[string]AddressID + ChainID apptypes.ChainType +} + +// Manager owns the wasm modules and implements gosdk.BlockObserver. +type Manager struct { + logger *zerolog.Logger + modules []*StrategyModule + multichain *gosdk.MultichainStateAccess + addressBook map[string]AddressID + chainID apptypes.ChainType +} + +// NewManager loads the AssemblyScript-built WASM artifact from disk. +func NewManager(ctx context.Context, cfg ManagerConfig) (*Manager, error) { + if cfg.Multichain == nil { + return nil, fmt.Errorf("multichain state access is required") + } + + if cfg.DB == nil { + return nil, fmt.Errorf("database handle is required") + } + + logger := cfg.Logger + if logger == nil { + logger = zerolog.Ctx(ctx) + } + + wasmBytes, err := os.ReadFile(cfg.WasmPath) + if err != nil { + return nil, fmt.Errorf("read wasm %q: %w", cfg.WasmPath, err) + } + + module, err := NewStrategyModule(ctx, ModuleConfig{ + ID: "strategy-uniswap-monitor", + Wasm: wasmBytes, + DB: cfg.DB, + Logger: logger, + }) + if err != nil { + return nil, err + } + + book := cfg.AddressBook + if book == nil { + book = make(map[string]AddressID) + } + + return &Manager{ + logger: logger, + modules: []*StrategyModule{module}, + multichain: cfg.Multichain, + addressBook: book, + chainID: cfg.ChainID, + }, nil +} + +// Close frees all wasm runtimes. +func (m *Manager) Close(ctx context.Context) { + for _, module := range m.modules { + _ = module.Close(ctx) + } +} + +// OnBlock wires events and context into all configured WASM strategies. +func (m *Manager) OnBlock(ctx context.Context, meta gosdk.BlockObserverContext[application.Transaction[application.Receipt], application.Receipt]) error { + if len(m.modules) == 0 { + return nil + } + + blockCtx := BlockContext{ + BlockNumber: meta.BlockNumber, + ChainID: m.chainID, + Events: m.collectEvents(ctx, meta.Batch.ExternalBlocks), + } + + for _, module := range m.modules { + if err := module.OnBlock(ctx, blockCtx); err != nil { + m.logger.Error().Err(err).Msg("strategy execution failed") + } + } + + return nil +} + +func (m *Manager) collectEvents(ctx context.Context, blocks []*apptypes.ExternalBlock) []StrategyEvent { + if len(blocks) == 0 { + return nil + } + + if len(m.addressBook) == 0 { + return nil + } + + var events []StrategyEvent + + for _, blk := range blocks { + if blk == nil { + continue + } + + chainID := apptypes.ChainType(blk.ChainID) + if !gosdk.IsEvmChain(chainID) { + continue + } + + receipts, err := m.multichain.EthReceipts(ctx, *blk) + if err != nil { + m.logger.Debug().Err(err).Uint64("chain", blk.ChainID).Uint64("block", blk.BlockNumber).Msg("missing receipts") + continue + } + + for _, receipt := range receipts { + for _, vlog := range receipt.Logs { + if len(vlog.Topics) == 0 || vlog.Topics[0].Hex() != erc20TransferTopic { + continue + } + + addrID, ok := m.addressBook[normalizeAddress(vlog.Address.Hex())] + if !ok { + continue + } + + events = append(events, StrategyEvent{Kind: EventKindERC20Transfer, Target: addrID}) + } + } + } + + return events +} diff --git a/wasmstrategy/testdata/uniswap_strategy.wasm b/wasmstrategy/testdata/uniswap_strategy.wasm new file mode 100644 index 0000000000000000000000000000000000000000..7178724c0e831d46b179fdda4b407dd20b4971a4 GIT binary patch literal 2360 zcmZ8iQEXI26umR^-tP9zZu>-{!~%J{fWZi~3Km5Pof;dei8TmOt4Z7KF4))Y?)L3# z0R!EQq9G~?AI2EsM-wBE7y|f3lnPsa8j3rNYp(tw-y!_)H7DMnRm=N zZLW}6?b@VqC)xjk9;KKo0<1MKDKIiJBK%p{C)D;k9Q3~gXlQ(&FuBr8*kSK%(B>V z{PWMQoSDxwOP;><=Gkx0i)QYLlfRz5fdj-?LSBJ9RA%yg$EBRtPc_-YZa_`ig$A@I z`FYJow52^uXP~)RKDJ)EMoj5$Vb#rCIx9VL9n{I8W^7DiGtny@5fEl9u_!T<@<)mqdKC6x-xeWyn=-U{HhTC+S#FV)t0=RZe zAT%(kFo>T(#(@`9A1F&7H77W6ULo$B>Ya!YBxbqsC;Kj_Ubui`7LpUj2{Fi)kJfPt zK%-fH1*U*8n@2Zf==8s5pyRx3%FK%(;PDpt3S|WtV577RK^XxF*CW!4mG1DyZCraB zNn|Tu>nI;1M`?jC&nE(^dxVFJ-oX*979zSiCgGrN`o6zqFDW_H4r7RxW z6%kVm0skOK7s-&^KoD1fv+M;q(HmsceNkb+GX{S0q9EZ~W+nxU7FWBOHVxz?kLe3m zcN?ophjXH5e$yTVU(bLJ!pvN&i>o+8>m+^)Ti0T!|FTz*c{im+iOdHn{gNc2U08dl zOrunz{WMHfwDc6bPtwEG2|J_!-ZF0YQW3UC1?(Z#Nc35NXBd`8c`8u_-Z9wbHc$?c zIohJf7zKY75L2P&@fiTCO#5g(J%C8AUV!I7R40IclEzWJ9Mv4hsn)Dsj4&8`Kkhlg z14l+Lqk>19sSCI*T08+n-C@~9HfCU|RPE;?GQBs1&i z7S80=N#-iNsCU!R9qd_;Wl$mRl=~h<9A~{8|k!k=9bP`4O&*vXAP(U`nbc0 gXJ-VbCG&Q%HR3hfs_$hS7i$AzSfdhH4b%320B4`%1ONa4 literal 0 HcmV?d00001 diff --git a/wasmstrategy/types.go b/wasmstrategy/types.go new file mode 100644 index 0000000..d57b4f4 --- /dev/null +++ b/wasmstrategy/types.go @@ -0,0 +1,41 @@ +package wasmstrategy + +import "github.com/0xAtelerix/sdk/gosdk/apptypes" + +// EventKind mirrors AssemblyScript's enum for events surfaced to strategies. +type EventKind int32 + +const ( + EventKindUnknown EventKind = 0 + EventKindERC20Transfer EventKind = 1 +) + +// AddressID classifies contracts and venues that emit events. +type AddressID int32 + +const ( + AddressIDUnknown AddressID = 0 + AddressIDUniswapV2Pair AddressID = 1 + AddressIDUniswapV3Pool AddressID = 2 +) + +// StrategyEvent captures the minimal event metadata sent to WASM. +type StrategyEvent struct { + Kind EventKind + Target AddressID +} + +// BlockContext is stored inside the host functions before invoking WASM. +type BlockContext struct { + BlockNumber uint64 + ChainID apptypes.ChainType + Events []StrategyEvent +} + +const ( + // SlotLastUniTransferBlock mirrors SLOT_LAST_UNI_TRANSFER_BLOCK in AssemblyScript. + SlotLastUniTransferBlock int32 = 1 +) + +// erc20TransferTopic is the Keccak hash of Transfer(address,address,uint256). +const erc20TransferTopic = "0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef" From f49a040a1924a3df3eb0ae8a776975e2fe489146 Mon Sep 17 00:00:00 2001 From: Evgeny Danienko <6655321@bk.ru> Date: Fri, 21 Nov 2025 21:16:34 +0100 Subject: [PATCH 2/3] initial gas --- Makefile | 18 +- cmd/main.go | 58 +++-- wasmstrategy/host.go | 175 ++++++++++++-- wasmstrategy/host_test.go | 59 ++++- wasmstrategy/manager.go | 241 ++++++++++++++++--- wasmstrategy/testdata/as/forbidden_import.ts | 6 + wasmstrategy/testdata/as/memory_import.ts | 1 + wasmstrategy/testdata/as/uniswap_strategy.ts | 67 ++++++ wasmstrategy/testdata/forbidden_import.wasm | Bin 0 -> 68 bytes wasmstrategy/testdata/memory_import.wasm | Bin 0 -> 65 bytes wasmstrategy/testdata/uniswap_strategy.wasm | Bin 2360 -> 2235 bytes 11 files changed, 551 insertions(+), 74 deletions(-) create mode 100644 wasmstrategy/testdata/as/forbidden_import.ts create mode 100644 wasmstrategy/testdata/as/memory_import.ts create mode 100644 wasmstrategy/testdata/as/uniswap_strategy.ts create mode 100644 wasmstrategy/testdata/forbidden_import.wasm create mode 100644 wasmstrategy/testdata/memory_import.wasm diff --git a/Makefile b/Makefile index 9e96398..678ceb8 100644 --- a/Makefile +++ b/Makefile @@ -1,4 +1,9 @@ VERSION=v2.4.0 +ASC=npx --prefix as asc +WASM_TESTDATA_DIR=wasmstrategy/testdata +WASM_TESTDATA=$(WASM_TESTDATA_DIR)/uniswap_strategy.wasm \ + $(WASM_TESTDATA_DIR)/forbidden_import.wasm \ + $(WASM_TESTDATA_DIR)/memory_import.wasm run: go run cmd/main.go \ @@ -33,11 +38,22 @@ tidy: go mod tidy wasm-deps: - npm --prefix as install + npm --prefix as ci wasm-build: wasm-deps npm --prefix as run build +wasm-testdata: wasm-deps $(WASM_TESTDATA) + +$(WASM_TESTDATA_DIR)/uniswap_strategy.wasm: $(WASM_TESTDATA_DIR)/as/uniswap_strategy.ts + $(ASC) $< --runtime stub --optimize --shrinkLevel 2 --noAssert -o $@ + +$(WASM_TESTDATA_DIR)/forbidden_import.wasm: $(WASM_TESTDATA_DIR)/as/forbidden_import.ts + $(ASC) $< --runtime stub --optimize --shrinkLevel 2 --noAssert -o $@ + +$(WASM_TESTDATA_DIR)/memory_import.wasm: $(WASM_TESTDATA_DIR)/as/memory_import.ts + $(ASC) $< --runtime stub --optimize --shrinkLevel 2 --noAssert --importMemory -o $@ + tests: go test -short -timeout 20m -failfast -shuffle=on -v ./... $(params) diff --git a/cmd/main.go b/cmd/main.go index 1be02e5..e9c8aa7 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -7,8 +7,8 @@ import ( "flag" "os" "os/signal" - "path/filepath" "syscall" + "time" "github.com/0xAtelerix/sdk/gosdk" "github.com/0xAtelerix/sdk/gosdk/apptypes" @@ -37,6 +37,11 @@ type RuntimeArgs struct { RPCPort string MutlichainConfig gosdk.MultichainConfig LogLevel zerolog.Level + StrategyDir string + StrategyReload time.Duration + StrategyGasLimit uint64 + StrategyTimeout time.Duration + StrategyParallel int } func main() { @@ -62,6 +67,11 @@ func RunCLI(ctx context.Context) { rpcPort := fs.String("rpc-port", ":8080", "Port for the JSON-RPC server") multichainConfigJSON := fs.String("multichain-config", "", "Multichain config JSON path") logLevel := fs.Int("log-level", int(zerolog.InfoLevel), "Logging level") + strategyDir := fs.String("strategy-dir", "", "Directory containing strategy WASM modules") + strategyReload := fs.Duration("strategy-reload-interval", 5*time.Second, "Interval for rescanning the strategy directory") + strategyGasLimit := fs.Uint64("strategy-gas-limit", 100000, "Per-strategy gas limit when executing on_block") + strategyTimeout := fs.Duration("strategy-timeout", 50*time.Millisecond, "Per-strategy execution timeout") + strategyParallel := fs.Int("strategy-max-parallel", 4, "Maximum number of strategies executed in parallel") if *logLevel > int(zerolog.Disabled) { *logLevel = int(zerolog.DebugLevel) @@ -94,6 +104,11 @@ func RunCLI(ctx context.Context) { RPCPort: *rpcPort, LogLevel: zerolog.Level(*logLevel), MutlichainConfig: mcDbs, + StrategyDir: *strategyDir, + StrategyReload: *strategyReload, + StrategyGasLimit: *strategyGasLimit, + StrategyTimeout: *strategyTimeout, + StrategyParallel: *strategyParallel, } Run(ctx, args, nil) @@ -196,24 +211,33 @@ func Run(ctx context.Context, args RuntimeArgs, _ chan<- int) { log.Info().Msg("Starting appchain...") var strategyManager *wasmstrategy.Manager - strategyPath := filepath.Join("build", "strategy.wasm") - if info, errStat := os.Stat(strategyPath); errStat == nil && !info.IsDir() { - manager, err := wasmstrategy.NewManager(ctx, wasmstrategy.ManagerConfig{ - Logger: &log.Logger, - DB: appchainDB, - Multichain: msa, - WasmPath: strategyPath, - AddressBook: wasmstrategy.DefaultAddressBook(), - ChainID: apptypes.ChainType(ChainID), - }) - if err != nil { - log.Warn().Err(err).Msg("Failed to initialize WASM strategy. continuing without it") - } else { - strategyManager = manager - defer strategyManager.Close(ctx) + if args.StrategyDir != "" { + if info, errStat := os.Stat(args.StrategyDir); errStat == nil && info.IsDir() { + manager, err := wasmstrategy.NewManager(ctx, wasmstrategy.ManagerConfig{ + Logger: &log.Logger, + DB: appchainDB, + Multichain: msa, + StrategyDir: args.StrategyDir, + ReloadInterval: args.StrategyReload, + AddressBook: wasmstrategy.DefaultAddressBook(), + ChainID: apptypes.ChainType(ChainID), + Limits: wasmstrategy.StrategyLimits{ + GasLimit: args.StrategyGasLimit, + Timeout: args.StrategyTimeout, + }, + MaxParallel: args.StrategyParallel, + }) + if err != nil { + log.Warn().Err(err).Msg("Failed to initialize WASM strategy manager") + } else { + strategyManager = manager + defer strategyManager.Close(ctx) + } + } else if errStat != nil { + log.Warn().Err(errStat).Str("path", args.StrategyDir).Msg("Strategy directory unavailable, skipping WASM runtime") } } else { - log.Warn().Err(errStat).Str("path", strategyPath).Msg("No WASM strategy found, skipping runtime") + log.Debug().Msg("No strategy directory configured, skipping WASM runtime") } var appchainExample gosdk.Appchain[*gosdk.BatchProcesser[application.Transaction[application.Receipt], application.Receipt], application.Transaction[application.Receipt], application.Receipt, *application.Block] diff --git a/wasmstrategy/host.go b/wasmstrategy/host.go index 9e9d6db..4104f03 100644 --- a/wasmstrategy/host.go +++ b/wasmstrategy/host.go @@ -3,18 +3,55 @@ package wasmstrategy import ( "context" "encoding/binary" + "errors" "fmt" "strings" "sync" + "time" "github.com/ledgerwatch/erigon-lib/kv" "github.com/rs/zerolog" "github.com/tetratelabs/wazero" "github.com/tetratelabs/wazero/api" + "github.com/tetratelabs/wazero/sys" "github.com/0xAtelerix/example/application" ) +const ( + gasExitCode uint32 = 0xfffffff0 + hostAbortExitCode uint32 = 0xfffffff1 + defaultGasLimit = 50_000 + costCheap uint64 = 1 + costEvent uint64 = 2 + costDbRead uint64 = 15 + costDbWrite uint64 = 40 + costLog uint64 = 5 +) + +var ( + errGasLimitExceeded = errors.New("strategy gas limit exceeded") + errNoEntryPoint = errors.New("strategy does not export on_block") +) + +var allowedEnvImports = map[string]struct{}{ + "get_block_number": {}, + "get_chain_id": {}, + "get_event_count": {}, + "get_event_kind": {}, + "get_event_address_id": {}, + "db_get_u64": {}, + "db_put_u64": {}, + "log": {}, + "abort": {}, +} + +// StrategyLimits configures execution guard rails for a single strategy. +type StrategyLimits struct { + GasLimit uint64 + Timeout time.Duration +} + // hostEnv backs the env.* host functions consumed by AssemblyScript strategies. type hostEnv struct { logger *zerolog.Logger @@ -22,15 +59,19 @@ type hostEnv struct { mu sync.RWMutex blockCtx BlockContext + gasLimit uint64 + gasUsed uint64 } func newHostEnv(logger *zerolog.Logger, db kv.RwDB) *hostEnv { return &hostEnv{logger: logger, db: db} } -func (h *hostEnv) setContext(ctx BlockContext) { +func (h *hostEnv) prepare(ctx BlockContext, gasLimit uint64) { h.mu.Lock() h.blockCtx = ctx + h.gasLimit = gasLimit + h.gasUsed = 0 h.mu.Unlock() } @@ -90,19 +131,48 @@ func (h *hostEnv) snapshot() BlockContext { return h.blockCtx } -func (h *hostEnv) getBlockNumber(_ context.Context, _ api.Module, stack []uint64) { +func (h *hostEnv) charge(ctx context.Context, mod api.Module, cost uint64, reason string) { + if cost == 0 { + return + } + + h.mu.Lock() + h.gasUsed += cost + limit := h.gasLimit + used := h.gasUsed + h.mu.Unlock() + + if limit == 0 || used <= limit { + return + } + + h.logger.Warn(). + Uint64("cost", cost). + Uint64("used", used). + Uint64("limit", limit). + Str("reason", reason). + Msg("strategy gas limit exceeded") + + h.trap(ctx, mod, gasExitCode) +} + +func (h *hostEnv) getBlockNumber(ctx context.Context, mod api.Module, stack []uint64) { + h.charge(ctx, mod, costCheap, "get_block_number") stack[0] = h.snapshot().BlockNumber } -func (h *hostEnv) getChainID(_ context.Context, _ api.Module, stack []uint64) { +func (h *hostEnv) getChainID(ctx context.Context, mod api.Module, stack []uint64) { + h.charge(ctx, mod, costCheap, "get_chain_id") stack[0] = uint64(h.snapshot().ChainID) } -func (h *hostEnv) getEventCount(_ context.Context, _ api.Module, stack []uint64) { +func (h *hostEnv) getEventCount(ctx context.Context, mod api.Module, stack []uint64) { + h.charge(ctx, mod, costCheap, "get_event_count") stack[0] = uint64(uint32(len(h.snapshot().Events))) } -func (h *hostEnv) getEventKind(_ context.Context, _ api.Module, stack []uint64) { +func (h *hostEnv) getEventKind(ctx context.Context, mod api.Module, stack []uint64) { + h.charge(ctx, mod, costEvent, "get_event_kind") idx := int32(int64(stack[0])) events := h.snapshot().Events if idx < 0 || int(idx) >= len(events) { @@ -114,7 +184,8 @@ func (h *hostEnv) getEventKind(_ context.Context, _ api.Module, stack []uint64) stack[0] = uint64(uint32(events[idx].Kind)) } -func (h *hostEnv) getEventAddressID(_ context.Context, _ api.Module, stack []uint64) { +func (h *hostEnv) getEventAddressID(ctx context.Context, mod api.Module, stack []uint64) { + h.charge(ctx, mod, costEvent, "get_event_address_id") idx := int32(int64(stack[0])) events := h.snapshot().Events if idx < 0 || int(idx) >= len(events) { @@ -133,7 +204,8 @@ func (h *hostEnv) slotKey(slot int32) []byte { return k[:] } -func (h *hostEnv) dbGetU64(ctx context.Context, _ api.Module, stack []uint64) { +func (h *hostEnv) dbGetU64(ctx context.Context, mod api.Module, stack []uint64) { + h.charge(ctx, mod, costDbRead, "db_get_u64") slot := int32(int64(stack[0])) key := h.slotKey(slot) @@ -160,7 +232,8 @@ func (h *hostEnv) dbGetU64(ctx context.Context, _ api.Module, stack []uint64) { stack[0] = val } -func (h *hostEnv) dbPutU64(ctx context.Context, _ api.Module, stack []uint64) { +func (h *hostEnv) dbPutU64(ctx context.Context, mod api.Module, stack []uint64) { + h.charge(ctx, mod, costDbWrite, "db_put_u64") slot := int32(int64(stack[0])) value := stack[1] key := h.slotKey(slot) @@ -175,7 +248,8 @@ func (h *hostEnv) dbPutU64(ctx context.Context, _ api.Module, stack []uint64) { } } -func (h *hostEnv) log(_ context.Context, mod api.Module, stack []uint64) { +func (h *hostEnv) log(ctx context.Context, mod api.Module, stack []uint64) { + h.charge(ctx, mod, costLog, "log") level := int32(int64(stack[0])) ptr := uint32(stack[1]) length := uint32(stack[2]) @@ -208,7 +282,7 @@ func (h *hostEnv) log(_ context.Context, mod api.Module, stack []uint64) { } } -func (h *hostEnv) abort(_ context.Context, mod api.Module, stack []uint64) { +func (h *hostEnv) abort(ctx context.Context, mod api.Module, stack []uint64) { messagePtr := uint32(stack[0]) filePtr := uint32(stack[1]) line := uint32(stack[2]) @@ -221,7 +295,12 @@ func (h *hostEnv) abort(_ context.Context, mod api.Module, stack []uint64) { Uint32("column", column). Msg("wasm abort invoked") - panic(fmt.Sprintf("wasm abort (%d:%d)", line, column)) + h.trap(ctx, mod, hostAbortExitCode) +} + +func (h *hostEnv) trap(ctx context.Context, mod api.Module, code uint32) { + _ = mod.CloseWithExitCode(ctx, code) + panic(sys.NewExitError(code)) } // StrategyModule wraps a single WASM module instance plus host wiring. @@ -230,6 +309,7 @@ type StrategyModule struct { runtime wazero.Runtime module api.Module host *hostEnv + limits StrategyLimits } // ModuleConfig configures a StrategyModule instance. @@ -238,6 +318,7 @@ type ModuleConfig struct { Wasm []byte DB kv.RwDB Logger *zerolog.Logger + Limits StrategyLimits } // NewStrategyModule compiles and instantiates the WASM strategy and host functions. @@ -263,16 +344,27 @@ func NewStrategyModule(ctx context.Context, cfg ModuleConfig) (*StrategyModule, return nil, fmt.Errorf("compile strategy wasm: %w", err) } - module, err := runtime.InstantiateModule(ctx, compiled, wazero.NewModuleConfig()) + if err := validateModule(compiled); err != nil { + return nil, err + } + + moduleConfig := wazero.NewModuleConfig().WithName(fmt.Sprintf("strategy-%s", cfg.ID)) + module, err := runtime.InstantiateModule(ctx, compiled, moduleConfig) if err != nil { return nil, fmt.Errorf("instantiate strategy wasm: %w", err) } + limits := cfg.Limits + if limits.GasLimit == 0 { + limits.GasLimit = defaultGasLimit + } + return &StrategyModule{ id: cfg.ID, runtime: runtime, module: module, host: host, + limits: limits, }, nil } @@ -291,14 +383,44 @@ func (s *StrategyModule) Close(ctx context.Context) error { // OnBlock invokes the exported on_block handler with the provided context. func (s *StrategyModule) OnBlock(ctx context.Context, block BlockContext) error { - s.host.setContext(block) + s.host.prepare(block, s.limits.GasLimit) + + execCtx := ctx + cancel := func() {} + if s.limits.Timeout > 0 { + execCtx, cancel = context.WithTimeout(ctx, s.limits.Timeout) + } + defer cancel() exported := s.module.ExportedFunction("on_block") if exported == nil { - return fmt.Errorf("strategy %s does not export on_block", s.id) + return fmt.Errorf("%s: %w", s.id, errNoEntryPoint) + } + + _, err := exported.Call(execCtx) + if err == nil { + return nil + } + + var exitErr *sys.ExitError + if errors.As(err, &exitErr) { + switch exitErr.ExitCode() { + case gasExitCode: + return errGasLimitExceeded + case hostAbortExitCode: + return fmt.Errorf("strategy %s aborted execution", s.id) + case sys.ExitCodeDeadlineExceeded: + return fmt.Errorf("strategy %s timed out: %w", s.id, context.DeadlineExceeded) + case sys.ExitCodeContextCanceled: + return fmt.Errorf("strategy %s canceled: %w", s.id, context.Canceled) + default: + return exitErr + } } - _, err := exported.Call(ctx) + if errors.Is(err, context.DeadlineExceeded) { + return fmt.Errorf("strategy %s timed out: %w", s.id, err) + } return err } @@ -306,3 +428,26 @@ func (s *StrategyModule) OnBlock(ctx context.Context, block BlockContext) error func normalizeAddress(addr string) string { return strings.ToLower(addr) } + +func validateModule(compiled wazero.CompiledModule) error { + for _, fn := range compiled.ImportedFunctions() { + if _, _, imported := fn.Import(); !imported { + continue + } + + moduleName, funcName, _ := fn.Import() + if moduleName != "env" { + return fmt.Errorf("imports from module %q are not allowed", moduleName) + } + + if _, ok := allowedEnvImports[funcName]; !ok { + return fmt.Errorf("import env.%s is not allowed", funcName) + } + } + + if mems := compiled.ImportedMemories(); len(mems) > 0 { + return fmt.Errorf("imported memories are not supported") + } + + return nil +} diff --git a/wasmstrategy/host_test.go b/wasmstrategy/host_test.go index 2a7c442..1cd7abe 100644 --- a/wasmstrategy/host_test.go +++ b/wasmstrategy/host_test.go @@ -9,12 +9,14 @@ import ( "path/filepath" "strings" "testing" + "time" "github.com/ledgerwatch/erigon-lib/kv" "github.com/ledgerwatch/erigon-lib/kv/mdbx" mdbxlog "github.com/ledgerwatch/log/v3" "github.com/rs/zerolog" "github.com/stretchr/testify/require" + "github.com/tetratelabs/wazero" "github.com/0xAtelerix/example/application" ) @@ -22,6 +24,12 @@ import ( //go:embed testdata/uniswap_strategy.wasm var testStrategyWasm []byte +//go:embed testdata/forbidden_import.wasm +var testForbiddenImport []byte + +//go:embed testdata/memory_import.wasm +var testMemoryImport []byte + func newTestDB(t *testing.T) kv.RwDB { t.Helper() @@ -91,6 +99,10 @@ func newStrategyModule(t *testing.T, ctx context.Context, db kv.RwDB, sink *byte Wasm: testStrategyWasm, DB: db, Logger: &logger, + Limits: StrategyLimits{ + GasLimit: 5000, + Timeout: time.Second, + }, }) require.NoError(t, err) @@ -120,7 +132,7 @@ func TestStrategyModuleWritesSlotOnEvent(t *testing.T) { value := readSlot(t, ctx, db, SlotLastUniTransferBlock) require.Equal(t, blockNumber, value, "strategy should persist last observed block number") - require.Contains(t, logs.String(), "Uniswap ERC20 transfer detected", "expected info log from WASM strategy") + require.Contains(t, logs.String(), "Uniswap transfer at block", "expected info log from WASM strategy") } func TestStrategyModuleLogsStaleTransferWarning(t *testing.T) { @@ -143,6 +155,49 @@ func TestStrategyModuleLogsStaleTransferWarning(t *testing.T) { value := readSlot(t, ctx, db, SlotLastUniTransferBlock) require.Equal(t, lastSeen, value, "slot should remain unchanged when no transfer is seen") - expected := fmt.Sprintf("No Uniswap transfers observed since block %d", lastSeen) + expected := fmt.Sprintf("No Uniswap transfers since block %d", lastSeen) require.Truef(t, strings.Contains(logs.String(), expected), "expected stale warning log containing %q", expected) } + +func TestStrategyModuleGasLimitExceeded(t *testing.T) { + ctx := context.Background() + db := newTestDB(t) + var logs bytes.Buffer + + module := newStrategyModule(t, ctx, db, &logs) + module.limits = StrategyLimits{GasLimit: 1, Timeout: time.Second} + + err := module.OnBlock(ctx, BlockContext{ + BlockNumber: 1, + ChainID: 42, + }) + require.ErrorIs(t, err, errGasLimitExceeded) +} + +func TestValidateModuleRejectsUnknownImports(t *testing.T) { + ctx := context.Background() + rt := wazero.NewRuntime(ctx) + t.Cleanup(func() { + _ = rt.Close(ctx) + }) + + module, err := rt.CompileModule(ctx, testForbiddenImport) + require.NoError(t, err) + + err = validateModule(module) + require.EqualError(t, err, "import env.evil is not allowed") +} + +func TestValidateModuleRejectsMemoryImports(t *testing.T) { + ctx := context.Background() + rt := wazero.NewRuntime(ctx) + t.Cleanup(func() { + _ = rt.Close(ctx) + }) + + module, err := rt.CompileModule(ctx, testMemoryImport) + require.NoError(t, err) + + err = validateModule(module) + require.EqualError(t, err, "imported memories are not supported") +} diff --git a/wasmstrategy/manager.go b/wasmstrategy/manager.go index f70ed7e..0a977ca 100644 --- a/wasmstrategy/manager.go +++ b/wasmstrategy/manager.go @@ -2,8 +2,14 @@ package wasmstrategy import ( "context" + "crypto/sha256" + "encoding/hex" "fmt" "os" + "path/filepath" + "strings" + "sync" + "time" "github.com/ledgerwatch/erigon-lib/kv" "github.com/rs/zerolog" @@ -15,24 +21,41 @@ import ( // ManagerConfig wires the WASM runtime into the Pelagos example node. type ManagerConfig struct { - Logger *zerolog.Logger - DB kv.RwDB - Multichain *gosdk.MultichainStateAccess - WasmPath string - AddressBook map[string]AddressID - ChainID apptypes.ChainType + Logger *zerolog.Logger + DB kv.RwDB + Multichain *gosdk.MultichainStateAccess + StrategyDir string + ReloadInterval time.Duration + AddressBook map[string]AddressID + ChainID apptypes.ChainType + Limits StrategyLimits + MaxParallel int +} + +type strategyHandle struct { + module *StrategyModule + hash string } // Manager owns the wasm modules and implements gosdk.BlockObserver. type Manager struct { logger *zerolog.Logger - modules []*StrategyModule multichain *gosdk.MultichainStateAccess addressBook map[string]AddressID chainID apptypes.ChainType + + strategyDir string + reloadInterval time.Duration + limits StrategyLimits + maxParallel int + + mu sync.RWMutex + modules map[string]*strategyHandle + lastReload time.Time + db kv.RwDB } -// NewManager loads the AssemblyScript-built WASM artifact from disk. +// NewManager loads all WASM artifacts available in StrategyDir and keeps them in sync. func NewManager(ctx context.Context, cfg ManagerConfig) (*Manager, error) { if cfg.Multichain == nil { return nil, fmt.Errorf("multichain state access is required") @@ -42,50 +65,73 @@ func NewManager(ctx context.Context, cfg ManagerConfig) (*Manager, error) { return nil, fmt.Errorf("database handle is required") } + if cfg.StrategyDir == "" { + return nil, fmt.Errorf("strategy directory must be set") + } + + if _, err := os.Stat(cfg.StrategyDir); err != nil { + return nil, fmt.Errorf("strategy dir %q: %w", cfg.StrategyDir, err) + } + logger := cfg.Logger if logger == nil { logger = zerolog.Ctx(ctx) } - wasmBytes, err := os.ReadFile(cfg.WasmPath) - if err != nil { - return nil, fmt.Errorf("read wasm %q: %w", cfg.WasmPath, err) + manager := &Manager{ + logger: logger, + multichain: cfg.Multichain, + addressBook: normalizeAddressBook(cfg.AddressBook), + chainID: cfg.ChainID, + strategyDir: cfg.StrategyDir, + reloadInterval: cfg.ReloadInterval, + limits: cfg.Limits, + maxParallel: cfg.MaxParallel, + modules: make(map[string]*strategyHandle), + db: cfg.DB, } - module, err := NewStrategyModule(ctx, ModuleConfig{ - ID: "strategy-uniswap-monitor", - Wasm: wasmBytes, - DB: cfg.DB, - Logger: logger, - }) - if err != nil { - return nil, err + if manager.maxParallel <= 0 { + manager.maxParallel = 4 + } + + if manager.reloadInterval <= 0 { + manager.reloadInterval = 5 * time.Second } - book := cfg.AddressBook - if book == nil { - book = make(map[string]AddressID) + if manager.limits.GasLimit == 0 { + manager.limits.GasLimit = defaultGasLimit } - return &Manager{ - logger: logger, - modules: []*StrategyModule{module}, - multichain: cfg.Multichain, - addressBook: book, - chainID: cfg.ChainID, - }, nil + if err := manager.reload(ctx); err != nil { + return nil, err + } + + return manager, nil } // Close frees all wasm runtimes. func (m *Manager) Close(ctx context.Context) { - for _, module := range m.modules { - _ = module.Close(ctx) + m.mu.Lock() + defer m.mu.Unlock() + + for id, handle := range m.modules { + if handle.module != nil { + _ = handle.module.Close(ctx) + } + + delete(m.modules, id) } } // OnBlock wires events and context into all configured WASM strategies. func (m *Manager) OnBlock(ctx context.Context, meta gosdk.BlockObserverContext[application.Transaction[application.Receipt], application.Receipt]) error { - if len(m.modules) == 0 { + if err := m.maybeReload(ctx); err != nil { + m.logger.Error().Err(err).Msg("failed to reload strategies") + } + + modules := m.snapshotModules() + if len(modules) == 0 { return nil } @@ -95,21 +141,125 @@ func (m *Manager) OnBlock(ctx context.Context, meta gosdk.BlockObserverContext[a Events: m.collectEvents(ctx, meta.Batch.ExternalBlocks), } - for _, module := range m.modules { - if err := module.OnBlock(ctx, blockCtx); err != nil { - m.logger.Error().Err(err).Msg("strategy execution failed") - } + sem := make(chan struct{}, m.maxParallel) + var wg sync.WaitGroup + + for _, handle := range modules { + wg.Add(1) + go func(h *strategyHandle) { + defer wg.Done() + + sem <- struct{}{} + defer func() { <-sem }() + + if err := h.module.OnBlock(ctx, blockCtx); err != nil { + m.logger.Error().Err(err).Str("strategy", h.module.id).Msg("strategy execution failed") + } + }(handle) } + wg.Wait() + return nil } -func (m *Manager) collectEvents(ctx context.Context, blocks []*apptypes.ExternalBlock) []StrategyEvent { - if len(blocks) == 0 { +func (m *Manager) maybeReload(ctx context.Context) error { + m.mu.RLock() + last := m.lastReload + m.mu.RUnlock() + + if time.Since(last) < m.reloadInterval { return nil } - if len(m.addressBook) == 0 { + return m.reload(ctx) +} + +func (m *Manager) reload(ctx context.Context) error { + entries, err := os.ReadDir(m.strategyDir) + if err != nil { + return fmt.Errorf("scan strategies dir: %w", err) + } + + newHandles := make(map[string]*strategyHandle, len(entries)) + seen := make(map[string]struct{}, len(entries)) + + for _, entry := range entries { + if entry.IsDir() || filepath.Ext(entry.Name()) != ".wasm" { + continue + } + + id := strings.TrimSuffix(entry.Name(), filepath.Ext(entry.Name())) + path := filepath.Join(m.strategyDir, entry.Name()) + + bytes, readErr := os.ReadFile(path) + if readErr != nil { + m.logger.Error().Err(readErr).Str("strategy", id).Msg("failed to read strategy file") + continue + } + + hash := sha256.Sum256(bytes) + hashHex := hex.EncodeToString(hash[:]) + seen[id] = struct{}{} + + if existing, ok := m.modules[id]; ok && existing.hash == hashHex { + newHandles[id] = existing + continue + } + + module, instErr := NewStrategyModule(ctx, ModuleConfig{ + ID: id, + Wasm: bytes, + DB: m.db, + Logger: m.logger, + Limits: m.limits, + }) + if instErr != nil { + m.logger.Error().Err(instErr).Str("strategy", id).Msg("failed to instantiate strategy") + continue + } + + newHandles[id] = &strategyHandle{ + module: module, + hash: hashHex, + } + + m.logger.Info().Str("strategy", id).Msg("loaded strategy module") + } + + m.mu.Lock() + defer m.mu.Unlock() + + for id, handle := range m.modules { + if _, ok := seen[id]; ok && newHandles[id] == handle { + continue + } + + if handle.module != nil { + _ = handle.module.Close(ctx) + } + } + + m.modules = newHandles + m.lastReload = time.Now() + + return nil +} + +func (m *Manager) snapshotModules() []*strategyHandle { + m.mu.RLock() + defer m.mu.RUnlock() + + out := make([]*strategyHandle, 0, len(m.modules)) + for _, handle := range m.modules { + out = append(out, handle) + } + + return out +} + +func (m *Manager) collectEvents(ctx context.Context, blocks []*apptypes.ExternalBlock) []StrategyEvent { + if len(blocks) == 0 || len(m.addressBook) == 0 { return nil } @@ -149,3 +299,16 @@ func (m *Manager) collectEvents(ctx context.Context, blocks []*apptypes.External return events } + +func normalizeAddressBook(in map[string]AddressID) map[string]AddressID { + if len(in) == 0 { + return in + } + + out := make(map[string]AddressID, len(in)) + for k, v := range in { + out[normalizeAddress(k)] = v + } + + return out +} diff --git a/wasmstrategy/testdata/as/forbidden_import.ts b/wasmstrategy/testdata/as/forbidden_import.ts new file mode 100644 index 0000000..8d45f5a --- /dev/null +++ b/wasmstrategy/testdata/as/forbidden_import.ts @@ -0,0 +1,6 @@ +@external("env", "evil") +declare function evil(): void; + +export function on_block(): void { + evil(); +} diff --git a/wasmstrategy/testdata/as/memory_import.ts b/wasmstrategy/testdata/as/memory_import.ts new file mode 100644 index 0000000..c6296fb --- /dev/null +++ b/wasmstrategy/testdata/as/memory_import.ts @@ -0,0 +1 @@ +export function on_block(): void {} diff --git a/wasmstrategy/testdata/as/uniswap_strategy.ts b/wasmstrategy/testdata/as/uniswap_strategy.ts new file mode 100644 index 0000000..24d114f --- /dev/null +++ b/wasmstrategy/testdata/as/uniswap_strategy.ts @@ -0,0 +1,67 @@ +@external("env", "get_block_number") +declare function hostGetBlockNumber(): i64; + +@external("env", "get_event_count") +declare function hostGetEventCount(): i32; + +@external("env", "get_event_kind") +declare function hostGetEventKind(index: i32): i32; + +@external("env", "get_event_address_id") +declare function hostGetEventAddressId(index: i32): i32; + +@external("env", "db_get_u64") +declare function hostDbGet(slot: i32): i64; + +@external("env", "db_put_u64") +declare function hostDbPut(slot: i32, value: i64): void; + +@external("env", "log") +declare function hostLog(level: i32, ptr: usize, len: i32): void; + +const SLOT_LAST_UNI_TRANSFER_BLOCK: i32 = 1; + +enum EventKind { + Transfer = 1, +} + +enum AddressId { + UniswapV2Pair = 1, + UniswapV3Pool = 2, +} + +function logInfo(message: string): void { + const encoded = String.UTF8.encode(message); + hostLog(20, changetype(encoded), encoded.byteLength); +} + +function hasUniswapTransfer(events: i32): bool { + for (let index = 0; index < events; index++) { + if (hostGetEventKind(index) != EventKind.Transfer) { + continue; + } + + const address = hostGetEventAddressId(index); + if (address == AddressId.UniswapV2Pair || address == AddressId.UniswapV3Pool) { + return true; + } + } + + return false; +} + +export function on_block(): void { + const block = hostGetBlockNumber(); + const events = hostGetEventCount(); + + if (hasUniswapTransfer(events)) { + hostDbPut(SLOT_LAST_UNI_TRANSFER_BLOCK, block); + logInfo("Uniswap transfer at block " + block.toString()); + return; + } + + const lastSeen = hostDbGet(SLOT_LAST_UNI_TRANSFER_BLOCK); + if (lastSeen > 0) { + logInfo("No Uniswap transfers since block " + lastSeen.toString()); + } +} diff --git a/wasmstrategy/testdata/forbidden_import.wasm b/wasmstrategy/testdata/forbidden_import.wasm new file mode 100644 index 0000000000000000000000000000000000000000..705b2e0a9ed76718d4d42083b604506954dfa704 GIT binary patch literal 68 zcmWN`I|_g>5Cy>ZHft2MuU|?oqWMCI%;>gd7Ps+(p&SqeM N%W<(XGBYr80|0)N3)=ty literal 0 HcmV?d00001 diff --git a/wasmstrategy/testdata/uniswap_strategy.wasm b/wasmstrategy/testdata/uniswap_strategy.wasm index 7178724c0e831d46b179fdda4b407dd20b4971a4..bcdc62d17d1bf332d40182a683068fdb3ee53e1d 100644 GIT binary patch delta 433 zcmYk0&nrYx6vxlGcjmqO=FH5Uf18$?Y++?H3E7b#>hZ^qcZuGnc1cM z+#=L?%k#_m!ny*3wQ0 zzsuYb;dG^W^CvTh>0ekx2_#a5CQQzh#B>91yDigc&%!En9nxs!fgyKidfc~Z`T>=D BT&e&7 delta 618 zcmY*VziSjh6#m}K-tOGa{TMAYge1&eQiw!7#ZvJmVi4>kQA9!RawnR*+=k0x5Pxhd zC@8FjNCfRt3O0iG2L!Q5B^D{t2!a;rJ&12NHWqK*d*Amx-uveBp-+cj{)jz~RzW#Q zAm68$9CR=#Rn)uP>_$r+l>C+UO4NTKkQ+3b-8Ml{xj+hLS7Ka*3zS+$|FW;5vo$JGuuw^HV9 zuyABpI2h(6c!*Or6sVfoC~BI8+e|K~W9BP2?DKn3-%J-QB Date: Fri, 21 Nov 2025 23:21:15 +0100 Subject: [PATCH 3/3] gas meter --- Makefile | 12 ++- cmd/main.go | 2 +- go.mod | 2 +- go.sum | 2 + wasmstrategy/host.go | 81 +++++++++------ wasmstrategy/host_test.go | 109 +++++++++++++++++++- wasmstrategy/listener.go | 40 +++++++ wasmstrategy/manager.go | 6 ++ wasmstrategy/testdata/as/instruction_gas.ts | 7 ++ wasmstrategy/testdata/as/memory_grow.ts | 7 ++ wasmstrategy/testdata/instruction_gas.wasm | Bin 0 -> 87 bytes wasmstrategy/testdata/memory_grow.wasm | Bin 0 -> 78 bytes 12 files changed, 228 insertions(+), 40 deletions(-) create mode 100644 wasmstrategy/listener.go create mode 100644 wasmstrategy/testdata/as/instruction_gas.ts create mode 100644 wasmstrategy/testdata/as/memory_grow.ts create mode 100644 wasmstrategy/testdata/instruction_gas.wasm create mode 100644 wasmstrategy/testdata/memory_grow.wasm diff --git a/Makefile b/Makefile index 678ceb8..62b3820 100644 --- a/Makefile +++ b/Makefile @@ -3,7 +3,9 @@ ASC=npx --prefix as asc WASM_TESTDATA_DIR=wasmstrategy/testdata WASM_TESTDATA=$(WASM_TESTDATA_DIR)/uniswap_strategy.wasm \ $(WASM_TESTDATA_DIR)/forbidden_import.wasm \ - $(WASM_TESTDATA_DIR)/memory_import.wasm + $(WASM_TESTDATA_DIR)/memory_import.wasm \ + $(WASM_TESTDATA_DIR)/instruction_gas.wasm \ + $(WASM_TESTDATA_DIR)/memory_grow.wasm run: go run cmd/main.go \ @@ -52,7 +54,13 @@ $(WASM_TESTDATA_DIR)/forbidden_import.wasm: $(WASM_TESTDATA_DIR)/as/forbidden_im $(ASC) $< --runtime stub --optimize --shrinkLevel 2 --noAssert -o $@ $(WASM_TESTDATA_DIR)/memory_import.wasm: $(WASM_TESTDATA_DIR)/as/memory_import.ts - $(ASC) $< --runtime stub --optimize --shrinkLevel 2 --noAssert --importMemory -o $@ + $(ASC) $< --runtime stub --optimize --shrinkLevel 2 --noAssert -o $@ + +$(WASM_TESTDATA_DIR)/instruction_gas.wasm: $(WASM_TESTDATA_DIR)/as/instruction_gas.ts + $(ASC) $< --runtime stub --optimize --shrinkLevel 2 --noAssert -o $@ + +$(WASM_TESTDATA_DIR)/memory_grow.wasm: $(WASM_TESTDATA_DIR)/as/memory_grow.ts + $(ASC) $< --runtime stub --optimize --shrinkLevel 2 --noAssert -o $@ tests: go test -short -timeout 20m -failfast -shuffle=on -v ./... $(params) diff --git a/cmd/main.go b/cmd/main.go index e9c8aa7..4db1375 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -67,7 +67,7 @@ func RunCLI(ctx context.Context) { rpcPort := fs.String("rpc-port", ":8080", "Port for the JSON-RPC server") multichainConfigJSON := fs.String("multichain-config", "", "Multichain config JSON path") logLevel := fs.Int("log-level", int(zerolog.InfoLevel), "Logging level") - strategyDir := fs.String("strategy-dir", "", "Directory containing strategy WASM modules") + strategyDir := fs.String("strategy-dir", "./build", "Directory containing strategy WASM modules") strategyReload := fs.Duration("strategy-reload-interval", 5*time.Second, "Interval for rescanning the strategy directory") strategyGasLimit := fs.Uint64("strategy-gas-limit", 100000, "Per-strategy gas limit when executing on_block") strategyTimeout := fs.Duration("strategy-timeout", 50*time.Millisecond, "Per-strategy execution timeout") diff --git a/go.mod b/go.mod index 6db47ff..15ff614 100644 --- a/go.mod +++ b/go.mod @@ -12,7 +12,7 @@ require ( github.com/ledgerwatch/log/v3 v3.9.0 github.com/rs/zerolog v1.34.0 github.com/stretchr/testify v1.11.1 - github.com/tetratelabs/wazero v1.8.1 + github.com/tetratelabs/wazero v1.10.1 ) require ( diff --git a/go.sum b/go.sum index 735fda3..8a5b64d 100644 --- a/go.sum +++ b/go.sum @@ -144,6 +144,8 @@ github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U= github.com/supranational/blst v0.3.16 h1:bTDadT+3fK497EvLdWRQEjiGnUtzJ7jjIUMF0jqwYhE= github.com/supranational/blst v0.3.16/go.mod h1:jZJtfjgudtNl4en1tzwPIV3KjUnQUvG3/j+w+fVonLw= +github.com/tetratelabs/wazero v1.10.1 h1:2DugeJf6VVk58KTPszlNfeeN8AhhpwcZqkJj2wwFuH8= +github.com/tetratelabs/wazero v1.10.1/go.mod h1:DRm5twOQ5Gr1AoEdSi0CLjDQF1J9ZAuyqFIjl1KKfQU= github.com/tklauser/go-sysconf v0.3.12 h1:0QaGUFOdQaIVdPgfITYzaTegZvdCjmYO52cSFAEVmqU= github.com/tklauser/go-sysconf v0.3.12/go.mod h1:Ho14jnntGE1fpdOqQEEaiKRpvIavV0hSfmBq8nJbHYI= github.com/tklauser/numcpus v0.6.1 h1:ng9scYS7az0Bk4OZLvrNXNSAO2Pxr1XXRAPyjhIx+Fk= diff --git a/wasmstrategy/host.go b/wasmstrategy/host.go index 4104f03..eb715ee 100644 --- a/wasmstrategy/host.go +++ b/wasmstrategy/host.go @@ -13,20 +13,22 @@ import ( "github.com/rs/zerolog" "github.com/tetratelabs/wazero" "github.com/tetratelabs/wazero/api" + "github.com/tetratelabs/wazero/experimental" "github.com/tetratelabs/wazero/sys" "github.com/0xAtelerix/example/application" ) const ( - gasExitCode uint32 = 0xfffffff0 - hostAbortExitCode uint32 = 0xfffffff1 - defaultGasLimit = 50_000 - costCheap uint64 = 1 - costEvent uint64 = 2 - costDbRead uint64 = 15 - costDbWrite uint64 = 40 - costLog uint64 = 5 + gasExitCode uint32 = 0xfffffff0 + hostAbortExitCode uint32 = 0xfffffff1 + defaultGasLimit = 50_000 + defaultFunctionCost = 50 + costCheap uint64 = 1 + costEvent uint64 = 2 + costDbRead uint64 = 15 + costDbWrite uint64 = 40 + costLog uint64 = 5 ) var ( @@ -44,12 +46,14 @@ var allowedEnvImports = map[string]struct{}{ "db_put_u64": {}, "log": {}, "abort": {}, + "gas": {}, } // StrategyLimits configures execution guard rails for a single strategy. type StrategyLimits struct { - GasLimit uint64 - Timeout time.Duration + GasLimit uint64 + Timeout time.Duration + FunctionCallCost uint64 } // hostEnv backs the env.* host functions consumed by AssemblyScript strategies. @@ -57,10 +61,10 @@ type hostEnv struct { logger *zerolog.Logger db kv.RwDB - mu sync.RWMutex - blockCtx BlockContext - gasLimit uint64 - gasUsed uint64 + mu sync.RWMutex + blockCtx BlockContext + gasLimit uint64 + gasRemaining uint64 } func newHostEnv(logger *zerolog.Logger, db kv.RwDB) *hostEnv { @@ -71,7 +75,7 @@ func (h *hostEnv) prepare(ctx BlockContext, gasLimit uint64) { h.mu.Lock() h.blockCtx = ctx h.gasLimit = gasLimit - h.gasUsed = 0 + h.gasRemaining = gasLimit h.mu.Unlock() } @@ -110,6 +114,10 @@ func (h *hostEnv) register(ctx context.Context, runtime wazero.Runtime) error { WithGoModuleFunction(api.GoModuleFunc(h.log), []api.ValueType{api.ValueTypeI32, api.ValueTypeI32, api.ValueTypeI32}, []api.ValueType{}). Export("log") + builder.NewFunctionBuilder(). + WithGoModuleFunction(api.GoModuleFunc(h.gas), []api.ValueType{api.ValueTypeI64}, []api.ValueType{}). + Export("gas") + builder.NewFunctionBuilder(). WithGoModuleFunction(api.GoModuleFunc(h.abort), []api.ValueType{ api.ValueTypeI32, @@ -132,24 +140,22 @@ func (h *hostEnv) snapshot() BlockContext { } func (h *hostEnv) charge(ctx context.Context, mod api.Module, cost uint64, reason string) { - if cost == 0 { + if cost == 0 || h.gasLimit == 0 { return } h.mu.Lock() - h.gasUsed += cost - limit := h.gasLimit - used := h.gasUsed - h.mu.Unlock() - - if limit == 0 || used <= limit { + if h.gasRemaining >= cost { + h.gasRemaining -= cost + h.mu.Unlock() return } + h.gasRemaining = 0 + h.mu.Unlock() h.logger.Warn(). Uint64("cost", cost). - Uint64("used", used). - Uint64("limit", limit). + Uint64("limit", h.gasLimit). Str("reason", reason). Msg("strategy gas limit exceeded") @@ -303,6 +309,11 @@ func (h *hostEnv) trap(ctx context.Context, mod api.Module, code uint32) { panic(sys.NewExitError(code)) } +func (h *hostEnv) gas(ctx context.Context, mod api.Module, stack []uint64) { + cost := uint64(api.DecodeU32(stack[0])) + h.charge(ctx, mod, cost, "guest") +} + // StrategyModule wraps a single WASM module instance plus host wiring. type StrategyModule struct { id string @@ -339,7 +350,20 @@ func NewStrategyModule(ctx context.Context, cfg ModuleConfig) (*StrategyModule, return nil, fmt.Errorf("register host env: %w", err) } - compiled, err := runtime.CompileModule(ctx, cfg.Wasm) + limits := cfg.Limits + if limits.GasLimit == 0 { + limits.GasLimit = defaultGasLimit + } + if limits.FunctionCallCost == 0 { + limits.FunctionCallCost = defaultFunctionCost + } + + listenerCtx := experimental.WithFunctionListenerFactory(ctx, &gasListenerFactory{ + host: host, + cost: limits.FunctionCallCost, + }) + + compiled, err := runtime.CompileModule(listenerCtx, cfg.Wasm) if err != nil { return nil, fmt.Errorf("compile strategy wasm: %w", err) } @@ -349,16 +373,11 @@ func NewStrategyModule(ctx context.Context, cfg ModuleConfig) (*StrategyModule, } moduleConfig := wazero.NewModuleConfig().WithName(fmt.Sprintf("strategy-%s", cfg.ID)) - module, err := runtime.InstantiateModule(ctx, compiled, moduleConfig) + module, err := runtime.InstantiateModule(listenerCtx, compiled, moduleConfig) if err != nil { return nil, fmt.Errorf("instantiate strategy wasm: %w", err) } - limits := cfg.Limits - if limits.GasLimit == 0 { - limits.GasLimit = defaultGasLimit - } - return &StrategyModule{ id: cfg.ID, runtime: runtime, diff --git a/wasmstrategy/host_test.go b/wasmstrategy/host_test.go index 1cd7abe..49d017b 100644 --- a/wasmstrategy/host_test.go +++ b/wasmstrategy/host_test.go @@ -30,6 +30,12 @@ var testForbiddenImport []byte //go:embed testdata/memory_import.wasm var testMemoryImport []byte +//go:embed testdata/instruction_gas.wasm +var instructionGasTestWasm []byte + +//go:embed testdata/memory_grow.wasm +var memoryGrowGasTestWasm []byte + func newTestDB(t *testing.T) kv.RwDB { t.Helper() @@ -90,8 +96,6 @@ func writeSlot(t *testing.T, ctx context.Context, db kv.RwDB, slot int32, value } func newStrategyModule(t *testing.T, ctx context.Context, db kv.RwDB, sink *bytes.Buffer) *StrategyModule { - t.Helper() - logger := zerolog.New(sink).Level(zerolog.DebugLevel) module, err := NewStrategyModule(ctx, ModuleConfig{ @@ -100,8 +104,9 @@ func newStrategyModule(t *testing.T, ctx context.Context, db kv.RwDB, sink *byte DB: db, Logger: &logger, Limits: StrategyLimits{ - GasLimit: 5000, - Timeout: time.Second, + GasLimit: 5000, + Timeout: time.Second, + FunctionCallCost: defaultFunctionCost, }, }) require.NoError(t, err) @@ -113,6 +118,26 @@ func newStrategyModule(t *testing.T, ctx context.Context, db kv.RwDB, sink *byte return module } +func newStrategyModuleFromBytes(t *testing.T, ctx context.Context, db kv.RwDB, wasmBytes []byte, limits StrategyLimits) *StrategyModule { + var sink bytes.Buffer + logger := zerolog.New(&sink).Level(zerolog.DebugLevel) + + module, err := NewStrategyModule(ctx, ModuleConfig{ + ID: "test-raw", + Wasm: wasmBytes, + DB: db, + Logger: &logger, + Limits: limits, + }) + require.NoError(t, err) + + t.Cleanup(func() { + require.NoError(t, module.Close(ctx)) + }) + + return module +} + func TestStrategyModuleWritesSlotOnEvent(t *testing.T) { ctx := context.Background() db := newTestDB(t) @@ -165,7 +190,11 @@ func TestStrategyModuleGasLimitExceeded(t *testing.T) { var logs bytes.Buffer module := newStrategyModule(t, ctx, db, &logs) - module.limits = StrategyLimits{GasLimit: 1, Timeout: time.Second} + module.limits = StrategyLimits{ + GasLimit: 10, + Timeout: time.Second, + FunctionCallCost: 20, + } err := module.OnBlock(ctx, BlockContext{ BlockNumber: 1, @@ -201,3 +230,73 @@ func TestValidateModuleRejectsMemoryImports(t *testing.T) { err = validateModule(module) require.EqualError(t, err, "imported memories are not supported") } + +func TestGasExecutionDeterministic(t *testing.T) { + ctx := context.Background() + db := newTestDB(t) + + module := newStrategyModuleFromBytes(t, ctx, db, instructionGasTestWasm, StrategyLimits{ + GasLimit: 60, + Timeout: time.Second, + FunctionCallCost: 1, + }) + + block := BlockContext{BlockNumber: 1, ChainID: 42} + + require.NoError(t, module.OnBlock(ctx, block)) + require.NoError(t, module.OnBlock(ctx, block)) +} + +func TestGasLimitExceededOnInstructionCalls(t *testing.T) { + ctx := context.Background() + db := newTestDB(t) + + module := newStrategyModuleFromBytes(t, ctx, db, instructionGasTestWasm, StrategyLimits{ + GasLimit: 20, + Timeout: time.Second, + FunctionCallCost: 1, + }) + + err := module.OnBlock(ctx, BlockContext{BlockNumber: 1, ChainID: 42}) + require.ErrorIs(t, err, errGasLimitExceeded) +} + +func TestGasLimitAllowsExactUsage(t *testing.T) { + ctx := context.Background() + db := newTestDB(t) + + module := newStrategyModuleFromBytes(t, ctx, db, instructionGasTestWasm, StrategyLimits{ + GasLimit: costDbRead*2 + 2, + Timeout: time.Second, + FunctionCallCost: 1, + }) + + require.NoError(t, module.OnBlock(ctx, BlockContext{BlockNumber: 1, ChainID: 42})) +} + +func TestGasLimitExceededOnMemoryGrowth(t *testing.T) { + ctx := context.Background() + db := newTestDB(t) + + module := newStrategyModuleFromBytes(t, ctx, db, memoryGrowGasTestWasm, StrategyLimits{ + GasLimit: 20, + Timeout: time.Second, + FunctionCallCost: 1, + }) + + err := module.OnBlock(ctx, BlockContext{BlockNumber: 1, ChainID: 42}) + require.ErrorIs(t, err, errGasLimitExceeded) +} + +func TestGasLimitMemoryGrowthPass(t *testing.T) { + ctx := context.Background() + db := newTestDB(t) + + module := newStrategyModuleFromBytes(t, ctx, db, memoryGrowGasTestWasm, StrategyLimits{ + GasLimit: 30, + Timeout: time.Second, + FunctionCallCost: 1, + }) + + require.NoError(t, module.OnBlock(ctx, BlockContext{BlockNumber: 1, ChainID: 42})) +} diff --git a/wasmstrategy/listener.go b/wasmstrategy/listener.go new file mode 100644 index 0000000..713365b --- /dev/null +++ b/wasmstrategy/listener.go @@ -0,0 +1,40 @@ +package wasmstrategy + +import ( + "context" + + "github.com/tetratelabs/wazero/api" + "github.com/tetratelabs/wazero/experimental" +) + +type gasListenerFactory struct { + host *hostEnv + cost uint64 +} + +func (f *gasListenerFactory) NewFunctionListener(definition api.FunctionDefinition) experimental.FunctionListener { + // If no gas accounting is configured, skip installing listeners. + if f.cost == 0 || f.host == nil { + return nil + } + + return &gasFunctionListener{ + host: f.host, + cost: f.cost, + name: definition.DebugName(), + } +} + +type gasFunctionListener struct { + host *hostEnv + cost uint64 + name string +} + +func (l *gasFunctionListener) Before(ctx context.Context, mod api.Module, _ api.FunctionDefinition, _ []uint64, _ experimental.StackIterator) { + l.host.charge(ctx, mod, l.cost, l.name) +} + +func (l *gasFunctionListener) After(context.Context, api.Module, api.FunctionDefinition, []uint64) {} + +func (l *gasFunctionListener) Abort(context.Context, api.Module, api.FunctionDefinition, error) {} diff --git a/wasmstrategy/manager.go b/wasmstrategy/manager.go index 0a977ca..0d5c2ac 100644 --- a/wasmstrategy/manager.go +++ b/wasmstrategy/manager.go @@ -102,6 +102,12 @@ func NewManager(ctx context.Context, cfg ManagerConfig) (*Manager, error) { if manager.limits.GasLimit == 0 { manager.limits.GasLimit = defaultGasLimit } + if manager.limits.FunctionCallCost == 0 { + manager.limits.FunctionCallCost = defaultFunctionCost + } + if manager.limits.FunctionCallCost == 0 { + manager.limits.FunctionCallCost = defaultFunctionCost + } if err := manager.reload(ctx); err != nil { return nil, err diff --git a/wasmstrategy/testdata/as/instruction_gas.ts b/wasmstrategy/testdata/as/instruction_gas.ts new file mode 100644 index 0000000..2aaf918 --- /dev/null +++ b/wasmstrategy/testdata/as/instruction_gas.ts @@ -0,0 +1,7 @@ +@external("env", "db_get_u64") +declare function dbGet(slot: i32): i64; + +export function on_block(): void { + dbGet(0); + dbGet(0); +} diff --git a/wasmstrategy/testdata/as/memory_grow.ts b/wasmstrategy/testdata/as/memory_grow.ts new file mode 100644 index 0000000..291676c --- /dev/null +++ b/wasmstrategy/testdata/as/memory_grow.ts @@ -0,0 +1,7 @@ +@external("env", "gas") +declare function useGas(cost: i64): void; + +export function on_block(): void { + useGas(25); + memory.grow(1); +} diff --git a/wasmstrategy/testdata/instruction_gas.wasm b/wasmstrategy/testdata/instruction_gas.wasm new file mode 100644 index 0000000000000000000000000000000000000000..fe119b95e5cd9125a6bb95ece045fb298278374c GIT binary patch literal 87 zcmZQbEY4+QU|?Y6WJ+MHXRJ$LU|gd7Ps+(p&Sqd_%T3MAFREl>;NoNCVQ^#+V2}b6+yKMU4?zF` literal 0 HcmV?d00001 diff --git a/wasmstrategy/testdata/memory_grow.wasm b/wasmstrategy/testdata/memory_grow.wasm new file mode 100644 index 0000000000000000000000000000000000000000..3dd2d13cfa66e135b4b368110a85c91127f38f66 GIT binary patch literal 78 zcmWN{K@LDL6a~QdeuG9Mv9aJZxQLcC7PL(?k=QspvmUq{2!L8FUsym4m5;aDT?Ukp fh5AF)iB54@&$Jz28GK0BBd8l1%zcZABL?#U=wA$Z literal 0 HcmV?d00001