Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/parameter-query-logging.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@powersync/service-core': patch
---

Add detailed logging for parameter query results to improve debugging when limits are exceeded. Checkpoint logs now include the total number of parameter query results (before deduplication), and error messages show a breakdown of the top 10 sync stream definitions contributing to the count.
93 changes: 83 additions & 10 deletions packages/service-core/src/sync/BucketChecksumState.ts
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ export class BucketChecksumState {
const storage = this.bucketStorage;

const update = await this.parameterState.getCheckpointUpdate(next);
const { buckets: allBuckets, updatedBuckets } = update;
const { buckets: allBuckets, updatedBuckets, parameterQueryResultsByDefinition } = update;

/** Set of all buckets in this checkpoint. */
const bucketDescriptionMap = new Map(allBuckets.map((b) => [b.bucket, b]));
Expand Down Expand Up @@ -217,15 +217,30 @@ export class BucketChecksumState {
let message = `Updated checkpoint: ${base.checkpoint} | `;
message += `write: ${writeCheckpoint} | `;
message += `buckets: ${allBuckets.length} | `;
if (parameterQueryResultsByDefinition) {
const totalParamResults = Array.from(parameterQueryResultsByDefinition.values()).reduce(
(sum, count) => sum + count,
0
);
message += `param_results: ${totalParamResults} | `;
}
message += `updated: ${limitedBuckets(diff.updatedBuckets, 20)} | `;
message += `removed: ${limitedBuckets(diff.removedBuckets, 20)}`;
this.logger.info(message, {
const logData: any = {
checkpoint: base.checkpoint,
user_id: user_id,
buckets: allBuckets.length,
updated: diff.updatedBuckets.length,
removed: diff.removedBuckets.length
});
};
if (parameterQueryResultsByDefinition) {
const totalParamResults = Array.from(parameterQueryResultsByDefinition.values()).reduce(
(sum, count) => sum + count,
0
);
logData.parameter_query_results = totalParamResults;
}
this.logger.info(message, logData);
};

checkpointLine = {
Expand All @@ -239,8 +254,24 @@ export class BucketChecksumState {
} else {
deferredLog = () => {
let message = `New checkpoint: ${base.checkpoint} | write: ${writeCheckpoint} | `;
message += `buckets: ${allBuckets.length} ${limitedBuckets(allBuckets, 20)}`;
this.logger.info(message, { checkpoint: base.checkpoint, user_id: user_id, buckets: allBuckets.length });
message += `buckets: ${allBuckets.length}`;
if (parameterQueryResultsByDefinition) {
const totalParamResults = Array.from(parameterQueryResultsByDefinition.values()).reduce(
(sum, count) => sum + count,
0
);
message += ` | param_results: ${totalParamResults}`;
}
message += ` ${limitedBuckets(allBuckets, 20)}`;
const logData: any = { checkpoint: base.checkpoint, user_id: user_id, buckets: allBuckets.length };
if (parameterQueryResultsByDefinition) {
const totalParamResults = Array.from(parameterQueryResultsByDefinition.values()).reduce(
(sum, count) => sum + count,
0
);
logData.parameter_query_results = totalParamResults;
}
this.logger.info(message, logData);
Comment on lines +257 to +274
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The log message code is getting quite long here - can this be moved to a separate function?

};
bucketsToFetch = allBuckets.map((b) => ({ bucket: b.bucket, priority: b.priority }));

Expand Down Expand Up @@ -371,6 +402,12 @@ export interface CheckpointUpdate {
* If null, assume that any bucket in `buckets` may have been updated.
*/
updatedBuckets: Set<string> | typeof INVALIDATE_ALL_BUCKETS;

/**
* Number of parameter query results per sync stream definition (before deduplication).
* Map from definition name to count.
*/
parameterQueryResultsByDefinition?: Map<string, number>;
}

export class BucketParameterState {
Expand Down Expand Up @@ -504,11 +541,36 @@ export class BucketParameterState {
ErrorCode.PSYNC_S2305,
`Too many parameter query results: ${update.buckets.length} (limit of ${this.context.maxParameterQueryResults})`
);
this.logger.error(error.message, {

// Build breakdown of parameter query results by definition
let errorMessage = error.message;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here - can we move the code to generate this message to a separate function?

const logData: any = {
checkpoint: checkpoint,
user_id: this.syncParams.userId,
buckets: update.buckets.length
});
parameter_query_results: update.buckets.length
};

if (update.parameterQueryResultsByDefinition && update.parameterQueryResultsByDefinition.size > 0) {
// Sort definitions by count (descending) and take top 10
const sortedDefinitions = Array.from(update.parameterQueryResultsByDefinition.entries())
.sort((a, b) => b[1] - a[1])
.slice(0, 10);

errorMessage += '\nParameter query results by definition:';
const countsByDefinition: Record<string, number> = {};
for (const [definition, count] of sortedDefinitions) {
errorMessage += `\n ${definition}: ${count}`;
countsByDefinition[definition] = count;
}

if (update.parameterQueryResultsByDefinition.size > 10) {
errorMessage += `\n ... and ${update.parameterQueryResultsByDefinition.size - 10} more`;
}
Comment on lines +566 to +568
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For this, can we log the remaining count, instead of or in addition to the number of definitions?


logData.parameter_query_results_by_definition = countsByDefinition;
}

this.logger.error(errorMessage, logData);

throw error;
}
Expand Down Expand Up @@ -565,12 +627,21 @@ export class BucketParameterState {
}

let dynamicBuckets: ResolvedBucket[];
let parameterQueryResultsByDefinition: Map<string, number> | undefined;
if (hasParameterChange || this.cachedDynamicBuckets == null || this.cachedDynamicBucketSet == null) {
dynamicBuckets = await querier.queryDynamicBucketDescriptions({
getParameterSets(lookups) {
return checkpoint.base.getParameterSets(lookups);
}
});

// Count parameter query results per definition (before deduplication)
parameterQueryResultsByDefinition = new Map<string, number>();
for (const bucket of dynamicBuckets) {
const count = parameterQueryResultsByDefinition.get(bucket.definition) ?? 0;
parameterQueryResultsByDefinition.set(bucket.definition, count + 1);
}

this.cachedDynamicBuckets = dynamicBuckets;
this.cachedDynamicBucketSet = new Set<string>(dynamicBuckets.map((b) => b.bucket));
invalidateDataBuckets = true;
Expand All @@ -592,12 +663,14 @@ export class BucketParameterState {
return {
buckets: allBuckets,
// We cannot track individual bucket updates for dynamic lookups yet
updatedBuckets: INVALIDATE_ALL_BUCKETS
updatedBuckets: INVALIDATE_ALL_BUCKETS,
parameterQueryResultsByDefinition
};
} else {
return {
buckets: allBuckets,
updatedBuckets: updatedBuckets
updatedBuckets: updatedBuckets,
parameterQueryResultsByDefinition
};
}
}
Expand Down
200 changes: 200 additions & 0 deletions packages/service-core/test/src/sync/BucketChecksumState.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -819,6 +819,206 @@ config:
});
});
});

