diff --git a/apps/api/examples/lease-flow.ts b/apps/api/examples/lease-flow.ts index ade853e1da..13498533cf 100644 --- a/apps/api/examples/lease-flow.ts +++ b/apps/api/examples/lease-flow.ts @@ -2,7 +2,7 @@ * This script demonstrates how to create a deployment with a lease using the API and an API key. * * The script follows these steps: - * 1. Creates a certificate for secure communication + * 1. Checks if API key env var is set * 2. Creates a deployment using the provided SDL file * 3. Waits for and collects bids from providers * 4. Creates a lease using the first received bid @@ -23,6 +23,7 @@ import { config } from "@dotenvx/dotenvx"; import axios from "axios"; import * as fs from "node:fs"; import * as path from "node:path"; +import WebSocket from "ws"; // Load environment variables from .env.local in the script directory const envPath = path.resolve(__dirname, ".env.local"); @@ -68,7 +69,7 @@ async function waitForBids(dseq: string, apiKey: string, maxAttempts = 10): Prom /** * This script is used to create a lease for a deployment using an api key. - * It creates a certificate, creates a deployment, waits for bids, creates a lease, and then closes the deployment. + * It creates a deployment, waits for bids, creates a lease, and then closes the deployment. */ async function main() { try { @@ -78,22 +79,7 @@ async function main() { throw new Error("API_KEY environment variable is required"); } - // 2. Create certificate - console.log("Creating certificate..."); - const certResponse = await api.post( - "/v1/certificates", - {}, - { - headers: { - "x-api-key": apiKey - } - } - ); - - const { certPem, encryptedKey } = certResponse.data.data; - console.log("Certificate created successfully"); - - // 3. Create deployment + // 2. Create deployment console.log("Creating deployment..."); const deployResponse = await api.post( "/v1/deployments", @@ -113,7 +99,7 @@ async function main() { const { dseq, manifest } = deployResponse.data.data; console.log(`Deployment created with dseq: ${dseq}`); - // 4. Wait for and get bids + // 3. Wait for and get bids console.log("Waiting for bids..."); const bids = await waitForBids(dseq, apiKey); console.log(`Received ${bids.length} bids`); @@ -126,23 +112,21 @@ async function main() { throw new Error(`No bid found from provider ${targetProvider}`); } + const { provider, gseq, oseq } = selectedBid.bid.bid_id; + const body = { manifest, - certificate: { - certPem, - keyPem: encryptedKey - }, leases: [ { dseq, - gseq: selectedBid.bid.bid_id.gseq, - oseq: selectedBid.bid.bid_id.oseq, - provider: selectedBid.bid.bid_id.provider + gseq, + oseq, + provider } ] }; - // 5. Create lease and send manifest + // 4. Create lease and send manifest console.log("Creating lease and sending manifest..."); const leaseResponse = await api.post("/v1/leases", body, { headers: { @@ -155,7 +139,7 @@ async function main() { } console.log("Lease created successfully", JSON.stringify(leaseResponse.data.data, null, 2)); - // 6. Deposit into deployment + // 5. Deposit into deployment console.log("Depositing into deployment..."); const depositResponse = await api.post( `/v1/deposit-deployment`, @@ -182,11 +166,7 @@ async function main() { `/v1/deployments/${dseq}`, { data: { - sdl: updatedYml, - certificate: { - certPem, - keyPem: encryptedKey - } + sdl: updatedYml } }, { @@ -201,7 +181,7 @@ async function main() { } console.log("Deployment updated successfully"); - // 7. Get the deployment details + // 6. Get the deployment details console.log("Getting deployment details..."); const deploymentResponse = await api.get(`/v1/deployments/${dseq}`, { headers: { @@ -211,6 +191,39 @@ async function main() { console.log("Deployment details:", JSON.stringify(deploymentResponse.data.data, null, 2)); + // 7. Stream logs from provider + const providerResponse = await api.get(`/v1/providers/${provider}`, { + headers: { + "x-api-key": apiKey + } + }); + const { hostUri } = providerResponse.data; + + const websocket = new WebSocket(`${API_URL}/v1/ws`, { + headers: { + "x-api-key": apiKey + } + }); + + websocket.on("message", message => { + console.log("WebSocket message received:", message.toString()); + }); + + websocket.on("open", () => { + console.log("WebSocket connected, sending message to stream logs"); + websocket.send( + JSON.stringify({ + type: "websocket", + providerAddress: provider, + url: `${hostUri}/lease/${dseq}/${gseq}/${oseq}/logs`, + chainNetwork: "sandbox" + }) + ); + }); + + // wait for 5 seconds before closing the deployment + await new Promise(resolve => setTimeout(resolve, 5000)); + // 8. Close deployment console.log("Closing deployment..."); const closeResponse = await api.delete(`/v1/deployments/${dseq}`, { diff --git a/apps/api/package.json b/apps/api/package.json index 0e4ed6012b..1beb517fce 100644 --- a/apps/api/package.json +++ b/apps/api/package.json @@ -42,6 +42,7 @@ "@akashnetwork/database": "*", "@akashnetwork/env-loader": "*", "@akashnetwork/http-sdk": "*", + "@akashnetwork/jwt": "*", "@akashnetwork/logging": "*", "@akashnetwork/net": "*", "@akashnetwork/react-query-sdk": "*", @@ -57,6 +58,7 @@ "@cosmjs/tendermint-rpc": "^0.32.4", "@dotenvx/dotenvx": "^1.9.0", "@hono/node-server": "1.13.7", + "@hono/node-ws": "^1.2.0", "@hono/otel": "~0.4.0", "@hono/swagger-ui": "0.4.1", "@hono/zod-openapi": "0.18.4", @@ -92,6 +94,7 @@ "markdown-to-txt": "^2.0.1", "memory-cache": "^0.2.0", "murmurhash": "^2.0.1", + "node-forge": "^1.3.1", "pg": "^8.12.0", "pg-boss": "^10.3.2", "pg-hstore": "^2.3.4", @@ -125,9 +128,11 @@ "@types/memory-cache": "^0.2.2", "@types/node": "^22.13.11", "@types/node-fetch": "^2.6.2", + "@types/node-forge": "^1.3.11", "@types/pg": "^8.11.6", "@types/semver": "^7.5.2", "@types/uuid": "^8.3.1", + "@types/ws": "^8.18.1", "@typescript-eslint/eslint-plugin": "^7.12.0", "alias-hq": "^5.1.6", "copy-webpack-plugin": "^12.0.2", @@ -151,6 +156,7 @@ "ts-loader": "^9.5.2", "type-fest": "^4.26.1", "typescript": "~5.8.2", + "wait-for-expect": "^4.0.0", "webpack": "^5.91.0", "webpack-cli": "4.10.0", "webpack-node-externals": "^3.0.0" diff --git a/apps/api/src/app.ts b/apps/api/src/app.ts index c38edc22f5..93643daa40 100644 --- a/apps/api/src/app.ts +++ b/apps/api/src/app.ts @@ -4,11 +4,13 @@ import "./app/providers/jobs.provider"; import { LoggerService } from "@akashnetwork/logging"; import { HttpLoggerIntercepter } from "@akashnetwork/logging/hono"; import { serve } from "@hono/node-server"; +import { createNodeWebSocket } from "@hono/node-ws"; import { otel } from "@hono/otel"; import { swaggerUI } from "@hono/swagger-ui"; import { Hono } from "hono"; import { cors } from "hono/cors"; import once from "lodash/once"; +import type { AddressInfo } from "net"; import { container } from "tsyringe"; import { AuthInterceptor } from "@src/auth/services/auth.interceptor"; @@ -39,6 +41,7 @@ import { userRouter } from "./routers/userRouter"; import { web3IndexRouter } from "./routers/web3indexRouter"; import { env } from "./utils/env"; import { bytesToHumanReadableSize } from "./utils/files"; +import { initLeaseWebsocketRoute } from "./websocket/routes/websocket/websocket.router"; import { addressRouter } from "./address"; import { sendVerificationEmailRouter } from "./auth"; import { @@ -163,6 +166,9 @@ for (const handler of openApiHonoHandlers) { appHono.route("/", handler); } +const { injectWebSocket, upgradeWebSocket } = createNodeWebSocket({ app: appHono }); +initLeaseWebsocketRoute(appHono, upgradeWebSocket); + appHono.route("/", notificationsApiProxy); appHono.route("/", healthzRouter); @@ -199,22 +205,31 @@ const appLogger = LoggerService.forContext("APP"); * Start scheduler * Start server */ -export async function initApp() { +export async function initApp(port: number = Number(PORT)): Promise { try { await Promise.all([initDb(), ...container.resolveAll(APP_INITIALIZER).map(initializer => initializer[ON_APP_START]())]); startScheduler(); - appLogger.info({ event: "SERVER_STARTING", url: `http://localhost:${PORT}`, NODE_OPTIONS: process.env.NODE_OPTIONS }); + appLogger.info({ event: "SERVER_STARTING", url: `http://localhost:${port}`, NODE_OPTIONS: process.env.NODE_OPTIONS }); const server = serve({ fetch: appHono.fetch, - port: typeof PORT === "string" ? parseInt(PORT, 10) : PORT + port: typeof port === "string" ? parseInt(port, 10) : port }); + injectWebSocket(server); const shutdown = once(() => shutdownServer(server, appLogger, container.dispose.bind(container))); process.on("SIGTERM", shutdown); process.on("SIGINT", shutdown); + + return { + host: `http://localhost:${(server.address() as AddressInfo).port}`, + async close() { + await shutdown(); + } + }; } catch (error) { appLogger.error({ event: "APP_INIT_ERROR", error }); + throw error; } } @@ -239,3 +254,8 @@ export async function initDb() { } export { appHono as app }; + +export interface AppServer { + host: string; + close(): Promise; +} diff --git a/apps/api/src/billing/lib/wallet/wallet.ts b/apps/api/src/billing/lib/wallet/wallet.ts index b847dcddbd..4774096beb 100644 --- a/apps/api/src/billing/lib/wallet/wallet.ts +++ b/apps/api/src/billing/lib/wallet/wallet.ts @@ -46,4 +46,8 @@ export class Wallet implements OfflineDirectSigner { async getMnemonic() { return (await this.instanceAsPromised).mnemonic; } + + async getInstance() { + return await this.instanceAsPromised; + } } diff --git a/apps/api/src/billing/services/wallet-initializer/wallet-initializer.service.spec.ts b/apps/api/src/billing/services/wallet-initializer/wallet-initializer.service.spec.ts index da6afafa13..8546f23416 100644 --- a/apps/api/src/billing/services/wallet-initializer/wallet-initializer.service.spec.ts +++ b/apps/api/src/billing/services/wallet-initializer/wallet-initializer.service.spec.ts @@ -7,6 +7,7 @@ import { DomainEventsService } from "@src/core/services/domain-events/domain-eve import type { FeatureFlagValue } from "@src/core/services/feature-flags/feature-flags"; import { FeatureFlags } from "@src/core/services/feature-flags/feature-flags"; import { FeatureFlagsService } from "@src/core/services/feature-flags/feature-flags.service"; +import { JwtTokenService } from "@src/provider/services/jwt-token/jwt-token.service"; import { UserWalletRepository } from "../../repositories/user-wallet/user-wallet.repository"; import { ManagedUserWalletService } from "../managed-user-wallet/managed-user-wallet.service"; import { WalletInitializerService } from "./wallet-initializer.service"; @@ -150,6 +151,12 @@ describe(WalletInitializerService.name, () => { isEnabled: jest.fn(flag => !!input?.enabledFeatures?.includes(flag)) }) ); + di.registerInstance( + JwtTokenService, + mock({ + generateJwtToken: jest.fn().mockResolvedValue("mock-jwt-token") + }) + ); container.clearInstances(); diff --git a/apps/api/src/core/lib/telemetry.ts b/apps/api/src/core/lib/telemetry.ts new file mode 100644 index 0000000000..4fde903ef4 --- /dev/null +++ b/apps/api/src/core/lib/telemetry.ts @@ -0,0 +1,12 @@ +import type { Span } from "@opentelemetry/api"; +import { context } from "@opentelemetry/api"; +import { trace } from "@opentelemetry/api"; + +export function traceActiveSpan any>(name: string, callback: T): ReturnType { + return trace.getTracer("default").startActiveSpan(name, callback); +} + +export function propagateTracingContext any>(callback: T): T { + const currentContext = context.active(); + return ((...args) => context.with(currentContext, () => callback(...args))) as T; +} diff --git a/apps/api/src/deployment/http-schemas/deployment.schema.ts b/apps/api/src/deployment/http-schemas/deployment.schema.ts index 8d1f04e06a..c593fe5b95 100644 --- a/apps/api/src/deployment/http-schemas/deployment.schema.ts +++ b/apps/api/src/deployment/http-schemas/deployment.schema.ts @@ -101,11 +101,7 @@ export const DepositDeploymentResponseSchema = z.object({ export const UpdateDeploymentRequestSchema = z.object({ data: z.object({ - sdl: z.string(), - certificate: z.object({ - certPem: z.string(), - keyPem: z.string() - }) + sdl: z.string() }) }); diff --git a/apps/api/src/deployment/http-schemas/lease.schema.ts b/apps/api/src/deployment/http-schemas/lease.schema.ts index 9726680389..ea5c15e8e7 100644 --- a/apps/api/src/deployment/http-schemas/lease.schema.ts +++ b/apps/api/src/deployment/http-schemas/lease.schema.ts @@ -2,10 +2,6 @@ import { z } from "zod"; export const CreateLeaseRequestSchema = z.object({ manifest: z.string(), - certificate: z.object({ - certPem: z.string(), - keyPem: z.string() - }), leases: z.array( z.object({ dseq: z.string(), diff --git a/apps/api/src/deployment/services/deployment-reader/deployment-reader.service.ts b/apps/api/src/deployment/services/deployment-reader/deployment-reader.service.ts index 01b01002f1..f6bb6d9f52 100644 --- a/apps/api/src/deployment/services/deployment-reader/deployment-reader.service.ts +++ b/apps/api/src/deployment/services/deployment-reader/deployment-reader.service.ts @@ -7,6 +7,7 @@ import { InternalServerError } from "http-errors"; import { Op } from "sequelize"; import { singleton } from "tsyringe"; +import { UserWalletRepository } from "@src/billing/repositories"; import { GetDeploymentResponse } from "@src/deployment/http-schemas/deployment.schema"; import { ProviderService } from "@src/provider/services/provider/provider.service"; import { ProviderList } from "@src/types/provider"; @@ -20,14 +21,12 @@ export class DeploymentReaderService { private readonly providerService: ProviderService, private readonly deploymentHttpService: DeploymentHttpService, private readonly leaseHttpService: LeaseHttpService, - private readonly messageService: MessageService + private readonly messageService: MessageService, + private readonly userWalletRepository: UserWalletRepository ) {} - public async findByOwnerAndDseq( - owner: string, - dseq: string, - options?: { certificate?: { certPem: string; keyPem: string } } - ): Promise { + public async findByOwnerAndDseq(owner: string, dseq: string): Promise { + const wallet = await this.getWalletByAddress(owner); const deploymentResponse = await this.deploymentHttpService.findByOwnerAndDseq(owner, dseq); if ("code" in deploymentResponse) { @@ -40,20 +39,13 @@ export class DeploymentReaderService { const leasesWithStatus = await Promise.all( leases.map(async ({ lease }) => { - if (!options?.certificate) { - return { - lease, - status: null - }; - } - try { const leaseStatus = await this.providerService.getLeaseStatus( lease.lease_id.provider, lease.lease_id.dseq, lease.lease_id.gseq, lease.lease_id.oseq, - options.certificate + wallet.id ); return { lease, @@ -91,12 +83,23 @@ export class DeploymentReaderService { .for(deployments) .process(async deployment => this.leaseHttpService.list({ owner, dseq: deployment.deployment.deployment_id.dseq })); - const deploymentsWithLeases = deployments.map((deployment, index) => ({ + const wallet = await this.getWalletByAddress(owner); + const leaseStatuses = await Promise.all( + leaseResults.map(async ({ leases }) => { + return await Promise.all( + leases.map(async ({ lease }) => { + return await this.providerService.getLeaseStatus(lease.lease_id.provider, lease.lease_id.dseq, lease.lease_id.gseq, lease.lease_id.oseq, wallet.id); + }) + ); + }) + ); + + const deploymentsWithLeases = deployments.map((deployment, deploymentIndex) => ({ deployment: deployment.deployment, leases: - leaseResults[index]?.leases?.map(({ lease }) => ({ + leaseResults[deploymentIndex]?.leases?.map(({ lease }, leaseIndex) => ({ ...lease, - status: null as null + status: leaseStatuses[deploymentIndex][leaseIndex] })) ?? [], escrow_account: deployment.escrow_account })); @@ -281,4 +284,13 @@ export class DeploymentReaderService { other: deploymentData }; } + + private async getWalletByAddress(address: string) { + const wallet = await this.userWalletRepository.findOneBy({ address }); + if (!wallet) { + throw new Error(`Wallet not found for address: ${address}`); + } + + return wallet; + } } diff --git a/apps/api/src/deployment/services/deployment-writer/deployment-writer.service.ts b/apps/api/src/deployment/services/deployment-writer/deployment-writer.service.ts index 012cc1320e..5dd52e9e02 100644 --- a/apps/api/src/deployment/services/deployment-writer/deployment-writer.service.ts +++ b/apps/api/src/deployment/services/deployment-writer/deployment-writer.service.ts @@ -92,7 +92,7 @@ export class DeploymentWriterService { } public async update(wallet: UserWalletOutput, dseq: string, input: UpdateDeploymentRequest["data"]): Promise { - const { sdl, certificate } = input; + const { sdl } = input; assert(this.sdlService.validateSdl(sdl), 400, "Invalid SDL"); @@ -101,11 +101,9 @@ export class DeploymentWriterService { const manifest = this.sdlService.getManifest(sdl, "beta3", true) as string; await this.ensureDeploymentIsUpToDate(wallet, dseq, manifestVersion, deployment); - await this.sendManifestToProviders(dseq, manifest, certificate as { certPem: string; keyPem: string }, deployment.leases); + await this.sendManifestToProviders(wallet.id, dseq, manifest, deployment.leases); - return await this.deploymentReaderService.findByOwnerAndDseq(wallet.address!, dseq, { - certificate: { certPem: certificate.certPem, keyPem: certificate.keyPem } - }); + return await this.deploymentReaderService.findByOwnerAndDseq(wallet.address!, dseq); } private async ensureDeploymentIsUpToDate( @@ -125,20 +123,10 @@ export class DeploymentWriterService { } } - private async sendManifestToProviders( - dseq: string, - manifest: string, - certificate: { certPem: string; keyPem: string }, - leases: GetDeploymentResponse["data"]["leases"] - ): Promise { - assert(certificate.certPem && certificate.keyPem, 400, "Certificate must include both certPem and keyPem"); - + private async sendManifestToProviders(walletId: number, dseq: string, manifest: string, leases: GetDeploymentResponse["data"]["leases"]): Promise { const leaseProviders = leases.map(lease => lease.lease_id.provider).filter((v, i, s) => s.indexOf(v) === i); for (const provider of leaseProviders) { - await this.providerService.sendManifest(provider, dseq, manifest, { - certPem: certificate.certPem, - keyPem: certificate.keyPem - }); + await this.providerService.sendManifest({ provider, dseq, manifest, walletId }); } } } diff --git a/apps/api/src/deployment/services/lease/lease.service.ts b/apps/api/src/deployment/services/lease/lease.service.ts index d859cbcbcc..061d272db8 100644 --- a/apps/api/src/deployment/services/lease/lease.service.ts +++ b/apps/api/src/deployment/services/lease/lease.service.ts @@ -30,14 +30,9 @@ export class LeaseService { await this.signerService.executeDecodedTxByUserId(wallet.userId, leaseMessages); for (const lease of input.leases) { - await this.providerService.sendManifest(lease.provider, lease.dseq, input.manifest, { - certPem: input.certificate.certPem, - keyPem: input.certificate.keyPem - }); + await this.providerService.sendManifest({ provider: lease.provider, dseq: lease.dseq, manifest: input.manifest, walletId: wallet.id }); } - return await this.deploymentReaderService.findByOwnerAndDseq(wallet.address!, input.leases[0].dseq, { - certificate: { certPem: input.certificate.certPem, keyPem: input.certificate.keyPem } - }); + return await this.deploymentReaderService.findByOwnerAndDseq(wallet.address!, input.leases[0].dseq); } } diff --git a/apps/api/src/lib/shutdown-server/shutdown-server.ts b/apps/api/src/lib/shutdown-server/shutdown-server.ts new file mode 100644 index 0000000000..80f2e59427 --- /dev/null +++ b/apps/api/src/lib/shutdown-server/shutdown-server.ts @@ -0,0 +1,17 @@ +export async function shutdownServer(server: ClosableServer | undefined): Promise { + if (!server || !server.listening) return; + return new Promise((resolve, reject) => { + server.close(error => { + if (error) { + reject(error); + } else { + resolve(); + } + }); + }); +} + +export interface ClosableServer { + close: (cb?: (error?: Error) => void) => void; + listening: boolean; +} diff --git a/apps/api/src/provider/services/jwt-token/jwt-token.service.spec.ts b/apps/api/src/provider/services/jwt-token/jwt-token.service.spec.ts new file mode 100644 index 0000000000..b3e64227ca --- /dev/null +++ b/apps/api/src/provider/services/jwt-token/jwt-token.service.spec.ts @@ -0,0 +1,181 @@ +import * as JwtModule from "@akashnetwork/jwt"; +import type { JwtTokenPayload } from "@akashnetwork/jwt/src/types"; +import type { DirectSecp256k1HdWallet } from "@cosmjs/proto-signing"; +import { faker } from "@faker-js/faker"; +import type { MockProxy } from "jest-mock-extended"; +import { mock } from "jest-mock-extended"; +import { container } from "tsyringe"; + +import * as WalletModule from "@src/billing/lib/wallet/wallet"; +import type { BillingConfig } from "@src/billing/providers"; +import { JwtTokenService } from "./jwt-token.service"; + +import { createAkashAddress } from "@test/seeders"; + +describe("JwtTokenService", () => { + const mockMnemonic = "test mnemonic phrase for testing purposes only"; + const mockAddress = "akash1testaddress123456789"; + + describe("generateJwtToken", () => { + it("should generate a JWT token successfully", async () => { + const { jwtTokenService, mockWalletId, mockWallet, leases } = setup(); + + const now = Math.floor(Date.now() / 1000); + jest.spyOn(Date, "now").mockReturnValue(now * 1000); + + const result = JSON.parse(await jwtTokenService.generateJwtToken({ walletId: mockWalletId, leases })); + + expect(result.leases).toEqual(leases); + expect(WalletModule.Wallet).toHaveBeenCalledWith(mockMnemonic, mockWalletId); + expect(mockWallet.getInstance).toHaveBeenCalled(); + }); + + it("generates a JWT token with 30 seconds of a TTL by default", async () => { + const { jwtTokenService, mockWalletId, leases } = setup(); + + const result = JSON.parse(await jwtTokenService.generateJwtToken({ walletId: mockWalletId, leases })); + + expect(result.exp).toBe(result.iat + 30); + }); + + it("can generate a JWT token with a custom TTL", async () => { + const { jwtTokenService, mockWalletId, leases } = setup(); + const ttl = 100; + + const result = JSON.parse(await jwtTokenService.generateJwtToken({ walletId: mockWalletId, leases, ttl })); + + expect(result.exp).toBe(result.iat + ttl); + }); + + it("memoizes JWT Token generation", async () => { + const { jwtTokenService, mockWalletId, leases } = setup(); + + await jwtTokenService.generateJwtToken({ walletId: mockWalletId, leases }); + await jwtTokenService.generateJwtToken({ walletId: mockWalletId, leases }); + + expect(WalletModule.Wallet).toHaveBeenCalledTimes(1); + expect(JwtModule.createSignArbitraryAkashWallet).toHaveBeenCalledTimes(1); + expect(JwtModule.JwtToken).toHaveBeenCalledTimes(1); + }); + + it("should generate unique jti for each token", async () => { + const { jwtTokenService, mockWalletId, leases } = setup(); + + const tokens = ( + await Promise.all([ + jwtTokenService.generateJwtToken({ walletId: mockWalletId, leases }), + jwtTokenService.generateJwtToken({ walletId: mockWalletId, leases }) + ]) + ).map(token => JSON.parse(token)); + + expect(tokens[0].jti).not.toBe(tokens[1].jti); + }); + + it("should work with different wallet IDs", async () => { + const { jwtTokenService, leases } = setup(); + + await jwtTokenService.generateJwtToken({ walletId: 1, leases }); + await jwtTokenService.generateJwtToken({ walletId: 999, leases }); + + expect(WalletModule.Wallet).toHaveBeenCalledWith(mockMnemonic, 1); + expect(WalletModule.Wallet).toHaveBeenCalledWith(mockMnemonic, 999); + }); + }); + + describe("getScopedLeases", () => { + it("returns leases for a provider, scoped", () => { + const { jwtTokenService } = setup(); + + const provider = createAkashAddress(); + const result = jwtTokenService.getScopedLeases({ provider, scope: ["status"] }); + + expect(result).toEqual({ + access: "granular", + permissions: [ + { + provider, + access: "scoped", + scope: ["status"] + } + ] + }); + }); + }); + + describe("getFullAccessLeases", () => { + it("returns leases for a provider, with full access", () => { + const { jwtTokenService } = setup(); + + const provider = createAkashAddress(); + const result = jwtTokenService.getFullAccessLeases({ provider }); + + expect(result).toEqual({ + access: "granular", + permissions: [ + { + provider, + access: "full" + } + ] + }); + }); + }); + + function setup(): { + jwtTokenService: JwtTokenService; + mockBillingConfig: MockProxy; + mockWalletId: number; + mockWallet: MockProxy; + leases: JwtTokenPayload["leases"]; + } { + jest.clearAllMocks(); + const mockWalletId = faker.number.int({ min: 1, max: 10000 }); + + const mockWallet = mock(); + const mockDirectSecp256k1HdWallet = mock(); + mockDirectSecp256k1HdWallet.getAccounts.mockResolvedValue([{ address: mockAddress, pubkey: new Uint8Array([1, 2, 3]), algo: "secp256k1" }]); + mockWallet.getInstance.mockResolvedValue(mockDirectSecp256k1HdWallet); + + const mockBillingConfig = mock(); + mockBillingConfig.MASTER_WALLET_MNEMONIC = mockMnemonic; + + jest.spyOn(WalletModule, "Wallet").mockImplementation(() => mockWallet); + + const mockAkashWallet = { + address: mockAddress, + pubkey: new Uint8Array([1, 2, 3]), + signArbitrary: jest.fn().mockResolvedValue({ signature: "test-signature" }) + }; + + jest.spyOn(JwtModule, "createSignArbitraryAkashWallet").mockResolvedValue(mockAkashWallet); + jest.spyOn(JwtModule, "JwtToken").mockImplementation( + () => + ({ + createToken: jest.fn().mockImplementation(args => JSON.stringify(args)) + }) as unknown as JwtModule.JwtToken + ); + + container.clearInstances(); + container.register("BILLING_CONFIG", { useValue: mockBillingConfig }); + + const jwtTokenService = new JwtTokenService(mockBillingConfig); + + const leases: JwtTokenPayload["leases"] = { + access: "granular", + permissions: [ + { + provider: createAkashAddress(), + access: "full" + } + ] + }; + + return { + jwtTokenService, + mockBillingConfig, + mockWalletId, + mockWallet, + leases + }; + } +}); diff --git a/apps/api/src/provider/services/jwt-token/jwt-token.service.ts b/apps/api/src/provider/services/jwt-token/jwt-token.service.ts new file mode 100644 index 0000000000..6b2d4e2440 --- /dev/null +++ b/apps/api/src/provider/services/jwt-token/jwt-token.service.ts @@ -0,0 +1,67 @@ +import { createSignArbitraryAkashWallet, JwtToken } from "@akashnetwork/jwt"; +import type { AccessScope, JwtTokenPayload } from "@akashnetwork/jwt/src/types"; +import { minutesToSeconds } from "date-fns"; +import { singleton } from "tsyringe"; +import * as uuid from "uuid"; + +import { Wallet } from "@src/billing/lib/wallet/wallet"; +import { BillingConfig, InjectBillingConfig } from "@src/billing/providers"; +import { Memoize } from "@src/caching/helpers"; + +const JWT_TOKEN_TTL_IN_SECONDS = 30; + +type JwtTokenWithAddress = { + jwtToken: JwtToken; + address: string; +}; + +type GenerateJwtTokenParams = { + walletId: number; + leases: JwtTokenPayload["leases"]; + ttl?: number; +}; + +@singleton() +export class JwtTokenService { + constructor(@InjectBillingConfig() private readonly config: BillingConfig) {} + + async generateJwtToken({ walletId, leases, ttl = JWT_TOKEN_TTL_IN_SECONDS }: GenerateJwtTokenParams) { + const { jwtToken, address } = await this.getJwtToken(walletId.toString()); + const now = Math.floor(Date.now() / 1000); + + const token = await jwtToken.createToken({ + iss: address, + exp: now + ttl, + nbf: now, + iat: now, + jti: uuid.v4(), + version: "v1", + leases + }); + + return token; + } + + @Memoize({ ttlInSeconds: minutesToSeconds(5) }) + private async getJwtToken(walletId: string): Promise { + const wallet = new Wallet(this.config.MASTER_WALLET_MNEMONIC, Number(walletId)); + const akashWallet = await createSignArbitraryAkashWallet(await wallet.getInstance()); + const jwtToken = new JwtToken(akashWallet); + + return { jwtToken, address: akashWallet.address }; + } + + getScopedLeases({ provider, scope }: { provider: string; scope: AccessScope[] }): JwtTokenPayload["leases"] { + return { + access: "granular", + permissions: [{ provider, access: "scoped", scope }] + }; + } + + getFullAccessLeases({ provider }: { provider: string }): JwtTokenPayload["leases"] { + return { + access: "granular", + permissions: [{ provider, access: "full" }] + }; + } +} diff --git a/apps/api/src/provider/services/provider/provider.service.spec.ts b/apps/api/src/provider/services/provider/provider.service.spec.ts new file mode 100644 index 0000000000..0a4026db97 --- /dev/null +++ b/apps/api/src/provider/services/provider/provider.service.spec.ts @@ -0,0 +1,241 @@ +import type { ProviderHttpService } from "@akashnetwork/http-sdk"; +import type { JwtTokenPayload } from "@akashnetwork/jwt"; +import { mock } from "jest-mock-extended"; + +import type { AuditorService } from "@src/provider/services/auditors/auditors.service"; +import type { JwtTokenService } from "@src/provider/services/jwt-token/jwt-token.service"; +import type { ProviderAttributesSchemaService } from "@src/provider/services/provider-attributes-schema/provider-attributes-schema.service"; +import { ProviderService } from "./provider.service"; + +describe(ProviderService.name, () => { + describe("sendManifest", () => { + it("should send manifest successfully on first attempt", async () => { + const { service, jwtTokenService, providerHttpService } = setup(); + + const providerAddress = "akash1provider123"; + const dseq = "123456"; + const manifest = '{"quantity":{"val":"1"}}'; + const walletId = 1; + const jwtToken = "jwt-token-123"; + const hostUri = "https://provider.example.com"; + + const mockProviderResponse = { + provider: { + owner: providerAddress, + host_uri: hostUri, + attributes: [], + info: { email: "", website: "" } + } + }; + + const leases: JwtTokenPayload["leases"] = { + access: "granular", + permissions: [{ provider: providerAddress, access: "scoped", scope: ["send-manifest"] }] + }; + + providerHttpService.getProvider.mockResolvedValue(mockProviderResponse); + jwtTokenService.generateJwtToken.mockResolvedValue(jwtToken); + jwtTokenService.getScopedLeases.mockReturnValue(leases); + providerHttpService.sendManifest.mockResolvedValue({ success: true }); + + const result = await service.sendManifest({ provider: providerAddress, dseq, manifest, walletId }); + + expect(providerHttpService.getProvider).toHaveBeenCalledWith(providerAddress); + expect(jwtTokenService.generateJwtToken).toHaveBeenCalledWith({ + walletId, + leases + }); + expect(providerHttpService.sendManifest).toHaveBeenCalledWith({ + hostUri, + dseq, + manifest: '{"size":{"val":"1"}}', + jwtToken + }); + expect(result).toEqual({ success: true }); + }); + + it("should retry on lease not found error and succeed", async () => { + const { service, jwtTokenService, providerHttpService } = setup(); + + const providerAddress = "akash1provider123"; + const dseq = "123456"; + const manifest = '{"quantity":{"val":"1"}}'; + const walletId = 1; + const jwtToken = "jwt-token-123"; + const hostUri = "https://provider.example.com"; + + const mockProviderResponse = { + provider: { + owner: providerAddress, + host_uri: hostUri, + attributes: [], + info: { email: "", website: "" } + } + }; + + providerHttpService.getProvider.mockResolvedValue(mockProviderResponse); + jwtTokenService.generateJwtToken.mockResolvedValue(jwtToken); + providerHttpService.sendManifest.mockRejectedValueOnce(new Error("no lease for deployment")).mockResolvedValueOnce({ success: true }); + + const result = await service.sendManifest({ provider: providerAddress, dseq, manifest, walletId }); + + expect(providerHttpService.sendManifest).toHaveBeenCalledTimes(2); + expect(result).toEqual({ success: true }); + }, 10000); + + it("should throw error when provider not found", async () => { + const { service, providerHttpService } = setup(); + + const providerAddress = "akash1provider123"; + const dseq = "123456"; + const manifest = '{"quantity":{"val":"1"}}'; + const walletId = 1; + + providerHttpService.getProvider.mockRejectedValue(new Error(`Provider ${providerAddress} not found`)); + + await expect(service.sendManifest({ provider: providerAddress, dseq, manifest, walletId })).rejects.toThrow(`Provider ${providerAddress} not found`); + }); + + it("should throw error after max retries", async () => { + const { service, jwtTokenService, providerHttpService } = setup(); + + const providerAddress = "akash1provider123"; + const dseq = "123456"; + const manifest = '{"quantity":{"val":"1"}}'; + const walletId = 1; + const jwtToken = "jwt-token-123"; + const hostUri = "https://provider.example.com"; + + const mockProviderResponse = { + provider: { + owner: providerAddress, + host_uri: hostUri, + attributes: [], + info: { email: "", website: "" } + } + }; + + providerHttpService.getProvider.mockResolvedValue(mockProviderResponse); + jwtTokenService.generateJwtToken.mockResolvedValue(jwtToken); + providerHttpService.sendManifest.mockRejectedValue(new Error("no lease for deployment")); + + await expect(service.sendManifest({ provider: providerAddress, dseq, manifest, walletId })).rejects.toThrow("no lease for deployment"); + + expect(providerHttpService.sendManifest).toHaveBeenCalledTimes(3); + }, 15000); + + it("should throw error immediately for non-lease errors", async () => { + const { service, jwtTokenService, providerHttpService } = setup(); + + const providerAddress = "akash1provider123"; + const dseq = "123456"; + const manifest = '{"quantity":{"val":"1"}}'; + const walletId = 1; + const jwtToken = "jwt-token-123"; + const hostUri = "https://provider.example.com"; + + const mockProviderResponse = { + provider: { + owner: providerAddress, + host_uri: hostUri, + attributes: [], + info: { email: "", website: "" } + } + }; + + providerHttpService.getProvider.mockResolvedValue(mockProviderResponse); + jwtTokenService.generateJwtToken.mockResolvedValue(jwtToken); + providerHttpService.sendManifest.mockRejectedValue(new Error("network error")); + + await expect(service.sendManifest({ provider: providerAddress, dseq, manifest, walletId })).rejects.toThrow("network error"); + + expect(providerHttpService.sendManifest).toHaveBeenCalledTimes(1); + }); + }); + + describe("getLeaseStatus", () => { + it("should get lease status successfully", async () => { + const { service, jwtTokenService, providerHttpService } = setup(); + + const providerAddress = "akash1provider123"; + const dseq = "123456"; + const gseq = 1; + const oseq = 1; + const walletId = 1; + const jwtToken = "jwt-token-123"; + const hostUri = "https://provider.example.com"; + + const mockProviderResponse = { + provider: { + owner: providerAddress, + host_uri: hostUri, + attributes: [], + info: { email: "", website: "" } + } + }; + + const mockLeaseStatus = { + forwarded_ports: {}, + ips: {}, + services: {} + }; + + const leases: JwtTokenPayload["leases"] = { + access: "granular", + permissions: [{ provider: providerAddress, access: "scoped", scope: ["status"] }] + }; + + providerHttpService.getProvider.mockResolvedValue(mockProviderResponse); + jwtTokenService.generateJwtToken.mockResolvedValue(jwtToken); + jwtTokenService.getScopedLeases.mockReturnValue(leases); + providerHttpService.getLeaseStatus.mockResolvedValue(mockLeaseStatus); + + const result = await service.getLeaseStatus(providerAddress, dseq, gseq, oseq, walletId); + + expect(providerHttpService.getProvider).toHaveBeenCalledWith(providerAddress); + expect(jwtTokenService.generateJwtToken).toHaveBeenCalledWith({ + walletId, + leases + }); + expect(providerHttpService.getLeaseStatus).toHaveBeenCalledWith({ + hostUri, + dseq, + gseq, + oseq, + jwtToken + }); + expect(result).toEqual(mockLeaseStatus); + }); + + it("should throw error when provider not found", async () => { + const { service, providerHttpService } = setup(); + + const providerAddress = "akash1provider123"; + const dseq = "123456"; + const gseq = 1; + const oseq = 1; + const walletId = 1; + + providerHttpService.getProvider.mockRejectedValue(new Error(`Provider ${providerAddress} not found`)); + + await expect(service.getLeaseStatus(providerAddress, dseq, gseq, oseq, walletId)).rejects.toThrow(`Provider ${providerAddress} not found`); + }); + }); + + function setup() { + const providerHttpService = mock(); + const providerAttributesSchemaService = mock(); + const auditorsService = mock(); + const jwtTokenService = mock(); + + const service = new ProviderService(providerHttpService, providerAttributesSchemaService, auditorsService, jwtTokenService); + + return { + service, + providerHttpService, + providerAttributesSchemaService, + auditorsService, + jwtTokenService + }; + } +}); diff --git a/apps/api/src/provider/services/provider/provider.service.ts b/apps/api/src/provider/services/provider/provider.service.ts index 3ea523575d..2cd2a4c687 100644 --- a/apps/api/src/provider/services/provider/provider.service.ts +++ b/apps/api/src/provider/services/provider/provider.service.ts @@ -1,7 +1,6 @@ import { Provider, ProviderAttribute, ProviderAttributeSignature, ProviderSnapshotNode, ProviderSnapshotNodeGPU } from "@akashnetwork/database/dbSchemas/akash"; import { ProviderSnapshot } from "@akashnetwork/database/dbSchemas/akash/providerSnapshot"; import { ProviderHttpService } from "@akashnetwork/http-sdk"; -import { SupportedChainNetworks } from "@akashnetwork/net"; import { AxiosError } from "axios"; import { add } from "date-fns"; import assert from "http-assert"; @@ -9,34 +8,30 @@ import { Op } from "sequelize"; import { setTimeout as delay } from "timers/promises"; import { singleton } from "tsyringe"; -import { type BillingConfig, InjectBillingConfig } from "@src/billing/providers"; import { AUDITOR, TRIAL_ATTRIBUTE } from "@src/deployment/config/provider.config"; import { LeaseStatusResponse } from "@src/deployment/http-schemas/lease.schema"; -import { ProviderIdentity, ProviderProxyService } from "@src/provider/services/provider/provider-proxy.service"; +import { ProviderIdentity } from "@src/provider/services/provider/provider-proxy.service"; import { ProviderList } from "@src/types/provider"; import { toUTC } from "@src/utils"; import { mapProviderToList } from "@src/utils/map/provider"; import { AuditorService } from "../auditors/auditors.service"; +import { JwtTokenService } from "../jwt-token/jwt-token.service"; import { ProviderAttributesSchemaService } from "../provider-attributes-schema/provider-attributes-schema.service"; @singleton() export class ProviderService { private readonly MANIFEST_SEND_MAX_RETRIES = 3; private readonly MANIFEST_SEND_RETRY_DELAY = 6000; - private readonly chainNetwork: SupportedChainNetworks; constructor( - private readonly providerProxy: ProviderProxyService, private readonly providerHttpService: ProviderHttpService, private readonly providerAttributesSchemaService: ProviderAttributesSchemaService, private readonly auditorsService: AuditorService, - @InjectBillingConfig() private readonly config: BillingConfig - ) { - this.chainNetwork = this.config.NETWORK as SupportedChainNetworks; - } + private readonly jwtTokenService: JwtTokenService + ) {} - async sendManifest(provider: string, dseq: string, manifest: string, options: { certPem: string; keyPem: string }) { - const jsonStr = manifest.replace(/"quantity":{"val/g, '"size":{"val'); + async sendManifest({ provider, dseq, manifest, walletId }: { provider: string; dseq: string; manifest: string; walletId: number }) { + const manifestWithSize = manifest.replace(/"quantity":{"val/g, '"size":{"val'); const providerResponse = await this.providerHttpService.getProvider(provider); @@ -47,55 +42,65 @@ export class ProviderService { hostUri: providerResponse.provider.host_uri }; - return await this.sendManifestToProvider(dseq, jsonStr, options, providerIdentity); + return await this.sendManifestToProvider({ walletId, dseq, manifest: manifestWithSize, providerIdentity }); } - private async sendManifestToProvider(dseq: string, jsonStr: string, options: { certPem: string; keyPem: string }, providerIdentity: ProviderIdentity) { + private async sendManifestToProvider({ + walletId, + dseq, + manifest, + providerIdentity + }: { + walletId: number; + dseq: string; + manifest: string; + providerIdentity: ProviderIdentity; + }) { for (let i = 1; i <= this.MANIFEST_SEND_MAX_RETRIES; i++) { try { - const result = await this.providerProxy.fetchProviderUrl(`/deployment/${dseq}/manifest`, { - method: "PUT", - body: jsonStr, - certPem: options.certPem, - keyPem: options.keyPem, - chainNetwork: this.chainNetwork, - providerIdentity, - timeout: 60000 + const jwtToken = await this.jwtTokenService.generateJwtToken({ + walletId, + leases: this.jwtTokenService.getScopedLeases({ + provider: providerIdentity.owner, + scope: ["send-manifest"] + }) }); + const result = await this.providerHttpService.sendManifest({ hostUri: providerIdentity.hostUri, dseq, manifest, jwtToken }); - if (result) return result; + if (result) { + return result; + } } catch (err: any) { if (err.message?.includes("no lease for deployment") && i < this.MANIFEST_SEND_MAX_RETRIES) { await delay(this.MANIFEST_SEND_RETRY_DELAY); continue; } + const providerError = err instanceof AxiosError && err.response?.data; - assert(!providerError?.toLowerCase()?.includes("invalid manifest"), 400, err?.response?.data); + if (typeof providerError === "string") { + assert(!providerError.toLowerCase().includes("invalid manifest"), 400, err?.response?.data); + } throw new Error(providerError || err); } } } - async getLeaseStatus(provider: string, dseq: string, gseq: number, oseq: number, options: { certPem: string; keyPem: string }): Promise { + async getLeaseStatus(provider: string, dseq: string, gseq: number, oseq: number, walletId: number): Promise { const providerResponse = await this.providerHttpService.getProvider(provider); if (!providerResponse) { throw new Error(`Provider ${provider} not found`); } - const providerIdentity: ProviderIdentity = { - owner: provider, - hostUri: providerResponse.provider.host_uri - }; - - return await this.providerProxy.fetchProviderUrl(`/lease/${dseq}/${gseq}/${oseq}/status`, { - method: "GET", - certPem: options.certPem, - keyPem: options.keyPem, - chainNetwork: this.chainNetwork, - providerIdentity, - timeout: 30000 + const jwtToken = await this.jwtTokenService.generateJwtToken({ + walletId, + leases: this.jwtTokenService.getScopedLeases({ + provider, + scope: ["status"] + }) }); + + return await this.providerHttpService.getLeaseStatus({ hostUri: providerResponse.provider.host_uri, dseq, gseq, oseq, jwtToken }); } async getProviderList({ trial = false }: { trial?: boolean } = {}): Promise { diff --git a/apps/api/src/websocket/controllers/websocket/websocket.controller.ts b/apps/api/src/websocket/controllers/websocket/websocket.controller.ts new file mode 100644 index 0000000000..4661064cca --- /dev/null +++ b/apps/api/src/websocket/controllers/websocket/websocket.controller.ts @@ -0,0 +1,159 @@ +import { LoggerService } from "@akashnetwork/logging"; +import { Attributes, Span } from "@opentelemetry/api"; +import { WSContext } from "hono/ws"; +import { singleton } from "tsyringe"; + +import { AuthService } from "@src/auth/services/auth.service"; +import { traceActiveSpan } from "@src/core/lib/telemetry"; +import { MESSAGE_SCHEMA, WsMessage } from "@src/websocket/http-schemas/websocket.schema"; +import { ProviderWebsocketService } from "@src/websocket/services/provider-websocket/provider-websocket.service"; +import { ClientWebSocketStats, WebSocketUsage } from "@src/websocket/services/websocket-stats/websocket-stats.service"; + +const logger = LoggerService.forContext("LeaseWebsocketController"); + +@singleton() +export class WebsocketController { + constructor( + private readonly authService: AuthService, + private readonly providerWebsocketService: ProviderWebsocketService + ) {} + + async handleOpen(stats: ClientWebSocketStats) { + const { currentUser, ability } = this.authService; + + logger.info({ + event: "WEBSOCKET_CONNECTION_OPENED", + userId: currentUser?.id, + hasAbility: !!ability + }); + + stats.setUserIfExists(currentUser, ability); + } + + async handleMessage(messageStr: string, ws: WSContext, stats: ClientWebSocketStats) { + logger.debug({ + event: "WEBSOCKET_MESSAGE_RECEIVED", + messageLength: messageStr?.length + }); + + const userInfo = stats.getUser(); + if (userInfo) { + logger.debug({ + event: "WEBSOCKET_USER_INFO_RETRIEVED", + userId: userInfo.currentUser?.id + }); + } + + traceActiveSpan("ws.message", async span => { + let message: WsMessage | undefined; + try { + message = typeof messageStr === "string" ? JSON.parse(messageStr) : undefined; + } catch (error) { + logger.error({ + event: "CLIENT_MESSAGE_INVALID_JSON", + message: "Received message is not a JSON string", + messageLength: messageStr?.length, + messageType: typeof messageStr, + messageConstructor: messageStr?.constructor?.name + }); + } + + if (!message) { + return ws.send( + JSON.stringify({ + type: "websocket", + message: "Message is not a JSON string", + error: "Invalid message format" + }) + ); + } + + const parsedMessage = MESSAGE_SCHEMA.safeParse(message); + if (parsedMessage.error) { + logger.error({ + event: "CLIENT_MESSAGE_INVALID_JSON", + message: "Message doesn't match expected schema", + error: parsedMessage.error + }); + return ws.send( + JSON.stringify({ + type: "websocket", + message: "Message doesn't match expected schema", + error: "Invalid message format" + }) + ); + } + + const attributes: Attributes = { + type: message.type + }; + if (message.type === "websocket") { + attributes.providerUrl = message.url; + attributes.providerAddress = message.providerAddress; + attributes.chainNetwork = message.chainNetwork; + attributes.function = getWebSocketUsage(message); + } + + span.setAttributes(attributes); + logger.info({ + event: "NEW_WEBSOCKET_MESSAGE", + attributes + }); + + stats.setUsage(getWebSocketUsage(message)); + + try { + if (message.type === "ping") { + ws.send( + JSON.stringify({ + type: "pong" + }) + ); + } else if (message.type === "websocket") { + await this.providerWebsocketService.proxyMessageToProvider(message, ws, stats); + } + } catch (err) { + logger.error({ + event: "CLIENT_MESSAGE_SEND_ERROR", + error: err + }); + ws.send( + JSON.stringify({ + id: message.id, + error: "Unable to send message to provider socket", + type: message.type + }) + ); + } + + span.end(); + }); + } + + async handleClose(_event: any, stats: ClientWebSocketStats, span?: Span) { + logger.info("Closing socket"); + stats.close(); + + this.providerWebsocketService.closeProviderSocket(stats.id); + + span?.end(); + } + + async handleError(event: Event) { + logger.error({ + event: "CLIENT_WEBSOCKET_ERROR", + error: event + }); + } +} + +function getWebSocketUsage(message: any): WebSocketUsage { + if (message.type === "websocket") { + if (message.url.includes("logs?follow=false&tail=10000000")) return "DownloadLogs"; + if (message.url.includes("logs?follow=true")) return "StreamLogs"; + if (message.url.includes("kubeevents?follow=true")) return "StreamEvents"; + if (message.url.includes("/shell?stdin=")) return "Shell"; + } + + return "Unknown"; +} diff --git a/apps/api/src/websocket/http-schemas/websocket.schema.ts b/apps/api/src/websocket/http-schemas/websocket.schema.ts new file mode 100644 index 0000000000..9b0f981cca --- /dev/null +++ b/apps/api/src/websocket/http-schemas/websocket.schema.ts @@ -0,0 +1,29 @@ +import { netConfig, type SupportedChainNetworks } from "@akashnetwork/net"; +import { z } from "zod"; + +import { isValidBech32Address } from "@src/utils/addresses"; + +const chainNetworkSchema = z.enum(netConfig.getSupportedNetworks() as [SupportedChainNetworks]).describe("Blockchain network"); + +export const providerRequestSchema = z.object({ + url: z.string().url(), + providerAddress: z.string().refine(isValidBech32Address, "is not bech32 address").describe("Bech32 representation of provider wallet address") +}); + +export const MESSAGE_SCHEMA = z.discriminatedUnion("type", [ + z.object({ + type: z.literal("ping") + }), + providerRequestSchema.extend({ + type: z.literal("websocket"), + chainNetwork: chainNetworkSchema, + data: z + .string() + .optional() + .describe( + "Currently it's used only for service shell communication and stores only buffered representation of string in char codes something like this: Array.from(Uint8Array).join(', ')" + ) + }) +]); + +export type WsMessage = z.infer & { id: unknown }; diff --git a/apps/api/src/websocket/routes/websocket/websocket.router.ts b/apps/api/src/websocket/routes/websocket/websocket.router.ts new file mode 100644 index 0000000000..3fb39a0308 --- /dev/null +++ b/apps/api/src/websocket/routes/websocket/websocket.router.ts @@ -0,0 +1,50 @@ +import { trace } from "@opentelemetry/api"; +import type { Hono } from "hono"; +import type { UpgradeWebSocket, WSContext } from "hono/ws"; +import { container } from "tsyringe"; + +import { propagateTracingContext } from "@src/core/lib/telemetry"; +import type { AppEnv } from "@src/core/types/app-context"; +import { WebsocketController } from "@src/websocket/controllers/websocket/websocket.controller"; +import { WebsocketStatsService } from "@src/websocket/services/websocket-stats/websocket-stats.service"; + +export const initLeaseWebsocketRoute = (app: Hono, upgradeWebSocket: UpgradeWebSocket) => { + app.get( + "/v1/ws", + upgradeWebSocket(() => { + const controller = container.resolve(WebsocketController); + const stats = container.resolve(WebsocketStatsService).create(); + const trackingSpan = trace.getActiveSpan(); + trackingSpan?.setAttribute("ws.id", stats.id); + + return { + onOpen: async (_event: Event, wsContext: WSContext) => { + trackingSpan?.setAttribute("ws.url", wsContext.url?.toString() ?? ""); + controller.handleOpen(stats); + }, + + onMessage: async (event, wsContext) => { + propagateTracingContext((message: string) => { + controller.handleMessage(message, wsContext, stats); + })(event.data.toString()); + }, + + onClose: async (event, wsContext) => { + propagateTracingContext(() => { + if (wsContext.url) { + trackingSpan?.setAttribute("ws.url", wsContext.url.toString()); + } + + controller.handleClose(event, stats, trackingSpan); + })(); + }, + + onError: async event => { + propagateTracingContext(() => { + controller.handleError(event); + })(); + } + }; + }) + ); +}; diff --git a/apps/api/src/websocket/services/provider-websocket/provider-websocket.service.ts b/apps/api/src/websocket/services/provider-websocket/provider-websocket.service.ts new file mode 100644 index 0000000000..91933bdfdb --- /dev/null +++ b/apps/api/src/websocket/services/provider-websocket/provider-websocket.service.ts @@ -0,0 +1,233 @@ +import { JwtTokenPayload } from "@akashnetwork/jwt"; +import { LoggerService } from "@akashnetwork/logging"; +import { SupportedChainNetworks } from "@akashnetwork/net"; +import { MongoAbility } from "@casl/ability"; +import { WSContext } from "hono/ws"; +import assert from "http-assert"; +import https from "https"; +import { singleton } from "tsyringe"; +import WebSocket from "ws"; + +import { UserWalletRepository } from "@src/billing/repositories"; +import { propagateTracingContext } from "@src/core/lib/telemetry"; +import { JwtTokenService } from "@src/provider/services/jwt-token/jwt-token.service"; +import { UserOutput } from "@src/user/repositories"; +import { WsMessage } from "@src/websocket/http-schemas/websocket.schema"; +import { ClientWebSocketStats } from "@src/websocket/services/websocket-stats/websocket-stats.service"; + +const logger = LoggerService.forContext("ProviderWebsocketService"); + +type ProxiableWsMessage = Extract; + +@singleton() +export class ProviderWebsocketService { + private readonly openProviderSockets: Record< + string, + { + ws: WebSocket; + waitlist: ProxiableWsMessage[]; + } + > = {}; + + constructor( + private readonly jwtTokenService: JwtTokenService, + private readonly userWalletRepository: UserWalletRepository + ) {} + + async proxyMessageToProvider(message: ProxiableWsMessage, ws: WSContext, stats: ClientWebSocketStats): Promise { + const url = message.url.replace("https://", "wss://"); + + let socketDetails = this.openProviderSockets[stats.id]; + if ( + !socketDetails || + socketDetails.ws.url !== url || + socketDetails.ws.readyState === WebSocket.CLOSED || + socketDetails.ws.readyState === WebSocket.CLOSING + ) { + socketDetails?.ws.terminate(); + const userInfo = stats.getUser(); + if (!userInfo) { + throw new Error("User not found"); + } + const wallet = await this.getWalletByUserId(userInfo.currentUser, userInfo.ability); + socketDetails = await this.createProviderSocket(url, { + wsId: stats.id, + chainNetwork: message.chainNetwork, + walletId: wallet.id, + providerAddress: message.providerAddress + }); + this.linkSockets(socketDetails.ws, ws, stats); + } + + if (!message.data) { + logger.info(`Do not proxy "${message.type}" message because it has no data`); + return; + } + + const data = Buffer.from(message.data.split(",") as any); + const callback = propagateTracingContext((error?: Error) => { + if (error) { + logger.error({ + event: "CLIENT_MESSAGE_SEND_ERROR", + error + }); + } + }); + const proxyMessage = propagateTracingContext(() => { + logger.debug(`Proxying "${message.type}" message`); + socketDetails.ws.send(data, callback); + }); + + if (socketDetails.ws.readyState === WebSocket.OPEN) { + proxyMessage(); + } else { + logger.info(`Provider websocket is not open, adding message to waitlist`); + socketDetails.ws.once("verified", proxyMessage); + socketDetails.waitlist.push(message); + } + } + + closeProviderSocket(statsId: string): void { + if (statsId in this.openProviderSockets) { + this.openProviderSockets[statsId].ws.terminate(); + delete this.openProviderSockets[statsId]; + } + } + + private async createProviderSocket(url: string, options: CreateProviderSocketOptions) { + logger.info(`Initializing new provider websocket connection: ${url}`); + + const jwtToken = await this.jwtTokenService.generateJwtToken({ + walletId: options.walletId, + leases: this.getLeasesForUrl({ + provider: options.providerAddress, + url + }) + }); + const pws = new WebSocket(url, { + agent: new https.Agent({ + sessionTimeout: 0, + rejectUnauthorized: false, + servername: "" + }), + headers: { + Authorization: `Bearer ${jwtToken}` + } + }); + this.openProviderSockets[options.wsId] = { ws: pws, waitlist: [] }; + + return this.openProviderSockets[options.wsId]; + } + + private getLeasesForUrl({ provider, url }: { provider: string; url: string }): JwtTokenPayload["leases"] { + const urlObj = new URL(url); + const pathname = urlObj.pathname; + if (/\/lease\/\d+\/\d+\/\d+\/logs(?:\?.*)?$/.test(pathname)) { + return this.jwtTokenService.getScopedLeases({ + provider, + scope: ["logs"] + }); + } + + if (/\/lease\/\d+\/\d+\/\d+\/kubeevents(?:\?.*)?$/.test(pathname)) { + return this.jwtTokenService.getScopedLeases({ + provider, + scope: ["events"] + }); + } + + logger.warn(`Unknown url: ${url}, returning full access`); + return this.jwtTokenService.getFullAccessLeases({ provider }); + } + + private linkSockets(providerWs: WebSocket, ws: WSContext, stats: ClientWebSocketStats): void { + providerWs.on( + "open", + propagateTracingContext(() => { + logger.info(`Connected to provider websocket: ${providerWs.url}`); + const waitlist = this.openProviderSockets[stats.id].waitlist; + while (waitlist.length > 0) { + const message = waitlist.shift(); + if (message) { + this.proxyMessageToProvider(message, ws, stats); + } + } + }) + ); + + providerWs.on( + "message", + propagateTracingContext(socketMessage => { + if ( + !socketMessage || + (Object.hasOwn(socketMessage, "byteLength") && (socketMessage as Buffer).byteLength === 0) || + (Object.hasOwn(socketMessage, "length") && (socketMessage as string | unknown[]).length === 0) + ) { + logger.info(`Received empty message from provider. Skipping...`); + return; + } + + const data = JSON.stringify({ + type: "websocket", + message: socketMessage.toString() + }); + stats.logDataTransfer(Buffer.from(data).length); + ws.send(data); + }) + ); + + providerWs.on( + "error", + propagateTracingContext(error => { + logger.error({ + event: "PROVIDER_WEBSOCKET_ERROR", + error + }); + const data = JSON.stringify({ + type: "websocket", + message: "Received error from provider websocket", + error: "Received error from provider websocket" + }); + stats.logDataTransfer(Buffer.from(data).length); + ws.send(data); + }) + ); + + providerWs.on( + "close", + propagateTracingContext((code, reason) => { + delete this.openProviderSockets[stats.id]; + logger.info({ + event: "PROVIDER_WEBSOCKET_CLOSED", + code, + reason + }); + const data = JSON.stringify({ + type: "websocket", + message: "", + closed: true, + code, + reason: reason.toString() + }); + stats.logDataTransfer(Buffer.from(data).length); + ws.send(data); + }) + ); + } + + private async getWalletByUserId(currentUser: UserOutput, ability: MongoAbility) { + const userWallet = await this.userWalletRepository.accessibleBy(ability, "sign").findOneByUserId(currentUser.id); + assert(userWallet, 404, "UserWallet Not Found"); + + return userWallet; + } +} + +interface CreateProviderSocketOptions { + wsId: string; + cert?: string; + key?: string; + chainNetwork: SupportedChainNetworks; + providerAddress: string; + walletId: number; +} diff --git a/apps/api/src/websocket/services/websocket-stats/websocket-stats.service.ts b/apps/api/src/websocket/services/websocket-stats/websocket-stats.service.ts new file mode 100644 index 0000000000..a87799cd48 --- /dev/null +++ b/apps/api/src/websocket/services/websocket-stats/websocket-stats.service.ts @@ -0,0 +1,96 @@ +import type { MongoAbility } from "@casl/ability"; +import { singleton } from "tsyringe"; +import { v4 as uuidv4 } from "uuid"; + +import type { UserOutput } from "@src/user/repositories"; + +@singleton() +export class WebsocketStatsService { + private items: ClientWebSocketStats[] = []; + + create(): ClientWebSocketStats { + const item = new ClientWebSocketStats(uuidv4()); + this.items.push(item); + + if (this.items.length > 100_000) { + this.items = this.items.slice(-5000); + } + + return item; + } + + getItems(): ReadonlyArray { + return this.items; + } +} + +export class ClientWebSocketStats { + readonly id: string; + private openedOn: Date; + private closedOn?: Date; + private usage: WebSocketUsage = "Unknown"; + private user?: { currentUser: UserOutput; ability: MongoAbility }; + + private usageStats: Record = { + StreamLogs: { count: 0, data: 0 }, + StreamEvents: { count: 0, data: 0 }, + Shell: { count: 0, data: 0 }, + DownloadLogs: { count: 0, data: 0 }, + Unknown: { count: 0, data: 0 } + }; + + constructor(id: string) { + this.id = id; + this.openedOn = new Date(); + } + + setUserIfExists(currentUser: UserOutput | undefined, ability: MongoAbility) { + if (currentUser) { + this.user = { currentUser, ability }; + } + } + + getUser() { + return this.user; + } + + setUsage(usage: WebSocketUsage) { + this.usage = usage; + + if (usage !== "Unknown") { + this.usageStats[usage].count += 1; + } + } + + logDataTransfer(dataTransferred: number) { + this.usageStats[this.usage].data += dataTransferred; + } + + close() { + this.closedOn = new Date(); + } + + isClosed() { + return !!this.closedOn; + } + + getStats() { + return { + id: this.id, + openedOn: this.openedOn, + closedOn: this.closedOn, + usageStats: this.usageStats, + totalStats: (Object.keys(this.usageStats) as WebSocketUsage[]).reduce( + (s, n) => { + return { + count: s.count + this.usageStats[n].count, + data: s.data + this.usageStats[n].data + }; + }, + { count: 0, data: 0 } + ) + }; + } +} + +export type WebSocketUsage = "StreamLogs" | "StreamEvents" | "Shell" | "DownloadLogs" | "Unknown"; diff --git a/apps/api/test/functional/__snapshots__/docs.spec.ts.snap b/apps/api/test/functional/__snapshots__/docs.spec.ts.snap index 1f15af5f57..b36b8b1ea2 100644 --- a/apps/api/test/functional/__snapshots__/docs.spec.ts.snap +++ b/apps/api/test/functional/__snapshots__/docs.spec.ts.snap @@ -4321,28 +4321,12 @@ exports[`API Docs GET /v1/doc returns docs with all routes expected 1`] = ` "properties": { "data": { "properties": { - "certificate": { - "properties": { - "certPem": { - "type": "string", - }, - "keyPem": { - "type": "string", - }, - }, - "required": [ - "certPem", - "keyPem", - ], - "type": "object", - }, "sdl": { "type": "string", }, }, "required": [ "sdl", - "certificate", ], "type": "object", }, @@ -5542,21 +5526,6 @@ exports[`API Docs GET /v1/doc returns docs with all routes expected 1`] = ` "application/json": { "schema": { "properties": { - "certificate": { - "properties": { - "certPem": { - "type": "string", - }, - "keyPem": { - "type": "string", - }, - }, - "required": [ - "certPem", - "keyPem", - ], - "type": "object", - }, "leases": { "items": { "properties": { @@ -5589,7 +5558,6 @@ exports[`API Docs GET /v1/doc returns docs with all routes expected 1`] = ` }, "required": [ "manifest", - "certificate", "leases", ], "type": "object", diff --git a/apps/api/test/functional/balances.spec.ts b/apps/api/test/functional/balances.spec.ts index d81fdd620f..9a0fbdc96c 100644 --- a/apps/api/test/functional/balances.spec.ts +++ b/apps/api/test/functional/balances.spec.ts @@ -1,27 +1,13 @@ import { faker } from "@faker-js/faker"; import nock from "nock"; -import { container } from "tsyringe"; import { app } from "@src/app"; -import { ApiKeyRepository } from "@src/auth/repositories/api-key/api-key.repository"; -import { ApiKeyGeneratorService } from "@src/auth/services/api-key/api-key-generator.service"; -import { UserWalletRepository } from "@src/billing/repositories"; -import type { CoreConfigService } from "@src/core/services/core-config/core-config.service"; -import { UserRepository } from "@src/user/repositories"; -import { apiNodeUrl } from "@src/utils/constants"; - -import { DeploymentGrantResponseSeeder } from "@test/seeders/deployment-grant-response.seeder"; -import { DeploymentListResponseSeeder } from "@test/seeders/deployment-list-response.seeder"; -import { FeeAllowanceResponseSeeder } from "@test/seeders/fee-allowance-response.seeder"; -import { stub } from "@test/services/stub"; -import { WalletTestingService } from "@test/services/wallet-testing.service"; + +import { setupUser, type SetupUserOptions } from "@test/setup/setup-user"; jest.setTimeout(20000); describe("Balances", () => { - const mockMasterWalletAddress = "akash1testmasterwalletaddress"; - const mockDeploymentGrantDenom = "uakt"; - afterEach(() => { jest.restoreAllMocks(); nock.cleanAll(); @@ -103,123 +89,7 @@ describe("Balances", () => { expect(response.status).toBe(404); }); - async function setup(options?: SetupOptions) { - const di = container.createChildContainer(); - const userRepository = di.resolve(UserRepository); - const apiKeyRepository = di.resolve(ApiKeyRepository); - const userWalletRepository = di.resolve(UserWalletRepository); - const walletService = new WalletTestingService(app); - - let apiKeyGenerator: ApiKeyGeneratorService; - let config: jest.Mocked; - - async function createTestUser() { - const { user, wallet } = await walletService.createUserAndWallet(); - const userWithId = { ...user, userId: faker.string.uuid() }; - config = stub({ get: jest.fn() }); - config.get.mockReturnValue("test"); - apiKeyGenerator = new ApiKeyGeneratorService(config); - const apiKey = apiKeyGenerator.generateApiKey(); - - jest.spyOn(userRepository, "findById").mockImplementation(async id => { - if (id === userWithId.id) { - return { - ...userWithId, - trial: false, - userWallets: { isTrialing: false } - }; - } - return undefined; - }); - - jest.spyOn(apiKeyRepository, "find").mockImplementation(async () => { - const now = new Date().toISOString(); - return [ - { - id: faker.string.uuid(), - userId: userWithId.id, - key: apiKey, - hashedKey: await apiKeyGenerator.hashApiKey(apiKey), - keyFormat: "sk", - name: "test", - createdAt: now, - updatedAt: now, - expiresAt: null, - lastUsedAt: null - } - ]; - }); - - const findOneByUserIdMock = jest.fn().mockImplementation(async (id: string) => { - if (id === userWithId.id) { - return options?.walletNotFound ? undefined : wallet; - } - return undefined; - }); - - jest.spyOn(userWalletRepository, "accessibleBy").mockReturnValue(userWalletRepository); - jest.spyOn(userWalletRepository, "findOneByUserId").mockImplementation(findOneByUserIdMock); - - nock(apiNodeUrl) - .persist() - .get(/\/cosmos\/feegrant\/v1beta1\/allowance\/.*\/.*/) - .reply( - 200, - FeeAllowanceResponseSeeder.create({ - granter: mockMasterWalletAddress, - grantee: wallet.address, - amount: "1000000" - }) - ); - - nock(apiNodeUrl) - .persist() - .get(/\/cosmos\/authz\/v1beta1\/grants\?.*/) - .reply( - 200, - DeploymentGrantResponseSeeder.create({ - granter: mockMasterWalletAddress, - grantee: wallet.address, - amount: "5000000" - }) - ); - - nock(apiNodeUrl) - .persist() - .get(/\/akash\/deployment\/v1beta3\/deployments\/list\?.*/) - .reply( - 200, - DeploymentListResponseSeeder.create({ - owner: wallet.address, - amount: "1000000" - }) - ); - - return { user: userWithId, apiKey, wallet }; - } - - const mockMasterWallet = { - getFirstAddress: jest.fn().mockResolvedValue(mockMasterWalletAddress) - }; - - const mockBillingConfig = { - get: jest.fn().mockReturnValue(mockDeploymentGrantDenom) - }; - - di.registerInstance("MANAGED", mockMasterWallet); - di.registerInstance("BillingConfig", mockBillingConfig); - - const { user, apiKey, wallet } = await createTestUser(); - - return { - di, - user, - apiKey, - wallet - }; - } - - interface SetupOptions { - walletNotFound?: boolean; + async function setup(options?: SetupUserOptions) { + return await setupUser(options); } }); diff --git a/apps/api/test/functional/deployments.spec.ts b/apps/api/test/functional/deployments.spec.ts index 6ec8038fa2..f00bb662fc 100644 --- a/apps/api/test/functional/deployments.spec.ts +++ b/apps/api/test/functional/deployments.spec.ts @@ -40,12 +40,14 @@ describe("Deployments API", () => { let knownUsers: Record; let knownApiKeys: Record; let knownWallets: Record; + let allWallets: UserWalletOutput[]; let currentHeight: number; beforeEach(() => { knownUsers = {}; knownApiKeys = {}; knownWallets = {}; + allWallets = []; currentHeight = faker.number.int({ min: 1000000, max: 10000000 }); jest.spyOn(userRepository, "findById").mockImplementation(async (id: string) => { @@ -66,6 +68,10 @@ describe("Deployments API", () => { jest.spyOn(blockHttpService, "getCurrentHeight").mockResolvedValue(currentHeight); + jest.spyOn(userWalletRepository, "findOneBy").mockImplementation(async (query: Partial | undefined) => { + return Promise.resolve(allWallets.find(wallet => wallet.address === query?.address)); + }); + const fakeWalletRepository = { findByUserId: async (id: string) => { return Promise.resolve(knownWallets[id]); @@ -108,6 +114,7 @@ describe("Deployments API", () => { knownUsers[userId] = user; knownApiKeys[userApiKeySecret] = apiKey; knownWallets[user.id] = wallets; + allWallets.push(...wallets); return { user, userApiKeySecret, wallets }; } @@ -684,6 +691,41 @@ describe("Deployments API", () => { const yml = fs.readFileSync(path.resolve(__dirname, "../mocks/hello-world-sdl.yml"), "utf8"); + const response = await app.request(`/v1/deployments/${dseq}`, { + method: "PUT", + body: JSON.stringify({ + data: { + sdl: yml + } + }), + headers: new Headers({ "Content-Type": "application/json", "x-api-key": userApiKeySecret }) + }); + + expect(response.status).toBe(200); + const result = (await response.json()) as { data: unknown }; + expect(result.data).toEqual({ + deployment: expect.any(Object), + escrow_account: expect.any(Object), + leases: expect.arrayContaining([expect.any(Object)]) + }); + }); + + it("should update a deployment successfully with a certificate provided", async () => { + const { userApiKeySecret, wallets } = await mockUser(); + const dseq = "1234"; + setupDeploymentInfoMock(wallets, dseq); + + const mockTxResult = { + code: 0, + hash: "test-hash", + transactionHash: "test-hash", + rawLog: "success" + }; + + jest.spyOn(signerService, "executeDecodedTxByUserId").mockResolvedValueOnce(mockTxResult); + + const yml = fs.readFileSync(path.resolve(__dirname, "../mocks/hello-world-sdl.yml"), "utf8"); + const response = await app.request(`/v1/deployments/${dseq}`, { method: "PUT", body: JSON.stringify({ @@ -719,11 +761,7 @@ describe("Deployments API", () => { method: "PUT", body: JSON.stringify({ data: { - sdl: yml, - certificate: { - certPem: "test-cert-pem", - keyPem: "test-key-pem" - } + sdl: yml } }), headers: new Headers({ "Content-Type": "application/json", "x-api-key": userApiKeySecret }) @@ -744,11 +782,7 @@ describe("Deployments API", () => { method: "PUT", body: JSON.stringify({ data: { - sdl: "test-sdl", - certificate: { - certPem: "test-cert-pem", - keyPem: "test-key-pem" - } + sdl: "test-sdl" } }), headers: new Headers({ "Content-Type": "application/json" }) @@ -772,11 +806,7 @@ describe("Deployments API", () => { method: "PUT", body: JSON.stringify({ data: { - sdl: "invalid-sdl", - certificate: { - certPem: "test-cert-pem", - keyPem: "test-key-pem" - } + sdl: "invalid-sdl" } }), headers: new Headers({ "Content-Type": "application/json", "x-api-key": userApiKeySecret }) @@ -786,45 +816,6 @@ describe("Deployments API", () => { const result = (await response.json()) as { message: string }; expect(result.message).toContain("Invalid SDL"); }); - - it("should return 400 if certificate is missing required fields", async () => { - const { userApiKeySecret, wallets } = await mockUser(); - const dseq = "1234"; - setupDeploymentInfoMock(wallets, dseq); - - const yml = fs.readFileSync(path.resolve(__dirname, "../mocks/hello-world-sdl.yml"), "utf8"); - - const response = await app.request(`/v1/deployments/${dseq}`, { - method: "PUT", - body: JSON.stringify({ - data: { - sdl: yml, - certificate: { - certPem: "test-cert-pem" - } - } - }), - headers: new Headers({ "Content-Type": "application/json", "x-api-key": userApiKeySecret }) - }); - - expect(response.status).toBe(400); - const result = await response.json(); - expect(result).toEqual({ - data: [ - { - code: "invalid_type", - expected: "string", - message: "Required", - path: ["data", "certificate", "keyPem"], - received: "undefined" - } - ], - error: "BadRequestError", - message: "Validation error", - code: "validation_error", - type: "validation_error" - }); - }); }); describe("GET /v1/addresses/{address}/deployments/{skip}/{limit}", () => { diff --git a/apps/api/test/functional/lease-flow.spec.ts b/apps/api/test/functional/lease-flow.spec.ts index 146d27b4a6..0c5fbe3980 100644 --- a/apps/api/test/functional/lease-flow.spec.ts +++ b/apps/api/test/functional/lease-flow.spec.ts @@ -115,7 +115,22 @@ describe("Lease Flow", () => { throw new Error("No bids received after maximum attempts"); } - it("should execute complete lease lifecycle", async () => { + [ + { + name: "should execute complete lease lifecycle without a certificate provided", + includeCertificate: false + }, + { + name: "should execute complete lease lifecycle with a certificate provided", + includeCertificate: true + } + ].forEach(({ name, includeCertificate }) => { + it(name, async () => { + await runLifecycle(includeCertificate); + }); + }); + + const runLifecycle = async (includeCertificate: boolean) => { // 1. Setup user and get authentication const { apiKey, wallet } = await createTestUser(); @@ -182,10 +197,12 @@ describe("Lease Flow", () => { const body = { manifest, - certificate: { - certPem, - keyPem: encryptedKey - }, + certificate: includeCertificate + ? { + certPem, + keyPem: encryptedKey + } + : undefined, leases: [ { dseq, @@ -277,5 +294,5 @@ describe("Lease Flow", () => { expect(finalBalances.deployments).toBeLessThan(afterDepositBalances.deployments); // Total should be less than initial total due to fees expect(finalBalances.total).toBeLessThan(initialTotal); - }); + }; }); diff --git a/apps/api/test/functional/websocket.spec.ts b/apps/api/test/functional/websocket.spec.ts new file mode 100644 index 0000000000..35089bfbae --- /dev/null +++ b/apps/api/test/functional/websocket.spec.ts @@ -0,0 +1,194 @@ +import { setTimeout as delay } from "timers/promises"; +import waitForExpect from "wait-for-expect"; +import WebSocket from "ws"; + +import { createAkashAddress } from "@test/seeders"; +import { createX509CertPair } from "@test/seeders/x509-cert-pair"; +import { startProviderServer, stopProviderServer } from "@test/setup/provider-server"; +import { startServer } from "@test/setup/server"; +import { setupUser, teardownUser } from "@test/setup/setup-user"; + +jest.setTimeout(20000); + +describe("Provider WebSocket", () => { + afterEach(() => { + stopProviderServer(); + teardownUser(); + }); + + it("proxies provider websocket messages", async () => { + const { providerAddress, providerUrl, ws } = await setup({ + onConnection: providerWs => { + providerWs.send("connected"); + const messages: string[] = []; + providerWs.on("message", (data: Buffer) => { + messages.push(data.toString()); + console.log("messages", messages); + if (data.toString() === "flush") { + providerWs.send(JSON.stringify(messages)); + } + }); + } + }); + + await new Promise(resolve => ws.once("open", resolve)); + ws.send(JSON.stringify(ourMessage("hello", providerUrl, { providerAddress }))); + expect(await waitForMessage(ws)).toEqual(providerMessage("connected")); + + ws.send(JSON.stringify(ourMessage("test", providerUrl, { providerAddress }))); + ws.send(JSON.stringify(ourMessage("flush", providerUrl, { providerAddress }))); + expect(await waitForMessage(ws)).toEqual(providerMessage(JSON.stringify(["hello", "test", "flush"]))); + }); + + it("responds to ping messages", async () => { + const { ws } = await setup({ + onConnection: providerWs => { + providerWs.on("message", (data: Buffer) => { + if (data.toString() === "ping") { + providerWs.send("pong"); + } + }); + } + }); + + await new Promise(resolve => ws.once("open", resolve)); + ws.send(JSON.stringify({ type: "ping" })); + + expect(await waitForMessage(ws)).toEqual({ type: "pong" }); + }); + + it("does not connect to provider socket until 1st message is sent", async () => { + const { ws, providerAddress, providerUrl } = await setup({ + onConnection: providerWs => providerWs.send("connected") + }); + + const [providerMessageOnConnect] = await Promise.all([ + Promise.race([waitForMessage(ws), delay(200, null)]), + new Promise(resolve => ws.once("open", resolve)) + ]); + expect(providerMessageOnConnect).toBe(null); + + ws.send(JSON.stringify(ourMessage("hello", providerUrl, { providerAddress }))); + + expect(await waitForMessage(ws)).toEqual(providerMessage("connected")); + }); + + it('does not send message to provider socket if "data" property is empty', async () => { + const { ws, providerAddress, providerUrl } = await setup({ + onConnection: providerWs => + providerWs.on("message", () => { + providerWs.send("received"); + }) + }); + + await new Promise(resolve => ws.once("open", resolve)); + ws.send(JSON.stringify(ourMessage("", providerUrl, { providerAddress }))); + + const receivedProviderMessage = await Promise.race([waitForMessage(ws), delay(200, null)]); + + expect(receivedProviderMessage).toBe(null); + }); + + it("closes provider websocket when client websocket is closed", async () => { + let isProviderWebsocketOpen = false; + const onProviderWsClose = jest.fn(); + const { ws, providerAddress, providerUrl } = await setup({ + onConnection: providerWs => { + isProviderWebsocketOpen = true; + providerWs.on("close", onProviderWsClose); + } + }); + + await new Promise(resolve => ws.once("open", resolve)); + ws.send(JSON.stringify(ourMessage("hello", providerUrl, { providerAddress }))); + await waitForExpect(() => { + expect(isProviderWebsocketOpen).toBe(true); + }, 5000); + + ws.close(); + + await waitForExpect(() => { + expect(onProviderWsClose).toHaveBeenCalled(); + }); + }); + + it("sends close message if provider socket has been closed", async () => { + const { ws, providerAddress, providerUrl } = await setup({ + onConnection: providerWs => + providerWs.on("message", data => { + if (data.toString() === "please_close") { + providerWs.close(1000); + } + }) + }); + + await new Promise(resolve => ws.once("open", resolve)); + ws.send(JSON.stringify(ourMessage("please_close", providerUrl, { providerAddress }))); + + expect(await waitForMessage(ws)).toEqual( + providerMessage("", { + closed: true, + code: 1000, + reason: "" + }) + ); + }); + + function providerMessage(message: T, extra?: Record) { + return { + ...extra, + type: "websocket", + message + }; + } + + function ourMessage(message: string, url: string, extra?: Record) { + return { + ...extra, + type: "websocket", + data: message + .split("") + .map(char => char.charCodeAt(0)) + .join(","), + url: `${url}/test`, + providerAddress: extra?.providerAddress || createAkashAddress(), + chainNetwork: extra?.chainNetwork || "sandbox" + }; + } + + function waitForMessage(ws: WebSocket) { + return new Promise(resolve => { + ws.once("message", data => resolve(JSON.parse(data.toString()))); + }); + } + + type SetupOptions = { + onConnection?: (providerWs: WebSocket) => void; + }; + + async function setup(options: SetupOptions) { + const { apiKey } = await setupUser(); + + const serverUrl = await startServer(); + const providerAddress = createAkashAddress(); + const certPair = createX509CertPair({ commonName: providerAddress }); + const { providerUrl } = await startProviderServer({ + certPair, + websocketServer: { + enable: true, + onConnection: options.onConnection + } + }); + const ws = new WebSocket(`${serverUrl}/v1/ws`, { + headers: { + "x-api-key": apiKey + } + }); + + return { + providerAddress, + providerUrl, + ws + }; + } +}); diff --git a/apps/api/test/seeders/x509-cert-pair.ts b/apps/api/test/seeders/x509-cert-pair.ts new file mode 100644 index 0000000000..9e7a38ef58 --- /dev/null +++ b/apps/api/test/seeders/x509-cert-pair.ts @@ -0,0 +1,44 @@ +import { X509Certificate } from "crypto"; +import { pki } from "node-forge"; + +export function createX509CertPair(options: CertificateOptions = {}): CertPair { + const keys = pki.rsa.generateKeyPair(2048); + const cert = pki.createCertificate(); + + cert.publicKey = keys.publicKey; + cert.serialNumber = options.serialNumber ?? "177831BE7F249E66"; + cert.validity.notBefore = options.validFrom || new Date(); + cert.validity.notAfter = options.validTo || nextDay(cert.validity.notBefore); + + const attrs = [ + { name: "commonName", value: options?.commonName ?? "example.org" }, + { name: "countryName", value: "US" }, + { shortName: "ST", value: "Virginia" } + ]; + cert.setSubject(attrs); + cert.setIssuer(attrs); + cert.sign(keys.privateKey); + + return { + cert: new X509Certificate(pki.certificateToPem(cert)), + key: pki.privateKeyToPem(keys.privateKey) + }; +} + +export interface CertPair { + key: string; + cert: X509Certificate; +} + +export interface CertificateOptions { + validFrom?: Date; + validTo?: Date; + serialNumber?: string; + commonName?: string; +} + +function nextDay(from: Date) { + const date = new Date(from.getTime()); + date.setDate(date.getDate() + 1); + return date; +} diff --git a/apps/api/test/setup/provider-server.ts b/apps/api/test/setup/provider-server.ts new file mode 100644 index 0000000000..3402bbb2a9 --- /dev/null +++ b/apps/api/test/setup/provider-server.ts @@ -0,0 +1,84 @@ +import type { IncomingMessage, ServerResponse } from "http"; +import type { ServerOptions } from "https"; +import https from "https"; +import type { AddressInfo } from "net"; +import WebSocket from "ws"; + +import { shutdownServer } from "@src/lib/shutdown-server/shutdown-server"; + +import { createAkashAddress } from "@test/seeders"; +import { type CertPair, createX509CertPair } from "@test/seeders/x509-cert-pair"; + +let runningServer: https.Server | undefined; + +export function startProviderServer(options: ProviderServerOptions): Promise { + return new Promise(resolve => { + const certPair = options.certPair || createX509CertPair({ commonName: createAkashAddress() }); + const httpServerOptions: ServerOptions = { + key: certPair.key, + cert: certPair.cert.toJSON() + }; + + let cleanupHandlers = new Set<() => void>(); + const handlers: RequestHandlers = { + "/200.txt"(_, res) { + res.writeHead(200, "OK", { "Content-Type": "text/plain" }); + res.end("Hello, World!"); + }, + "/headers.json"(_, res) { + res.writeHead(200, "OK", { + "Content-Type": "application/json", + "X-Custom-Header": "test" + }); + res.end(JSON.stringify({ ok: true })); + }, + ...options.handlers + }; + + const server = https.createServer(httpServerOptions, (req, res) => { + if (req.url && Object.hasOwn(handlers, req.url)) { + const cleanup = handlers[req.url](req, res); + if (cleanup) cleanupHandlers.add(cleanup); + } else { + res.writeHead(404, "Not found", { "Content-Type": "text/plain" }); + res.end("Not Found"); + } + }); + + server.on("close", () => { + cleanupHandlers.forEach(handler => handler()); + cleanupHandlers = new Set(); + }); + + server.listen(0, () => { + runningServer = server; + resolve({ providerUrl: `https://localhost:${(server.address() as AddressInfo).port}` }); + }); + + if (options.websocketServer?.enable) { + const wss = new WebSocket.Server({ server }); + server.on("close", () => wss.close()); + if (options.websocketServer.onConnection) { + wss.on("connection", options.websocketServer.onConnection); + } + } + }); +} + +export function stopProviderServer(): Promise { + return shutdownServer(runningServer); +} + +type RequestHandlers = Record (() => void) | undefined | void>; +export interface ProviderServerOptions { + certPair?: CertPair; + handlers?: RequestHandlers; + websocketServer?: { + enable: boolean; + onConnection?(ws: WebSocket): void; + }; +} + +interface ProviderServerResult { + providerUrl: string; +} diff --git a/apps/api/test/setup/server.ts b/apps/api/test/setup/server.ts new file mode 100644 index 0000000000..03dae58084 --- /dev/null +++ b/apps/api/test/setup/server.ts @@ -0,0 +1,14 @@ +import type { AppServer } from "../../src/app"; +import { initApp } from "../../src/app"; + +let server: AppServer | undefined; + +export async function startServer(): Promise { + server = await initApp(0); + + return server!.host; +} + +export async function stopServer(): Promise { + await server?.close(); +} diff --git a/apps/api/test/setup/setup-user.ts b/apps/api/test/setup/setup-user.ts new file mode 100644 index 0000000000..42b40bfc29 --- /dev/null +++ b/apps/api/test/setup/setup-user.ts @@ -0,0 +1,145 @@ +import { faker } from "@faker-js/faker"; +import nock from "nock"; +import { container } from "tsyringe"; + +import { app } from "@src/app"; +import { ApiKeyRepository } from "@src/auth/repositories/api-key/api-key.repository"; +import { ApiKeyGeneratorService } from "@src/auth/services/api-key/api-key-generator.service"; +import { UserWalletRepository } from "@src/billing/repositories"; +import type { CoreConfigService } from "@src/core/services/core-config/core-config.service"; +import { UserRepository } from "@src/user/repositories"; +import { apiNodeUrl } from "@src/utils/constants"; + +import { DeploymentGrantResponseSeeder } from "@test/seeders/deployment-grant-response.seeder"; +import { DeploymentListResponseSeeder } from "@test/seeders/deployment-list-response.seeder"; +import { FeeAllowanceResponseSeeder } from "@test/seeders/fee-allowance-response.seeder"; +import { stub } from "@test/services/stub"; +import { WalletTestingService } from "@test/services/wallet-testing.service"; + +export type SetupUserOptions = { + walletNotFound?: boolean; +}; + +const mockMasterWalletAddress = "akash1testmasterwalletaddress"; +const mockDeploymentGrantDenom = "uakt"; + +export const setupUser = async (options?: SetupUserOptions) => { + const di = container.createChildContainer(); + const userRepository = di.resolve(UserRepository); + const apiKeyRepository = di.resolve(ApiKeyRepository); + const userWalletRepository = di.resolve(UserWalletRepository); + const walletService = new WalletTestingService(app); + + let apiKeyGenerator: ApiKeyGeneratorService; + let config: jest.Mocked; + + async function createTestUser() { + const { user, wallet } = await walletService.createUserAndWallet(); + const userWithId = { ...user, userId: faker.string.uuid() }; + config = stub({ get: jest.fn() }); + config.get.mockReturnValue("test"); + apiKeyGenerator = new ApiKeyGeneratorService(config); + const apiKey = apiKeyGenerator.generateApiKey(); + + jest.spyOn(userRepository, "findById").mockImplementation(async id => { + if (id === userWithId.id) { + return { + ...userWithId, + trial: false, + userWallets: { isTrialing: false } + }; + } + return undefined; + }); + + jest.spyOn(apiKeyRepository, "find").mockImplementation(async () => { + const now = new Date().toISOString(); + return [ + { + id: faker.string.uuid(), + userId: userWithId.id, + key: apiKey, + hashedKey: await apiKeyGenerator.hashApiKey(apiKey), + keyFormat: "sk", + name: "test", + createdAt: now, + updatedAt: now, + expiresAt: null, + lastUsedAt: null + } + ]; + }); + + const findOneByUserIdMock = jest.fn().mockImplementation(async (id: string) => { + if (id === userWithId.id) { + return options?.walletNotFound ? undefined : wallet; + } + return undefined; + }); + + jest.spyOn(userWalletRepository, "accessibleBy").mockReturnValue(userWalletRepository); + jest.spyOn(userWalletRepository, "findOneByUserId").mockImplementation(findOneByUserIdMock); + + nock(apiNodeUrl) + .persist() + .get(/\/cosmos\/feegrant\/v1beta1\/allowance\/.*\/.*/) + .reply( + 200, + FeeAllowanceResponseSeeder.create({ + granter: mockMasterWalletAddress, + grantee: wallet.address, + amount: "1000000" + }) + ); + + nock(apiNodeUrl) + .persist() + .get(/\/cosmos\/authz\/v1beta1\/grants\?.*/) + .reply( + 200, + DeploymentGrantResponseSeeder.create({ + granter: mockMasterWalletAddress, + grantee: wallet.address, + amount: "5000000" + }) + ); + + nock(apiNodeUrl) + .persist() + .get(/\/akash\/deployment\/v1beta3\/deployments\/list\?.*/) + .reply( + 200, + DeploymentListResponseSeeder.create({ + owner: wallet.address, + amount: "1000000" + }) + ); + + return { user: userWithId, apiKey, wallet }; + } + + const mockMasterWallet = { + getFirstAddress: jest.fn().mockResolvedValue(mockMasterWalletAddress) + }; + + const mockBillingConfig = { + get: jest.fn().mockReturnValue(mockDeploymentGrantDenom) + }; + + di.registerInstance("MANAGED", mockMasterWallet); + di.registerInstance("BillingConfig", mockBillingConfig); + + const { user, apiKey, wallet } = await createTestUser(); + + return { + di, + user, + apiKey, + wallet + }; +}; + +export const teardownUser = () => { + jest.restoreAllMocks(); + nock.cleanAll(); +}; diff --git a/package-lock.json b/package-lock.json index 9b51dd489d..e3aa0e00ea 100644 --- a/package-lock.json +++ b/package-lock.json @@ -43,6 +43,7 @@ "@akashnetwork/database": "*", "@akashnetwork/env-loader": "*", "@akashnetwork/http-sdk": "*", + "@akashnetwork/jwt": "*", "@akashnetwork/logging": "*", "@akashnetwork/net": "*", "@akashnetwork/react-query-sdk": "*", @@ -58,6 +59,7 @@ "@cosmjs/tendermint-rpc": "^0.32.4", "@dotenvx/dotenvx": "^1.9.0", "@hono/node-server": "1.13.7", + "@hono/node-ws": "^1.2.0", "@hono/otel": "~0.4.0", "@hono/swagger-ui": "0.4.1", "@hono/zod-openapi": "0.18.4", @@ -93,6 +95,7 @@ "markdown-to-txt": "^2.0.1", "memory-cache": "^0.2.0", "murmurhash": "^2.0.1", + "node-forge": "^1.3.1", "pg": "^8.12.0", "pg-boss": "^10.3.2", "pg-hstore": "^2.3.4", @@ -126,9 +129,11 @@ "@types/memory-cache": "^0.2.2", "@types/node": "^22.13.11", "@types/node-fetch": "^2.6.2", + "@types/node-forge": "^1.3.11", "@types/pg": "^8.11.6", "@types/semver": "^7.5.2", "@types/uuid": "^8.3.1", + "@types/ws": "^8.18.1", "@typescript-eslint/eslint-plugin": "^7.12.0", "alias-hq": "^5.1.6", "copy-webpack-plugin": "^12.0.2", @@ -152,6 +157,7 @@ "ts-loader": "^9.5.2", "type-fest": "^4.26.1", "typescript": "~5.8.2", + "wait-for-expect": "^4.0.0", "webpack": "^5.91.0", "webpack-cli": "4.10.0", "webpack-node-externals": "^3.0.0" @@ -11283,6 +11289,22 @@ "hono": "^4" } }, + "node_modules/@hono/node-ws": { + "version": "1.2.0", + "resolved": "https://registry.npmjs.org/@hono/node-ws/-/node-ws-1.2.0.tgz", + "integrity": "sha512-OBPQ8OSHBw29mj00wT/xGYtB6HY54j0fNSdVZ7gZM3TUeq0So11GXaWtFf1xWxQNfumKIsj0wRuLKWfVsO5GgQ==", + "license": "MIT", + "dependencies": { + "ws": "^8.17.0" + }, + "engines": { + "node": ">=18.14.1" + }, + "peerDependencies": { + "@hono/node-server": "^1.11.1", + "hono": "^4.6.0" + } + }, "node_modules/@hono/otel": { "version": "0.4.0", "resolved": "https://registry.npmjs.org/@hono/otel/-/otel-0.4.0.tgz", @@ -25604,10 +25626,11 @@ "license": "MIT" }, "node_modules/@types/ws": { - "version": "8.5.13", - "resolved": "https://registry.npmjs.org/@types/ws/-/ws-8.5.13.tgz", - "integrity": "sha512-osM/gWBTPKgHV8XkTunnegTRIsvF6owmf5w+JtAfOw472dptdm0dlGv4xCt6GwQRcC2XVOvvRE/0bAoQcL2QkA==", + "version": "8.18.1", + "resolved": "https://registry.npmjs.org/@types/ws/-/ws-8.18.1.tgz", + "integrity": "sha512-ThVF6DCVhA8kUGy+aazFQ4kXQ7E1Ty7A3ypFOe0IcJV8O/M511G99AW24irKrW56Wt44yG9+ij8FaqoBGkuBXg==", "dev": true, + "license": "MIT", "dependencies": { "@types/node": "*" } @@ -52871,6 +52894,13 @@ "node": ">=14" } }, + "node_modules/wait-for-expect": { + "version": "4.0.0", + "resolved": "https://registry.npmjs.org/wait-for-expect/-/wait-for-expect-4.0.0.tgz", + "integrity": "sha512-mcH2HYUUHhdFGHVJkgwkBxRihZO4VSuPyh6xhYHz7LEnYkcaLbTAEEsTpYiFw4UY45XdTZYYIaquuMucw9wWMw==", + "dev": true, + "license": "MIT" + }, "node_modules/walker": { "version": "1.0.8", "license": "Apache-2.0", diff --git a/packages/http-sdk/src/provider/provider-http.service.ts b/packages/http-sdk/src/provider/provider-http.service.ts index 17eb57cac1..c262243b53 100644 --- a/packages/http-sdk/src/provider/provider-http.service.ts +++ b/packages/http-sdk/src/provider/provider-http.service.ts @@ -11,4 +11,32 @@ export class ProviderHttpService extends HttpService { async getProvider(address: string): Promise { return this.extractData(await this.get(`/akash/provider/v1beta3/providers/${address}`)); } + + async sendManifest({ hostUri, dseq, manifest, jwtToken }: { hostUri: string; dseq: string; manifest: string; jwtToken: string }) { + return this.extractData( + await this.put(`/deployment/${dseq}/manifest`, { + baseURL: hostUri, + body: manifest, + headers: this.getJwtTokenHeaders(jwtToken), + timeout: 60000 + }) + ); + } + + async getLeaseStatus({ hostUri, dseq, gseq, oseq, jwtToken }: { hostUri: string; dseq: string; gseq: number; oseq: number; jwtToken: string }) { + return this.extractData( + await this.get(`/lease/${dseq}/${gseq}/${oseq}/status`, { + baseURL: hostUri, + headers: this.getJwtTokenHeaders(jwtToken), + timeout: 30000 + }) + ); + } + + private getJwtTokenHeaders(jwtToken: string) { + return { + Authorization: `Bearer ${jwtToken}`, + "Content-Type": "application/json" + }; + } }