diff --git a/modules/module-postgres-storage/src/storage/PostgresCompactor.ts b/modules/module-postgres-storage/src/storage/PostgresCompactor.ts index b19d866eb..def07e380 100644 --- a/modules/module-postgres-storage/src/storage/PostgresCompactor.ts +++ b/modules/module-postgres-storage/src/storage/PostgresCompactor.ts @@ -75,37 +75,50 @@ export class PostgresCompactor { async compact() { if (this.buckets) { for (let bucket of this.buckets) { - // We can make this more efficient later on by iterating - // through the buckets in a single query. - // That makes batching more tricky, so we leave for later. - await this.compactInternal(bucket); + await this.compactSingleBucket(bucket); } } else { - await this.compactInternal(undefined); + await this.compactAllBuckets(); } } - async compactInternal(bucket: string | undefined) { - const idLimitBytes = this.idLimitBytes; + private async compactAllBuckets() { + const DISCOVERY_BATCH_SIZE = 200; + let lastBucket = ''; + + while (true) { + const bucketRows = (await this.db.sql` + SELECT DISTINCT bucket_name + FROM bucket_data + WHERE + group_id = ${{ type: 'int4', value: this.group_id }} + AND bucket_name > ${{ type: 'varchar', value: lastBucket }} + ORDER BY bucket_name ASC + LIMIT ${{ type: 'int4', value: DISCOVERY_BATCH_SIZE }} + `.rows()) as { bucket_name: string }[]; - let currentState: CurrentBucketState | null = null; - - let bucketLower: string | null = null; - let bucketUpper: string | null = null; - const MAX_CHAR = String.fromCodePoint(0xffff); - - if (bucket == null) { - bucketLower = ''; - bucketUpper = MAX_CHAR; - } else if (bucket?.includes('[')) { - // Exact bucket name - bucketLower = bucket; - bucketUpper = bucket; - } else if (bucket) { - // Bucket definition name - bucketLower = `${bucket}[`; - bucketUpper = `${bucket}[${MAX_CHAR}`; + if (bucketRows.length === 0) { + break; + } + + for (const row of bucketRows) { + await this.compactSingleBucket(row.bucket_name); + } + + lastBucket = bucketRows[bucketRows.length - 1].bucket_name; } + } + + private async compactSingleBucket(bucket: string) { + const idLimitBytes = this.idLimitBytes; + + let currentState: CurrentBucketState = { + bucket: bucket, + seen: new Map(), + trackingSize: 0, + lastNotPut: null, + opsSincePut: 0 + }; let upperOpIdLimit = BIGINT_MAX; @@ -123,16 +136,9 @@ export class PostgresCompactor { bucket_data WHERE group_id = ${{ type: 'int4', value: this.group_id }} - AND bucket_name >= ${{ type: 'varchar', value: bucketLower }} - AND ( - ( - bucket_name = ${{ type: 'varchar', value: bucketUpper }} - AND op_id < ${{ type: 'int8', value: upperOpIdLimit }} - ) - OR bucket_name < ${{ type: 'varchar', value: bucketUpper }} COLLATE "C" -- Use binary comparison - ) + AND bucket_name = ${{ type: 'varchar', value: bucket }} + AND op_id < ${{ type: 'int8', value: upperOpIdLimit }} ORDER BY - bucket_name DESC, op_id DESC LIMIT ${{ type: 'int4', value: this.moveBatchQueryLimit }} @@ -150,32 +156,8 @@ export class PostgresCompactor { // Set upperBound for the next batch const lastBatchItem = batch[batch.length - 1]; upperOpIdLimit = lastBatchItem.op_id; - bucketUpper = lastBatchItem.bucket_name; for (const doc of batch) { - if (currentState == null || doc.bucket_name != currentState.bucket) { - if (currentState != null && currentState.lastNotPut != null && currentState.opsSincePut >= 1) { - // Important to flush before clearBucket() - await this.flush(); - logger.info( - `Inserting CLEAR at ${this.group_id}:${currentState.bucket}:${currentState.lastNotPut} to remove ${currentState.opsSincePut} operations` - ); - - const bucket = currentState.bucket; - const clearOp = currentState.lastNotPut; - // Free memory before clearing bucket - currentState = null; - await this.clearBucket(bucket, clearOp); - } - currentState = { - bucket: doc.bucket_name, - seen: new Map(), - trackingSize: 0, - lastNotPut: null, - opsSincePut: 0 - }; - } - if (this.maxOpId != null && doc.op_id > this.maxOpId) { continue; } @@ -237,16 +219,12 @@ export class PostgresCompactor { } await this.flush(); - currentState?.seen.clear(); - if (currentState?.lastNotPut != null && currentState?.opsSincePut > 1) { + currentState.seen.clear(); + if (currentState.lastNotPut != null && currentState.opsSincePut > 1) { logger.info( `Inserting CLEAR at ${this.group_id}:${currentState.bucket}:${currentState.lastNotPut} to remove ${currentState.opsSincePut} operations` ); - const bucket = currentState.bucket; - const clearOp = currentState.lastNotPut; - // Free memory before clearing bucket - currentState = null; - await this.clearBucket(bucket, clearOp); + await this.clearBucket(currentState.bucket, currentState.lastNotPut); } } diff --git a/modules/module-postgres-storage/test/src/storage_compacting.test.ts b/modules/module-postgres-storage/test/src/storage_compacting.test.ts index f0e09e89b..1c732b1d2 100644 --- a/modules/module-postgres-storage/test/src/storage_compacting.test.ts +++ b/modules/module-postgres-storage/test/src/storage_compacting.test.ts @@ -1,5 +1,57 @@ -import { register } from '@powersync/service-core-tests'; -import { describe } from 'vitest'; +import { storage } from '@powersync/service-core'; +import { register, TEST_TABLE, test_utils } from '@powersync/service-core-tests'; +import { describe, expect, test } from 'vitest'; import { POSTGRES_STORAGE_FACTORY } from './util.js'; describe('Postgres Sync Bucket Storage Compact', () => register.registerCompactTests(POSTGRES_STORAGE_FACTORY)); + +describe('Postgres Compact - explicit bucket name', () => { + test('compacts a specific bucket by exact name', async () => { + await using factory = await POSTGRES_STORAGE_FACTORY(); + const syncRules = await factory.updateSyncRules({ + content: ` +bucket_definitions: + global: + data: [select * from test] + ` + }); + const bucketStorage = factory.getInstance(syncRules); + + const result = await bucketStorage.startBatch( + test_utils.BATCH_OPTIONS, + async (batch) => { + await batch.save({ + sourceTable: TEST_TABLE, + tag: storage.SaveOperationTag.INSERT, + after: { id: 't1' }, + afterReplicaId: test_utils.rid('t1') + }); + await batch.save({ + sourceTable: TEST_TABLE, + tag: storage.SaveOperationTag.UPDATE, + after: { id: 't1' }, + afterReplicaId: test_utils.rid('t1') + }); + await batch.commit('1/1'); + } + ); + + const checkpoint = result!.flushed_op; + + // Compact with an explicit bucket name — exercises the this.buckets + // iteration path, NOT the compactAllBuckets discovery path. + await bucketStorage.compact({ + compactBuckets: ['global[]'], + minBucketChanges: 1 + }); + + const batch = await test_utils.oneFromAsync( + bucketStorage.getBucketDataBatch(checkpoint, new Map([['global[]', 0n]])) + ); + + expect(batch.chunkData.data).toMatchObject([ + { op_id: '1', op: 'MOVE' }, + { op_id: '2', op: 'PUT', object_id: 't1' } + ]); + }); +}); diff --git a/packages/service-core/src/entry/commands/compact-action.ts b/packages/service-core/src/entry/commands/compact-action.ts index 014d12af8..ae382c92c 100644 --- a/packages/service-core/src/entry/commands/compact-action.ts +++ b/packages/service-core/src/entry/commands/compact-action.ts @@ -25,12 +25,22 @@ const COMPACT_MEMORY_LIMIT_MB = Math.min(HEAP_LIMIT / 1024 / 1024 - 128, 1024); export function registerCompactAction(program: Command) { const compactCommand = program .command(COMMAND_NAME) - .option(`-b, --buckets [buckets]`, 'Bucket name (optional, comma-separate multiple names)'); + .option(`-b, --buckets [buckets]`, 'Full bucket names, comma-separated (e.g., "global[],mybucket[\\"user1\\"]")'); wrapConfigCommand(compactCommand); return compactCommand.description('Compact storage').action(async (options) => { - const buckets = options.buckets?.split(','); + const buckets = options.buckets?.split(',').map((b: string) => b.trim()).filter(Boolean); + if (buckets) { + const invalid = buckets.filter((b: string) => !b.includes('[')); + if (invalid.length > 0) { + logger.error( + `Invalid bucket names: ${invalid.join(', ')}. ` + + `Pass full bucket names (e.g., "global[]"), not bucket definition names (e.g., "global").` + ); + process.exit(1); + } + } if (buckets == null) { logger.info('Compacting storage for all buckets...'); } else { diff --git a/packages/service-core/src/storage/SyncRulesBucketStorage.ts b/packages/service-core/src/storage/SyncRulesBucketStorage.ts index 175427449..a0dc8c95b 100644 --- a/packages/service-core/src/storage/SyncRulesBucketStorage.ts +++ b/packages/service-core/src/storage/SyncRulesBucketStorage.ts @@ -202,7 +202,8 @@ export interface CompactOptions { * * If not specified, compacts all buckets. * - * These can be individual bucket names, or bucket definition names. + * These must be full bucket names (e.g., "global[]", "mybucket[\"user1\"]"). + * Bucket definition names (e.g., "global") are not supported. */ compactBuckets?: string[];