From 9ef8b9855de146bac82247303129b1c31748afbe Mon Sep 17 00:00:00 2001 From: Ibrahim hamzat Date: Tue, 13 Jan 2026 17:49:27 +0100 Subject: [PATCH] chore: Enhance logging for parameter query results in BucketChecksumState --- .changeset/parameter-query-logging.md | 5 + .../src/sync/BucketChecksumState.ts | 93 +++++++- .../test/src/sync/BucketChecksumState.test.ts | 200 ++++++++++++++++++ 3 files changed, 288 insertions(+), 10 deletions(-) create mode 100644 .changeset/parameter-query-logging.md diff --git a/.changeset/parameter-query-logging.md b/.changeset/parameter-query-logging.md new file mode 100644 index 000000000..f0beecf93 --- /dev/null +++ b/.changeset/parameter-query-logging.md @@ -0,0 +1,5 @@ +--- +'@powersync/service-core': patch +--- + +Add detailed logging for parameter query results to improve debugging when limits are exceeded. Checkpoint logs now include the total number of parameter query results (before deduplication), and error messages show a breakdown of the top 10 sync stream definitions contributing to the count. diff --git a/packages/service-core/src/sync/BucketChecksumState.ts b/packages/service-core/src/sync/BucketChecksumState.ts index d9afa0efb..6aa3512aa 100644 --- a/packages/service-core/src/sync/BucketChecksumState.ts +++ b/packages/service-core/src/sync/BucketChecksumState.ts @@ -118,7 +118,7 @@ export class BucketChecksumState { const storage = this.bucketStorage; const update = await this.parameterState.getCheckpointUpdate(next); - const { buckets: allBuckets, updatedBuckets } = update; + const { buckets: allBuckets, updatedBuckets, parameterQueryResultsByDefinition } = update; /** Set of all buckets in this checkpoint. */ const bucketDescriptionMap = new Map(allBuckets.map((b) => [b.bucket, b])); @@ -217,15 +217,30 @@ export class BucketChecksumState { let message = `Updated checkpoint: ${base.checkpoint} | `; message += `write: ${writeCheckpoint} | `; message += `buckets: ${allBuckets.length} | `; + if (parameterQueryResultsByDefinition) { + const totalParamResults = Array.from(parameterQueryResultsByDefinition.values()).reduce( + (sum, count) => sum + count, + 0 + ); + message += `param_results: ${totalParamResults} | `; + } message += `updated: ${limitedBuckets(diff.updatedBuckets, 20)} | `; message += `removed: ${limitedBuckets(diff.removedBuckets, 20)}`; - this.logger.info(message, { + const logData: any = { checkpoint: base.checkpoint, user_id: user_id, buckets: allBuckets.length, updated: diff.updatedBuckets.length, removed: diff.removedBuckets.length - }); + }; + if (parameterQueryResultsByDefinition) { + const totalParamResults = Array.from(parameterQueryResultsByDefinition.values()).reduce( + (sum, count) => sum + count, + 0 + ); + logData.parameter_query_results = totalParamResults; + } + this.logger.info(message, logData); }; checkpointLine = { @@ -239,8 +254,24 @@ export class BucketChecksumState { } else { deferredLog = () => { let message = `New checkpoint: ${base.checkpoint} | write: ${writeCheckpoint} | `; - message += `buckets: ${allBuckets.length} ${limitedBuckets(allBuckets, 20)}`; - this.logger.info(message, { checkpoint: base.checkpoint, user_id: user_id, buckets: allBuckets.length }); + message += `buckets: ${allBuckets.length}`; + if (parameterQueryResultsByDefinition) { + const totalParamResults = Array.from(parameterQueryResultsByDefinition.values()).reduce( + (sum, count) => sum + count, + 0 + ); + message += ` | param_results: ${totalParamResults}`; + } + message += ` ${limitedBuckets(allBuckets, 20)}`; + const logData: any = { checkpoint: base.checkpoint, user_id: user_id, buckets: allBuckets.length }; + if (parameterQueryResultsByDefinition) { + const totalParamResults = Array.from(parameterQueryResultsByDefinition.values()).reduce( + (sum, count) => sum + count, + 0 + ); + logData.parameter_query_results = totalParamResults; + } + this.logger.info(message, logData); }; bucketsToFetch = allBuckets.map((b) => ({ bucket: b.bucket, priority: b.priority })); @@ -371,6 +402,12 @@ export interface CheckpointUpdate { * If null, assume that any bucket in `buckets` may have been updated. */ updatedBuckets: Set | typeof INVALIDATE_ALL_BUCKETS; + + /** + * Number of parameter query results per sync stream definition (before deduplication). + * Map from definition name to count. + */ + parameterQueryResultsByDefinition?: Map; } export class BucketParameterState { @@ -504,11 +541,36 @@ export class BucketParameterState { ErrorCode.PSYNC_S2305, `Too many parameter query results: ${update.buckets.length} (limit of ${this.context.maxParameterQueryResults})` ); - this.logger.error(error.message, { + + // Build breakdown of parameter query results by definition + let errorMessage = error.message; + const logData: any = { checkpoint: checkpoint, user_id: this.syncParams.userId, - buckets: update.buckets.length - }); + parameter_query_results: update.buckets.length + }; + + if (update.parameterQueryResultsByDefinition && update.parameterQueryResultsByDefinition.size > 0) { + // Sort definitions by count (descending) and take top 10 + const sortedDefinitions = Array.from(update.parameterQueryResultsByDefinition.entries()) + .sort((a, b) => b[1] - a[1]) + .slice(0, 10); + + errorMessage += '\nParameter query results by definition:'; + const countsByDefinition: Record = {}; + for (const [definition, count] of sortedDefinitions) { + errorMessage += `\n ${definition}: ${count}`; + countsByDefinition[definition] = count; + } + + if (update.parameterQueryResultsByDefinition.size > 10) { + errorMessage += `\n ... and ${update.parameterQueryResultsByDefinition.size - 10} more`; + } + + logData.parameter_query_results_by_definition = countsByDefinition; + } + + this.logger.error(errorMessage, logData); throw error; } @@ -565,12 +627,21 @@ export class BucketParameterState { } let dynamicBuckets: ResolvedBucket[]; + let parameterQueryResultsByDefinition: Map | undefined; if (hasParameterChange || this.cachedDynamicBuckets == null || this.cachedDynamicBucketSet == null) { dynamicBuckets = await querier.queryDynamicBucketDescriptions({ getParameterSets(lookups) { return checkpoint.base.getParameterSets(lookups); } }); + + // Count parameter query results per definition (before deduplication) + parameterQueryResultsByDefinition = new Map(); + for (const bucket of dynamicBuckets) { + const count = parameterQueryResultsByDefinition.get(bucket.definition) ?? 0; + parameterQueryResultsByDefinition.set(bucket.definition, count + 1); + } + this.cachedDynamicBuckets = dynamicBuckets; this.cachedDynamicBucketSet = new Set(dynamicBuckets.map((b) => b.bucket)); invalidateDataBuckets = true; @@ -592,12 +663,14 @@ export class BucketParameterState { return { buckets: allBuckets, // We cannot track individual bucket updates for dynamic lookups yet - updatedBuckets: INVALIDATE_ALL_BUCKETS + updatedBuckets: INVALIDATE_ALL_BUCKETS, + parameterQueryResultsByDefinition }; } else { return { buckets: allBuckets, - updatedBuckets: updatedBuckets + updatedBuckets: updatedBuckets, + parameterQueryResultsByDefinition }; } } diff --git a/packages/service-core/test/src/sync/BucketChecksumState.test.ts b/packages/service-core/test/src/sync/BucketChecksumState.test.ts index 26c986521..c3a8075e7 100644 --- a/packages/service-core/test/src/sync/BucketChecksumState.test.ts +++ b/packages/service-core/test/src/sync/BucketChecksumState.test.ts @@ -819,6 +819,206 @@ config: }); }); }); + + describe('parameter query result logging', () => { + test('logs parameter query results for dynamic buckets', async () => { + const storage = new MockBucketChecksumStateStorage(); + storage.updateTestChecksum({ bucket: 'by_project[1]', checksum: 1, count: 1 }); + storage.updateTestChecksum({ bucket: 'by_project[2]', checksum: 1, count: 1 }); + storage.updateTestChecksum({ bucket: 'by_project[3]', checksum: 1, count: 1 }); + + const logMessages: string[] = []; + const logData: any[] = []; + const mockLogger = { + info: (message: string, data: any) => { + logMessages.push(message); + logData.push(data); + }, + error: () => {}, + warn: () => {}, + debug: () => {} + }; + + const state = new BucketChecksumState({ + syncContext, + tokenPayload: { sub: 'u1' }, + syncRequest, + syncRules: SYNC_RULES_DYNAMIC, + bucketStorage: storage, + logger: mockLogger as any + }); + + const line = (await state.buildNextCheckpointLine({ + base: storage.makeCheckpoint(1n, (lookups) => { + return [{ id: 1 }, { id: 2 }, { id: 3 }]; + }), + writeCheckpoint: null, + update: CHECKPOINT_INVALIDATE_ALL + }))!; + line.advance(); + + expect(logMessages[0]).toContain('param_results: 3'); + expect(logData[0].parameter_query_results).toBe(3); + }); + + test('throws error with breakdown when parameter query limit is exceeded', async () => { + const SYNC_RULES_MULTI = SqlSyncRules.fromYaml( + ` +bucket_definitions: + projects: + parameters: select id from projects where user_id = request.user_id() + data: [] + tasks: + parameters: select id from tasks where user_id = request.user_id() + data: [] + comments: + parameters: select id from comments where user_id = request.user_id() + data: [] + `, + { defaultSchema: 'public' } + ).hydrate({ hydrationState: versionedHydrationState(4) }); + + const storage = new MockBucketChecksumStateStorage(); + + const errorMessages: string[] = []; + const errorData: any[] = []; + const mockLogger = { + info: () => {}, + error: (message: string, data: any) => { + errorMessages.push(message); + errorData.push(data); + }, + warn: () => {}, + debug: () => {} + }; + + const smallContext = new SyncContext({ + maxBuckets: 100, + maxParameterQueryResults: 50, + maxDataFetchConcurrency: 10 + }); + + const state = new BucketChecksumState({ + syncContext: smallContext, + tokenPayload: { sub: 'u1' }, + syncRequest, + syncRules: SYNC_RULES_MULTI, + bucketStorage: storage, + logger: mockLogger as any + }); + + // Create 60 total results: 30 projects + 20 tasks + 10 comments + const projectIds = Array.from({ length: 30 }, (_, i) => ({ id: i + 1 })); + const taskIds = Array.from({ length: 20 }, (_, i) => ({ id: i + 1 })); + const commentIds = Array.from({ length: 10 }, (_, i) => ({ id: i + 1 })); + + for (let i = 1; i <= 30; i++) { + storage.updateTestChecksum({ bucket: `projects[${i}]`, checksum: 1, count: 1 }); + } + for (let i = 1; i <= 20; i++) { + storage.updateTestChecksum({ bucket: `tasks[${i}]`, checksum: 1, count: 1 }); + } + for (let i = 1; i <= 10; i++) { + storage.updateTestChecksum({ bucket: `comments[${i}]`, checksum: 1, count: 1 }); + } + + await expect( + state.buildNextCheckpointLine({ + base: storage.makeCheckpoint(1n, (lookups) => { + const lookup = lookups[0]; + const lookupName = lookup.values[0]; + if (lookupName === 'projects') { + return projectIds; + } else if (lookupName === 'tasks') { + return taskIds; + } else { + return commentIds; + } + }), + writeCheckpoint: null, + update: CHECKPOINT_INVALIDATE_ALL + }) + ).rejects.toThrow('Too many parameter query results: 60 (limit of 50)'); + + // Verify error log includes breakdown + expect(errorMessages[0]).toContain('Parameter query results by definition:'); + expect(errorMessages[0]).toContain('projects: 30'); + expect(errorMessages[0]).toContain('tasks: 20'); + expect(errorMessages[0]).toContain('comments: 10'); + + expect(errorData[0].parameter_query_results).toBe(60); + expect(errorData[0].parameter_query_results_by_definition).toEqual({ + projects: 30, + tasks: 20, + comments: 10 + }); + }); + + test('limits breakdown to top 10 definitions', async () => { + // Create sync rules with 15 different definitions with dynamic parameters + let yamlDefinitions = 'bucket_definitions:\n'; + for (let i = 1; i <= 15; i++) { + yamlDefinitions += ` def${i}:\n`; + yamlDefinitions += ` parameters: select id from def${i}_table where user_id = request.user_id()\n`; + yamlDefinitions += ` data: []\n`; + } + + const SYNC_RULES_MANY = SqlSyncRules.fromYaml(yamlDefinitions, { defaultSchema: 'public' }).hydrate({ + hydrationState: versionedHydrationState(5) + }); + + const storage = new MockBucketChecksumStateStorage(); + + const errorMessages: string[] = []; + const mockLogger = { + info: () => {}, + error: (message: string) => { + errorMessages.push(message); + }, + warn: () => {}, + debug: () => {} + }; + + const smallContext = new SyncContext({ + maxBuckets: 100, + maxParameterQueryResults: 10, + maxDataFetchConcurrency: 10 + }); + + const state = new BucketChecksumState({ + syncContext: smallContext, + tokenPayload: { sub: 'u1' }, + syncRequest, + syncRules: SYNC_RULES_MANY, + bucketStorage: storage, + logger: mockLogger as any + }); + + // Each definition creates one bucket, total 15 buckets + for (let i = 1; i <= 15; i++) { + storage.updateTestChecksum({ bucket: `def${i}[${i}]`, checksum: 1, count: 1 }); + } + + await expect( + state.buildNextCheckpointLine({ + base: storage.makeCheckpoint(1n, (lookups) => { + // Return one result for each definition + return [{ id: 1 }]; + }), + writeCheckpoint: null, + update: CHECKPOINT_INVALIDATE_ALL + }) + ).rejects.toThrow('Too many parameter query results: 15 (limit of 10)'); + + // Verify only top 10 are shown + const errorMessage = errorMessages[0]; + expect(errorMessage).toContain('... and 5 more'); + + // Count how many definitions are listed (should be exactly 10) + const defMatches = errorMessage.match(/def\d+:/g); + expect(defMatches?.length).toBe(10); + }); + }); }); class MockBucketChecksumStateStorage implements BucketChecksumStateStorage {