From 8d09d50750118247b6879eb90ea5bb6ac4983aa1 Mon Sep 17 00:00:00 2001 From: Jose Vargas Date: Tue, 10 Feb 2026 18:19:14 -0600 Subject: [PATCH 1/3] test(postgres-storage): add regression test for COLLATE "C" compactor fix Adds a test that verifies the compactor properly handles bucket_name comparisons when using U+FFFF as a sentinel upper bound. Under locale-aware collation (e.g. en_US.UTF-8), alphanumeric strings can sort after U+FFFF, causing the initial pagination query to return zero rows and skip compaction entirely. The test inserts two PUT ops for the same row, then compacts without specifying compactBuckets (triggering the bucketUpper = U+FFFF path). After compaction, the older PUT should be converted to MOVE - which only works correctly with COLLATE "C" forcing byte-order comparison. --- .../test/src/storage_compacting.test.ts | 69 ++++++++++++++++++- 1 file changed, 67 insertions(+), 2 deletions(-) diff --git a/modules/module-postgres-storage/test/src/storage_compacting.test.ts b/modules/module-postgres-storage/test/src/storage_compacting.test.ts index f0e09e89b..8ec056128 100644 --- a/modules/module-postgres-storage/test/src/storage_compacting.test.ts +++ b/modules/module-postgres-storage/test/src/storage_compacting.test.ts @@ -1,5 +1,70 @@ -import { register } from '@powersync/service-core-tests'; -import { describe } from 'vitest'; +import { storage } from '@powersync/service-core'; +import { register, TEST_TABLE, test_utils } from '@powersync/service-core-tests'; +import { describe, expect, test } from 'vitest'; import { POSTGRES_STORAGE_FACTORY } from './util.js'; describe('Postgres Sync Bucket Storage Compact', () => register.registerCompactTests(POSTGRES_STORAGE_FACTORY)); + +/** + * Regression test: the compactor's pagination query uses U+FFFF as a sentinel + * upper bound for bucket_name comparisons. Under locale-aware collation + * (e.g. en_US.UTF-8), alphanumeric strings sort AFTER U+FFFF, causing the + * initial query to return zero rows and skip compaction entirely. + * COLLATE "C" on that comparison forces byte-order, where U+FFFF sorts last. + * + * This test inserts two PUT ops for the same row (t1) into a single bucket, + * then compacts without specifying compactBuckets — triggering the + * bucket == null path where bucketUpper = U+FFFF. After compaction, the older + * PUT should be converted to a MOVE. Without COLLATE "C", no compaction + * occurs and both ops remain as PUT. + */ +describe('Postgres Compact - COLLATE regression', () => { + test('compacts when bucket_name is compared against U+FFFF sentinel', async () => { + await using factory = await POSTGRES_STORAGE_FACTORY(); + const syncRules = await factory.updateSyncRules({ + content: ` +bucket_definitions: + global: + data: [select * from test] + ` + }); + const bucketStorage = factory.getInstance(syncRules); + + // Insert two PUTs for the same row — the older one should become a MOVE after compaction + const result = await bucketStorage.startBatch(test_utils.BATCH_OPTIONS, async (batch) => { + await batch.save({ + sourceTable: TEST_TABLE, + tag: storage.SaveOperationTag.INSERT, + after: { id: 't1' }, + afterReplicaId: test_utils.rid('t1') + }); + await batch.save({ + sourceTable: TEST_TABLE, + tag: storage.SaveOperationTag.UPDATE, + after: { id: 't1' }, + afterReplicaId: test_utils.rid('t1') + }); + await batch.commit('1/1'); + }); + + const checkpoint = result!.flushed_op; + + // Compact WITHOUT specifying compactBuckets — this triggers bucketUpper = U+FFFF + await bucketStorage.compact({ + moveBatchLimit: 1, + moveBatchQueryLimit: 1, + minBucketChanges: 1 + }); + + const batch = await test_utils.oneFromAsync( + bucketStorage.getBucketDataBatch(checkpoint, new Map([['global[]', 0n]])) + ); + + // Op 1 should be converted to MOVE; op 2 stays as PUT. + // Without COLLATE "C", both remain as PUT (compaction was skipped). + expect(batch.chunkData.data).toMatchObject([ + { op_id: '1', op: 'MOVE' }, + { op_id: '2', op: 'PUT', object_id: 't1' } + ]); + }); +}); From a2da7fb2a9831699a216954c07129f8f52d0863a Mon Sep 17 00:00:00 2001 From: Jose Vargas Date: Tue, 10 Feb 2026 23:49:38 -0600 Subject: [PATCH 2/3] test(postgres-storage): remove COLLATE regression test The COLLATE regression test (Postgres Compact - COLLATE regression) verified that compaction works when bucket_name is compared against the U+FFFF sentinel under locale-aware collation. This test is redundant with the shared compacting tests in register-compacting-tests.ts: - compacting (1) through (4) all exercise the bucket==null (compact-all) path - They verify MOVE conversion with stronger assertions (validateCompactedBucket) - The extra test was a minimal reproduction of the same code path The sentinel and COLLATE "C" will be removed entirely as part of the compactor refactor, making this regression impossible to trigger. The shared tests provide full coverage of the behavior we care about: MOVE conversion works correctly when compacting all buckets. --- .../test/src/storage_compacting.test.ts | 69 +------------------ 1 file changed, 2 insertions(+), 67 deletions(-) diff --git a/modules/module-postgres-storage/test/src/storage_compacting.test.ts b/modules/module-postgres-storage/test/src/storage_compacting.test.ts index 8ec056128..f0e09e89b 100644 --- a/modules/module-postgres-storage/test/src/storage_compacting.test.ts +++ b/modules/module-postgres-storage/test/src/storage_compacting.test.ts @@ -1,70 +1,5 @@ -import { storage } from '@powersync/service-core'; -import { register, TEST_TABLE, test_utils } from '@powersync/service-core-tests'; -import { describe, expect, test } from 'vitest'; +import { register } from '@powersync/service-core-tests'; +import { describe } from 'vitest'; import { POSTGRES_STORAGE_FACTORY } from './util.js'; describe('Postgres Sync Bucket Storage Compact', () => register.registerCompactTests(POSTGRES_STORAGE_FACTORY)); - -/** - * Regression test: the compactor's pagination query uses U+FFFF as a sentinel - * upper bound for bucket_name comparisons. Under locale-aware collation - * (e.g. en_US.UTF-8), alphanumeric strings sort AFTER U+FFFF, causing the - * initial query to return zero rows and skip compaction entirely. - * COLLATE "C" on that comparison forces byte-order, where U+FFFF sorts last. - * - * This test inserts two PUT ops for the same row (t1) into a single bucket, - * then compacts without specifying compactBuckets — triggering the - * bucket == null path where bucketUpper = U+FFFF. After compaction, the older - * PUT should be converted to a MOVE. Without COLLATE "C", no compaction - * occurs and both ops remain as PUT. - */ -describe('Postgres Compact - COLLATE regression', () => { - test('compacts when bucket_name is compared against U+FFFF sentinel', async () => { - await using factory = await POSTGRES_STORAGE_FACTORY(); - const syncRules = await factory.updateSyncRules({ - content: ` -bucket_definitions: - global: - data: [select * from test] - ` - }); - const bucketStorage = factory.getInstance(syncRules); - - // Insert two PUTs for the same row — the older one should become a MOVE after compaction - const result = await bucketStorage.startBatch(test_utils.BATCH_OPTIONS, async (batch) => { - await batch.save({ - sourceTable: TEST_TABLE, - tag: storage.SaveOperationTag.INSERT, - after: { id: 't1' }, - afterReplicaId: test_utils.rid('t1') - }); - await batch.save({ - sourceTable: TEST_TABLE, - tag: storage.SaveOperationTag.UPDATE, - after: { id: 't1' }, - afterReplicaId: test_utils.rid('t1') - }); - await batch.commit('1/1'); - }); - - const checkpoint = result!.flushed_op; - - // Compact WITHOUT specifying compactBuckets — this triggers bucketUpper = U+FFFF - await bucketStorage.compact({ - moveBatchLimit: 1, - moveBatchQueryLimit: 1, - minBucketChanges: 1 - }); - - const batch = await test_utils.oneFromAsync( - bucketStorage.getBucketDataBatch(checkpoint, new Map([['global[]', 0n]])) - ); - - // Op 1 should be converted to MOVE; op 2 stays as PUT. - // Without COLLATE "C", both remain as PUT (compaction was skipped). - expect(batch.chunkData.data).toMatchObject([ - { op_id: '1', op: 'MOVE' }, - { op_id: '2', op: 'PUT', object_id: 't1' } - ]); - }); -}); From a9cfb37e6018a65e460bb20630c02ef2005f2886 Mon Sep 17 00:00:00 2001 From: Jose Vargas Date: Wed, 11 Feb 2026 01:01:46 -0600 Subject: [PATCH 3/3] refactor(postgres): simplify bucket compaction to process one bucket at a time Replace complex bucket range filtering in PostgresCompactor with a cleaner two-method approach: compactAllBuckets() discovers buckets in batches and compactSingleBucket() handles individual bucket compaction. Changes: - Split compactInternal into compactAllBuckets and compactSingleBucket - Remove bucket range string matching (bucketLower/bucketUpper) logic - Add CLI validation requiring full bucket names (e.g., "global[]") - Update documentation to clarify compactBuckets format requirements - Add test coverage for explicit bucket name compaction --- .../src/storage/PostgresCompactor.ts | 106 +++++++----------- .../test/src/storage_compacting.test.ts | 56 ++++++++- .../src/entry/commands/compact-action.ts | 14 ++- .../src/storage/SyncRulesBucketStorage.ts | 3 +- 4 files changed, 110 insertions(+), 69 deletions(-) diff --git a/modules/module-postgres-storage/src/storage/PostgresCompactor.ts b/modules/module-postgres-storage/src/storage/PostgresCompactor.ts index b19d866eb..def07e380 100644 --- a/modules/module-postgres-storage/src/storage/PostgresCompactor.ts +++ b/modules/module-postgres-storage/src/storage/PostgresCompactor.ts @@ -75,37 +75,50 @@ export class PostgresCompactor { async compact() { if (this.buckets) { for (let bucket of this.buckets) { - // We can make this more efficient later on by iterating - // through the buckets in a single query. - // That makes batching more tricky, so we leave for later. - await this.compactInternal(bucket); + await this.compactSingleBucket(bucket); } } else { - await this.compactInternal(undefined); + await this.compactAllBuckets(); } } - async compactInternal(bucket: string | undefined) { - const idLimitBytes = this.idLimitBytes; + private async compactAllBuckets() { + const DISCOVERY_BATCH_SIZE = 200; + let lastBucket = ''; + + while (true) { + const bucketRows = (await this.db.sql` + SELECT DISTINCT bucket_name + FROM bucket_data + WHERE + group_id = ${{ type: 'int4', value: this.group_id }} + AND bucket_name > ${{ type: 'varchar', value: lastBucket }} + ORDER BY bucket_name ASC + LIMIT ${{ type: 'int4', value: DISCOVERY_BATCH_SIZE }} + `.rows()) as { bucket_name: string }[]; - let currentState: CurrentBucketState | null = null; - - let bucketLower: string | null = null; - let bucketUpper: string | null = null; - const MAX_CHAR = String.fromCodePoint(0xffff); - - if (bucket == null) { - bucketLower = ''; - bucketUpper = MAX_CHAR; - } else if (bucket?.includes('[')) { - // Exact bucket name - bucketLower = bucket; - bucketUpper = bucket; - } else if (bucket) { - // Bucket definition name - bucketLower = `${bucket}[`; - bucketUpper = `${bucket}[${MAX_CHAR}`; + if (bucketRows.length === 0) { + break; + } + + for (const row of bucketRows) { + await this.compactSingleBucket(row.bucket_name); + } + + lastBucket = bucketRows[bucketRows.length - 1].bucket_name; } + } + + private async compactSingleBucket(bucket: string) { + const idLimitBytes = this.idLimitBytes; + + let currentState: CurrentBucketState = { + bucket: bucket, + seen: new Map(), + trackingSize: 0, + lastNotPut: null, + opsSincePut: 0 + }; let upperOpIdLimit = BIGINT_MAX; @@ -123,16 +136,9 @@ export class PostgresCompactor { bucket_data WHERE group_id = ${{ type: 'int4', value: this.group_id }} - AND bucket_name >= ${{ type: 'varchar', value: bucketLower }} - AND ( - ( - bucket_name = ${{ type: 'varchar', value: bucketUpper }} - AND op_id < ${{ type: 'int8', value: upperOpIdLimit }} - ) - OR bucket_name < ${{ type: 'varchar', value: bucketUpper }} COLLATE "C" -- Use binary comparison - ) + AND bucket_name = ${{ type: 'varchar', value: bucket }} + AND op_id < ${{ type: 'int8', value: upperOpIdLimit }} ORDER BY - bucket_name DESC, op_id DESC LIMIT ${{ type: 'int4', value: this.moveBatchQueryLimit }} @@ -150,32 +156,8 @@ export class PostgresCompactor { // Set upperBound for the next batch const lastBatchItem = batch[batch.length - 1]; upperOpIdLimit = lastBatchItem.op_id; - bucketUpper = lastBatchItem.bucket_name; for (const doc of batch) { - if (currentState == null || doc.bucket_name != currentState.bucket) { - if (currentState != null && currentState.lastNotPut != null && currentState.opsSincePut >= 1) { - // Important to flush before clearBucket() - await this.flush(); - logger.info( - `Inserting CLEAR at ${this.group_id}:${currentState.bucket}:${currentState.lastNotPut} to remove ${currentState.opsSincePut} operations` - ); - - const bucket = currentState.bucket; - const clearOp = currentState.lastNotPut; - // Free memory before clearing bucket - currentState = null; - await this.clearBucket(bucket, clearOp); - } - currentState = { - bucket: doc.bucket_name, - seen: new Map(), - trackingSize: 0, - lastNotPut: null, - opsSincePut: 0 - }; - } - if (this.maxOpId != null && doc.op_id > this.maxOpId) { continue; } @@ -237,16 +219,12 @@ export class PostgresCompactor { } await this.flush(); - currentState?.seen.clear(); - if (currentState?.lastNotPut != null && currentState?.opsSincePut > 1) { + currentState.seen.clear(); + if (currentState.lastNotPut != null && currentState.opsSincePut > 1) { logger.info( `Inserting CLEAR at ${this.group_id}:${currentState.bucket}:${currentState.lastNotPut} to remove ${currentState.opsSincePut} operations` ); - const bucket = currentState.bucket; - const clearOp = currentState.lastNotPut; - // Free memory before clearing bucket - currentState = null; - await this.clearBucket(bucket, clearOp); + await this.clearBucket(currentState.bucket, currentState.lastNotPut); } } diff --git a/modules/module-postgres-storage/test/src/storage_compacting.test.ts b/modules/module-postgres-storage/test/src/storage_compacting.test.ts index f0e09e89b..1c732b1d2 100644 --- a/modules/module-postgres-storage/test/src/storage_compacting.test.ts +++ b/modules/module-postgres-storage/test/src/storage_compacting.test.ts @@ -1,5 +1,57 @@ -import { register } from '@powersync/service-core-tests'; -import { describe } from 'vitest'; +import { storage } from '@powersync/service-core'; +import { register, TEST_TABLE, test_utils } from '@powersync/service-core-tests'; +import { describe, expect, test } from 'vitest'; import { POSTGRES_STORAGE_FACTORY } from './util.js'; describe('Postgres Sync Bucket Storage Compact', () => register.registerCompactTests(POSTGRES_STORAGE_FACTORY)); + +describe('Postgres Compact - explicit bucket name', () => { + test('compacts a specific bucket by exact name', async () => { + await using factory = await POSTGRES_STORAGE_FACTORY(); + const syncRules = await factory.updateSyncRules({ + content: ` +bucket_definitions: + global: + data: [select * from test] + ` + }); + const bucketStorage = factory.getInstance(syncRules); + + const result = await bucketStorage.startBatch( + test_utils.BATCH_OPTIONS, + async (batch) => { + await batch.save({ + sourceTable: TEST_TABLE, + tag: storage.SaveOperationTag.INSERT, + after: { id: 't1' }, + afterReplicaId: test_utils.rid('t1') + }); + await batch.save({ + sourceTable: TEST_TABLE, + tag: storage.SaveOperationTag.UPDATE, + after: { id: 't1' }, + afterReplicaId: test_utils.rid('t1') + }); + await batch.commit('1/1'); + } + ); + + const checkpoint = result!.flushed_op; + + // Compact with an explicit bucket name — exercises the this.buckets + // iteration path, NOT the compactAllBuckets discovery path. + await bucketStorage.compact({ + compactBuckets: ['global[]'], + minBucketChanges: 1 + }); + + const batch = await test_utils.oneFromAsync( + bucketStorage.getBucketDataBatch(checkpoint, new Map([['global[]', 0n]])) + ); + + expect(batch.chunkData.data).toMatchObject([ + { op_id: '1', op: 'MOVE' }, + { op_id: '2', op: 'PUT', object_id: 't1' } + ]); + }); +}); diff --git a/packages/service-core/src/entry/commands/compact-action.ts b/packages/service-core/src/entry/commands/compact-action.ts index 014d12af8..ae382c92c 100644 --- a/packages/service-core/src/entry/commands/compact-action.ts +++ b/packages/service-core/src/entry/commands/compact-action.ts @@ -25,12 +25,22 @@ const COMPACT_MEMORY_LIMIT_MB = Math.min(HEAP_LIMIT / 1024 / 1024 - 128, 1024); export function registerCompactAction(program: Command) { const compactCommand = program .command(COMMAND_NAME) - .option(`-b, --buckets [buckets]`, 'Bucket name (optional, comma-separate multiple names)'); + .option(`-b, --buckets [buckets]`, 'Full bucket names, comma-separated (e.g., "global[],mybucket[\\"user1\\"]")'); wrapConfigCommand(compactCommand); return compactCommand.description('Compact storage').action(async (options) => { - const buckets = options.buckets?.split(','); + const buckets = options.buckets?.split(',').map((b: string) => b.trim()).filter(Boolean); + if (buckets) { + const invalid = buckets.filter((b: string) => !b.includes('[')); + if (invalid.length > 0) { + logger.error( + `Invalid bucket names: ${invalid.join(', ')}. ` + + `Pass full bucket names (e.g., "global[]"), not bucket definition names (e.g., "global").` + ); + process.exit(1); + } + } if (buckets == null) { logger.info('Compacting storage for all buckets...'); } else { diff --git a/packages/service-core/src/storage/SyncRulesBucketStorage.ts b/packages/service-core/src/storage/SyncRulesBucketStorage.ts index 175427449..a0dc8c95b 100644 --- a/packages/service-core/src/storage/SyncRulesBucketStorage.ts +++ b/packages/service-core/src/storage/SyncRulesBucketStorage.ts @@ -202,7 +202,8 @@ export interface CompactOptions { * * If not specified, compacts all buckets. * - * These can be individual bucket names, or bucket definition names. + * These must be full bucket names (e.g., "global[]", "mybucket[\"user1\"]"). + * Bucket definition names (e.g., "global") are not supported. */ compactBuckets?: string[];