Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
169 changes: 168 additions & 1 deletion src/tracing/cloud-sink.ts
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,10 @@ export class CloudTraceSink extends TraceSink {
if (statusCode === 200) {
console.log('✅ [Sentience] Trace uploaded successfully');

// Call /v1/traces/complete to report file sizes (NEW)
// Upload trace index file
await this._uploadIndex();

// Call /v1/traces/complete to report file sizes
await this._completeTrace();

// 4. Delete temp file on success
Expand Down Expand Up @@ -381,6 +384,170 @@ export class CloudTraceSink extends TraceSink {
}
}

/**
* Upload trace index file to cloud storage.
*
* Called after successful trace upload to provide fast timeline rendering.
* The index file enables O(1) step lookups without parsing the entire trace.
*/
private async _uploadIndex(): Promise<void> {
// Construct index file path (same as trace file with .index.json extension)
const indexPath = this.tempFilePath.replace('.jsonl', '.index.json');

try {
// Check if index file exists
await fsPromises.access(indexPath);
} catch {
this.logger?.warn('Index file not found, skipping index upload');
return;
}

try {
// Request index upload URL from API
if (!this.apiKey) {
this.logger?.info('No API key provided, skipping index upload');
return;
}

const uploadUrlResponse = await this._requestIndexUploadUrl();
if (!uploadUrlResponse) {
return;
}

// Read and compress index file
const indexData = await fsPromises.readFile(indexPath);
const compressedIndex = zlib.gzipSync(indexData);
const indexSize = compressedIndex.length;

this.logger?.info(`Index file size: ${(indexSize / 1024).toFixed(2)} KB`);

console.log(`📤 [Sentience] Uploading trace index (${indexSize} bytes)...`);

// Upload index to cloud storage
const statusCode = await this._uploadIndexToCloud(uploadUrlResponse, compressedIndex);

if (statusCode === 200) {
console.log('✅ [Sentience] Trace index uploaded successfully');

// Delete local index file after successful upload
try {
await fsPromises.unlink(indexPath);
} catch {
// Ignore cleanup errors
}
} else {
this.logger?.warn(`Index upload failed: HTTP ${statusCode}`);
console.log(`⚠️ [Sentience] Index upload failed: HTTP ${statusCode}`);
}
} catch (error: any) {
// Non-fatal: log but don't crash
this.logger?.warn(`Error uploading trace index: ${error.message}`);
console.log(`⚠️ [Sentience] Error uploading trace index: ${error.message}`);
}
}

/**
* Request index upload URL from Sentience API
*/
private async _requestIndexUploadUrl(): Promise<string | null> {
return new Promise((resolve) => {
const url = new URL(`${this.apiUrl}/v1/traces/index_upload`);
const protocol = url.protocol === 'https:' ? https : http;

const body = JSON.stringify({ run_id: this.runId });

const options = {
hostname: url.hostname,
port: url.port || (url.protocol === 'https:' ? 443 : 80),
path: url.pathname + url.search,
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Content-Length': Buffer.byteLength(body),
Authorization: `Bearer ${this.apiKey}`,
},
timeout: 10000,
};

const req = protocol.request(options, (res) => {
let data = '';
res.on('data', (chunk) => {
data += chunk;
});
res.on('end', () => {
if (res.statusCode === 200) {
try {
const response = JSON.parse(data);
resolve(response.upload_url || null);
} catch {
this.logger?.warn('Failed to parse index upload URL response');
resolve(null);
}
} else {
this.logger?.warn(`Failed to get index upload URL: HTTP ${res.statusCode}`);
resolve(null);
}
});
});

req.on('error', (error) => {
this.logger?.warn(`Error requesting index upload URL: ${error.message}`);
resolve(null);
});

req.on('timeout', () => {
req.destroy();
this.logger?.warn('Index upload URL request timeout');
resolve(null);
});

req.write(body);
req.end();
});
}

/**
* Upload index data to cloud using pre-signed URL
*/
private async _uploadIndexToCloud(uploadUrl: string, data: Buffer): Promise<number> {
return new Promise((resolve, reject) => {
const url = new URL(uploadUrl);
const protocol = url.protocol === 'https:' ? https : http;

const options = {
hostname: url.hostname,
port: url.port || (url.protocol === 'https:' ? 443 : 80),
path: url.pathname + url.search,
method: 'PUT',
headers: {
'Content-Type': 'application/json',
'Content-Encoding': 'gzip',
'Content-Length': data.length,
},
timeout: 30000, // 30 second timeout
};

const req = protocol.request(options, (res) => {
res.on('data', () => {});
res.on('end', () => {
resolve(res.statusCode || 500);
});
});

req.on('error', (error) => {
reject(error);
});

req.on('timeout', () => {
req.destroy();
reject(new Error('Index upload timeout'));
});

req.write(data);
req.end();
});
}

/**
* Get unique identifier for this sink
*/
Expand Down
Loading