-
Notifications
You must be signed in to change notification settings - Fork 40
Enhance logging for parameter query results in BucketChecksumS #462
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,5 @@ | ||
| --- | ||
| '@powersync/service-core': patch | ||
| --- | ||
|
|
||
| Add detailed logging for parameter query results to improve debugging when limits are exceeded. Checkpoint logs now include the total number of parameter query results (before deduplication), and error messages show a breakdown of the top 10 sync stream definitions contributing to the count. |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -118,7 +118,7 @@ export class BucketChecksumState { | |
| const storage = this.bucketStorage; | ||
|
|
||
| const update = await this.parameterState.getCheckpointUpdate(next); | ||
| const { buckets: allBuckets, updatedBuckets } = update; | ||
| const { buckets: allBuckets, updatedBuckets, parameterQueryResultsByDefinition } = update; | ||
|
|
||
| /** Set of all buckets in this checkpoint. */ | ||
| const bucketDescriptionMap = new Map(allBuckets.map((b) => [b.bucket, b])); | ||
|
|
@@ -217,15 +217,30 @@ export class BucketChecksumState { | |
| let message = `Updated checkpoint: ${base.checkpoint} | `; | ||
| message += `write: ${writeCheckpoint} | `; | ||
| message += `buckets: ${allBuckets.length} | `; | ||
| if (parameterQueryResultsByDefinition) { | ||
| const totalParamResults = Array.from(parameterQueryResultsByDefinition.values()).reduce( | ||
| (sum, count) => sum + count, | ||
| 0 | ||
| ); | ||
| message += `param_results: ${totalParamResults} | `; | ||
| } | ||
| message += `updated: ${limitedBuckets(diff.updatedBuckets, 20)} | `; | ||
| message += `removed: ${limitedBuckets(diff.removedBuckets, 20)}`; | ||
| this.logger.info(message, { | ||
| const logData: any = { | ||
| checkpoint: base.checkpoint, | ||
| user_id: user_id, | ||
| buckets: allBuckets.length, | ||
| updated: diff.updatedBuckets.length, | ||
| removed: diff.removedBuckets.length | ||
| }); | ||
| }; | ||
| if (parameterQueryResultsByDefinition) { | ||
| const totalParamResults = Array.from(parameterQueryResultsByDefinition.values()).reduce( | ||
| (sum, count) => sum + count, | ||
| 0 | ||
| ); | ||
| logData.parameter_query_results = totalParamResults; | ||
| } | ||
| this.logger.info(message, logData); | ||
| }; | ||
|
|
||
| checkpointLine = { | ||
|
|
@@ -239,8 +254,24 @@ export class BucketChecksumState { | |
| } else { | ||
| deferredLog = () => { | ||
| let message = `New checkpoint: ${base.checkpoint} | write: ${writeCheckpoint} | `; | ||
| message += `buckets: ${allBuckets.length} ${limitedBuckets(allBuckets, 20)}`; | ||
| this.logger.info(message, { checkpoint: base.checkpoint, user_id: user_id, buckets: allBuckets.length }); | ||
| message += `buckets: ${allBuckets.length}`; | ||
| if (parameterQueryResultsByDefinition) { | ||
| const totalParamResults = Array.from(parameterQueryResultsByDefinition.values()).reduce( | ||
| (sum, count) => sum + count, | ||
| 0 | ||
| ); | ||
| message += ` | param_results: ${totalParamResults}`; | ||
| } | ||
| message += ` ${limitedBuckets(allBuckets, 20)}`; | ||
| const logData: any = { checkpoint: base.checkpoint, user_id: user_id, buckets: allBuckets.length }; | ||
| if (parameterQueryResultsByDefinition) { | ||
| const totalParamResults = Array.from(parameterQueryResultsByDefinition.values()).reduce( | ||
| (sum, count) => sum + count, | ||
| 0 | ||
| ); | ||
| logData.parameter_query_results = totalParamResults; | ||
| } | ||
| this.logger.info(message, logData); | ||
| }; | ||
| bucketsToFetch = allBuckets.map((b) => ({ bucket: b.bucket, priority: b.priority })); | ||
|
|
||
|
|
@@ -371,6 +402,12 @@ export interface CheckpointUpdate { | |
| * If null, assume that any bucket in `buckets` may have been updated. | ||
| */ | ||
| updatedBuckets: Set<string> | typeof INVALIDATE_ALL_BUCKETS; | ||
|
|
||
| /** | ||
| * Number of parameter query results per sync stream definition (before deduplication). | ||
| * Map from definition name to count. | ||
| */ | ||
| parameterQueryResultsByDefinition?: Map<string, number>; | ||
| } | ||
|
|
||
| export class BucketParameterState { | ||
|
|
@@ -504,11 +541,36 @@ export class BucketParameterState { | |
| ErrorCode.PSYNC_S2305, | ||
| `Too many parameter query results: ${update.buckets.length} (limit of ${this.context.maxParameterQueryResults})` | ||
| ); | ||
| this.logger.error(error.message, { | ||
|
|
||
| // Build breakdown of parameter query results by definition | ||
| let errorMessage = error.message; | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same here - can we move the code to generate this message to a separate function? |
||
| const logData: any = { | ||
| checkpoint: checkpoint, | ||
| user_id: this.syncParams.userId, | ||
| buckets: update.buckets.length | ||
| }); | ||
| parameter_query_results: update.buckets.length | ||
| }; | ||
|
|
||
| if (update.parameterQueryResultsByDefinition && update.parameterQueryResultsByDefinition.size > 0) { | ||
| // Sort definitions by count (descending) and take top 10 | ||
| const sortedDefinitions = Array.from(update.parameterQueryResultsByDefinition.entries()) | ||
| .sort((a, b) => b[1] - a[1]) | ||
| .slice(0, 10); | ||
|
|
||
| errorMessage += '\nParameter query results by definition:'; | ||
| const countsByDefinition: Record<string, number> = {}; | ||
| for (const [definition, count] of sortedDefinitions) { | ||
| errorMessage += `\n ${definition}: ${count}`; | ||
| countsByDefinition[definition] = count; | ||
| } | ||
|
|
||
| if (update.parameterQueryResultsByDefinition.size > 10) { | ||
| errorMessage += `\n ... and ${update.parameterQueryResultsByDefinition.size - 10} more`; | ||
| } | ||
|
Comment on lines
+566
to
+568
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For this, can we log the remaining count, instead of or in addition to the number of definitions? |
||
|
|
||
| logData.parameter_query_results_by_definition = countsByDefinition; | ||
| } | ||
|
|
||
| this.logger.error(errorMessage, logData); | ||
|
|
||
| throw error; | ||
| } | ||
|
|
@@ -565,12 +627,21 @@ export class BucketParameterState { | |
| } | ||
|
|
||
| let dynamicBuckets: ResolvedBucket[]; | ||
| let parameterQueryResultsByDefinition: Map<string, number> | undefined; | ||
| if (hasParameterChange || this.cachedDynamicBuckets == null || this.cachedDynamicBucketSet == null) { | ||
| dynamicBuckets = await querier.queryDynamicBucketDescriptions({ | ||
| getParameterSets(lookups) { | ||
| return checkpoint.base.getParameterSets(lookups); | ||
| } | ||
| }); | ||
|
|
||
| // Count parameter query results per definition (before deduplication) | ||
| parameterQueryResultsByDefinition = new Map<string, number>(); | ||
| for (const bucket of dynamicBuckets) { | ||
| const count = parameterQueryResultsByDefinition.get(bucket.definition) ?? 0; | ||
| parameterQueryResultsByDefinition.set(bucket.definition, count + 1); | ||
| } | ||
|
|
||
| this.cachedDynamicBuckets = dynamicBuckets; | ||
| this.cachedDynamicBucketSet = new Set<string>(dynamicBuckets.map((b) => b.bucket)); | ||
| invalidateDataBuckets = true; | ||
|
|
@@ -592,12 +663,14 @@ export class BucketParameterState { | |
| return { | ||
| buckets: allBuckets, | ||
| // We cannot track individual bucket updates for dynamic lookups yet | ||
| updatedBuckets: INVALIDATE_ALL_BUCKETS | ||
| updatedBuckets: INVALIDATE_ALL_BUCKETS, | ||
| parameterQueryResultsByDefinition | ||
| }; | ||
| } else { | ||
| return { | ||
| buckets: allBuckets, | ||
| updatedBuckets: updatedBuckets | ||
| updatedBuckets: updatedBuckets, | ||
| parameterQueryResultsByDefinition | ||
| }; | ||
| } | ||
| } | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The log message code is getting quite long here - can this be moved to a separate function?