From de1e4ff9a4e9f2c1269444bd6a3847cd69993d9c Mon Sep 17 00:00:00 2001 From: Gorka Date: Thu, 12 Mar 2026 13:59:29 -0300 Subject: [PATCH 1/4] feat: add withSpan tracing utility and @opentelemetry/api dependency Introduces a withSpan helper that wraps async functions in OpenTelemetry spans with enter/exit events, error recording, and automatic span lifecycle management. Foundation for function-level instrumentation. --- deno.json | 3 ++- deno.lock | 12 ++++++++++-- src/core/tracing.ts | 48 +++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 60 insertions(+), 3 deletions(-) create mode 100644 src/core/tracing.ts diff --git a/deno.json b/deno.json index a83acf3..d34bbd7 100644 --- a/deno.json +++ b/deno.json @@ -36,7 +36,8 @@ "@oak/oak": "jsr:@oak/oak@^17.1.4", "@olli/kvdex": "jsr:@olli/kvdex@^3.1.4", "zod": "npm:zod@3.24.2", - "chalk": "npm:chalk@^5.3.0" + "chalk": "npm:chalk@^5.3.0", + "@opentelemetry/api": "npm:@opentelemetry/api@^1.9.0" }, "deploy": { "project": "159af174-b645-4039-be9c-493de4886f26", diff --git a/deno.lock b/deno.lock index bee66aa..87d4e12 100644 --- a/deno.lock +++ b/deno.lock @@ -30,6 +30,7 @@ "jsr:@std/ulid@1": "1.0.0", "jsr:@zaubrik/djwt@^3.0.2": "3.0.2", "npm:@drizzle-team/brocli@0.11": "0.11.0", + "npm:@opentelemetry/api@^1.9.0": "1.9.0", "npm:@stellar/stellar-sdk@14.2.0": "14.2.0", "npm:@stellar/stellar-sdk@^14.2.0": "14.6.1", "npm:@stellar/stellar-sdk@^14.6.1": "14.6.1", @@ -38,8 +39,9 @@ "npm:buffer@6.0.3": "6.0.3", "npm:buffer@^6.0.3": "6.0.3", "npm:chalk@^5.3.0": "5.6.2", + "npm:drizzle-kit@*": "0.31.9_esbuild@0.25.12", "npm:drizzle-kit@~0.31.6": "0.31.9_esbuild@0.25.12", - "npm:drizzle-orm@~0.44.7": "0.44.7_@types+pg@8.18.0_pg@8.20.0_postgres@3.4.8", + "npm:drizzle-orm@~0.44.7": "0.44.7_@opentelemetry+api@1.9.0_@types+pg@8.18.0_pg@8.20.0_postgres@3.4.8", "npm:path-to-regexp@^6.3.0": "6.3.0", "npm:pg@^8.16.3": "8.20.0", "npm:postgres@^3.4.7": "3.4.8", @@ -445,6 +447,9 @@ "@noble/hashes@1.8.0": { "integrity": "sha512-jCs9ldd7NwzpgXDIf6P3+NrHh9/sD6CQdxHyjQI+h/6rDNo88ypBxxz45UDuZHz9r3tNz7N/VInSVoVdtXEI4A==" }, + "@opentelemetry/api@1.9.0": { + "integrity": "sha512-3giAOQvZiH5F9bMlMiv8+GSPMeqg0dbaeo58/0SlA9sxSqZhnUtxzX9/2FzyhS9sWQf5S0GJE0AKBrFqjpeYcg==" + }, "@stellar/js-xdr@3.1.2": { "integrity": "sha512-VVolPL5goVEIsvuGqDc5uiKxV03lzfWdvYg1KikvwheDmTBO68CKDji3bAZ/kppZrx5iTA8z3Ld5yuytcvhvOQ==" }, @@ -608,14 +613,16 @@ ], "bin": true }, - "drizzle-orm@0.44.7_@types+pg@8.18.0_pg@8.20.0_postgres@3.4.8": { + "drizzle-orm@0.44.7_@opentelemetry+api@1.9.0_@types+pg@8.18.0_pg@8.20.0_postgres@3.4.8": { "integrity": "sha512-quIpnYznjU9lHshEOAYLoZ9s3jweleHlZIAWR/jX9gAWNg/JhQ1wj0KGRf7/Zm+obRrYd9GjPVJg790QY9N5AQ==", "dependencies": [ + "@opentelemetry/api", "@types/pg", "pg", "postgres" ], "optionalPeers": [ + "@opentelemetry/api", "@types/pg", "pg", "postgres" @@ -1023,6 +1030,7 @@ "jsr:@std/dotenv@~0.225.6", "jsr:@zaubrik/djwt@^3.0.2", "npm:@drizzle-team/brocli@0.11", + "npm:@opentelemetry/api@^1.9.0", "npm:@stellar/stellar-sdk@14.2.0", "npm:@types/pg@^8.15.6", "npm:asn1js@3.0.5", diff --git a/src/core/tracing.ts b/src/core/tracing.ts new file mode 100644 index 0000000..9a56880 --- /dev/null +++ b/src/core/tracing.ts @@ -0,0 +1,48 @@ +import { trace, SpanStatusCode, type Span } from "@opentelemetry/api"; + +const tracer = trace.getTracer("provider-platform"); + +/** + * Wraps a function in an OpenTelemetry span with info/event/error tracing. + * + * - Info trace at function entry + * - Returns the span so callers can add events at logical breaks + * - Automatically records errors and sets span status on failure + * - Ends span when the function completes + */ +export async function withSpan( + name: string, + fn: (span: Span) => Promise | T, + attributes?: Record, +): Promise { + return tracer.startActiveSpan(name, async (span) => { + try { + if (attributes) { + for (const [key, value] of Object.entries(attributes)) { + span.setAttribute(key, value); + } + } + span.addEvent("enter", { "function.name": name }); + + const result = await fn(span); + + span.addEvent("exit", { "function.name": name }); + return result; + } catch (error) { + span.setStatus({ + code: SpanStatusCode.ERROR, + message: error instanceof Error ? error.message : String(error), + }); + span.recordException( + error instanceof Error ? error : new Error(String(error)), + ); + span.addEvent("exit_with_error", { "function.name": name }); + throw error; + } finally { + span.end(); + } + }); +} + +export { tracer, SpanStatusCode }; +export type { Span }; From 2e2998b034cc2e5e2d62c1a8a2967f50db5bdf25 Mon Sep 17 00:00:00 2001 From: Gorka Date: Thu, 12 Mar 2026 13:59:51 -0300 Subject: [PATCH 2/4] feat: instrument auth challenge service with withSpan Wraps all auth pipeline functions with OpenTelemetry spans: P_CreateChallenge, P_CreateChallengeDB, P_CreateChallengeMemory, P_VerifyChallenge, P_CompareChallenge, P_GenerateChallengeJWT, and related store/update functions. --- .../auth/challenge/create/create-challenge.ts | 86 +++++---- .../create/generate-challenge-jwt.ts | 34 ++-- .../challenge/store/create-challenge-db.ts | 69 ++++--- .../store/create-challenge-memory.ts | 47 +++-- .../challenge/store/update-challenge-db.ts | 44 +++-- .../store/update-challenge-session.ts | 90 +++++---- .../challenge/verify/compare-challenge.ts | 89 +++++---- .../auth/challenge/verify/verify-challenge.ts | 181 ++++++++++-------- 8 files changed, 355 insertions(+), 285 deletions(-) diff --git a/src/core/service/auth/challenge/create/create-challenge.ts b/src/core/service/auth/challenge/create/create-challenge.ts index fa03e5e..0894874 100644 --- a/src/core/service/auth/challenge/create/create-challenge.ts +++ b/src/core/service/auth/challenge/create/create-challenge.ts @@ -18,45 +18,59 @@ import { assertOrThrow } from "@/utils/error/assert-or-throw.ts"; import { isDefined } from "@/utils/type-guards/is-defined.ts"; import * as E from "@/core/service/auth/challenge/create/error.ts"; import { logAndThrow } from "@/utils/error/log-and-throw.ts"; +import { withSpan } from "@/core/tracing.ts"; export const P_CreateChallenge = ProcessEngine.create( async (input: GetChallengeInput): Promise => { - const { ctx, query } = input; - const clientAccount = query.account; - - assertOrThrow(isDefined(clientAccount), new E.MISSING_CLIENT_ACCOUNT()); - - try { - const { tx, nonce, minTime, maxTime } = - getChallengeTransaction(clientAccount); - - const xdr = tx.toXDR(); - const txHash = tx.hash().toString("hex"); - - const dateCreated = new Date(minTime * 1000); - const expiresAt = new Date(maxTime * 1000); - - const { clientIp, userAgent, requestId } = extractRequestMetadata(ctx); - - const output: ChallengeData = { - ctx, - challengeData: { - txHash: txHash, - clientAccount: clientAccount, - xdr, - nonce, - dateCreated: dateCreated, - requestId, - clientIp, - userAgent, - expiresAt, - }, - }; - - return await output; - } catch (error) { - logAndThrow(new E.FAILED_TO_CREATE_CHALLENGE(error)); - } + return withSpan("P_CreateChallenge", async (span) => { + const { ctx, query } = input; + const clientAccount = query.account; + + span.addEvent("validating_client_account", { "client.account": clientAccount ?? "undefined" }); + assertOrThrow(isDefined(clientAccount), new E.MISSING_CLIENT_ACCOUNT()); + + try { + span.addEvent("building_challenge_transaction"); + const { tx, nonce, minTime, maxTime } = + getChallengeTransaction(clientAccount); + + const xdr = tx.toXDR(); + const txHash = tx.hash().toString("hex"); + + const dateCreated = new Date(minTime * 1000); + const expiresAt = new Date(maxTime * 1000); + + const { clientIp, userAgent, requestId } = extractRequestMetadata(ctx); + + span.addEvent("challenge_created", { + "challenge.txHash": txHash, + "challenge.clientAccount": clientAccount, + "challenge.requestId": requestId, + }); + + const output: ChallengeData = { + ctx, + challengeData: { + txHash: txHash, + clientAccount: clientAccount, + xdr, + nonce, + dateCreated: dateCreated, + requestId, + clientIp, + userAgent, + expiresAt, + }, + }; + + return await output; + } catch (error) { + span.addEvent("challenge_creation_failed", { + "error.message": error instanceof Error ? error.message : String(error), + }); + logAndThrow(new E.FAILED_TO_CREATE_CHALLENGE(error)); + } + }); }, { name: "CreateChallengeProcessEngine", diff --git a/src/core/service/auth/challenge/create/generate-challenge-jwt.ts b/src/core/service/auth/challenge/create/generate-challenge-jwt.ts index 93554cd..d44826c 100644 --- a/src/core/service/auth/challenge/create/generate-challenge-jwt.ts +++ b/src/core/service/auth/challenge/create/generate-challenge-jwt.ts @@ -9,31 +9,35 @@ import generateJwt from "@/core/service/auth/generate-jwt.ts"; import * as E from "@/core/service/auth/challenge/create/error.ts"; import { assertOrThrow } from "@/utils/error/assert-or-throw.ts"; import { isDefined } from "@/utils/type-guards/is-defined.ts"; +import { withSpan } from "@/core/tracing.ts"; export const P_GenerateChallengeJWT = ProcessEngine.create( async ( input: PostChallengeInput, _metadataHelper?: MetadataHelper ): Promise => { - // Assume the input was already validated by an earlier process. - const { signedChallenge } = input.body; - const tx = new Transaction( - signedChallenge, - NETWORK_CONFIG.networkPassphrase - ); + return withSpan("P_GenerateChallengeJWT", async (span) => { + const { signedChallenge } = input.body; + const tx = new Transaction( + signedChallenge, + NETWORK_CONFIG.networkPassphrase + ); - const key = tx.hash().toString("hex"); + const key = tx.hash().toString("hex"); - const clientAccount = tx.operations[0].source; - assertOrThrow(isDefined(clientAccount), new E.MISSING_CLIENT_ACCOUNT()); + const clientAccount = tx.operations[0].source; + assertOrThrow(isDefined(clientAccount), new E.MISSING_CLIENT_ACCOUNT()); - const jwt = await generateJwt(clientAccount, key); + span.addEvent("generating_jwt", { "client.account": clientAccount }); + const jwt = await generateJwt(clientAccount, key); + span.addEvent("jwt_generated"); - return { - ctx: input.ctx, - body: input.body, - jwt, - }; + return { + ctx: input.ctx, + body: input.body, + jwt, + }; + }); }, { name: "GenerateChallengeJWT", diff --git a/src/core/service/auth/challenge/store/create-challenge-db.ts b/src/core/service/auth/challenge/store/create-challenge-db.ts index b1e7ac6..7a54a54 100644 --- a/src/core/service/auth/challenge/store/create-challenge-db.ts +++ b/src/core/service/auth/challenge/store/create-challenge-db.ts @@ -13,6 +13,7 @@ import { import type { ChallengeData } from "@/core/service/auth/challenge/types.ts"; import { logAndThrow } from "@/utils/error/log-and-throw.ts"; import * as E from "@/core/service/auth/challenge/store/error.ts"; +import { withSpan } from "@/core/tracing.ts"; const challengeRepository = new ChallengeRepository(drizzleClient); const userRepository = new UserRepository(drizzleClient); @@ -20,39 +21,49 @@ const accountRepository = new AccountRepository(drizzleClient); export const P_CreateChallengeDB = ProcessEngine.create( async (input: ChallengeData) => { - const { challengeData } = input; - try { - let account = await accountRepository.findById( - challengeData.clientAccount - ); + return withSpan("P_CreateChallengeDB", async (span) => { + const { challengeData } = input; + try { + span.addEvent("looking_up_account", { "client.account": challengeData.clientAccount }); + let account = await accountRepository.findById( + challengeData.clientAccount + ); - let user: NewUser | undefined; - if (!account) { - user = await userRepository.create({ - id: crypto.randomUUID(), - status: UserStatus.UNVERIFIED, - } as NewUser); + let user: NewUser | undefined; + if (!account) { + span.addEvent("creating_new_user_and_account"); + user = await userRepository.create({ + id: crypto.randomUUID(), + status: UserStatus.UNVERIFIED, + } as NewUser); - account = await accountRepository.create({ - id: challengeData.clientAccount, - type: "USER", - userId: user.id, - } as NewAccount); - } + account = await accountRepository.create({ + id: challengeData.clientAccount, + type: "USER", + userId: user.id, + } as NewAccount); + } else { + span.addEvent("account_exists"); + } - await challengeRepository.create({ - id: crypto.randomUUID(), - accountId: account.id, - status: ChallengeStatus.UNVERIFIED, - ttl: challengeData.expiresAt, - txHash: challengeData.txHash, - txXDR: challengeData.xdr, - } as NewChallenge); + span.addEvent("persisting_challenge", { "challenge.txHash": challengeData.txHash }); + await challengeRepository.create({ + id: crypto.randomUUID(), + accountId: account.id, + status: ChallengeStatus.UNVERIFIED, + ttl: challengeData.expiresAt, + txHash: challengeData.txHash, + txXDR: challengeData.xdr, + } as NewChallenge); - return await input; - } catch (error) { - logAndThrow(new E.FAILED_TO_STORE_CHALLENGE_IN_DATABASE(error)); - } + return await input; + } catch (error) { + span.addEvent("db_store_failed", { + "error.message": error instanceof Error ? error.message : String(error), + }); + logAndThrow(new E.FAILED_TO_STORE_CHALLENGE_IN_DATABASE(error)); + } + }); }, { name: "CreateChallengeDB", diff --git a/src/core/service/auth/challenge/store/create-challenge-memory.ts b/src/core/service/auth/challenge/store/create-challenge-memory.ts index 4d85aa7..a63f76e 100644 --- a/src/core/service/auth/challenge/store/create-challenge-memory.ts +++ b/src/core/service/auth/challenge/store/create-challenge-memory.ts @@ -5,31 +5,40 @@ import { logAndThrow } from "@/utils/error/log-and-throw.ts"; import * as E from "@/core/service/auth/challenge/store/error.ts"; import { isDefined } from "@/utils/type-guards/is-defined.ts"; import { assertOrThrow } from "@/utils/error/assert-or-throw.ts"; +import { withSpan } from "@/core/tracing.ts"; export const P_CreateChallengeMemory = ProcessEngine.create( async (input: ChallengeData) => { - const { challengeData } = input; - try { - const existingSession = await sessionManager.getSession( - challengeData.txHash - ); + return withSpan("P_CreateChallengeMemory", async (span) => { + const { challengeData } = input; + try { + span.addEvent("checking_existing_session", { "challenge.txHash": challengeData.txHash }); + const existingSession = await sessionManager.getSession( + challengeData.txHash + ); - assertOrThrow( - !isDefined(existingSession), - new E.SESSION_ALREADY_EXISTS(challengeData.txHash) - ); + assertOrThrow( + !isDefined(existingSession), + new E.SESSION_ALREADY_EXISTS(challengeData.txHash) + ); - await sessionManager.addSession( - challengeData.txHash, - challengeData.clientAccount, - challengeData.requestId, - challengeData.expiresAt - ); + span.addEvent("caching_session"); + await sessionManager.addSession( + challengeData.txHash, + challengeData.clientAccount, + challengeData.requestId, + challengeData.expiresAt + ); - return await input; - } catch (error) { - logAndThrow(new E.FAILED_TO_CACHE_CHALLENGE_IN_LIVE_SESSIONS(error)); - } + span.addEvent("session_cached"); + return await input; + } catch (error) { + span.addEvent("memory_cache_failed", { + "error.message": error instanceof Error ? error.message : String(error), + }); + logAndThrow(new E.FAILED_TO_CACHE_CHALLENGE_IN_LIVE_SESSIONS(error)); + } + }); }, { name: "CreateChallengeMemory", diff --git a/src/core/service/auth/challenge/store/update-challenge-db.ts b/src/core/service/auth/challenge/store/update-challenge-db.ts index 9928066..4330f7a 100644 --- a/src/core/service/auth/challenge/store/update-challenge-db.ts +++ b/src/core/service/auth/challenge/store/update-challenge-db.ts @@ -11,35 +11,39 @@ import type { import * as E from "@/core/service/auth/challenge/store/error.ts"; import { assertOrThrow } from "@/utils/error/assert-or-throw.ts"; import { isDefined } from "@/utils/type-guards/is-defined.ts"; +import { withSpan } from "@/core/tracing.ts"; const challengeRepository = new ChallengeRepository(drizzleClient); export const P_UpdateChallengeDB = ProcessEngine.create( async (input: PostChallengeWithJWT): Promise => { - // Assume the input was already validated by an earlier process. - const { signedChallenge } = input.body; - const tx = new Transaction( - signedChallenge, - NETWORK_CONFIG.networkPassphrase - ); - const hash = tx.hash().toString("hex"); + return withSpan("P_UpdateChallengeDB", async (span) => { + const { signedChallenge } = input.body; + const tx = new Transaction( + signedChallenge, + NETWORK_CONFIG.networkPassphrase + ); + const hash = tx.hash().toString("hex"); - const challenge = await challengeRepository.findOneByTxHash(hash); - assertOrThrow( - isDefined(challenge), - new E.CHALLENGE_NOT_FOUND_IN_DATABASE(hash) - ); + span.addEvent("finding_challenge", { "challenge.txHash": hash }); + const challenge = await challengeRepository.findOneByTxHash(hash); + assertOrThrow( + isDefined(challenge), + new E.CHALLENGE_NOT_FOUND_IN_DATABASE(hash) + ); - challenge.status = ChallengeStatus.VERIFIED; + challenge.status = ChallengeStatus.VERIFIED; - await challengeRepository.update(challenge.id, { - ...challenge, - }); + span.addEvent("updating_challenge_status", { "challenge.status": ChallengeStatus.VERIFIED }); + await challengeRepository.update(challenge.id, { + ...challenge, + }); - return { - ctx: input.ctx, - jwt: input.jwt!, - }; + return { + ctx: input.ctx, + jwt: input.jwt!, + }; + }); }, { name: "UpdateChallengeDB", diff --git a/src/core/service/auth/challenge/store/update-challenge-session.ts b/src/core/service/auth/challenge/store/update-challenge-session.ts index a561d88..56a2ec0 100644 --- a/src/core/service/auth/challenge/store/update-challenge-session.ts +++ b/src/core/service/auth/challenge/store/update-challenge-session.ts @@ -13,6 +13,7 @@ import type { PostChallengeWithJWT } from "@/core/service/auth/challenge/types.t import * as E from "@/core/service/auth/challenge/store/error.ts"; import { assertOrThrow } from "@/utils/error/assert-or-throw.ts"; import { isDefined } from "@/utils/type-guards/is-defined.ts"; +import { withSpan } from "@/core/tracing.ts"; const accountRepository = new AccountRepository(drizzleClient); const sessionRepository = new SessionRepository(drizzleClient); @@ -22,58 +23,65 @@ export const P_UpdateChallengeSession = ProcessEngine.create( input: PostChallengeWithJWT, _metadataHelper?: MetadataHelper ): Promise => { - // Assume the input was already validated by an earlier process. - const { signedChallenge } = input.body; - const tx = new Transaction( - signedChallenge, - NETWORK_CONFIG.networkPassphrase - ); + return withSpan("P_UpdateChallengeSession", async (span) => { + const { signedChallenge } = input.body; + const tx = new Transaction( + signedChallenge, + NETWORK_CONFIG.networkPassphrase + ); - const key = tx.hash().toString("hex"); + const key = tx.hash().toString("hex"); - LOG.debug("Updating session with key", key); + LOG.debug("Updating session with key", key); + span.addEvent("updating_memory_session", { "session.key": key }); - const ttl = SESSION_TTL * 1000; + const ttl = SESSION_TTL * 1000; - const memorySession = await sessionManager.getSession(key); + const memorySession = await sessionManager.getSession(key); - if (memorySession) { - const data = { - txHash: memorySession.txHash, - requestId: memorySession.requestId, - status: "ACTIVE", - expiresAt: new Date(Date.now() + ttl), - } as Session; + if (memorySession) { + span.addEvent("memory_session_found"); + const data = { + txHash: memorySession.txHash, + requestId: memorySession.requestId, + status: "ACTIVE", + expiresAt: new Date(Date.now() + ttl), + } as Session; - sessionManager.updateSession(data); - } + sessionManager.updateSession(data); + } else { + span.addEvent("no_memory_session"); + } - assertOrThrow( - isDefined(tx.operations) && tx.operations.length > 0, - new E.CHALLENGE_HAS_NO_OPERATIONS(key) - ); + assertOrThrow( + isDefined(tx.operations) && tx.operations.length > 0, + new E.CHALLENGE_HAS_NO_OPERATIONS(key) + ); - const txOperation = tx.operations[0] as Operation.ManageData; - const txClientAccount = txOperation.source; - assertOrThrow(isDefined(txClientAccount), new E.MISSING_CLIENT_ACCOUNT()); + const txOperation = tx.operations[0] as Operation.ManageData; + const txClientAccount = txOperation.source; + assertOrThrow(isDefined(txClientAccount), new E.MISSING_CLIENT_ACCOUNT()); - const account = await accountRepository.findById(txClientAccount); - assertOrThrow( - isDefined(account), - new E.USER_NOT_FOUND_IN_DATABASE(txClientAccount) - ); + span.addEvent("looking_up_account", { "client.account": txClientAccount }); + const account = await accountRepository.findById(txClientAccount); + assertOrThrow( + isDefined(account), + new E.USER_NOT_FOUND_IN_DATABASE(txClientAccount) + ); - // Add session to database - await sessionRepository.create({ - id: key, - status: SessionStatus.ACTIVE, - accountId: account.id, - jwtToken: input?.jwt, - createdAt: new Date(), - updatedAt: new Date(), - }); + span.addEvent("persisting_session"); + await sessionRepository.create({ + id: key, + status: SessionStatus.ACTIVE, + accountId: account.id, + jwtToken: input?.jwt, + createdAt: new Date(), + updatedAt: new Date(), + }); - return input; + span.addEvent("session_persisted"); + return input; + }); }, { name: "UpdateChallengeSession", diff --git a/src/core/service/auth/challenge/verify/compare-challenge.ts b/src/core/service/auth/challenge/verify/compare-challenge.ts index 8df7549..d4c7201 100644 --- a/src/core/service/auth/challenge/verify/compare-challenge.ts +++ b/src/core/service/auth/challenge/verify/compare-challenge.ts @@ -9,6 +9,7 @@ import { assertOrThrow } from "@/utils/error/assert-or-throw.ts"; import { isDefined } from "@/utils/type-guards/is-defined.ts"; import { extractOperationFromChallengeTx } from "./extract-nonce-from-tx.ts"; import { isTransaction } from "@colibri/core"; +import { withSpan } from "@/core/tracing.ts"; const challengeRepository = new ChallengeRepository(drizzleClient); @@ -17,59 +18,63 @@ export const P_CompareChallenge = ProcessEngine.create( input: PostChallengeInput, _metadataHelper?: MetadataHelper ): Promise => { - // Assume the input was already validated by an earlier process. - const { signedChallenge } = input.body; - const tx = new Transaction( - signedChallenge, - NETWORK_CONFIG.networkPassphrase - ); - assertOrThrow(isTransaction(tx), new E.CHALLENGE_IS_NOT_TRANSACTION(tx)); + return withSpan("P_CompareChallenge", async (span) => { + const { signedChallenge } = input.body; + const tx = new Transaction( + signedChallenge, + NETWORK_CONFIG.networkPassphrase + ); + assertOrThrow(isTransaction(tx), new E.CHALLENGE_IS_NOT_TRANSACTION(tx)); - const incomingTtl = extractChallengeTtl(tx); - const txHash = tx.hash().toString("hex"); + const incomingTtl = extractChallengeTtl(tx); + const txHash = tx.hash().toString("hex"); - // Look up the stored challenge record using the tx hash. - const localChallenge = await challengeRepository.findOneByTxHash(txHash); + span.addEvent("looking_up_stored_challenge", { "challenge.txHash": txHash }); + const localChallenge = await challengeRepository.findOneByTxHash(txHash); - assertOrThrow(isDefined(localChallenge), new E.CHALLENGE_NOT_FOUND(txHash)); + assertOrThrow(isDefined(localChallenge), new E.CHALLENGE_NOT_FOUND(txHash)); - const localChallengeTx = TransactionBuilder.fromXDR( - localChallenge.txXDR, - NETWORK_CONFIG.networkPassphrase - ); + const localChallengeTx = TransactionBuilder.fromXDR( + localChallenge.txXDR, + NETWORK_CONFIG.networkPassphrase + ); - assertOrThrow( - isTransaction(localChallengeTx), - new E.CHALLENGE_IS_NOT_TRANSACTION(localChallengeTx) - ); + assertOrThrow( + isTransaction(localChallengeTx), + new E.CHALLENGE_IS_NOT_TRANSACTION(localChallengeTx) + ); - const { - clientAccount: localChallengeClientAccount, - nonce: localChallengeNonce, - } = extractOperationFromChallengeTx(localChallengeTx); + span.addEvent("comparing_nonce_and_account"); + const { + clientAccount: localChallengeClientAccount, + nonce: localChallengeNonce, + } = extractOperationFromChallengeTx(localChallengeTx); - const { nonce: incomingNonce, clientAccount: incomingClientAccount } = - extractOperationFromChallengeTx(tx); + const { nonce: incomingNonce, clientAccount: incomingClientAccount } = + extractOperationFromChallengeTx(tx); - assertOrThrow( - localChallengeNonce === incomingNonce, - new E.NONCE_MISMATCH(localChallengeNonce, incomingNonce) - ); + assertOrThrow( + localChallengeNonce === incomingNonce, + new E.NONCE_MISMATCH(localChallengeNonce, incomingNonce) + ); - assertOrThrow( - localChallengeClientAccount === incomingClientAccount, - new E.CLIENT_ACCOUNT_MISMATCH( - localChallengeClientAccount, - incomingClientAccount - ) - ); + assertOrThrow( + localChallengeClientAccount === incomingClientAccount, + new E.CLIENT_ACCOUNT_MISMATCH( + localChallengeClientAccount, + incomingClientAccount + ) + ); - assertOrThrow( - localChallenge.ttl.toDateString() === incomingTtl.toDateString(), - new E.CHALLENGE_TTL_MISMATCH(localChallenge.ttl, incomingTtl) - ); + span.addEvent("comparing_ttl"); + assertOrThrow( + localChallenge.ttl.toDateString() === incomingTtl.toDateString(), + new E.CHALLENGE_TTL_MISMATCH(localChallenge.ttl, incomingTtl) + ); - return input; + span.addEvent("challenge_comparison_passed"); + return input; + }); }, { name: "CompareChallengeProcessEngine", diff --git a/src/core/service/auth/challenge/verify/verify-challenge.ts b/src/core/service/auth/challenge/verify/verify-challenge.ts index 3fe3c74..e552941 100644 --- a/src/core/service/auth/challenge/verify/verify-challenge.ts +++ b/src/core/service/auth/challenge/verify/verify-challenge.ts @@ -8,94 +8,109 @@ import { assertOrThrow } from "@/utils/error/assert-or-throw.ts"; import { isDefined } from "@/utils/type-guards/is-defined.ts"; import { StrKey } from "@colibri/core"; import { logAndThrow } from "@/utils/error/log-and-throw.ts"; +import { withSpan } from "@/core/tracing.ts"; export const P_VerifyChallenge = ProcessEngine.create( - (input: PostChallengeInput): PostChallengeInput => { - const { signedChallenge } = input.body; - try { - const tx = new Transaction( - signedChallenge, - NETWORK_CONFIG.networkPassphrase - ); - - assertOrThrow( - tx.sequence === "0", - new E.INVALID_SEQUENCE_NUMBER(tx.sequence) - ); - - assertOrThrow(isDefined(tx.timeBounds), new E.MISSING_TIME_BOUNDS()); - - assertOrThrow( - isDefined(tx.operations && tx.operations.length > 0), - new E.MISSING_OPERATIONS(tx) - ); - - const firstOp = tx.operations[0]; - - const expectedOperationType = "manageData" as OperationType.ManageData; - assertOrThrow( - firstOp.type === expectedOperationType, - new E.WRONG_OPERATION_TYPE(expectedOperationType, firstOp.type) - ); - - assertOrThrow( - firstOp.name.startsWith(`${SERVICE_DOMAIN} auth`), - new E.OPERATION_KEY_MISMATCH(`${SERVICE_DOMAIN} auth`, firstOp.name) - ); - - const currentTime = Math.floor(Date.now() / 1000); - const minTime = tx.timeBounds.minTime - ? parseInt(tx.timeBounds.minTime) - : 0; - const maxTime = tx.timeBounds.maxTime - ? parseInt(tx.timeBounds.maxTime) - : 0; - - assertOrThrow( - currentTime >= minTime, - new E.CHALLENGE_TOO_EARLY(currentTime, minTime) - ); - assertOrThrow( - currentTime <= maxTime, - new E.CHALLENGE_EXPIRED(currentTime, maxTime) - ); - - const clientPublicKey = firstOp.source; - - assertOrThrow( - isDefined(clientPublicKey) && - StrKey.isEd25519PublicKey(clientPublicKey), - new E.MISSING_CLIENT_ACCOUNT() - ); - - const clientKeypair = Keypair.fromPublicKey(clientPublicKey); - - let isSignedByServer = false; - let isSignedByClient = false; - - for (const sig of tx.signatures) { - if ( - PROVIDER_ACCOUNT.verifySignature( - // deno-lint-ignore no-explicit-any - tx.hash() as any, // Forcing type to Buffer as there seems to be an issue with the lib type inference - // deno-lint-ignore no-explicit-any - sig.signature() as any // Forcing type to Buffer as there seems to be an issue with the lib type inference - ) - ) { - isSignedByServer = true; + (input: PostChallengeInput): Promise => { + return withSpan("P_VerifyChallenge", (span) => { + const { signedChallenge } = input.body; + try { + span.addEvent("deserializing_transaction"); + const tx = new Transaction( + signedChallenge, + NETWORK_CONFIG.networkPassphrase + ); + + span.addEvent("validating_sequence_number"); + assertOrThrow( + tx.sequence === "0", + new E.INVALID_SEQUENCE_NUMBER(tx.sequence) + ); + + assertOrThrow(isDefined(tx.timeBounds), new E.MISSING_TIME_BOUNDS()); + + assertOrThrow( + isDefined(tx.operations && tx.operations.length > 0), + new E.MISSING_OPERATIONS(tx) + ); + + const firstOp = tx.operations[0]; + + const expectedOperationType = "manageData" as OperationType.ManageData; + assertOrThrow( + firstOp.type === expectedOperationType, + new E.WRONG_OPERATION_TYPE(expectedOperationType, firstOp.type) + ); + + assertOrThrow( + firstOp.name.startsWith(`${SERVICE_DOMAIN} auth`), + new E.OPERATION_KEY_MISMATCH(`${SERVICE_DOMAIN} auth`, firstOp.name) + ); + + span.addEvent("validating_timebounds"); + const currentTime = Math.floor(Date.now() / 1000); + const minTime = tx.timeBounds.minTime + ? parseInt(tx.timeBounds.minTime) + : 0; + const maxTime = tx.timeBounds.maxTime + ? parseInt(tx.timeBounds.maxTime) + : 0; + + assertOrThrow( + currentTime >= minTime, + new E.CHALLENGE_TOO_EARLY(currentTime, minTime) + ); + assertOrThrow( + currentTime <= maxTime, + new E.CHALLENGE_EXPIRED(currentTime, maxTime) + ); + + const clientPublicKey = firstOp.source; + + assertOrThrow( + isDefined(clientPublicKey) && + StrKey.isEd25519PublicKey(clientPublicKey), + new E.MISSING_CLIENT_ACCOUNT() + ); + + span.addEvent("verifying_signatures", { "client.publicKey": clientPublicKey }); + const clientKeypair = Keypair.fromPublicKey(clientPublicKey); + + let isSignedByServer = false; + let isSignedByClient = false; + + for (const sig of tx.signatures) { + if ( + PROVIDER_ACCOUNT.verifySignature( + // deno-lint-ignore no-explicit-any + tx.hash() as any, // Forcing type to Buffer as there seems to be an issue with the lib type inference + // deno-lint-ignore no-explicit-any + sig.signature() as any // Forcing type to Buffer as there seems to be an issue with the lib type inference + ) + ) { + isSignedByServer = true; + } + if (clientKeypair.verify(tx.hash(), sig.signature())) { + isSignedByClient = true; + } } - if (clientKeypair.verify(tx.hash(), sig.signature())) { - isSignedByClient = true; - } - } - assertOrThrow(isSignedByServer, new E.MISSING_SERVER_SIGNATURE()); - assertOrThrow(isSignedByClient, new E.MISSING_CLIENT_SIGNATURE()); + span.addEvent("signature_verification_result", { + "signatures.server": isSignedByServer, + "signatures.client": isSignedByClient, + }); - return input; - } catch (error) { - logAndThrow(new E.CHALLENGE_VERIFICATION_FAILED(error)); - } + assertOrThrow(isSignedByServer, new E.MISSING_SERVER_SIGNATURE()); + assertOrThrow(isSignedByClient, new E.MISSING_CLIENT_SIGNATURE()); + + return input; + } catch (error) { + span.addEvent("verification_failed", { + "error.message": error instanceof Error ? error.message : String(error), + }); + logAndThrow(new E.CHALLENGE_VERIFICATION_FAILED(error)); + } + }); }, { name: "VerifyChallengeProcessEngine", From 9a05ad4f7e422e88f6b45d3139d86967443be197 Mon Sep 17 00:00:00 2001 From: Gorka Date: Thu, 12 Mar 2026 14:00:08 -0300 Subject: [PATCH 3/4] feat: instrument bundle, executor, verifier, and mempool services Wraps all service functions with withSpan using Class.method naming: Bundle.validateSession, Bundle.parseOperations, Bundle.persistCreate/Spend, Executor.buildTransactionFromSlot, Executor.submitTransactionToNetwork, Executor.ensureOpexUtxosAvailable, Executor.handleExecutionFailure, Verifier.verifyTransactionOnNetwork, Mempool.reAddBundles, and P_AddOperationsBundle, P_GetBundle, P_ListBundles process pipelines. --- src/core/service/bundle/add-bundle.process.ts | 315 ++++++++++-------- src/core/service/bundle/get-bundle.process.ts | 31 +- .../service/bundle/list-bundles.process.ts | 34 +- src/core/service/executor/executor.process.ts | 236 +++++++------ src/core/service/executor/executor.service.ts | 108 +++--- src/core/service/mempool/mempool.process.ts | 157 +++++---- src/core/service/verifier/verifier.process.ts | 111 +++--- src/core/service/verifier/verifier.service.ts | 75 +++-- 8 files changed, 597 insertions(+), 470 deletions(-) diff --git a/src/core/service/bundle/add-bundle.process.ts b/src/core/service/bundle/add-bundle.process.ts index e2d80a7..cabc072 100644 --- a/src/core/service/bundle/add-bundle.process.ts +++ b/src/core/service/bundle/add-bundle.process.ts @@ -25,11 +25,12 @@ import * as E from "@/core/service/bundle/bundle.errors.ts"; import type { ClassifiedOperations } from "@/core/service/bundle/bundle.types.ts"; import { logAndThrow } from "@/utils/error/log-and-throw.ts"; import type { OperationsBundle } from "@/persistence/drizzle/entity/operations-bundle.entity.ts"; -import { +import { OperationsBundleRepository, SessionRepository, UtxoRepository, } from "@/persistence/drizzle/repository/index.ts"; +import { withSpan } from "@/core/tracing.ts"; // Repositories const sessionRepository = new SessionRepository(drizzleClient); @@ -48,11 +49,16 @@ const MEMPOOL_WEIGHT_CONFIG: WeightConfig = { * Validates the user session */ async function validateSession(sessionId: string) { - const userSession = await sessionRepository.findById(sessionId); - if (!userSession) { - logAndThrow(new E.INVALID_SESSION(sessionId)); - } - return userSession; + return withSpan("Bundle.validateSession", async (span) => { + span.addEvent("looking_up_session", { "session.id": sessionId }); + const userSession = await sessionRepository.findById(sessionId); + if (!userSession) { + span.addEvent("session_not_found"); + logAndThrow(new E.INVALID_SESSION(sessionId)); + } + span.addEvent("session_valid", { "account.id": userSession.accountId }); + return userSession; + }); } /** @@ -60,30 +66,43 @@ async function validateSession(sessionId: string) { * Throws an error if an active bundle exists. */ async function assertBundleIsExpired(bundleId: string): Promise { - const existingBundle = await operationsBundleRepository.findById(bundleId); + return withSpan("Bundle.assertBundleIsExpired", async (span) => { + span.addEvent("checking_existing_bundle", { "bundle.id": bundleId }); + const existingBundle = await operationsBundleRepository.findById(bundleId); + + if (!existingBundle) { + span.addEvent("bundle_not_found"); + return false; + } - if (!existingBundle) - return false; + if (existingBundle.status !== BundleStatus.EXPIRED) { + span.addEvent("bundle_exists_not_expired", { "bundle.status": existingBundle.status }); + logAndThrow(new E.BUNDLE_ALREADY_EXISTS(bundleId)); + } - if (existingBundle.status !== BundleStatus.EXPIRED) - logAndThrow(new E.BUNDLE_ALREADY_EXISTS(bundleId)); - - return true; + span.addEvent("bundle_expired"); + return true; + }); } /** * Parses MLXDR operations */ async function parseOperations(operationsMLXDR: string[]): Promise> { - const operations = await Promise.all( - operationsMLXDR.map((xdr) => MoonlightOperation.fromMLXDR(xdr)) - ); - - if (operations.length === 0) { - logAndThrow(new E.NO_OPERATIONS_PROVIDED()); - } - - return operations; + return withSpan("Bundle.parseOperations", async (span) => { + span.addEvent("parsing_operations", { "operations.count": operationsMLXDR.length }); + const operations = await Promise.all( + operationsMLXDR.map((xdr) => MoonlightOperation.fromMLXDR(xdr)) + ); + + if (operations.length === 0) { + span.addEvent("no_operations"); + logAndThrow(new E.NO_OPERATIONS_PROVIDED()); + } + + span.addEvent("operations_parsed", { "operations.count": operations.length }); + return operations; + }); } /** @@ -107,27 +126,32 @@ async function persistCreateOperations( bundleId: string, accountId: string ): Promise { - for (const operation of operations) { - const utxoId = Buffer.from(operation.getUtxo()).toString("base64"); - const utxo = await utxoRepository.findById(utxoId); - if (utxo) { - continue; - } + return withSpan("Bundle.persistCreateOperations", async (span) => { + span.addEvent("persisting_create_utxos", { "operations.count": operations.length, "bundle.id": bundleId }); + for (const operation of operations) { + const utxoId = Buffer.from(operation.getUtxo()).toString("base64"); + const utxo = await utxoRepository.findById(utxoId); + if (utxo) { + span.addEvent("utxo_already_exists", { "utxo.id": utxoId }); + continue; + } - await utxoRepository.create({ - id: utxoId, - accountId, - amount: operation.getAmount(), - createdAt: new Date(), - createdBy: accountId, - createdAtBundleId: bundleId, - }); - } + await utxoRepository.create({ + id: utxoId, + accountId, + amount: operation.getAmount(), + createdAt: new Date(), + createdBy: accountId, + createdAtBundleId: bundleId, + }); + } + span.addEvent("create_utxos_persisted"); + }); } /** * Updates UTXOs in the database from spend operations - * + * * Note: The spend amount is fetched directly from the network since * SpendOperation intentionally does not have an amount attribute. */ @@ -136,37 +160,40 @@ async function persistSpendOperations( bundleId: string, accountId: string ): Promise { - if (operations.length === 0) { - return; - } + return withSpan("Bundle.persistSpendOperations", async (span) => { + if (operations.length === 0) { + span.addEvent("no_spend_operations"); + return; + } - // Fetch all UTXO balances from the network in batch for better performance - const utxoPublicKeys = operations.map((op) => op.getUtxo()); - const balances = await fetchUtxoBalances(utxoPublicKeys); + span.addEvent("fetching_utxo_balances", { "operations.count": operations.length }); + const utxoPublicKeys = operations.map((op) => op.getUtxo()); + const balances = await fetchUtxoBalances(utxoPublicKeys); - for (let i = 0; i < operations.length; i++) { - const operation = operations[i]; - const utxoPublicKey = operation.getUtxo(); - // Convert UTXO public key to base64 string to match the format used in persistCreateOperations - const utxoId = Buffer.from(utxoPublicKey).toString("base64"); - LOG.info(` /n/n utxo Id: ${utxoId} /n/n`); - const utxo = await utxoRepository.findById(utxoId); - - if (!utxo) { - logAndThrow(new E.UTXO_NOT_FOUND(utxoId)); - } + for (let i = 0; i < operations.length; i++) { + const operation = operations[i]; + const utxoPublicKey = operation.getUtxo(); + const utxoId = Buffer.from(utxoPublicKey).toString("base64"); + LOG.info(` /n/n utxo Id: ${utxoId} /n/n`); + const utxo = await utxoRepository.findById(utxoId); - // The spend amount is the full balance of the UTXO (since we're spending it entirely) - const spendAmount = balances[i] || BigInt(0); - - await utxoRepository.update(utxo.id, { - amount: utxo.amount - spendAmount, - updatedAt: new Date(), - updatedBy: accountId, - spentAtBundleId: bundleId, - spentByAccountId: accountId, - }); - } + if (!utxo) { + span.addEvent("utxo_not_found", { "utxo.id": utxoId }); + logAndThrow(new E.UTXO_NOT_FOUND(utxoId)); + } + + const spendAmount = balances[i] || BigInt(0); + + await utxoRepository.update(utxo.id, { + amount: utxo.amount - spendAmount, + updatedAt: new Date(), + updatedBy: accountId, + spentAtBundleId: bundleId, + spentByAccountId: accountId, + }); + } + span.addEvent("spend_utxos_persisted"); + }); } /** @@ -199,78 +226,104 @@ function createSlotBundle( export const P_AddOperationsBundle = ProcessEngine.create( async (input: PostEndpointInput) => { - const { operationsMLXDR } = input.body; - const sessionData = input.ctx.state.session as JwtSessionData; - - // 1. Session validation - const userSession = await validateSession(sessionData.sessionId); - - // 2. Bundle ID generation and validation - const bundleId = await generateBundleId(operationsMLXDR); - const isBundleExpired = await assertBundleIsExpired(bundleId); - - // 3. Parse and classify operations - const operations = await parseOperations(operationsMLXDR); - const classified = classifyOperations(operations); - validateSpendOperations(classified.spend); - - // 4. Fee calculation - LOG.info("before calculateOperationAmounts: ", classified); - const amounts = await calculateOperationAmounts(classified); - LOG.info("amounts: ", amounts); - const feeCalculation = calculateFee(amounts); - - // 5. Bundle update or creation - let bundleEntity: OperationsBundle; - if (isBundleExpired) { - bundleEntity = await operationsBundleRepository.update(bundleId, { - status: BundleStatus.PENDING, - operationsMLXDR: operationsMLXDR, - fee: feeCalculation.fee, - updatedAt: new Date(), - updatedBy: userSession.accountId, + return withSpan("P_AddOperationsBundle", async (span) => { + const { operationsMLXDR } = input.body; + const sessionData = input.ctx.state.session as JwtSessionData; + + // 1. Session validation + span.addEvent("validating_session"); + const userSession = await validateSession(sessionData.sessionId); + + // 2. Bundle ID generation and validation + span.addEvent("generating_bundle_id"); + const bundleId = await generateBundleId(operationsMLXDR); + span.setAttribute("bundle.id", bundleId); + const isBundleExpired = await assertBundleIsExpired(bundleId); + + // 3. Parse and classify operations + span.addEvent("parsing_and_classifying_operations"); + const operations = await parseOperations(operationsMLXDR); + const classified = classifyOperations(operations); + validateSpendOperations(classified.spend); + + span.addEvent("operations_classified", { + "operations.create": classified.create.length, + "operations.spend": classified.spend.length, + "operations.deposit": classified.deposit.length, + "operations.withdraw": classified.withdraw.length, }); - } else { - bundleEntity = await operationsBundleRepository.create({ - id: bundleId, - status: BundleStatus.PENDING, - ttl: calculateBundleTtl(), - operationsMLXDR: operationsMLXDR, - fee: feeCalculation.fee, - createdBy: userSession.accountId, - createdAt: new Date(), + + // 4. Fee calculation + span.addEvent("calculating_fee"); + LOG.info("before calculateOperationAmounts: ", classified); + const amounts = await calculateOperationAmounts(classified); + LOG.info("amounts: ", amounts); + const feeCalculation = calculateFee(amounts); + + span.addEvent("fee_calculated", { + "fee.amount": feeCalculation.fee.toString(), + "fee.totalInflows": feeCalculation.totalInflows.toString(), + "fee.totalOutflows": feeCalculation.totalOutflows.toString(), }); - } - - LOG.debug("Fee calculation breakdown", { - totalDepositAmount: feeCalculation.breakdown.totalDepositAmount.toString(), - totalCreateAmount: feeCalculation.breakdown.totalCreateAmount.toString(), - totalWithdrawAmount: feeCalculation.breakdown.totalWithdrawAmount.toString(), - totalSpendAmount: feeCalculation.breakdown.totalSpendAmount.toString(), - totalInflows: feeCalculation.totalInflows.toString(), - totalOutflows: feeCalculation.totalOutflows.toString(), - fee: feeCalculation.fee.toString(), - }); - if (feeCalculation.fee < BigInt(1)) { - LOG.warn("This bundle doesn't have any fee"); - } + // 5. Bundle update or creation + let bundleEntity: OperationsBundle; + if (isBundleExpired) { + span.addEvent("updating_expired_bundle"); + bundleEntity = await operationsBundleRepository.update(bundleId, { + status: BundleStatus.PENDING, + operationsMLXDR: operationsMLXDR, + fee: feeCalculation.fee, + updatedAt: new Date(), + updatedBy: userSession.accountId, + }); + } else { + span.addEvent("creating_new_bundle"); + bundleEntity = await operationsBundleRepository.create({ + id: bundleId, + status: BundleStatus.PENDING, + ttl: calculateBundleTtl(), + operationsMLXDR: operationsMLXDR, + fee: feeCalculation.fee, + createdBy: userSession.accountId, + createdAt: new Date(), + }); + } + + LOG.debug("Fee calculation breakdown", { + totalDepositAmount: feeCalculation.breakdown.totalDepositAmount.toString(), + totalCreateAmount: feeCalculation.breakdown.totalCreateAmount.toString(), + totalWithdrawAmount: feeCalculation.breakdown.totalWithdrawAmount.toString(), + totalSpendAmount: feeCalculation.breakdown.totalSpendAmount.toString(), + totalInflows: feeCalculation.totalInflows.toString(), + totalOutflows: feeCalculation.totalOutflows.toString(), + fee: feeCalculation.fee.toString(), + }); + + if (feeCalculation.fee < BigInt(1)) { + span.addEvent("zero_fee_warning"); + LOG.warn("This bundle doesn't have any fee"); + } - // 6. Persist UTXOs - await persistCreateOperations(classified.create, bundleEntity.id, userSession.accountId); - await persistSpendOperations(classified.spend, bundleEntity.id, userSession.accountId); + // 6. Persist UTXOs + span.addEvent("persisting_utxos"); + await persistCreateOperations(classified.create, bundleEntity.id, userSession.accountId); + await persistSpendOperations(classified.spend, bundleEntity.id, userSession.accountId); - // 7. Create SlotBundle and add to Mempool - const slotBundle = createSlotBundle(bundleEntity, classified); - const mempool = getMempool(); - await mempool.addBundle(slotBundle); + // 7. Create SlotBundle and add to Mempool + span.addEvent("adding_to_mempool"); + const slotBundle = createSlotBundle(bundleEntity, classified); + const mempool = getMempool(); + await mempool.addBundle(slotBundle); - LOG.info(`Bundle ${bundleEntity.id} added to mempool for asynchronous processing`); + span.addEvent("bundle_added_to_mempool", { "bundle.id": bundleEntity.id }); + LOG.info(`Bundle ${bundleEntity.id} added to mempool for asynchronous processing`); - return { - ctx: input.ctx, - operationsBundleId: bundleEntity.id, - }; + return { + ctx: input.ctx, + operationsBundleId: bundleEntity.id, + }; + }); }, { name: "ProcessNewBundleProcessEngine", diff --git a/src/core/service/bundle/get-bundle.process.ts b/src/core/service/bundle/get-bundle.process.ts index 0ccdb87..1c37a10 100644 --- a/src/core/service/bundle/get-bundle.process.ts +++ b/src/core/service/bundle/get-bundle.process.ts @@ -13,6 +13,7 @@ import type { JwtSessionData } from "@/http/middleware/auth/index.ts"; import * as E from "@/core/service/bundle/bundle.errors.ts"; import { logAndThrow } from "@/utils/error/log-and-throw.ts"; import { toBundleDTO } from "@/core/service/bundle/bundle.service.ts"; +import { withSpan } from "@/core/tracing.ts"; const operationsBundleRepository = new OperationsBundleRepository(drizzleClient); const sessionRepository = new SessionRepository(drizzleClient); @@ -52,22 +53,28 @@ export const P_GetBundleById = ProcessEngine.create( async ( input: GetEndpointInput, ): Promise => { - const { ctx, query } = input; - const { bundleId } = query; + return withSpan("P_GetBundleById", async (span) => { + const { ctx, query } = input; + const { bundleId } = query; - LOG.debug("Fetching bundle by ID", { bundleId }); + span.setAttribute("bundle.id", bundleId); + LOG.debug("Fetching bundle by ID", { bundleId }); - const bundle = await findBundleOrThrow(bundleId); - await assertBundleOwnership(ctx as Context, bundle); - const dto = toBundleDTO(bundle); + span.addEvent("finding_bundle"); + const bundle = await findBundleOrThrow(bundleId); - // Validate DTO against response schema (defensive) - const parsed = responseSchema.parse(dto); + span.addEvent("checking_ownership"); + await assertBundleOwnership(ctx as Context, bundle); - return { - ctx: ctx as Context, - bundle: parsed, - }; + const dto = toBundleDTO(bundle); + const parsed = responseSchema.parse(dto); + + span.addEvent("bundle_retrieved", { "bundle.status": bundle.status }); + return { + ctx: ctx as Context, + bundle: parsed, + }; + }); }, { name: "GetBundleByIdProcessEngine", diff --git a/src/core/service/bundle/list-bundles.process.ts b/src/core/service/bundle/list-bundles.process.ts index 91fee95..1f74c3e 100644 --- a/src/core/service/bundle/list-bundles.process.ts +++ b/src/core/service/bundle/list-bundles.process.ts @@ -12,6 +12,7 @@ import type { BundleStatus } from "@/persistence/drizzle/entity/operations-bundl import * as E from "@/core/service/bundle/bundle.errors.ts"; import { logAndThrow } from "@/utils/error/log-and-throw.ts"; import { toBundleDTO } from "@/core/service/bundle/bundle.service.ts"; +import { withSpan } from "@/core/tracing.ts"; const operationsBundleRepository = new OperationsBundleRepository(drizzleClient); const sessionRepository = new SessionRepository(drizzleClient); @@ -48,26 +49,29 @@ export const P_ListBundlesByUser = ProcessEngine.create( async ( input: GetEndpointInput, ): Promise => { - const { ctx, query } = input; - const sessionData = ctx.state.session as JwtSessionData; + return withSpan("P_ListBundlesByUser", async (span) => { + const { ctx, query } = input; + const sessionData = ctx.state.session as JwtSessionData; - LOG.debug("Fetching bundles for user", { - sessionId: sessionData.sessionId, - status: query.status, - }); + LOG.debug("Fetching bundles for user", { + sessionId: sessionData.sessionId, + status: query.status, + }); - // Validate session and get account ID - const accountId = await validateSessionAndGetAccountId(sessionData.sessionId); + span.addEvent("validating_session"); + const accountId = await validateSessionAndGetAccountId(sessionData.sessionId); - // Find bundles created by this account - const bundles = await findBundlesByUser(accountId, query.status); + span.addEvent("finding_bundles", { "account.id": accountId }); + const bundles = await findBundlesByUser(accountId, query.status); - LOG.debug("Bundles found", { count: bundles.length, accountId }); + span.addEvent("bundles_found", { "bundles.count": bundles.length }); + LOG.debug("Bundles found", { count: bundles.length, accountId }); - return { - ctx: ctx as Context, - bundles, - }; + return { + ctx: ctx as Context, + bundles, + }; + }); }, { name: "ListBundlesByUserProcessEngine", diff --git a/src/core/service/executor/executor.process.ts b/src/core/service/executor/executor.process.ts index 3e047e1..50a90ef 100644 --- a/src/core/service/executor/executor.process.ts +++ b/src/core/service/executor/executor.process.ts @@ -10,11 +10,12 @@ import { ChannelInvokeMethods } from "@moonlight/moonlight-sdk"; import type { SIM_ERRORS } from "@colibri/core"; import { buildTransactionFromSlot } from "@/core/service/executor/executor.service.ts"; import type { MoonlightTransactionBuilder } from "@moonlight/moonlight-sdk"; -import { +import { OperationsBundleRepository, TransactionRepository, BundleTransactionRepository, } from "@/persistence/drizzle/repository/index.ts"; +import { withSpan } from "@/core/tracing.ts"; const EXECUTOR_CONFIG = { INTERVAL_MS: MEMPOOL_EXECUTOR_INTERVAL_MS, @@ -40,38 +41,44 @@ async function submitTransactionToNetwork( txBuilder: MoonlightTransactionBuilder, expiration: number ): Promise { - await txBuilder.signWithProvider(PROVIDER_SIGNER, expiration); - - try { - const authEntries = txBuilder.getSignedAuthEntries(); - - const { hash } = await CHANNEL_CLIENT.invokeRaw({ - operationArgs: { - function: ChannelInvokeMethods.transact, - args: [txBuilder.buildXDR()], - auth: [...authEntries], - }, - config: TX_CONFIG, - }); - - return hash.toString(); - } catch (error) { - const errorMessage = error instanceof Error ? error.message : String(error); - LOG.error("Transaction submission failed", { error: errorMessage }); - const simError = error as SIM_ERRORS.SIMULATION_FAILED; - if (simError?.meta?.data) { - const simResponse = simError.meta.data.simulationResponse ?? simError.meta.data; - LOG.error("Simulation details", { - simError: JSON.stringify(simResponse, null, 2), + return withSpan("Executor.submitTransactionToNetwork", async (span) => { + span.addEvent("signing_with_provider"); + await txBuilder.signWithProvider(PROVIDER_SIGNER, expiration); + + try { + const authEntries = txBuilder.getSignedAuthEntries(); + + span.addEvent("invoking_channel_contract"); + const { hash } = await CHANNEL_CLIENT.invokeRaw({ + operationArgs: { + function: ChannelInvokeMethods.transact, + args: [txBuilder.buildXDR()], + auth: [...authEntries], + }, + config: TX_CONFIG, }); - if (simError.meta.data.input?.transaction) { - LOG.error("Failed transaction XDR", { - xdr: simError.meta.data.input.transaction.toXDR() + + span.addEvent("transaction_submitted", { "tx.hash": hash.toString() }); + return hash.toString(); + } catch (error) { + const errorMessage = error instanceof Error ? error.message : String(error); + span.addEvent("submission_failed", { "error.message": errorMessage }); + LOG.error("Transaction submission failed", { error: errorMessage }); + const simError = error as SIM_ERRORS.SIMULATION_FAILED; + if (simError?.meta?.data) { + const simResponse = simError.meta.data.simulationResponse ?? simError.meta.data; + LOG.error("Simulation details", { + simError: JSON.stringify(simResponse, null, 2), }); + if (simError.meta.data.input?.transaction) { + LOG.error("Failed transaction XDR", { + xdr: simError.meta.data.input.transaction.toXDR() + }); + } } + throw error; } - throw error; - } + }); } /** @@ -111,20 +118,25 @@ async function handleExecutionFailure( error: Error, bundleIds: string[] ): Promise { - const errorMessage = error.message || "Unknown error"; - LOG.error("Execution failed", { error: errorMessage, bundleIds }); - - // Update bundles back to PENDING status for retry - for (const bundleId of bundleIds) { - try { - await operationsBundleRepository.update(bundleId, { - status: BundleStatus.PENDING, - updatedAt: new Date(), - }); - } catch (updateError) { - LOG.error(`Failed to update bundle ${bundleId} status`, { error: updateError }); + return withSpan("Executor.handleExecutionFailure", async (span) => { + const errorMessage = error.message || "Unknown error"; + span.addEvent("handling_failure", { "error.message": errorMessage, "bundles.count": bundleIds.length }); + LOG.error("Execution failed", { error: errorMessage, bundleIds }); + + // Update bundles back to PENDING status for retry + for (const bundleId of bundleIds) { + try { + await operationsBundleRepository.update(bundleId, { + status: BundleStatus.PENDING, + updatedAt: new Date(), + }); + span.addEvent("bundle_reset_to_pending", { "bundle.id": bundleId }); + } catch (updateError) { + span.addEvent("bundle_reset_failed", { "bundle.id": bundleId }); + LOG.error(`Failed to update bundle ${bundleId} status`, { error: updateError }); + } } - } + }); } /** @@ -175,95 +187,99 @@ export class Executor { * Executes the next slot from the mempool */ async executeNext(): Promise { - // Check if already processing a slot if (this.isProcessing) { LOG.debug("Executor already processing a slot, skipping"); return; } - const mempool = getMempool(); - let slot: ReturnType = null; - let bundleIds: string[] = []; + return withSpan("Executor.executeNext", async (span) => { + const mempool = getMempool(); + let slot: ReturnType = null; + let bundleIds: string[] = []; - try { - // Set processing lock - this.isProcessing = true; + try { + this.isProcessing = true; - // Remove slot from mempool BEFORE processing to prevent concurrent execution - slot = mempool.removeFirstSlot(); + slot = mempool.removeFirstSlot(); - if (!slot || slot.isEmpty()) { - // No slots to process - return; - } + if (!slot || slot.isEmpty()) { + span.addEvent("no_slots_to_process"); + return; + } - // Get bundle IDs before processing - bundleIds = slot.getBundles().map((b) => b.bundleId); + bundleIds = slot.getBundles().map((b) => b.bundleId); - LOG.debug("Executing slot", { - bundleCount: slot.getBundleCount(), - weight: slot.getTotalWeight(), - bundleIds - }); + span.addEvent("executing_slot", { + "slot.bundleCount": slot.getBundleCount(), + "slot.weight": slot.getTotalWeight(), + }); - // Build transaction from slot - const { txBuilder, bundleIds: buildBundleIds } = await buildTransactionFromSlot(slot); - - // Use bundleIds from build result to ensure consistency - bundleIds = buildBundleIds; + LOG.debug("Executing slot", { + bundleCount: slot.getBundleCount(), + weight: slot.getTotalWeight(), + bundleIds, + }); - // Get transaction expiration - const expiration = await getTransactionExpiration(); + span.addEvent("building_transaction"); + const { txBuilder, bundleIds: buildBundleIds } = await buildTransactionFromSlot(slot); + bundleIds = buildBundleIds; - LOG.info("Transaction", { txBuilder: txBuilder.buildXDR().toXDR() }); + span.addEvent("getting_expiration"); + const expiration = await getTransactionExpiration(); - // Submit transaction to network - const transactionHash = await submitTransactionToNetwork(txBuilder, expiration); + LOG.info("Transaction", { txBuilder: txBuilder.buildXDR().toXDR() }); - LOG.info("Transaction submitted successfully", { - transactionHash, - bundleCount: bundleIds.length, - bundleIds - }); + span.addEvent("submitting_to_network"); + const transactionHash = await submitTransactionToNetwork(txBuilder, expiration); - // Create transaction record and link bundles - await createTransactionRecord(transactionHash, bundleIds); + span.addEvent("transaction_submitted", { + "tx.hash": transactionHash, + "bundles.count": bundleIds.length, + }); - LOG.info("Slot executed successfully", { - transactionHash, - bundleCount: bundleIds.length - }); + LOG.info("Transaction submitted successfully", { + transactionHash, + bundleCount: bundleIds.length, + bundleIds, + }); - } catch (error) { - const errorMessage = error instanceof Error ? error.message : String(error); - LOG.error("Slot execution failed", { - error: errorMessage, - bundleIds - }); + span.addEvent("creating_transaction_record"); + await createTransactionRecord(transactionHash, bundleIds); - // Handle failure: re-add bundles to mempool and update status - if (slot && !slot.isEmpty() && bundleIds.length > 0) { - // Re-add bundles to mempool for retry - const bundles = slot.getBundles(); - await mempool.reAddBundles(bundles); - - // Update bundle statuses to PENDING - await handleExecutionFailure( - error instanceof Error ? error : new Error(errorMessage), - bundleIds - ); - - LOG.info("Bundles re-added to mempool for retry", { bundleIds }); - } else { - LOG.error("Execution error with no slot or bundles to re-add", { + span.addEvent("slot_executed_successfully"); + LOG.info("Slot executed successfully", { + transactionHash, + bundleCount: bundleIds.length, + }); + } catch (error) { + const errorMessage = error instanceof Error ? error.message : String(error); + span.addEvent("execution_failed", { "error.message": errorMessage }); + LOG.error("Slot execution failed", { error: errorMessage, - hasSlot: !!slot, - bundleCount: bundleIds.length + bundleIds, }); + + if (slot && !slot.isEmpty() && bundleIds.length > 0) { + span.addEvent("re_adding_bundles_to_mempool"); + const bundles = slot.getBundles(); + await mempool.reAddBundles(bundles); + + await handleExecutionFailure( + error instanceof Error ? error : new Error(errorMessage), + bundleIds, + ); + + LOG.info("Bundles re-added to mempool for retry", { bundleIds }); + } else { + LOG.error("Execution error with no slot or bundles to re-add", { + error: errorMessage, + hasSlot: !!slot, + bundleCount: bundleIds.length, + }); + } + } finally { + this.isProcessing = false; } - } finally { - // Always release processing lock - this.isProcessing = false; - } + }); } } diff --git a/src/core/service/executor/executor.service.ts b/src/core/service/executor/executor.service.ts index 7e45991..205b0e3 100644 --- a/src/core/service/executor/executor.service.ts +++ b/src/core/service/executor/executor.service.ts @@ -4,6 +4,7 @@ import type { TransactionBuildResult } from "@/core/service/executor/executor.ty import { CHANNEL_CLIENT } from "@/core/channel-client/index.ts"; import { UtxoBasedStellarAccount, UTXOStatus } from "@moonlight/moonlight-sdk"; import { OPEX_SK } from "@/config/env.ts"; +import { withSpan } from "@/core/tracing.ts"; const EXECUTOR_CONFIG = { OPEX_UTXO_BATCH_SIZE: 200, @@ -20,56 +21,59 @@ const EXECUTOR_CONFIG = { export async function buildTransactionFromSlot( slot: Slot ): Promise { - const bundles = slot.getBundles(); - - if (bundles.length === 0) { - throw new Error("Cannot build transaction from empty slot"); - } + return withSpan("Executor.buildTransactionFromSlot", async (span) => { + const bundles = slot.getBundles(); - // Setup transaction builder and OPEX handler - const txBuilder = MoonlightTransactionBuilder.fromPrivacyChannel(CHANNEL_CLIENT); - const opexHandler = UtxoBasedStellarAccount.fromPrivacyChannel({ - channelClient: CHANNEL_CLIENT, - root: OPEX_SK, - options: { - batchSize: EXECUTOR_CONFIG.OPEX_UTXO_BATCH_SIZE, - }, - }); + if (bundles.length === 0) { + throw new Error("Cannot build transaction from empty slot"); + } + + span.addEvent("setting_up_tx_builder", { "bundles.count": bundles.length }); + const txBuilder = MoonlightTransactionBuilder.fromPrivacyChannel(CHANNEL_CLIENT); + const opexHandler = UtxoBasedStellarAccount.fromPrivacyChannel({ + channelClient: CHANNEL_CLIENT, + root: OPEX_SK, + options: { + batchSize: EXECUTOR_CONFIG.OPEX_UTXO_BATCH_SIZE, + }, + }); + + span.addEvent("ensuring_opex_utxos"); + await ensureOpexUtxosAvailable(opexHandler, EXECUTOR_CONFIG.REQUIRED_OPEX_UTXOS); + const reservedUtxos = opexHandler.reserveUTXOs(EXECUTOR_CONFIG.REQUIRED_OPEX_UTXOS); - // Ensure OPEX has enough UTXOs - await ensureOpexUtxosAvailable(opexHandler, EXECUTOR_CONFIG.REQUIRED_OPEX_UTXOS); - const reservedUtxos = opexHandler.reserveUTXOs(EXECUTOR_CONFIG.REQUIRED_OPEX_UTXOS); - - if (!reservedUtxos || reservedUtxos.length === 0) { - const availableCount = opexHandler.getUTXOsByState(UTXOStatus.FREE).length; - throw new Error(`Insufficient UTXOs. Required: ${EXECUTOR_CONFIG.REQUIRED_OPEX_UTXOS}, Available: ${availableCount}`); - } + if (!reservedUtxos || reservedUtxos.length === 0) { + const availableCount = opexHandler.getUTXOsByState(UTXOStatus.FREE).length; + span.addEvent("insufficient_utxos", { "available": availableCount }); + throw new Error(`Insufficient UTXOs. Required: ${EXECUTOR_CONFIG.REQUIRED_OPEX_UTXOS}, Available: ${availableCount}`); + } - // Calculate total fee from all bundles - const totalFee = bundles.reduce((sum, bundle) => sum + bundle.fee, BigInt(0)); + const totalFee = bundles.reduce((sum, bundle) => sum + bundle.fee, BigInt(0)); + span.addEvent("fee_calculated", { "fee.total": totalFee.toString() }); - // Create fee operation - const feeOperation = MoonlightOperation.create( - reservedUtxos[0].publicKey, - totalFee - ); - txBuilder.addOperation(feeOperation); + const feeOperation = MoonlightOperation.create( + reservedUtxos[0].publicKey, + totalFee + ); + txBuilder.addOperation(feeOperation); - // Add all operations from all bundles - for (const bundle of bundles) { - bundle.operations.deposit.forEach((op) => txBuilder.addOperation(op)); - bundle.operations.create.forEach((op) => txBuilder.addOperation(op)); - bundle.operations.spend.forEach((op) => txBuilder.addOperation(op)); - bundle.operations.withdraw.forEach((op) => txBuilder.addOperation(op)); - } + span.addEvent("adding_bundle_operations"); + for (const bundle of bundles) { + bundle.operations.deposit.forEach((op) => txBuilder.addOperation(op)); + bundle.operations.create.forEach((op) => txBuilder.addOperation(op)); + bundle.operations.spend.forEach((op) => txBuilder.addOperation(op)); + bundle.operations.withdraw.forEach((op) => txBuilder.addOperation(op)); + } - const bundleIds = bundles.map((b) => b.bundleId); + const bundleIds = bundles.map((b) => b.bundleId); + span.addEvent("transaction_built", { "bundles.count": bundleIds.length }); - return { - txBuilder, - totalFee, - bundleIds, - }; + return { + txBuilder, + totalFee, + bundleIds, + }; + }); } /** @@ -79,8 +83,18 @@ async function ensureOpexUtxosAvailable( opexHandler: UtxoBasedStellarAccount, requiredCount: number ): Promise { - while (opexHandler.getUTXOsByState(UTXOStatus.FREE).length < requiredCount + 1) { - await opexHandler.deriveBatch({}); - await opexHandler.batchLoad(); - } + return withSpan("Executor.ensureOpexUtxosAvailable", async (span) => { + span.addEvent("checking_free_utxos", { "required": requiredCount }); + let iterations = 0; + while (opexHandler.getUTXOsByState(UTXOStatus.FREE).length < requiredCount + 1) { + iterations++; + span.addEvent("deriving_batch", { "iteration": iterations }); + await opexHandler.deriveBatch({}); + await opexHandler.batchLoad(); + } + span.addEvent("utxos_available", { + "free.count": opexHandler.getUTXOsByState(UTXOStatus.FREE).length, + "iterations": iterations, + }); + }); } diff --git a/src/core/service/mempool/mempool.process.ts b/src/core/service/mempool/mempool.process.ts index 523a123..7068ce9 100644 --- a/src/core/service/mempool/mempool.process.ts +++ b/src/core/service/mempool/mempool.process.ts @@ -23,6 +23,7 @@ import { } from "@/core/service/mempool/mempool.service.ts"; import type { MempoolStats } from "@/core/service/mempool/mempool.types.ts"; import * as E from "@/core/service/mempool/mempool.errors.ts"; +import { withSpan } from "@/core/tracing.ts"; const MEMPOOL_CONFIG = { SLOT_CAPACITY: MEMPOOL_SLOT_CAPACITY, @@ -235,51 +236,54 @@ export class Mempool { * Tries to fit in existing slots, creates new slot if necessary */ async addBundle(bundleData: SlotBundle): Promise { - // Check if bundle is expired - if (isBundleExpired(bundleData)) { - LOG.warn(`Bundle ${bundleData.bundleId} is expired, marking as EXPIRED`); - await operationsBundleRepository.update(bundleData.bundleId, { - status: BundleStatus.EXPIRED, - updatedAt: new Date(), - }); - return; - } + return withSpan("Mempool.addBundle", async (span) => { + span.setAttribute("bundle.id", bundleData.bundleId); + span.setAttribute("bundle.weight", bundleData.weight); + + if (isBundleExpired(bundleData)) { + span.addEvent("bundle_expired"); + LOG.warn(`Bundle ${bundleData.bundleId} is expired, marking as EXPIRED`); + await operationsBundleRepository.update(bundleData.bundleId, { + status: BundleStatus.EXPIRED, + updatedAt: new Date(), + }); + return; + } - let bundleToAdd: SlotBundle | null = bundleData; + let bundleToAdd: SlotBundle | null = bundleData; - // Try to fit in existing slots - for (const slot of this.slots) { - if (!bundleToAdd) break; + for (const slot of this.slots) { + if (!bundleToAdd) break; - const removed = slot.add(bundleToAdd); - if (removed === null) { - // Bundle was successfully added - bundleToAdd = null; - LOG.debug(`Bundle ${bundleData.bundleId} added to existing slot`); - } else if (removed !== bundleToAdd) { - // A different bundle was removed, try to re-add the removed one - bundleToAdd = removed; + const removed = slot.add(bundleToAdd); + if (removed === null) { + bundleToAdd = null; + span.addEvent("added_to_existing_slot"); + LOG.debug(`Bundle ${bundleData.bundleId} added to existing slot`); + } else if (removed !== bundleToAdd) { + bundleToAdd = removed; + } } - } - // If bundle still needs to be added, create a new slot - if (bundleToAdd) { - const newSlot = new Slot(this.capacity); - const result = newSlot.add(bundleToAdd); - if (result === null) { - this.slots.push(newSlot); - LOG.debug(`Bundle ${bundleData.bundleId} added to new slot`); - } else { - // Even new slot can't fit (shouldn't happen if capacity is reasonable) - LOG.error(`Bundle ${bundleData.bundleId} cannot fit in any slot, weight: ${bundleData.weight}, capacity: ${this.capacity}`); - throw new E.SLOT_FULL(bundleData.weight, this.capacity); + if (bundleToAdd) { + const newSlot = new Slot(this.capacity); + const result = newSlot.add(bundleToAdd); + if (result === null) { + this.slots.push(newSlot); + span.addEvent("added_to_new_slot"); + LOG.debug(`Bundle ${bundleData.bundleId} added to new slot`); + } else { + span.addEvent("slot_full", { "bundle.weight": bundleData.weight, "slot.capacity": this.capacity }); + LOG.error(`Bundle ${bundleData.bundleId} cannot fit in any slot, weight: ${bundleData.weight}, capacity: ${this.capacity}`); + throw new E.SLOT_FULL(bundleData.weight, this.capacity); + } } - } - // Update bundle status to PROCESSING - await operationsBundleRepository.update(bundleData.bundleId, { - status: BundleStatus.PROCESSING, - updatedAt: new Date(), + span.addEvent("updating_status_to_processing"); + await operationsBundleRepository.update(bundleData.bundleId, { + status: BundleStatus.PROCESSING, + updatedAt: new Date(), + }); }); } @@ -307,53 +311,64 @@ export class Mempool { * @param bundles - Array of bundles to re-add */ async reAddBundles(bundles: SlotBundle[]): Promise { - LOG.debug(`Re-adding ${bundles.length} bundles to mempool after execution failure`); + return withSpan("Mempool.reAddBundles", async (span) => { + span.addEvent("re_adding_bundles", { "bundles.count": bundles.length }); + LOG.debug(`Re-adding ${bundles.length} bundles to mempool after execution failure`); - for (const bundle of bundles) { - try { - await this.addBundle(bundle); - LOG.debug(`Bundle ${bundle.bundleId} re-added to mempool`); - } catch (error) { - LOG.error(`Failed to re-add bundle ${bundle.bundleId}`, { - error: error instanceof Error ? error.message : String(error), - }); + let succeeded = 0; + let failed = 0; + for (const bundle of bundles) { + try { + await this.addBundle(bundle); + succeeded++; + LOG.debug(`Bundle ${bundle.bundleId} re-added to mempool`); + } catch (error) { + failed++; + span.addEvent("re_add_failed", { "bundle.id": bundle.bundleId }); + LOG.error(`Failed to re-add bundle ${bundle.bundleId}`, { + error: error instanceof Error ? error.message : String(error), + }); + } } - } + span.addEvent("re_add_complete", { "succeeded": succeeded, "failed": failed }); + }); } /** * Expires bundles that have passed their TTL */ async expireBundles(): Promise { - const expiredBundleIds: string[] = []; + return withSpan("Mempool.expireBundles", async (span) => { + const expiredBundleIds: string[] = []; + + for (let i = this.slots.length - 1; i >= 0; i--) { + const slot = this.slots[i]; + const bundles = slot.getBundles(); + + for (const bundle of bundles) { + if (isBundleExpired(bundle)) { + expiredBundleIds.push(bundle.bundleId); + this.removeBundleFromSlot(slot, bundle.bundleId); + } + } - // Collect expired bundles from all slots - for (let i = this.slots.length - 1; i >= 0; i--) { - const slot = this.slots[i]; - const bundles = slot.getBundles(); - - for (const bundle of bundles) { - if (isBundleExpired(bundle)) { - expiredBundleIds.push(bundle.bundleId); - // Remove the expired bundle from slot - this.removeBundleFromSlot(slot, bundle.bundleId); + if (slot.isEmpty()) { + this.slots.splice(i, 1); } } - // Remove empty slots - if (slot.isEmpty()) { - this.slots.splice(i, 1); + if (expiredBundleIds.length > 0) { + span.addEvent("expiring_bundles", { "expired.count": expiredBundleIds.length }); } - } - // Update expired bundles in database - for (const bundleId of expiredBundleIds) { - await operationsBundleRepository.update(bundleId, { - status: BundleStatus.EXPIRED, - updatedAt: new Date(), - }); - LOG.info(`Bundle ${bundleId} expired and marked as EXPIRED`); - } + for (const bundleId of expiredBundleIds) { + await operationsBundleRepository.update(bundleId, { + status: BundleStatus.EXPIRED, + updatedAt: new Date(), + }); + LOG.info(`Bundle ${bundleId} expired and marked as EXPIRED`); + } + }); } /** diff --git a/src/core/service/verifier/verifier.process.ts b/src/core/service/verifier/verifier.process.ts index d2c4306..56a5d79 100644 --- a/src/core/service/verifier/verifier.process.ts +++ b/src/core/service/verifier/verifier.process.ts @@ -4,11 +4,12 @@ import { BundleStatus } from "@/persistence/drizzle/entity/operations-bundle.ent import { TransactionStatus } from "@/persistence/drizzle/entity/transaction.entity.ts"; import { MEMPOOL_VERIFIER_INTERVAL_MS, NETWORK_RPC_SERVER } from "@/config/env.ts"; import { verifyTransactionOnNetwork } from "@/core/service/verifier/verifier.service.ts"; -import { +import { TransactionRepository, BundleTransactionRepository, OperationsBundleRepository, } from "@/persistence/drizzle/repository/index.ts"; +import { withSpan } from "@/core/tracing.ts"; const VERIFIER_CONFIG = { INTERVAL_MS: MEMPOOL_VERIFIER_INTERVAL_MS, @@ -130,60 +131,74 @@ export class Verifier { * Verifies all unverified transactions */ async verifyTransactions(): Promise { - try { - // Get all unverified transactions - const unverifiedTransactions = await transactionRepository.findByStatus( - TransactionStatus.UNVERIFIED - ); - - if (unverifiedTransactions.length === 0) { - // No transactions to verify - return; - } - - LOG.debug(`Verifying ${unverifiedTransactions.length} transactions`); - - // Verify each transaction - for (const transaction of unverifiedTransactions) { - await this.verifyTransaction(transaction.id); + return withSpan("Verifier.verifyTransactions", async (span) => { + try { + const unverifiedTransactions = await transactionRepository.findByStatus( + TransactionStatus.UNVERIFIED + ); + + if (unverifiedTransactions.length === 0) { + span.addEvent("no_transactions_to_verify"); + return; + } + + span.addEvent("verifying_transactions", { "transactions.count": unverifiedTransactions.length }); + LOG.debug(`Verifying ${unverifiedTransactions.length} transactions`); + + for (const transaction of unverifiedTransactions) { + await this.verifyTransaction(transaction.id); + } + + span.addEvent("verification_cycle_complete"); + } catch (error) { + span.addEvent("verification_error", { + "error.message": error instanceof Error ? error.message : String(error), + }); + LOG.error("Error during transaction verification", { + error: error instanceof Error ? error.message : String(error), + }); } - } catch (error) { - LOG.error("Error during transaction verification", { - error: error instanceof Error ? error.message : String(error), - }); - } + }); } /** * Verifies a single transaction */ private async verifyTransaction(txId: string): Promise { - try { - // Get bundles linked to this transaction - const bundleTransactions = await bundleTransactionRepository.findByTransactionId(txId); - const bundleIds = bundleTransactions.map((bt) => bt.bundleId); - - if (bundleIds.length === 0) { - LOG.warn(`No bundles found for transaction ${txId}`); - return; - } - - // Verify transaction on network - const result = await verifyTransactionOnNetwork(txId, NETWORK_RPC_SERVER); - - // Handle verification result - if (result.status === "VERIFIED") { - await handleVerificationSuccess(txId, bundleIds); - } else if (result.status === "FAILED") { - await handleVerificationFailure(txId, result.reason, bundleIds); - } else { - // PENDING - transaction not yet found on network, will check again next cycle - LOG.debug(`Transaction ${txId} still pending verification`); + return withSpan("Verifier.verifyTransaction", async (span) => { + try { + span.setAttribute("tx.id", txId); + + span.addEvent("looking_up_bundle_transactions"); + const bundleTransactions = await bundleTransactionRepository.findByTransactionId(txId); + const bundleIds = bundleTransactions.map((bt) => bt.bundleId); + + if (bundleIds.length === 0) { + span.addEvent("no_bundles_found"); + LOG.warn(`No bundles found for transaction ${txId}`); + return; + } + + span.addEvent("verifying_on_network", { "bundles.count": bundleIds.length }); + const result = await verifyTransactionOnNetwork(txId, NETWORK_RPC_SERVER); + + span.addEvent("verification_result", { "result.status": result.status }); + + if (result.status === "VERIFIED") { + await handleVerificationSuccess(txId, bundleIds); + } else if (result.status === "FAILED") { + await handleVerificationFailure(txId, result.reason, bundleIds); + } else { + LOG.debug(`Transaction ${txId} still pending verification`); + } + } catch (error) { + span.addEvent("verification_failed", { + "error.message": error instanceof Error ? error.message : String(error), + }); + LOG.error(`Failed to verify transaction ${txId}`, { + error: error instanceof Error ? error.message : String(error), + }); } - } catch (error) { - LOG.error(`Failed to verify transaction ${txId}`, { - error: error instanceof Error ? error.message : String(error), - }); - } + }); } } diff --git a/src/core/service/verifier/verifier.service.ts b/src/core/service/verifier/verifier.service.ts index 148cfa2..01d0276 100644 --- a/src/core/service/verifier/verifier.service.ts +++ b/src/core/service/verifier/verifier.service.ts @@ -1,10 +1,11 @@ import type { Server } from "stellar-sdk/rpc"; import type { VerificationResult } from "@/core/service/verifier/verifier.types.ts"; +import { withSpan } from "@/core/tracing.ts"; /** * Verifies a transaction on the Stellar network * Checks if the transaction was included in a ledger - * + * * @param txHash - Transaction hash to verify * @param rpcServer - Stellar RPC server instance * @returns Verification result: VERIFIED, FAILED, or PENDING @@ -13,46 +14,48 @@ export async function verifyTransactionOnNetwork( txHash: string, rpcServer: Server ): Promise { - try { - // Try to get transaction by hash - const txResponse = await rpcServer.getTransaction(txHash); - if (!txResponse) { - // Transaction not found - might be pending or failed + return withSpan("Verifier.verifyTransactionOnNetwork", async (span) => { + span.setAttribute("tx.hash", txHash); + try { + span.addEvent("querying_rpc"); + const txResponse = await rpcServer.getTransaction(txHash); + if (!txResponse) { + span.addEvent("transaction_not_found"); + return { status: "PENDING" }; + } + + if (txResponse.status === "SUCCESS") { + span.addEvent("transaction_verified", { "ledger": txResponse.ledger?.toString() ?? "unknown" }); + return { + status: "VERIFIED", + ledgerSequence: txResponse.ledger?.toString(), + }; + } + + if (txResponse.status === "FAILED") { + const resultCode = txResponse.resultXdr || "unknown"; + span.addEvent("transaction_failed_on_network", { "resultCode": String(resultCode) }); + return { + status: "FAILED", + reason: `Transaction failed with result code: ${resultCode}`, + }; + } + + span.addEvent("transaction_status_unclear"); return { status: "PENDING" }; - } + } catch (error) { + const errorMessage = error instanceof Error ? error.message : String(error); - // Check if transaction was successful - if (txResponse.status === "SUCCESS") { - return { - status: "VERIFIED", - ledgerSequence: txResponse.ledger?.toString(), - }; - } + if (errorMessage.includes("not found") || errorMessage.includes("404")) { + span.addEvent("transaction_pending_not_found"); + return { status: "PENDING" }; + } - // Transaction was included but failed - if (txResponse.status === "FAILED") { - const resultCode = txResponse.resultXdr || "unknown"; + span.addEvent("verification_error", { "error.message": errorMessage }); return { status: "FAILED", - reason: `Transaction failed with result code: ${resultCode}`, + reason: errorMessage, }; } - - // Transaction found but status unclear - return { status: "PENDING" }; - } catch (error) { - // If transaction is not found, it might still be pending - // Check if it's a 404 or similar - const errorMessage = error instanceof Error ? error.message : String(error); - - if (errorMessage.includes("not found") || errorMessage.includes("404")) { - return { status: "PENDING" }; - } - - // Other errors might indicate the transaction failed - return { - status: "FAILED", - reason: errorMessage, - }; - } + }); } From c49391ae7b1ec3b28cca0c849ead991d0eaa907e Mon Sep 17 00:00:00 2001 From: Gorka Date: Thu, 12 Mar 2026 14:00:15 -0300 Subject: [PATCH 4/4] chore: add OTEL config to .env.example and docker-compose, remove fly.provider-b.toml Adds commented-out OTEL configuration section to .env.example, adds OTEL environment variables to docker-compose provider service, and removes unused fly.provider-b.toml deployment config. --- .env.example | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/.env.example b/.env.example index 8fcaa36..31995eb 100644 --- a/.env.example +++ b/.env.example @@ -27,6 +27,12 @@ SERVICE_AUTH_SECRET= # 32 random bytes, base64 encoded. Used for secure JWT midd CHALLENGE_TTL=900 #15m SESSION_TTL=21600 #6h +# OPENTELEMETRY (opt-in, requires Jaeger or OTLP-compatible collector) +# OTEL_DENO=true +# OTEL_SERVICE_NAME=provider-platform +# OTEL_EXPORTER_OTLP_ENDPOINT=http://localhost:4318 +# OTEL_EXPORTER_OTLP_PROTOCOL=http/protobuf + # MEMPOOL CONFIGURATION MEMPOOL_SLOT_CAPACITY=100 MEMPOOL_EXPENSIVE_OP_WEIGHT=10