diff --git a/.changeset/famous-cobras-give.md b/.changeset/famous-cobras-give.md new file mode 100644 index 000000000..a88e51863 --- /dev/null +++ b/.changeset/famous-cobras-give.md @@ -0,0 +1,36 @@ +--- +"@powersync/service-sync-rules": minor +"@powersync/service-core": minor +"@powersync/service-module-postgres-storage": patch +"@powersync/service-module-mysql": minor +--- + +Add snapshot_filter support for initial table replication + +Users can now configure snapshot filters in sync_rules.yaml to apply WHERE clauses during initial table snapshots. This reduces storage and bandwidth for large tables where only a subset of rows match sync rules. + +Features: +- Configure global filters for initial replication +- CDC changes continue to work normally, only affecting rows in storage +- Supports MySQL source with PostgreSQL storage + +Example: +```yaml +# EXPLICIT: "I only want data from the last 90 days, period" +initial_snapshot_filters: + orders: + sql: "created_at > DATE_SUB(NOW(), INTERVAL 90 DAY)" + + logs: + sql: "timestamp > NOW() - INTERVAL '7 days'" + +bucket_definitions: + # This works - queries recent orders + recent_orders: + data: + - SELECT * FROM orders WHERE created_at > DATE_SUB(NOW(), INTERVAL 30 DAY) + + # This will be EMPTY initially - and that's OK for logs + all_logs: + data: + - SELECT * FROM logs \ No newline at end of file diff --git a/docs/initial-snapshot-filters.md b/docs/initial-snapshot-filters.md new file mode 100644 index 000000000..46c878a8c --- /dev/null +++ b/docs/initial-snapshot-filters.md @@ -0,0 +1,412 @@ +# Initial Snapshot Filters + +Initial snapshot filters allow you to limit which rows are replicated during the initial snapshot phase. This is useful for large tables where you only need to sync a subset of data. + +## Overview + +Without snapshot filters, PowerSync replicates **all rows** from source tables during initial replication, even if your sync rules only match a small subset. Initial snapshot filters solve this by allowing you to specify which rows to replicate upfront. + +**Important**: Filters are configured **globally** at the top level of your sync rules and apply to **all buckets** using that table. This means some buckets may end up empty if their queries don't align with the global filter. + +## Syntax + +Initial snapshot filters are defined at the **top level** of your `sync_rules.yaml` using an **object format** with database-specific filter syntax: + +```yaml +initial_snapshot_filters: + users: + sql: "status = 'active'" # MySQL, PostgreSQL, SQL Server + mongo: {status: 'active'} # MongoDB (BSON/EJSON format) + +bucket_definitions: + active_users: + data: + - SELECT id, name, status FROM users +``` + +You can specify just `sql` or just `mongo` depending on your database: + +```yaml +initial_snapshot_filters: + todos: + sql: "archived = false" + +bucket_definitions: + my_todos: + data: + - SELECT id, title FROM todos +``` + +## Basic Examples + +### SQL Databases (MySQL, PostgreSQL, SQL Server) + +```yaml +initial_snapshot_filters: + users: + sql: "status = 'active'" + +bucket_definitions: + active_users: + data: + - SELECT id, name, status FROM users +``` + +### MongoDB + +```yaml +initial_snapshot_filters: + users: + mongo: {status: 'active'} + +bucket_definitions: + active_users: + data: + - SELECT id, name, status FROM users +``` + +### Multi-Database Support + +Specify filters for both SQL and MongoDB sources: + +```yaml +initial_snapshot_filters: + users: + sql: "status = 'active'" + mongo: {status: 'active'} + +bucket_definitions: + active_users: + data: + - SELECT id, name, status FROM users +``` + +## Global Filter Behavior + +⚠️ **Critical**: Filters are **global** and apply to **all buckets** using that table. This can result in empty buckets if the bucket query doesn't match the filter. + +### Example: Misaligned Bucket Query + +```yaml +initial_snapshot_filters: + users: + sql: "status = 'active'" # Only active users are replicated + +bucket_definitions: + active_users: + data: + - SELECT * FROM users WHERE status = 'active' # ✅ Will have data + + admin_users: + data: + - SELECT * FROM users WHERE is_admin = true # ⚠️ Will be EMPTY if admin users aren't active +``` + +In this example: +- Only users with `status = 'active'` are replicated (due to the global filter) +- The `admin_users` bucket will be **empty** because no admin users are included in the initial snapshot +- This is a **deliberate trade-off** between snapshot performance and bucket completeness + +### When to Use Global Filters + +✅ **Good use case**: All your buckets can work with a single filter +```yaml +initial_snapshot_filters: + orders: + sql: "created_at > DATE_SUB(NOW(), INTERVAL 90 DAY)" # Only recent orders + +bucket_definitions: + my_orders: + data: + - SELECT * FROM orders WHERE user_id = token_parameters.user_id + pending_orders: + data: + - SELECT * FROM orders WHERE status = 'pending' +``` +Both buckets work with recent orders only - the filter just improves snapshot performance. + +❌ **Bad use case**: Different buckets need different data +```yaml +initial_snapshot_filters: + users: + sql: "status = 'active'" # Only active users + +bucket_definitions: + active_users: + data: + - SELECT * FROM users WHERE status = 'active' # ✅ Works + archived_users: + data: + - SELECT * FROM users WHERE status = 'archived' # ⚠️ Will be EMPTY! +``` +The `archived_users` bucket will never have data because archived users aren't included in the snapshot. + +## Complex Filters + +You can use any valid SQL WHERE clause syntax: + +```yaml +initial_snapshot_filters: + orders: + sql: "created_at > DATE_SUB(NOW(), INTERVAL 30 DAY) AND status != 'cancelled'" + +bucket_definitions: + my_orders: + data: + - SELECT * FROM orders WHERE user_id = token_parameters.user_id +``` + +## Wildcard Table Names + +You can use wildcards (%) in table names to match multiple tables: + +```yaml +initial_snapshot_filters: + logs_%: + sql: "created_at > DATE_SUB(NOW(), INTERVAL 7 DAY)" + +bucket_definitions: + recent_logs: + data: + - SELECT * FROM logs_2024 + - SELECT * FROM logs_2025 +``` + +This applies the filter to `logs_2024`, `logs_2025`, etc. + +## Schema-Qualified Names + +You can specify schema-qualified table names: + +```yaml +initial_snapshot_filters: + public.users: + sql: "status = 'active'" + analytics.events: + sql: "timestamp > NOW() - INTERVAL '1 day'" + +bucket_definitions: + active_users: + data: + - SELECT id, name FROM public.users + recent_events: + data: + - SELECT * FROM analytics.events +``` + +## How It Works + +1. **Initial Snapshot**: Only rows matching the filter are replicated during the initial snapshot phase +2. **CDC Replication**: After the snapshot, Change Data Capture (CDC) continues to replicate changes to **only the rows that were included in the snapshot** +3. **Storage**: Only filtered rows are stored in PostgreSQL/MongoDB storage +4. **Buckets**: All buckets using the filtered table will only see the filtered data + +### Changing Filters Later + +If you widen a filter after the initial snapshot, CDC will **not** backfill older rows that were previously excluded because those rows did not change. To include them, run a **targeted resnapshot** of the affected tables or perform a **one-time backfill** job, then let CDC keep the data up to date. + +## Database-Specific Syntax + +### MySQL +```yaml +initial_snapshot_filters: + orders: + sql: "DATE(created_at) > DATE_SUB(CURDATE(), INTERVAL 30 DAY)" +``` + +### PostgreSQL +```yaml +initial_snapshot_filters: + orders: + sql: "created_at > NOW() - INTERVAL '30 days'" +``` + +### SQL Server +```yaml +initial_snapshot_filters: + orders: + sql: "created_at > DATEADD(day, -30, GETDATE())" +``` + +### MongoDB +```yaml +initial_snapshot_filters: + orders: + mongo: + created_at: {$gt: {$date: "2024-01-01T00:00:00Z"}} + status: {$in: ['active', 'pending']} +``` + +## Use Cases + +### Large Historical Tables +```yaml +# Only sync recent orders instead of entire order history +initial_snapshot_filters: + orders: + sql: "created_at > DATE_SUB(NOW(), INTERVAL 90 DAY)" + +bucket_definitions: + my_orders: + data: + - SELECT * FROM orders WHERE user_id = token_parameters.user_id +``` + +### Active Records Only +```yaml +# Skip archived/deleted records +initial_snapshot_filters: + users: + sql: "archived = false AND deleted_at IS NULL" + +bucket_definitions: + all_users: + data: + - SELECT id, name FROM users +``` + +### Tenant/Organization Filtering +```yaml +# Multi-tenant app - only sync specific tenants +initial_snapshot_filters: + records: + sql: "tenant_id IN ('tenant-a', 'tenant-b')" + +bucket_definitions: + my_records: + data: + - SELECT * FROM records WHERE user_id = token_parameters.user_id +``` + +### Partitioned Tables +```yaml +# Apply same filter to all partitions +initial_snapshot_filters: + events_%: + sql: "created_at > DATE_SUB(NOW(), INTERVAL 30 DAY)" + +bucket_definitions: + recent_events: + data: + - SELECT * FROM events_2024 + - SELECT * FROM events_2025 +``` + +## Special Characters and Identifiers + +⚠️ **Important**: Filter expressions are embedded directly into SQL/MongoDB queries. You must properly quote identifiers and escape string literals according to your database's syntax rules. + +### SQL Identifier Quoting + +**PostgreSQL** - Use double quotes for identifiers with spaces or special characters: +```yaml +initial_snapshot_filters: + users: + sql: "\"User Status\" = 'active' AND \"created-at\" > NOW() - INTERVAL '30 days'" +``` + +**MySQL** - Use backticks for identifiers with spaces or special characters: +```yaml +initial_snapshot_filters: + users: + sql: "`User Status` = 'active' AND `created-at` > NOW() - INTERVAL 30 DAY" +``` + +**SQL Server** - Use square brackets for identifiers with spaces or special characters: +```yaml +initial_snapshot_filters: + users: + sql: "[User Status] = 'active' AND [created-at] > DATEADD(day, -30, GETDATE())" +``` + +### String Literal Escaping + +Always use proper escaping for string literals containing quotes: + +```yaml +initial_snapshot_filters: + comments: + # Single quotes must be escaped as '' in SQL + sql: "content NOT LIKE '%can''t%' AND status = 'approved'" +``` + +### Complex Expressions with OR Operators + +PowerSync automatically wraps your filter in parentheses to prevent operator precedence issues: + +```yaml +initial_snapshot_filters: + users: + # This is wrapped as: WHERE (status = 'active' OR status = 'pending') + sql: "status = 'active' OR status = 'pending'" +``` + +### MongoDB Filters + +MongoDB filters use native BSON query syntax, which is safer than string concatenation: + +```yaml +initial_snapshot_filters: + users: + mongo: + $or: + - status: 'active' + - status: 'pending' + "special field": { $exists: true } +``` + +### Security Considerations + +- Filters are defined in `sync_rules.yaml` by administrators, not by end users +- Filters are static configuration, not dynamic user input +- Still follow security best practices: + - Avoid including sensitive data in filters + - Test filters in development before production + - Review filter changes carefully during deployment +- PowerSync does not parameterize filters since they are arbitrary SQL expressions, similar to bucket query definitions + +## Limitations + +- Filters are applied **globally** across all buckets using that table +- CDC changes only affect rows that were **initially snapshotted** +- Widening filters requires a **resnapshot or backfill** to pick up previously excluded rows +- Filter syntax must be valid for your **source database** +- Some buckets may be **empty** if their queries don't align with the global filter + +## Best Practices + +1. **Design filters carefully** - ensure all buckets can work with the same filter +2. **Test filters** on your source database before deploying +3. **Use indexes** on filtered columns for better snapshot performance +4. **Be conservative** - if unsure whether a row is needed, include it +5. **Document filters** in your sync_rules.yaml comments +6. **Monitor snapshot progress** to ensure reasonable replication times +7. **Verify bucket data** after applying filters to ensure no buckets are unexpectedly empty + +## Migration from Per-Bucket Filters + +If you're using the old per-bucket `source_tables` configuration, move filters to the top level: + +```yaml +# Old (no longer supported) +bucket_definitions: + active_users: + data: + - SELECT id, name FROM users + source_tables: + users: + initial_snapshot_filter: + sql: "status = 'active'" + +# New (global configuration) +initial_snapshot_filters: + users: + sql: "status = 'active'" + +bucket_definitions: + active_users: + data: + - SELECT id, name FROM users +``` + +**Important**: With the old configuration, if multiple buckets specified different filters for the same table, they were ORed together. With the new configuration, there is **one global filter per table** that applies to all buckets. diff --git a/modules/module-mongodb-storage/src/storage/implementation/MongoSyncBucketStorage.ts b/modules/module-mongodb-storage/src/storage/implementation/MongoSyncBucketStorage.ts index 51d66cd2a..b4a90850b 100644 --- a/modules/module-mongodb-storage/src/storage/implementation/MongoSyncBucketStorage.ts +++ b/modules/module-mongodb-storage/src/storage/implementation/MongoSyncBucketStorage.ts @@ -250,7 +250,8 @@ export class MongoSyncBucketStorage schema: schema, name: name, replicaIdColumns: replicaIdColumns, - snapshotComplete: doc.snapshot_done ?? true + snapshotComplete: doc.snapshot_done ?? true, + initialSnapshotFilter: options.sync_rules.definition.getInitialSnapshotFilter(connection_tag, schema, name, 'mongo') }); sourceTable.syncEvent = options.sync_rules.tableTriggersEvent(sourceTable); sourceTable.syncData = options.sync_rules.tableSyncsData(sourceTable); diff --git a/modules/module-mongodb/src/replication/ChangeStream.ts b/modules/module-mongodb/src/replication/ChangeStream.ts index 0347edf3c..a9aba0846 100644 --- a/modules/module-mongodb/src/replication/ChangeStream.ts +++ b/modules/module-mongodb/src/replication/ChangeStream.ts @@ -483,7 +483,8 @@ export class ChangeStream { await using query = new ChunkedSnapshotQuery({ collection, key: table.snapshotStatus?.lastKey, - batchSize: this.snapshotChunkLength + batchSize: this.snapshotChunkLength, + filter: table.initialSnapshotFilter?.mongo }); if (query.lastKey != null) { this.logger.info( @@ -492,6 +493,9 @@ export class ChangeStream { } else { this.logger.info(`Replicating ${table.qualifiedName} ${table.formatSnapshotProgress()}`); } + if (table.initialSnapshotFilter?.mongo) { + this.logger.info(`Applying initial snapshot filter: ${JSON.stringify(table.initialSnapshotFilter.mongo)}`); + } let lastBatch = performance.now(); let nextChunkPromise = query.nextChunk(); diff --git a/modules/module-mongodb/src/replication/MongoSnapshotQuery.ts b/modules/module-mongodb/src/replication/MongoSnapshotQuery.ts index 64b64f9b5..e83c15cd1 100644 --- a/modules/module-mongodb/src/replication/MongoSnapshotQuery.ts +++ b/modules/module-mongodb/src/replication/MongoSnapshotQuery.ts @@ -13,12 +13,19 @@ export class ChunkedSnapshotQuery implements AsyncDisposable { private lastCursor: mongo.FindCursor | null = null; private collection: mongo.Collection; private batchSize: number; + private snapshotFilter: any; - public constructor(options: { collection: mongo.Collection; batchSize: number; key?: Uint8Array | null }) { + public constructor(options: { + collection: mongo.Collection; + batchSize: number; + key?: Uint8Array | null; + filter?: any; + }) { this.lastKey = options.key ? bson.deserialize(options.key, { useBigInt64: true })._id : null; this.lastCursor = null; this.collection = options.collection; this.batchSize = options.batchSize; + this.snapshotFilter = options.filter; } async nextChunk(): Promise<{ docs: mongo.Document[]; lastKey: Uint8Array } | { docs: []; lastKey: null }> { @@ -35,8 +42,23 @@ export class ChunkedSnapshotQuery implements AsyncDisposable { // any parsing as an operator. // Starting in MongoDB 5.0, this filter can use the _id index. Source: // https://www.mongodb.com/docs/manual/release-notes/5.0/#general-aggregation-improvements - const filter: mongo.Filter = + + // Build base filter for _id + const idFilter: mongo.Filter = this.lastKey == null ? {} : { $expr: { $gt: ['$_id', { $literal: this.lastKey }] } }; + + // Combine with snapshot filter if present + let filter: mongo.Filter; + if (this.snapshotFilter) { + if (this.lastKey == null) { + filter = this.snapshotFilter; + } else { + filter = { $and: [idFilter, this.snapshotFilter] }; + } + } else { + filter = idFilter; + } + cursor = this.collection.find(filter, { readConcern: 'majority', limit: this.batchSize, diff --git a/modules/module-mssql/src/replication/CDCStream.ts b/modules/module-mssql/src/replication/CDCStream.ts index d2b109f62..b2d571df1 100644 --- a/modules/module-mssql/src/replication/CDCStream.ts +++ b/modules/module-mssql/src/replication/CDCStream.ts @@ -356,6 +356,11 @@ export class CDCStream { query = new SimpleSnapshotQuery(transaction, table); replicatedCount = 0; } + + if (table.sourceTable.initialSnapshotFilter?.sql) { + this.logger.info(`Applying initial snapshot filter: ${table.sourceTable.initialSnapshotFilter.sql}`); + } + await query.initialize(); let hasRemainingData = true; diff --git a/modules/module-mssql/src/replication/MSSQLSnapshotQuery.ts b/modules/module-mssql/src/replication/MSSQLSnapshotQuery.ts index 25ce2f4ca..57ebd3af6 100644 --- a/modules/module-mssql/src/replication/MSSQLSnapshotQuery.ts +++ b/modules/module-mssql/src/replication/MSSQLSnapshotQuery.ts @@ -44,7 +44,11 @@ export class SimpleSnapshotQuery implements MSSQLSnapshotQuery { const request = this.transaction.request(); const stream = request.toReadableStream(); - request.query(`SELECT * FROM ${this.table.toQualifiedName()}`); + let query = `SELECT * FROM ${this.table.toQualifiedName()}`; + if (this.table.sourceTable.initialSnapshotFilter?.sql) { + query += ` WHERE (${this.table.sourceTable.initialSnapshotFilter.sql})`; + } + request.query(query); // MSSQL only streams one row at a time for await (const row of stream) { @@ -141,17 +145,26 @@ export class BatchedSnapshotQuery implements MSSQLSnapshotQuery { const request = this.transaction.request(); const stream = request.toReadableStream(); + const snapshotFilter = this.table.sourceTable.initialSnapshotFilter?.sql; + if (this.lastKey == null) { - request.query(`SELECT TOP(${this.batchSize}) * FROM ${this.table.toQualifiedName()} ORDER BY ${escapedKeyName}`); + let query = `SELECT TOP(${this.batchSize}) * FROM ${this.table.toQualifiedName()}`; + if (snapshotFilter) { + query += ` WHERE (${snapshotFilter})`; + } + query += ` ORDER BY ${escapedKeyName}`; + request.query(query); } else { if (this.key.typeId == null) { throw new Error(`typeId required for primary key ${this.key.name}`); } - request - .input('lastKey', this.lastKey) - .query( - `SELECT TOP(${this.batchSize}) * FROM ${this.table.toQualifiedName()} WHERE ${escapedKeyName} > @lastKey ORDER BY ${escapedKeyName}` - ); + request.input('lastKey', this.lastKey); + let query = `SELECT TOP(${this.batchSize}) * FROM ${this.table.toQualifiedName()} WHERE ${escapedKeyName} > @lastKey`; + if (snapshotFilter) { + query += ` AND (${snapshotFilter})`; + } + query += ` ORDER BY ${escapedKeyName}`; + request.query(query); } // MSSQL only streams one row at a time diff --git a/modules/module-mysql/src/replication/BinLogStream.ts b/modules/module-mysql/src/replication/BinLogStream.ts index 13c40062d..374ebfa9a 100644 --- a/modules/module-mysql/src/replication/BinLogStream.ts +++ b/modules/module-mysql/src/replication/BinLogStream.ts @@ -307,12 +307,25 @@ export class BinLogStream { batch: storage.BucketStorageBatch, table: storage.SourceTable ) { - this.logger.info(`Replicating ${qualifiedMySQLTable(table)}`); + const qualifiedMySQLTableName = qualifiedMySQLTable(table); + + this.logger.info(`Replicating ${qualifiedMySQLTableName}`); // TODO count rows and log progress at certain batch sizes // MAX_EXECUTION_TIME(0) hint disables execution timeout for this query - const query = connection.query(`SELECT /*+ MAX_EXECUTION_TIME(0) */ * FROM ${qualifiedMySQLTable(table)}`); - const stream = query.stream(); + let query = `SELECT /*+ MAX_EXECUTION_TIME(0) */ * FROM ${qualifiedMySQLTableName}`; + + // Apply snapshot filter if it exists. This allows us to do partial snapshots, + // for example for large tables where we only want to snapshot recent data. + if (table.initialSnapshotFilter?.sql) { + query += ` WHERE (${table.initialSnapshotFilter.sql})`; + this.logger.info(`Applying initial snapshot filter: ${table.initialSnapshotFilter.sql}`); + } else { + this.logger.info(`No initial snapshot filter applied for ${qualifiedMySQLTableName}`); + } + + const queryStream = connection.query(query); + const stream = queryStream.stream(); let columns: Map | undefined = undefined; stream.on('fields', (fields: mysql.FieldPacket[]) => { diff --git a/modules/module-mysql/test/src/BinLogStream.test.ts b/modules/module-mysql/test/src/BinLogStream.test.ts index 5d35428b7..f02b212c8 100644 --- a/modules/module-mysql/test/src/BinLogStream.test.ts +++ b/modules/module-mysql/test/src/BinLogStream.test.ts @@ -401,4 +401,207 @@ function defineBinlogStreamTests(factory: storage.TestStorageFactory) { ]); } }); + + test('Snapshot filter - replicate only filtered rows', async () => { + await using context = await BinlogStreamTestContext.open(factory); + const { connectionManager } = context; + await context.updateSyncRules(` + initial_snapshot_filters: + users: + sql: "status = 'active'" + + bucket_definitions: + active_users: + data: + - SELECT id, name, status FROM "users"`); + + await connectionManager.query( + `CREATE TABLE users (id CHAR(36) PRIMARY KEY, name TEXT, status VARCHAR(20))` + ); + + // Insert rows before snapshot + const activeId = uuid(); + const inactiveId = uuid(); + await connectionManager.query( + `INSERT INTO users(id, name, status) VALUES('${activeId}', 'Active User', 'active')` + ); + await connectionManager.query( + `INSERT INTO users(id, name, status) VALUES('${inactiveId}', 'Inactive User', 'inactive')` + ); + + await context.replicateSnapshot(); + + const data = await context.getBucketData('active_users[]'); + + // Should only have the active user, not the inactive one + expect(data).toMatchObject([putOp('users', { id: activeId, name: 'Active User', status: 'active' })]); + expect(data.length).toBe(1); + }); + + test('Snapshot filter - only specified filter applied globally', async () => { + await using context = await BinlogStreamTestContext.open(factory); + const { connectionManager } = context; + await context.updateSyncRules(` + initial_snapshot_filters: + users: + sql: "status = 'active'" + + bucket_definitions: + active_users: + data: + - SELECT id, name, status FROM "users" WHERE status = 'active' + + admin_users: + data: + - SELECT id, name, is_admin FROM "users" WHERE is_admin = true`); + + await connectionManager.query( + `CREATE TABLE users (id CHAR(36) PRIMARY KEY, name TEXT, status VARCHAR(20), is_admin BOOLEAN)` + ); + + // Insert test data + const activeUserId = uuid(); + const adminUserId = uuid(); + const regularUserId = uuid(); + + await connectionManager.query( + `INSERT INTO users(id, name, status, is_admin) VALUES('${activeUserId}', 'Active User', 'active', false)` + ); + await connectionManager.query( + `INSERT INTO users(id, name, status, is_admin) VALUES('${adminUserId}', 'Admin User', 'inactive', true)` + ); + await connectionManager.query( + `INSERT INTO users(id, name, status, is_admin) VALUES('${regularUserId}', 'Regular User', 'inactive', false)` + ); + + await context.replicateSnapshot(); + + const activeData = await context.getBucketData('active_users[]'); + const adminData = await context.getBucketData('admin_users[]'); + + // Active bucket should have the active user + expect(activeData).toMatchObject([ + putOp('users', { id: activeUserId, name: 'Active User', status: 'active', is_admin: 0n }) + ]); + + // Admin bucket is empty because global filter only allows active users, not admin users + expect(adminData).toMatchObject([]); + + // Regular user should not be in either bucket (filtered out by snapshot filter) + }); + + test('Snapshot filter - CDC changes only affect filtered rows', async () => { + await using context = await BinlogStreamTestContext.open(factory); + const { connectionManager } = context; + await context.updateSyncRules(` + initial_snapshot_filters: + users: + sql: "status = 'active'" + + bucket_definitions: + active_users: + data: + - SELECT id, name, status FROM "users" WHERE status = 'active'`); + + await connectionManager.query( + `CREATE TABLE users (id CHAR(36) PRIMARY KEY, name TEXT, status VARCHAR(20))` + ); + + // Insert an active user before snapshot + const activeId = uuid(); + await connectionManager.query( + `INSERT INTO users(id, name, status) VALUES('${activeId}', 'Active User', 'active')` + ); + + await context.replicateSnapshot(); + await context.startStreaming(); + + // Insert an inactive user - should not appear in bucket + const inactiveId = uuid(); + await connectionManager.query( + `INSERT INTO users(id, name, status) VALUES('${inactiveId}', 'Inactive User', 'inactive')` + ); + + // Update the active user - should appear in bucket + await connectionManager.query(`UPDATE users SET name = 'Updated Active' WHERE id = '${activeId}'`); + + const data = await context.getBucketData('active_users[]'); + + // Should only have the active user with updated name + expect(data).toMatchObject([putOp('users', { id: activeId, name: 'Updated Active', status: 'active' })]); + expect(data.length).toBe(1); + }); + + test('Snapshot filter - complex WHERE clause', async () => { + await using context = await BinlogStreamTestContext.open(factory); + const { connectionManager } = context; + await context.updateSyncRules(` + initial_snapshot_filters: + users: + sql: "created_at > DATE_SUB(NOW(), INTERVAL 7 DAY) AND status = 'active'" + + bucket_definitions: + recent_active_users: + data: + - SELECT id, name, created_at FROM "users"`); + + await connectionManager.query( + `CREATE TABLE users (id CHAR(36) PRIMARY KEY, name TEXT, status VARCHAR(20), created_at DATETIME)` + ); + + // Insert recent active user + const recentActiveId = uuid(); + await connectionManager.query( + `INSERT INTO users(id, name, status, created_at) VALUES('${recentActiveId}', 'Recent Active', 'active', NOW())` + ); + + // Insert old active user + const oldActiveId = uuid(); + await connectionManager.query( + `INSERT INTO users(id, name, status, created_at) VALUES('${oldActiveId}', 'Old Active', 'active', DATE_SUB(NOW(), INTERVAL 30 DAY))` + ); + + // Insert recent inactive user + const recentInactiveId = uuid(); + await connectionManager.query( + `INSERT INTO users(id, name, status, created_at) VALUES('${recentInactiveId}', 'Recent Inactive', 'inactive', NOW())` + ); + + await context.replicateSnapshot(); + + const data = await context.getBucketData('recent_active_users[]'); + + // Should only have the recent active user + expect(data.length).toBe(1); + expect(data[0]).toMatchObject({ + op: 'PUT', + object_type: 'users', + object_id: recentActiveId + }); + }); + + test('Snapshot filter - no filter means all rows replicated', async () => { + await using context = await BinlogStreamTestContext.open(factory); + const { connectionManager } = context; + await context.updateSyncRules(` + bucket_definitions: + all_users: + data: + - SELECT id, name FROM "users"`); + + await connectionManager.query(`CREATE TABLE users (id CHAR(36) PRIMARY KEY, name TEXT)`); + + // Insert multiple users + const user1Id = uuid(); + const user2Id = uuid(); + await connectionManager.query(`INSERT INTO users(id, name) VALUES('${user1Id}', 'User 1')`); + await connectionManager.query(`INSERT INTO users(id, name) VALUES('${user2Id}', 'User 2')`); + + await context.replicateSnapshot(); + + const data = await context.getBucketData('all_users[]'); + + // Should have both users when no filter is specified + expect(data.length).toBe(2); + }); } diff --git a/modules/module-postgres-storage/src/storage/PostgresSyncRulesStorage.ts b/modules/module-postgres-storage/src/storage/PostgresSyncRulesStorage.ts index c7a3c2c29..98a3dc36b 100644 --- a/modules/module-postgres-storage/src/storage/PostgresSyncRulesStorage.ts +++ b/modules/module-postgres-storage/src/storage/PostgresSyncRulesStorage.ts @@ -249,7 +249,8 @@ export class PostgresSyncRulesStorage schema: schema, name: table, replicaIdColumns: replicaIdColumns, - snapshotComplete: sourceTableRow!.snapshot_done ?? true + snapshotComplete: sourceTableRow!.snapshot_done ?? true, + initialSnapshotFilter: options.sync_rules.definition.getInitialSnapshotFilter(connection_tag, schema, table, 'sql') }); if (!sourceTable.snapshotComplete) { sourceTable.snapshotStatus = { diff --git a/modules/module-postgres/src/replication/SnapshotQuery.ts b/modules/module-postgres/src/replication/SnapshotQuery.ts index 039185d60..04005c8e8 100644 --- a/modules/module-postgres/src/replication/SnapshotQuery.ts +++ b/modules/module-postgres/src/replication/SnapshotQuery.ts @@ -36,7 +36,11 @@ export class SimpleSnapshotQuery implements SnapshotQuery { ) {} public async initialize(): Promise { - await this.connection.query(`DECLARE snapshot_cursor CURSOR FOR SELECT * FROM ${this.table.qualifiedName}`); + let query = `SELECT * FROM ${this.table.qualifiedName}`; + if (this.table.initialSnapshotFilter?.sql) { + query += ` WHERE (${this.table.initialSnapshotFilter.sql})`; + } + await this.connection.query(`DECLARE snapshot_cursor CURSOR FOR ${query}`); } public nextChunk(): AsyncIterableIterator { @@ -119,17 +123,27 @@ export class ChunkedSnapshotQuery implements SnapshotQuery { public async *nextChunk(): AsyncIterableIterator { let stream: AsyncIterableIterator; const escapedKeyName = escapeIdentifier(this.key.name); + const snapshotFilter = this.table.initialSnapshotFilter?.sql; + if (this.lastKey == null) { - stream = this.connection.stream( - `SELECT * FROM ${this.table.qualifiedName} ORDER BY ${escapedKeyName} LIMIT ${this.chunkSize}` - ); + let query = `SELECT * FROM ${this.table.qualifiedName}`; + if (snapshotFilter) { + query += ` WHERE (${snapshotFilter})`; + } + query += ` ORDER BY ${escapedKeyName} LIMIT ${this.chunkSize}`; + stream = this.connection.stream(query); } else { if (this.key.typeId == null) { throw new Error(`typeId required for primary key ${this.key.name}`); } const type = Number(this.key.typeId); + let query = `SELECT * FROM ${this.table.qualifiedName} WHERE ${escapedKeyName} > $1`; + if (snapshotFilter) { + query += ` AND (${snapshotFilter})`; + } + query += ` ORDER BY ${escapedKeyName} LIMIT ${this.chunkSize}`; stream = this.connection.stream({ - statement: `SELECT * FROM ${this.table.qualifiedName} WHERE ${escapedKeyName} > $1 ORDER BY ${escapedKeyName} LIMIT ${this.chunkSize}`, + statement: query, params: [{ value: this.lastKey, type }] }); } @@ -200,8 +214,14 @@ export class IdSnapshotQuery implements SnapshotQuery { if (type == null) { throw new Error(`Cannot determine primary key array type for ${JSON.stringify(keyDefinition)}`); } + + let query = `SELECT * FROM ${this.table.qualifiedName} WHERE ${escapeIdentifier(keyDefinition.name)} = ANY($1)`; + if (this.table.initialSnapshotFilter?.sql) { + query += ` AND (${this.table.initialSnapshotFilter.sql})`; + } + yield* this.connection.stream({ - statement: `SELECT * FROM ${this.table.qualifiedName} WHERE ${escapeIdentifier(keyDefinition.name)} = ANY($1)`, + statement: query, params: [ { type: type, diff --git a/modules/module-postgres/src/replication/WalStream.ts b/modules/module-postgres/src/replication/WalStream.ts index 1dd2e23be..176ec5cd3 100644 --- a/modules/module-postgres/src/replication/WalStream.ts +++ b/modules/module-postgres/src/replication/WalStream.ts @@ -558,6 +558,11 @@ WHERE oid = $1::regclass`, q = new SimpleSnapshotQuery(db, table, this.snapshotChunkLength); at = 0; } + + if (table.initialSnapshotFilter?.sql) { + this.logger.info(`Applying initial snapshot filter: ${table.initialSnapshotFilter.sql}`); + } + await q.initialize(); let hasRemainingData = true; diff --git a/packages/service-core/src/storage/SourceTable.ts b/packages/service-core/src/storage/SourceTable.ts index 8e5951540..2b88cc5bd 100644 --- a/packages/service-core/src/storage/SourceTable.ts +++ b/packages/service-core/src/storage/SourceTable.ts @@ -2,6 +2,17 @@ import { DEFAULT_TAG } from '@powersync/service-sync-rules'; import * as util from '../util/util-index.js'; import { ColumnDescriptor, SourceEntityDescriptor } from './SourceEntity.js'; +/** + * Filter definition for initial snapshot replication. + * Object with database-specific filters: + * - sql: MySQL, PostgreSQL, SQL Server syntax + * - mongo: MongoDB query syntax (BSON/EJSON format) + */ +export type InitialSnapshotFilter = { + sql?: string; + mongo?: any; +}; + export interface SourceTableOptions { id: any; connectionTag: string; @@ -10,6 +21,7 @@ export interface SourceTableOptions { name: string; replicaIdColumns: ColumnDescriptor[]; snapshotComplete: boolean; + initialSnapshotFilter?: InitialSnapshotFilter; } export interface TableSnapshotStatus { @@ -84,6 +96,10 @@ export class SourceTable implements SourceEntityDescriptor { return this.options.replicaIdColumns; } + get initialSnapshotFilter() { + return this.options.initialSnapshotFilter; + } + /** * Sanitized name of the entity in the format of "{schema}.{entity name}" * Suitable for safe use in Postgres queries. @@ -107,7 +123,8 @@ export class SourceTable implements SourceEntityDescriptor { schema: this.schema, name: this.name, replicaIdColumns: this.replicaIdColumns, - snapshotComplete: this.snapshotComplete + snapshotComplete: this.snapshotComplete, + initialSnapshotFilter: this.initialSnapshotFilter }); copy.syncData = this.syncData; copy.syncParameters = this.syncParameters; diff --git a/packages/sync-rules/src/SyncConfig.ts b/packages/sync-rules/src/SyncConfig.ts index 19f42722f..77171168c 100644 --- a/packages/sync-rules/src/SyncConfig.ts +++ b/packages/sync-rules/src/SyncConfig.ts @@ -10,6 +10,17 @@ import { TablePattern } from './TablePattern.js'; import { SqliteInputValue, SqliteRow, SqliteValue } from './types.js'; import { applyRowContext } from './utils.js'; +/** + * Filter definition for initial snapshot replication. + * Object with database-specific filters. + */ +export type InitialSnapshotFilter = { + sql?: string; + mongo?: any; +}; + +export type DatabaseType = 'sql' | 'mongo'; + /** * A class describing how the sync process has been configured (i.e. which buckets and parameters to create and how to * resolve buckets for connections). @@ -21,6 +32,13 @@ export abstract class SyncConfig { compatibility: CompatibilityContext = CompatibilityContext.FULL_BACKWARDS_COMPATIBILITY; eventDescriptors: SqlEventDescriptor[] = []; + /** + * Global initial snapshot filters for source tables. + * Map structure: tableName -> initialSnapshotFilter + * Filters are applied globally during initial snapshot, regardless of bucket definitions. + */ + initialSnapshotFilters: Map = new Map(); + /** * The (YAML-based) source contents from which these sync rules have been derived. */ @@ -120,6 +138,49 @@ export abstract class SyncConfig { debugRepresentation() { return this.bucketSources.map((rules) => rules.debugRepresentation()); } + + /** + * Get the initial snapshot filter for a given table. + * Filters are applied globally and support wildcard matching. + * + * @param connectionTag Connection tag for the active source connection + * @param schema Schema name + * @param tableName Concrete table name (wildcards are allowed in the filter patterns, not here) + * @param dbType Database type ('sql' for MySQL/Postgres/MSSQL, 'mongo' for MongoDB) - currently unused, returns full filter object + * @returns Filter object with sql/mongo properties, or undefined if no filter is specified + */ + getInitialSnapshotFilter( + connectionTag: string, + schema: string, + tableName: string, + dbType: DatabaseType = 'sql' + ): InitialSnapshotFilter | undefined { + for (const [pattern, filterDef] of this.initialSnapshotFilters) { + const tablePattern = this.parseTablePattern(connectionTag, schema, pattern); + if (tablePattern.matches({ connectionTag, schema, name: tableName })) { + return filterDef; + } + } + + return undefined; + } + + /** + * Helper to parse a table pattern string into a TablePattern object + */ + private parseTablePattern(connectionTag: string, defaultSchema: string, pattern: string): TablePattern { + const parts = pattern.split('.'); + if (parts.length === 1) { + return new TablePattern(`${connectionTag}.${defaultSchema}`, parts[0]); + } + if (parts.length === 2) { + return new TablePattern(`${connectionTag}.${parts[0]}`, parts[1]); + } + const tag = parts[0]; + const schema = parts[1]; + const tableName = parts.slice(2).join('.'); + return new TablePattern(`${tag}.${schema}`, tableName); + } } export interface SyncConfigWithErrors { config: SyncConfig; diff --git a/packages/sync-rules/src/from_yaml.ts b/packages/sync-rules/src/from_yaml.ts index c4986dec0..ed8a07ddb 100644 --- a/packages/sync-rules/src/from_yaml.ts +++ b/packages/sync-rules/src/from_yaml.ts @@ -12,7 +12,7 @@ import { validateSyncRulesSchema } from './json_schema.js'; import { SqlEventDescriptor } from './events/SqlEventDescriptor.js'; import { QueryParseResult, SqlBucketDescriptor } from './SqlBucketDescriptor.js'; import { QueryParseOptions, SourceSchema, StreamParseOptions } from './types.js'; -import { SyncConfig, SyncConfigWithErrors } from './SyncConfig.js'; +import { InitialSnapshotFilter, SyncConfig, SyncConfigWithErrors } from './SyncConfig.js'; import { ParsingErrorListener, SyncStreamsCompiler } from './compiler/compiler.js'; import { syncStreamFromSql } from './streams/from_sql.js'; import { PrecompiledSyncConfig } from './sync_plan/evaluator/index.js'; @@ -20,6 +20,42 @@ import { javaScriptExpressionEngine } from './sync_plan/engine/javascript.js'; const ACCEPT_POTENTIALLY_DANGEROUS_QUERIES = Symbol('ACCEPT_POTENTIALLY_DANGEROUS_QUERIES'); +/** + * Parse initial_snapshot_filter from YAML object with sql and/or mongo properties + */ +function parseInitialSnapshotFilter(value: unknown): InitialSnapshotFilter | undefined { + if (value instanceof YAMLMap) { + // Object format with db-specific filters + const result: { sql?: string; mongo?: any } = {}; + + const sqlValue = value.get('sql', true); + if (sqlValue) { + if (sqlValue instanceof Scalar) { + result.sql = sqlValue.value as string; + } else { + // If it's not a scalar, try to convert it + result.sql = String(sqlValue); + } + } + + const mongoValue = value.get('mongo', true) as any; + if (mongoValue instanceof Scalar) { + result.mongo = mongoValue.value; + } else if (mongoValue instanceof YAMLMap || mongoValue instanceof YAMLSeq) { + result.mongo = mongoValue.toJSON(); + } else if (mongoValue !== null && mongoValue !== undefined) { + result.mongo = mongoValue; + } + + // Only return if at least one property is set + if (result.sql !== undefined || result.mongo !== undefined) { + return result; + } + } + + return undefined; +} + /** * Reads `sync_rules.yaml` files containing a sync configuration. * @@ -88,6 +124,12 @@ export class SyncConfigFromYaml { result = this.#legacyParseBucketDefinitionsAndStreams(bucketMap, streamMap, compatibility); } + // Parse global initial_snapshot_filters + const filtersMap = parsed.get('initial_snapshot_filters') as YAMLMap | null; + if (filtersMap instanceof YAMLMap) { + this.#parseInitialSnapshotFilters(filtersMap, result); + } + const eventDefinitions = this.#parseEventDefinitions(parsed, compatibility); result.eventDescriptors.push(...eventDefinitions); @@ -111,6 +153,45 @@ export class SyncConfigFromYaml { } } + /** + * Parse the global initial_snapshot_filters configuration + */ + #parseInitialSnapshotFilters(filtersMap: YAMLMap, result: SyncConfig) { + for (const entry of filtersMap.items) { + const { key: tableKey, value: filterValue } = entry as { key: Scalar; value: unknown }; + const tableName = tableKey.value as string; + + if (!filterValue) { + this.#errors.push( + this.#tokenError(tableKey, `Initial snapshot filter for table '${tableName}' cannot be empty`) + ); + continue; + } + + if (!(filterValue instanceof YAMLMap)) { + this.#errors.push( + this.#tokenError( + filterValue as any, + `Initial snapshot filter for table '${tableName}' must be an object with 'sql' and/or 'mongo' properties` + ) + ); + continue; + } + + const filter = parseInitialSnapshotFilter(filterValue); + if (filter) { + result.initialSnapshotFilters.set(tableName, filter); + } else { + this.#errors.push( + this.#tokenError( + filterValue as any, + `Initial snapshot filter for table '${tableName}' must have at least 'sql' or 'mongo' property` + ) + ); + } + } + } + /** * Parses the `config` block of a sync configuration. * diff --git a/packages/sync-rules/src/json_schema.ts b/packages/sync-rules/src/json_schema.ts index 0050ebbe8..731e54b67 100644 --- a/packages/sync-rules/src/json_schema.ts +++ b/packages/sync-rules/src/json_schema.ts @@ -7,6 +7,38 @@ const ajv = new Ajv({ allErrors: true, verbose: true }); export const syncRulesSchema: ajvModule.Schema = { type: 'object', properties: { + initial_snapshot_filters: { + type: 'object', + description: 'Global filters applied during initial snapshot replication for specific tables', + examples: [ + { + users: { + sql: "status = 'active'", + mongo: { status: 'active' } + }, + orders: { + sql: 'created_at > DATE_SUB(NOW(), INTERVAL 90 DAY)' + } + } + ], + patternProperties: { + '.*': { + type: 'object', + description: 'Database-specific filter syntax for a table', + properties: { + sql: { + type: 'string', + description: 'SQL WHERE clause for MySQL, PostgreSQL, SQL Server' + }, + mongo: { + description: 'MongoDB query filter (BSON/EJSON format) - can be any valid MongoDB query object' + } + }, + additionalProperties: false, + minProperties: 1 + } + } + }, bucket_definitions: { type: 'object', description: 'List of bucket definitions', diff --git a/packages/sync-rules/test/src/snapshot_filter.test.ts b/packages/sync-rules/test/src/snapshot_filter.test.ts new file mode 100644 index 000000000..94945ca1e --- /dev/null +++ b/packages/sync-rules/test/src/snapshot_filter.test.ts @@ -0,0 +1,217 @@ +import { describe, expect, test } from 'vitest'; +import { SqlSyncRules } from '../../src/index.js'; +import { PARSE_OPTIONS } from './util.js'; + +describe('snapshot filter tests', () => { + test('parse initial_snapshot_filter from global config', () => { + const { config: rules } = SqlSyncRules.fromYaml( + ` +initial_snapshot_filters: + users: + sql: "status = 'active'" + +bucket_definitions: + active_users: + data: + - SELECT id, name FROM users + `, + PARSE_OPTIONS + ); + + expect(rules.initialSnapshotFilters.size).toBe(1); + expect(rules.initialSnapshotFilters.get('users')).toEqual({ + sql: "status = 'active'" + }); + }); + + test('parse multiple tables with global filters', () => { + const { config: rules } = SqlSyncRules.fromYaml( + ` +initial_snapshot_filters: + users: + sql: "status = 'active'" + orders: + sql: "created_at > DATE_SUB(NOW(), INTERVAL 90 DAY)" + +bucket_definitions: + active_users: + data: + - SELECT id, name FROM users WHERE status = 'active' + + recent_orders: + data: + - SELECT * FROM orders + `, + PARSE_OPTIONS + ); + + expect(rules.initialSnapshotFilters.size).toBe(2); + expect(rules.initialSnapshotFilters.get('users')).toEqual({ sql: "status = 'active'" }); + expect(rules.initialSnapshotFilters.get('orders')).toEqual({ + sql: 'created_at > DATE_SUB(NOW(), INTERVAL 90 DAY)' + }); + }); + + test('getInitialSnapshotFilter returns undefined when no filters', () => { + const { config: rules } = SqlSyncRules.fromYaml( + ` +bucket_definitions: + all_users: + data: + - SELECT id, name FROM users + `, + PARSE_OPTIONS + ); + + const filter = rules.getInitialSnapshotFilter('default', 'public', 'users'); + expect(filter).toBeUndefined(); + }); + + test('getInitialSnapshotFilter returns single filter', () => { + const { config: rules } = SqlSyncRules.fromYaml( + ` +initial_snapshot_filters: + users: + sql: "status = 'active'" + +bucket_definitions: + active_users: + data: + - SELECT id, name FROM users + `, + PARSE_OPTIONS + ); + + const filter = rules.getInitialSnapshotFilter('default', 'public', 'users', 'sql'); + expect(filter).toBe("status = 'active'"); + }); + + test('getInitialSnapshotFilter handles schema-qualified table names', () => { + const { config: rules } = SqlSyncRules.fromYaml( + ` +initial_snapshot_filters: + public.users: + sql: "status = 'active'" + +bucket_definitions: + active_users: + data: + - SELECT id, name FROM public.users + `, + PARSE_OPTIONS + ); + + const filter = rules.getInitialSnapshotFilter('default', 'public', 'users', 'sql'); + expect(filter).toBe("status = 'active'"); + }); + + test('initial_snapshot_filter with complex WHERE clause', () => { + const { config: rules } = SqlSyncRules.fromYaml( + ` +initial_snapshot_filters: + todos: + sql: "created_at > DATE_SUB(NOW(), INTERVAL 30 DAY) AND archived = false" + +bucket_definitions: + recent_todos: + data: + - SELECT * FROM todos + `, + PARSE_OPTIONS + ); + + const filter = rules.initialSnapshotFilters.get('todos'); + expect(filter).toEqual({ + sql: 'created_at > DATE_SUB(NOW(), INTERVAL 30 DAY) AND archived = false' + }); + }); + + test('parse succeeds without errors when using initial_snapshot_filter', () => { + const { config: rules, errors } = SqlSyncRules.fromYaml( + ` +initial_snapshot_filters: + users: + sql: "status = 'active'" + +bucket_definitions: + active_users: + data: + - SELECT id, name FROM users + `, + PARSE_OPTIONS + ); + + expect(errors).toHaveLength(0); + expect(rules.initialSnapshotFilters.size).toBe(1); + }); + + test('parse initial_snapshot_filter with object format', () => { + const { config: rules, errors } = SqlSyncRules.fromYaml( + ` +initial_snapshot_filters: + users: + sql: "status = 'active'" + mongo: {status: 'active'} + +bucket_definitions: + active_users: + data: + - SELECT id, name FROM users + `, + PARSE_OPTIONS + ); + + expect(errors).toHaveLength(0); + expect(rules.initialSnapshotFilters.size).toBe(1); + const filter = rules.initialSnapshotFilters.get('users'); + expect(filter).toEqual({ + sql: "status = 'active'", + mongo: { status: 'active' } + }); + }); + + test('getInitialSnapshotFilter with object format - SQL and Mongo', () => { + const { config: rules } = SqlSyncRules.fromYaml( + ` +initial_snapshot_filters: + users: + sql: "status = 'active'" + mongo: {status: 'active'} + +bucket_definitions: + active_users: + data: + - SELECT id, name FROM users + `, + PARSE_OPTIONS + ); + + const sqlFilter = rules.getInitialSnapshotFilter('default', 'public', 'users', 'sql'); + expect(sqlFilter).toBe("status = 'active'"); + + const mongoFilter = rules.getInitialSnapshotFilter('default', 'public', 'users', 'mongo'); + expect(mongoFilter).toEqual({ status: 'active' }); + }); + + test('getInitialSnapshotFilter with object format - only sql specified', () => { + const { config: rules } = SqlSyncRules.fromYaml( + ` +initial_snapshot_filters: + users: + sql: "archived = false" + +bucket_definitions: + active_users: + data: + - SELECT id, name FROM users + `, + PARSE_OPTIONS + ); + + const sqlFilter = rules.getInitialSnapshotFilter('default', 'public', 'users', 'sql'); + expect(sqlFilter).toBe('archived = false'); + + const mongoFilter = rules.getInitialSnapshotFilter('default', 'public', 'users', 'mongo'); + expect(mongoFilter).toBeUndefined(); + }); +});