describe('parameter query result logging', () => {
test('logs parameter query results for dynamic buckets', async () => {
const storage = new MockBucketChecksumStateStorage();
storage.updateTestChecksum({ bucket: 'by_project[1]', checksum: 1, count: 1 });
storage.updateTestChecksum({ bucket: 'by_project[2]', checksum: 1, count: 1 });
storage.updateTestChecksum({ bucket: 'by_project[3]', checksum: 1, count: 1 });

const logMessages: string[] = [];
const logData: any[] = [];
const mockLogger = {
info: (message: string, data: any) => {
logMessages.push(message);
logData.push(data);
},
error: () => {},
warn: () => {},
debug: () => {}
};

const state = new BucketChecksumState({
syncContext,
tokenPayload: { sub: 'u1' },
syncRequest,
syncRules: SYNC_RULES_DYNAMIC,
bucketStorage: storage,
logger: mockLogger as any
});

const line = (await state.buildNextCheckpointLine({
base: storage.makeCheckpoint(1n, (lookups) => {
return [{ id: 1 }, { id: 2 }, { id: 3 }];
}),
writeCheckpoint: null,
update: CHECKPOINT_INVALIDATE_ALL
}))!;
line.advance();

expect(logMessages[0]).toContain('param_results: 3');
expect(logData[0].parameter_query_results).toBe(3);
});

test('throws error with breakdown when parameter query limit is exceeded', async () => {
const SYNC_RULES_MULTI = SqlSyncRules.fromYaml(
`
bucket_definitions:
projects:
parameters: select id from projects where user_id = request.user_id()
data: []
tasks:
parameters: select id from tasks where user_id = request.user_id()
data: []
comments:
parameters: select id from comments where user_id = request.user_id()
data: []
`,
{ defaultSchema: 'public' }
).hydrate({ hydrationState: versionedHydrationState(4) });

const storage = new MockBucketChecksumStateStorage();

const errorMessages: string[] = [];
const errorData: any[] = [];
const mockLogger = {
info: () => {},
error: (message: string, data: any) => {
errorMessages.push(message);
errorData.push(data);
},
warn: () => {},
debug: () => {}
};

const smallContext = new SyncContext({
maxBuckets: 100,
maxParameterQueryResults: 50,
maxDataFetchConcurrency: 10
});

const state = new BucketChecksumState({
syncContext: smallContext,
tokenPayload: { sub: 'u1' },
syncRequest,
syncRules: SYNC_RULES_MULTI,
bucketStorage: storage,
logger: mockLogger as any
});

// Create 60 total results: 30 projects + 20 tasks + 10 comments
const projectIds = Array.from({ length: 30 }, (_, i) => ({ id: i + 1 }));
const taskIds = Array.from({ length: 20 }, (_, i) => ({ id: i + 1 }));
const commentIds = Array.from({ length: 10 }, (_, i) => ({ id: i + 1 }));

for (let i = 1; i <= 30; i++) {
storage.updateTestChecksum({ bucket: `projects[${i}]`, checksum: 1, count: 1 });
}
for (let i = 1; i <= 20; i++) {
storage.updateTestChecksum({ bucket: `tasks[${i}]`, checksum: 1, count: 1 });
}
for (let i = 1; i <= 10; i++) {
storage.updateTestChecksum({ bucket: `comments[${i}]`, checksum: 1, count: 1 });
}

await expect(
state.buildNextCheckpointLine({
base: storage.makeCheckpoint(1n, (lookups) => {
const lookup = lookups[0];
const lookupName = lookup.values[0];
if (lookupName === 'projects') {
return projectIds;
} else if (lookupName === 'tasks') {
return taskIds;
} else {
return commentIds;
}
}),
writeCheckpoint: null,
update: CHECKPOINT_INVALIDATE_ALL
})
).rejects.toThrow('Too many parameter query results: 60 (limit of 50)');

// Verify error log includes breakdown
expect(errorMessages[0]).toContain('Parameter query results by definition:');
expect(errorMessages[0]).toContain('projects: 30');
expect(errorMessages[0]).toContain('tasks: 20');
expect(errorMessages[0]).toContain('comments: 10');

expect(errorData[0].parameter_query_results).toBe(60);
expect(errorData[0].parameter_query_results_by_definition).toEqual({
projects: 30,
tasks: 20,
comments: 10
});
});

test('limits breakdown to top 10 definitions', async () => {
// Create sync rules with 15 different definitions with dynamic parameters
let yamlDefinitions = 'bucket_definitions:\n';
for (let i = 1; i <= 15; i++) {
yamlDefinitions += ` def${i}:\n`;
yamlDefinitions += ` parameters: select id from def${i}_table where user_id = request.user_id()\n`;
yamlDefinitions += ` data: []\n`;
}

const SYNC_RULES_MANY = SqlSyncRules.fromYaml(yamlDefinitions, { defaultSchema: 'public' }).hydrate({
hydrationState: versionedHydrationState(5)
});

const storage = new MockBucketChecksumStateStorage();

const errorMessages: string[] = [];
const mockLogger = {
info: () => {},
error: (message: string) => {
errorMessages.push(message);
},
warn: () => {},
debug: () => {}
};

const smallContext = new SyncContext({
maxBuckets: 100,
maxParameterQueryResults: 10,
maxDataFetchConcurrency: 10
});

const state = new BucketChecksumState({
syncContext: smallContext,
tokenPayload: { sub: 'u1' },
syncRequest,
syncRules: SYNC_RULES_MANY,
bucketStorage: storage,
logger: mockLogger as any
});

// Each definition creates one bucket, total 15 buckets
for (let i = 1; i <= 15; i++) {
storage.updateTestChecksum({ bucket: `def${i}[${i}]`, checksum: 1, count: 1 });
}

await expect(
state.buildNextCheckpointLine({
base: storage.makeCheckpoint(1n, (lookups) => {
// Return one result for each definition
return [{ id: 1 }];
}),
writeCheckpoint: null,
update: CHECKPOINT_INVALIDATE_ALL
})
).rejects.toThrow('Too many parameter query results: 15 (limit of 10)');

// Verify only top 10 are shown
const errorMessage = errorMessages[0];
expect(errorMessage).toContain('... and 5 more');

// Count how many definitions are listed (should be exactly 10)
const defMatches = errorMessage.match(/def\d+:/g);
expect(defMatches?.length).toBe(10);
});
});
});

class MockBucketChecksumStateStorage implements BucketChecksumStateStorage {
Expand Down