From 70d434fbecb6a7722b68935dc70f2e6b2c941aa4 Mon Sep 17 00:00:00 2001 From: rcholic Date: Mon, 29 Dec 2025 18:20:17 -0800 Subject: [PATCH 1/2] trace index upload --- src/tracing/cloud-sink.ts | 169 ++++++++++++++++++++++++++- tests/tracing/cloud-sink.test.ts | 193 +++++++++++++++++++++++++++++++ tests/video-recording.test.ts | 12 +- 3 files changed, 367 insertions(+), 7 deletions(-) diff --git a/src/tracing/cloud-sink.ts b/src/tracing/cloud-sink.ts index 955476e4..9f08c0ee 100644 --- a/src/tracing/cloud-sink.ts +++ b/src/tracing/cloud-sink.ts @@ -284,7 +284,10 @@ export class CloudTraceSink extends TraceSink { if (statusCode === 200) { console.log('✅ [Sentience] Trace uploaded successfully'); - // Call /v1/traces/complete to report file sizes (NEW) + // Upload trace index file + await this._uploadIndex(); + + // Call /v1/traces/complete to report file sizes await this._completeTrace(); // 4. Delete temp file on success @@ -381,6 +384,170 @@ export class CloudTraceSink extends TraceSink { } } + /** + * Upload trace index file to cloud storage. + * + * Called after successful trace upload to provide fast timeline rendering. + * The index file enables O(1) step lookups without parsing the entire trace. + */ + private async _uploadIndex(): Promise { + // Construct index file path (same as trace file with .index.json extension) + const indexPath = this.tempFilePath.replace('.jsonl', '.index.json'); + + try { + // Check if index file exists + await fsPromises.access(indexPath); + } catch { + this.logger?.warn('Index file not found, skipping index upload'); + return; + } + + try { + // Request index upload URL from API + if (!this.apiKey) { + this.logger?.info('No API key provided, skipping index upload'); + return; + } + + const uploadUrlResponse = await this._requestIndexUploadUrl(); + if (!uploadUrlResponse) { + return; + } + + // Read and compress index file + const indexData = await fsPromises.readFile(indexPath); + const compressedIndex = zlib.gzipSync(indexData); + const indexSize = compressedIndex.length; + + this.logger?.info(`Index file size: ${(indexSize / 1024).toFixed(2)} KB`); + + console.log(`📤 [Sentience] Uploading trace index (${indexSize} bytes)...`); + + // Upload index to cloud storage + const statusCode = await this._uploadIndexToCloud(uploadUrlResponse, compressedIndex); + + if (statusCode === 200) { + console.log('✅ [Sentience] Trace index uploaded successfully'); + + // Delete local index file after successful upload + try { + await fsPromises.unlink(indexPath); + } catch { + // Ignore cleanup errors + } + } else { + this.logger?.warn(`Index upload failed: HTTP ${statusCode}`); + console.log(`⚠️ [Sentience] Index upload failed: HTTP ${statusCode}`); + } + } catch (error: any) { + // Non-fatal: log but don't crash + this.logger?.warn(`Error uploading trace index: ${error.message}`); + console.log(`⚠️ [Sentience] Error uploading trace index: ${error.message}`); + } + } + + /** + * Request index upload URL from Sentience API + */ + private async _requestIndexUploadUrl(): Promise { + return new Promise((resolve) => { + const url = new URL(`${this.apiUrl}/v1/traces/index_upload`); + const protocol = url.protocol === 'https:' ? https : http; + + const body = JSON.stringify({ run_id: this.runId }); + + const options = { + hostname: url.hostname, + port: url.port || (url.protocol === 'https:' ? 443 : 80), + path: url.pathname + url.search, + method: 'POST', + headers: { + 'Content-Type': 'application/json', + 'Content-Length': Buffer.byteLength(body), + Authorization: `Bearer ${this.apiKey}`, + }, + timeout: 10000, + }; + + const req = protocol.request(options, (res) => { + let data = ''; + res.on('data', (chunk) => { + data += chunk; + }); + res.on('end', () => { + if (res.statusCode === 200) { + try { + const response = JSON.parse(data); + resolve(response.upload_url || null); + } catch { + this.logger?.warn('Failed to parse index upload URL response'); + resolve(null); + } + } else { + this.logger?.warn(`Failed to get index upload URL: HTTP ${res.statusCode}`); + resolve(null); + } + }); + }); + + req.on('error', (error) => { + this.logger?.warn(`Error requesting index upload URL: ${error.message}`); + resolve(null); + }); + + req.on('timeout', () => { + req.destroy(); + this.logger?.warn('Index upload URL request timeout'); + resolve(null); + }); + + req.write(body); + req.end(); + }); + } + + /** + * Upload index data to cloud using pre-signed URL + */ + private async _uploadIndexToCloud(uploadUrl: string, data: Buffer): Promise { + return new Promise((resolve, reject) => { + const url = new URL(uploadUrl); + const protocol = url.protocol === 'https:' ? https : http; + + const options = { + hostname: url.hostname, + port: url.port || (url.protocol === 'https:' ? 443 : 80), + path: url.pathname + url.search, + method: 'PUT', + headers: { + 'Content-Type': 'application/json', + 'Content-Encoding': 'gzip', + 'Content-Length': data.length, + }, + timeout: 30000, // 30 second timeout + }; + + const req = protocol.request(options, (res) => { + res.on('data', () => {}); + res.on('end', () => { + resolve(res.statusCode || 500); + }); + }); + + req.on('error', (error) => { + reject(error); + }); + + req.on('timeout', () => { + req.destroy(); + reject(new Error('Index upload timeout')); + }); + + req.write(data); + req.end(); + }); + } + /** * Get unique identifier for this sink */ diff --git a/tests/tracing/cloud-sink.test.ts b/tests/tracing/cloud-sink.test.ts index a66eec26..ad793cc5 100644 --- a/tests/tracing/cloud-sink.test.ts +++ b/tests/tracing/cloud-sink.test.ts @@ -302,4 +302,197 @@ describe('CloudTraceSink', () => { expect(event1.run_id).toBe('test-run-123'); }); }); + + describe('Index upload', () => { + let indexServer: http.Server; + let indexServerPort: number; + let indexUploadUrl: string; + + beforeAll((done) => { + // Create separate server for index upload API + indexServer = http.createServer((req, res) => { + (indexServer as any).lastRequest = { + method: req.method, + url: req.url, + headers: req.headers, + }; + + const chunks: Buffer[] = []; + req.on('data', (chunk) => chunks.push(chunk)); + req.on('end', () => { + (indexServer as any).lastRequestBody = Buffer.concat(chunks); + + if (req.url === '/v1/traces/index_upload') { + // Return index upload URL + res.writeHead(200, { 'Content-Type': 'application/json' }); + res.end(JSON.stringify({ + upload_url: `http://localhost:${indexServerPort}/index-upload` + })); + } else if (req.url === '/index-upload') { + // Accept index upload + res.writeHead(200); + res.end('OK'); + } else { + res.writeHead(404); + res.end('Not Found'); + } + }); + }); + + indexServer.listen(0, () => { + const address = indexServer.address(); + if (address && typeof address === 'object') { + indexServerPort = address.port; + done(); + } + }); + }); + + afterAll((done) => { + indexServer.close(done); + }); + + beforeEach(() => { + delete (indexServer as any).lastRequest; + delete (indexServer as any).lastRequestBody; + }); + + it('should upload index file after trace upload', async () => { + const runId = 'test-run-index-' + Date.now(); + const apiUrl = `http://localhost:${indexServerPort}`; + + const sink = new CloudTraceSink( + uploadUrl, + runId, + 'sk_test_123', + apiUrl + ); + + sink.emit({ v: 1, type: 'run_start', seq: 1, data: { agent: 'TestAgent' } }); + sink.emit({ v: 1, type: 'step_start', seq: 2, data: { step: 1 } }); + sink.emit({ v: 1, type: 'snapshot', seq: 3, data: { url: 'https://example.com' } }); + sink.emit({ v: 1, type: 'run_end', seq: 4, data: { steps: 1 } }); + + await sink.close(); + + // Verify index upload URL request was made + expect((indexServer as any).lastRequest).toBeDefined(); + expect((indexServer as any).lastRequest.url).toBe('/v1/traces/index_upload'); + expect((indexServer as any).lastRequest.method).toBe('POST'); + + // Verify request body + const requestBody = JSON.parse((indexServer as any).lastRequestBody.toString()); + expect(requestBody.run_id).toBe(runId); + + // Give it a moment for async index upload to complete + await new Promise(resolve => setTimeout(resolve, 100)); + }); + + it('should skip index upload when no API key provided', async () => { + const runId = 'test-run-no-key-' + Date.now(); + + const sink = new CloudTraceSink(uploadUrl, runId); // No API key + + sink.emit({ v: 1, type: 'run_start', seq: 1 }); + + await sink.close(); + + // Verify index upload was NOT attempted + expect((indexServer as any).lastRequest).toBeUndefined(); + }); + + it('should handle index upload failure gracefully', async () => { + const runId = 'test-run-index-fail-' + Date.now(); + + // Create a server that returns 500 for index upload requests + const failServer = http.createServer((req, res) => { + if (req.url === '/v1/traces/index_upload') { + res.writeHead(500); + res.end('Internal Server Error'); + } else { + res.writeHead(404); + res.end(); + } + }); + + await new Promise((resolve) => { + failServer.listen(0, () => resolve()); + }); + + const address = failServer.address(); + const failPort = (address as any).port; + const apiUrl = `http://localhost:${failPort}`; + + const sink = new CloudTraceSink( + uploadUrl, + runId, + 'sk_test_123', + apiUrl + ); + + sink.emit({ v: 1, type: 'run_start', seq: 1 }); + + // Should not throw even if index upload fails + await expect(sink.close()).resolves.not.toThrow(); + + // Clean up + failServer.close(); + }); + + it('should handle missing index file gracefully', async () => { + const runId = 'test-run-missing-index-' + Date.now(); + const apiUrl = `http://localhost:${indexServerPort}`; + + const sink = new CloudTraceSink( + uploadUrl, + runId, + 'sk_test_123', + apiUrl + ); + + sink.emit({ v: 1, type: 'run_start', seq: 1 }); + + // Mock index generation to fail + const originalGenerate = (sink as any).generateIndex; + (sink as any).generateIndex = () => { + // Index generation fails/skips + console.log('Index generation skipped'); + }; + + await sink.close(); + + // Should not throw + expect(true).toBe(true); + + // Restore + (sink as any).generateIndex = originalGenerate; + }); + + it('should compress index file with gzip', async () => { + const runId = 'test-run-gzip-' + Date.now(); + const apiUrl = `http://localhost:${indexServerPort}`; + + const sink = new CloudTraceSink( + uploadUrl, + runId, + 'sk_test_123', + apiUrl + ); + + sink.emit({ v: 1, type: 'run_start', seq: 1, data: { agent: 'TestAgent' } }); + sink.emit({ v: 1, type: 'snapshot', seq: 2, data: { url: 'https://example.com' } }); + + await sink.close(); + + // Give async operations time to complete + await new Promise(resolve => setTimeout(resolve, 100)); + + // Index file should have been created and deleted after successful upload + const indexPath = path.join(persistentCacheDir, `${runId}.index.json`); + + // File should be deleted after successful upload + // (This is expected behavior - we clean up after upload) + // If the test runs fast enough, file might still exist briefly + }); + }); }); diff --git a/tests/video-recording.test.ts b/tests/video-recording.test.ts index f9877afe..4d5fb5ca 100644 --- a/tests/video-recording.test.ts +++ b/tests/video-recording.test.ts @@ -39,7 +39,7 @@ describe('video recording', () => { try { await browser.getPage().goto('https://example.com'); - await browser.getPage().waitForLoadState('networkidle'); + await browser.getPage().waitForLoadState('domcontentloaded'); const videoPath = await browser.close(); @@ -75,7 +75,7 @@ describe('video recording', () => { try { await browser.getPage().goto('https://example.com'); - await browser.getPage().waitForLoadState('networkidle'); + await browser.getPage().waitForLoadState('domcontentloaded'); const videoPath = await browser.close(); @@ -100,7 +100,7 @@ describe('video recording', () => { try { await browser.getPage().goto('https://example.com'); - await browser.getPage().waitForLoadState('networkidle'); + await browser.getPage().waitForLoadState('domcontentloaded'); const videoPath = await browser.close(customPath); @@ -126,7 +126,7 @@ describe('video recording', () => { try { await browser.getPage().goto('https://example.com'); - await browser.getPage().waitForLoadState('networkidle'); + await browser.getPage().waitForLoadState('domcontentloaded'); const videoPath = await browser.close(nestedPath); @@ -147,7 +147,7 @@ describe('video recording', () => { try { await browser.getPage().goto('https://example.com'); - await browser.getPage().waitForLoadState('networkidle'); + await browser.getPage().waitForLoadState('domcontentloaded'); const videoPath = await browser.close(); @@ -172,7 +172,7 @@ describe('video recording', () => { try { await browser.getPage().goto('https://example.com'); - await browser.getPage().waitForLoadState('networkidle'); + await browser.getPage().waitForLoadState('domcontentloaded'); const videoPath = await browser.close(); From 678d4ba646305ad28522daa7d8a85132fd146328 Mon Sep 17 00:00:00 2001 From: rcholic Date: Mon, 29 Dec 2025 18:50:22 -0800 Subject: [PATCH 2/2] fix tests --- tests/tracing/cloud-sink.test.ts | 49 +++++++++++++++++++++++++------- 1 file changed, 38 insertions(+), 11 deletions(-) diff --git a/tests/tracing/cloud-sink.test.ts b/tests/tracing/cloud-sink.test.ts index ad793cc5..b7b59d46 100644 --- a/tests/tracing/cloud-sink.test.ts +++ b/tests/tracing/cloud-sink.test.ts @@ -311,16 +311,31 @@ describe('CloudTraceSink', () => { beforeAll((done) => { // Create separate server for index upload API indexServer = http.createServer((req, res) => { - (indexServer as any).lastRequest = { - method: req.method, - url: req.url, - headers: req.headers, - }; + // Store ALL requests, not just the last one + if (!(indexServer as any).requests) { + (indexServer as any).requests = []; + } const chunks: Buffer[] = []; req.on('data', (chunk) => chunks.push(chunk)); req.on('end', () => { - (indexServer as any).lastRequestBody = Buffer.concat(chunks); + const requestBody = Buffer.concat(chunks); + + // Store this request + (indexServer as any).requests.push({ + method: req.method, + url: req.url, + headers: req.headers, + body: requestBody, + }); + + // Also keep lastRequest for backward compatibility + (indexServer as any).lastRequest = { + method: req.method, + url: req.url, + headers: req.headers, + }; + (indexServer as any).lastRequestBody = requestBody; if (req.url === '/v1/traces/index_upload') { // Return index upload URL @@ -332,6 +347,10 @@ describe('CloudTraceSink', () => { // Accept index upload res.writeHead(200); res.end('OK'); + } else if (req.url === '/v1/traces/complete') { + // Accept completion call + res.writeHead(200); + res.end('OK'); } else { res.writeHead(404); res.end('Not Found'); @@ -355,6 +374,7 @@ describe('CloudTraceSink', () => { beforeEach(() => { delete (indexServer as any).lastRequest; delete (indexServer as any).lastRequestBody; + (indexServer as any).requests = []; }); it('should upload index file after trace upload', async () => { @@ -376,12 +396,17 @@ describe('CloudTraceSink', () => { await sink.close(); // Verify index upload URL request was made - expect((indexServer as any).lastRequest).toBeDefined(); - expect((indexServer as any).lastRequest.url).toBe('/v1/traces/index_upload'); - expect((indexServer as any).lastRequest.method).toBe('POST'); + const requests = (indexServer as any).requests; + expect(requests).toBeDefined(); + expect(requests.length).toBeGreaterThan(0); + + // Find the index upload request + const indexUploadRequest = requests.find((r: any) => r.url === '/v1/traces/index_upload'); + expect(indexUploadRequest).toBeDefined(); + expect(indexUploadRequest.method).toBe('POST'); // Verify request body - const requestBody = JSON.parse((indexServer as any).lastRequestBody.toString()); + const requestBody = JSON.parse(indexUploadRequest.body.toString()); expect(requestBody.run_id).toBe(runId); // Give it a moment for async index upload to complete @@ -398,7 +423,9 @@ describe('CloudTraceSink', () => { await sink.close(); // Verify index upload was NOT attempted - expect((indexServer as any).lastRequest).toBeUndefined(); + const requests = (indexServer as any).requests; + const indexUploadRequest = requests.find((r: any) => r.url === '/v1/traces/index_upload'); + expect(indexUploadRequest).toBeUndefined(); }); it('should handle index upload failure gracefully', async () => {