From e875f007a031f2363652920da0fe017d1fa1dc6e Mon Sep 17 00:00:00 2001 From: raffidahmad Date: Wed, 11 Feb 2026 21:23:41 +0000 Subject: [PATCH 1/4] Adds snapshot filtering for binlog replication Enables filtering of rows during the initial snapshot phase of binlog replication, based on a configurable SQL WHERE clause. This allows for partial snapshots, replicating only a subset of data based on specified criteria, which is particularly useful for large tables or scenarios where only recent data is needed. The commit also includes tests to verify the functionality of snapshot filtering, including handling of CDC changes and multiple bucket filters. Only for source: Mysql and PostgreSQL storage --- .changeset/famous-cobras-give.md | 36 ++ docs/initial-snapshot-filters.md | 335 ++++++++++++++++++ .../src/replication/BinLogStream.ts | 14 +- .../test/src/BinLogStream.test.ts | 203 +++++++++++ .../src/storage/PostgresSyncRulesStorage.ts | 3 +- .../service-core/src/storage/SourceTable.ts | 19 +- packages/sync-rules/src/SyncConfig.ts | 58 +++ packages/sync-rules/src/from_yaml.ts | 51 ++- packages/sync-rules/src/json_schema.ts | 31 ++ .../test/src/snapshot_filter.test.ts | 224 ++++++++++++ 10 files changed, 969 insertions(+), 5 deletions(-) create mode 100644 .changeset/famous-cobras-give.md create mode 100644 docs/initial-snapshot-filters.md create mode 100644 packages/sync-rules/test/src/snapshot_filter.test.ts diff --git a/.changeset/famous-cobras-give.md b/.changeset/famous-cobras-give.md new file mode 100644 index 000000000..a88e51863 --- /dev/null +++ b/.changeset/famous-cobras-give.md @@ -0,0 +1,36 @@ +--- +"@powersync/service-sync-rules": minor +"@powersync/service-core": minor +"@powersync/service-module-postgres-storage": patch +"@powersync/service-module-mysql": minor +--- + +Add snapshot_filter support for initial table replication + +Users can now configure snapshot filters in sync_rules.yaml to apply WHERE clauses during initial table snapshots. This reduces storage and bandwidth for large tables where only a subset of rows match sync rules. + +Features: +- Configure global filters for initial replication +- CDC changes continue to work normally, only affecting rows in storage +- Supports MySQL source with PostgreSQL storage + +Example: +```yaml +# EXPLICIT: "I only want data from the last 90 days, period" +initial_snapshot_filters: + orders: + sql: "created_at > DATE_SUB(NOW(), INTERVAL 90 DAY)" + + logs: + sql: "timestamp > NOW() - INTERVAL '7 days'" + +bucket_definitions: + # This works - queries recent orders + recent_orders: + data: + - SELECT * FROM orders WHERE created_at > DATE_SUB(NOW(), INTERVAL 30 DAY) + + # This will be EMPTY initially - and that's OK for logs + all_logs: + data: + - SELECT * FROM logs \ No newline at end of file diff --git a/docs/initial-snapshot-filters.md b/docs/initial-snapshot-filters.md new file mode 100644 index 000000000..91e9eeab1 --- /dev/null +++ b/docs/initial-snapshot-filters.md @@ -0,0 +1,335 @@ +# Initial Snapshot Filters + +Initial snapshot filters allow you to limit which rows are replicated during the initial snapshot phase. This is useful for large tables where you only need to sync a subset of data. + +## Overview + +Without snapshot filters, PowerSync replicates **all rows** from source tables during initial replication, even if your sync rules only match a small subset. Initial snapshot filters solve this by allowing you to specify which rows to replicate upfront. + +**Important**: Filters are configured **globally** at the top level of your sync rules and apply to **all buckets** using that table. This means some buckets may end up empty if their queries don't align with the global filter. + +## Syntax + +Initial snapshot filters are defined at the **top level** of your `sync_rules.yaml` using an **object format** with database-specific filter syntax: + +```yaml +initial_snapshot_filters: + users: + sql: "status = 'active'" # MySQL, PostgreSQL, SQL Server + mongo: {status: 'active'} # MongoDB (BSON/EJSON format) + +bucket_definitions: + active_users: + data: + - SELECT id, name, status FROM users +``` + +You can specify just `sql` or just `mongo` depending on your database: + +```yaml +initial_snapshot_filters: + todos: + sql: "archived = false" + +bucket_definitions: + my_todos: + data: + - SELECT id, title FROM todos +``` + +## Basic Examples + +### SQL Databases (MySQL, PostgreSQL, SQL Server) + +```yaml +initial_snapshot_filters: + users: + sql: "status = 'active'" + +bucket_definitions: + active_users: + data: + - SELECT id, name, status FROM users +``` + +### MongoDB + +```yaml +initial_snapshot_filters: + users: + mongo: {status: 'active'} + +bucket_definitions: + active_users: + data: + - SELECT id, name, status FROM users +``` + +### Multi-Database Support + +Specify filters for both SQL and MongoDB sources: + +```yaml +initial_snapshot_filters: + users: + sql: "status = 'active'" + mongo: {status: 'active'} + +bucket_definitions: + active_users: + data: + - SELECT id, name, status FROM users +``` + +## Global Filter Behavior + +⚠️ **Critical**: Filters are **global** and apply to **all buckets** using that table. This can result in empty buckets if the bucket query doesn't match the filter. + +### Example: Misaligned Bucket Query + +```yaml +initial_snapshot_filters: + users: + sql: "status = 'active'" # Only active users are replicated + +bucket_definitions: + active_users: + data: + - SELECT * FROM users WHERE status = 'active' # ✅ Will have data + + admin_users: + data: + - SELECT * FROM users WHERE is_admin = true # ⚠️ Will be EMPTY if admin users aren't active +``` + +In this example: +- Only users with `status = 'active'` are replicated (due to the global filter) +- The `admin_users` bucket will be **empty** because no admin users are included in the initial snapshot +- This is a **deliberate trade-off** between snapshot performance and bucket completeness + +### When to Use Global Filters + +✅ **Good use case**: All your buckets can work with a single filter +```yaml +initial_snapshot_filters: + orders: + sql: "created_at > DATE_SUB(NOW(), INTERVAL 90 DAY)" # Only recent orders + +bucket_definitions: + my_orders: + data: + - SELECT * FROM orders WHERE user_id = token_parameters.user_id + pending_orders: + data: + - SELECT * FROM orders WHERE status = 'pending' +``` +Both buckets work with recent orders only - the filter just improves snapshot performance. + +❌ **Bad use case**: Different buckets need different data +```yaml +initial_snapshot_filters: + users: + sql: "status = 'active'" # Only active users + +bucket_definitions: + active_users: + data: + - SELECT * FROM users WHERE status = 'active' # ✅ Works + archived_users: + data: + - SELECT * FROM users WHERE status = 'archived' # ⚠️ Will be EMPTY! +``` +The `archived_users` bucket will never have data because archived users aren't included in the snapshot. + +## Complex Filters + +You can use any valid SQL WHERE clause syntax: + +```yaml +initial_snapshot_filters: + orders: + sql: "created_at > DATE_SUB(NOW(), INTERVAL 30 DAY) AND status != 'cancelled'" + +bucket_definitions: + my_orders: + data: + - SELECT * FROM orders WHERE user_id = token_parameters.user_id +``` + +## Wildcard Table Names + +You can use wildcards (%) in table names to match multiple tables: + +```yaml +initial_snapshot_filters: + logs_%: + sql: "created_at > DATE_SUB(NOW(), INTERVAL 7 DAY)" + +bucket_definitions: + recent_logs: + data: + - SELECT * FROM logs_2024 + - SELECT * FROM logs_2025 +``` + +This applies the filter to `logs_2024`, `logs_2025`, etc. + +## Schema-Qualified Names + +You can specify schema-qualified table names: + +```yaml +initial_snapshot_filters: + public.users: + sql: "status = 'active'" + analytics.events: + sql: "timestamp > NOW() - INTERVAL '1 day'" + +bucket_definitions: + active_users: + data: + - SELECT id, name FROM public.users + recent_events: + data: + - SELECT * FROM analytics.events +``` + +## How It Works + +1. **Initial Snapshot**: Only rows matching the filter are replicated during the initial snapshot phase +2. **CDC Replication**: After the snapshot, Change Data Capture (CDC) continues to replicate changes to **only the rows that were included in the snapshot** +3. **Storage**: Only filtered rows are stored in PostgreSQL/MongoDB storage +4. **Buckets**: All buckets using the filtered table will only see the filtered data + +## Database-Specific Syntax + +### MySQL +```yaml +initial_snapshot_filters: + orders: + sql: "DATE(created_at) > DATE_SUB(CURDATE(), INTERVAL 30 DAY)" +``` + +### PostgreSQL +```yaml +initial_snapshot_filters: + orders: + sql: "created_at > NOW() - INTERVAL '30 days'" +``` + +### SQL Server +```yaml +initial_snapshot_filters: + orders: + sql: "created_at > DATEADD(day, -30, GETDATE())" +``` + +### MongoDB +```yaml +initial_snapshot_filters: + orders: + mongo: + created_at: {$gt: {$date: "2024-01-01T00:00:00Z"}} + status: {$in: ['active', 'pending']} +``` + +## Use Cases + +### Large Historical Tables +```yaml +# Only sync recent orders instead of entire order history +initial_snapshot_filters: + orders: + sql: "created_at > DATE_SUB(NOW(), INTERVAL 90 DAY)" + +bucket_definitions: + my_orders: + data: + - SELECT * FROM orders WHERE user_id = token_parameters.user_id +``` + +### Active Records Only +```yaml +# Skip archived/deleted records +initial_snapshot_filters: + users: + sql: "archived = false AND deleted_at IS NULL" + +bucket_definitions: + all_users: + data: + - SELECT id, name FROM users +``` + +### Tenant/Organization Filtering +```yaml +# Multi-tenant app - only sync specific tenants +initial_snapshot_filters: + records: + sql: "tenant_id IN ('tenant-a', 'tenant-b')" + +bucket_definitions: + my_records: + data: + - SELECT * FROM records WHERE user_id = token_parameters.user_id +``` + +### Partitioned Tables +```yaml +# Apply same filter to all partitions +initial_snapshot_filters: + events_%: + sql: "created_at > DATE_SUB(NOW(), INTERVAL 30 DAY)" + +bucket_definitions: + recent_events: + data: + - SELECT * FROM events_2024 + - SELECT * FROM events_2025 +``` + +## Limitations + +- Filters are applied **globally** across all buckets using that table +- CDC changes only affect rows that were **initially snapshotted** +- Changing filters requires a **full re-snapshot** of affected tables +- Filter syntax must be valid for your **source database** +- Some buckets may be **empty** if their queries don't align with the global filter + +## Best Practices + +1. **Design filters carefully** - ensure all buckets can work with the same filter +2. **Test filters** on your source database before deploying +3. **Use indexes** on filtered columns for better snapshot performance +4. **Be conservative** - if unsure whether a row is needed, include it +5. **Document filters** in your sync_rules.yaml comments +6. **Monitor snapshot progress** to ensure reasonable replication times +7. **Verify bucket data** after applying filters to ensure no buckets are unexpectedly empty + +## Migration from Per-Bucket Filters + +If you're using the old per-bucket `source_tables` configuration, move filters to the top level: + +```yaml +# Old (no longer supported) +bucket_definitions: + active_users: + data: + - SELECT id, name FROM users + source_tables: + users: + initial_snapshot_filter: + sql: "status = 'active'" + +# New (global configuration) +initial_snapshot_filters: + users: + sql: "status = 'active'" + +bucket_definitions: + active_users: + data: + - SELECT id, name FROM users +``` + +**Important**: With the old configuration, if multiple buckets specified different filters for the same table, they were ORed together. With the new configuration, there is **one global filter per table** that applies to all buckets. diff --git a/modules/module-mysql/src/replication/BinLogStream.ts b/modules/module-mysql/src/replication/BinLogStream.ts index 13c40062d..9e11bf766 100644 --- a/modules/module-mysql/src/replication/BinLogStream.ts +++ b/modules/module-mysql/src/replication/BinLogStream.ts @@ -311,8 +311,18 @@ export class BinLogStream { // TODO count rows and log progress at certain batch sizes // MAX_EXECUTION_TIME(0) hint disables execution timeout for this query - const query = connection.query(`SELECT /*+ MAX_EXECUTION_TIME(0) */ * FROM ${qualifiedMySQLTable(table)}`); - const stream = query.stream(); + let query = `SELECT /*+ MAX_EXECUTION_TIME(0) */ * FROM ${qualifiedMySQLTable(table)}`; + + // Apply snapshot filter if it exists. This allows us to do partial snapshots, + // for example for large tables where we only want to snapshot recent data. + if (table.initialSnapshotFilter?.sql) { + query += ` WHERE ${table.initialSnapshotFilter.sql}`; + this.logger.info(`Applying initial snapshot filter: ${table.initialSnapshotFilter.sql}`); + } + } + + const queryStream = connection.query(query); + const stream = queryStream.stream(); let columns: Map | undefined = undefined; stream.on('fields', (fields: mysql.FieldPacket[]) => { diff --git a/modules/module-mysql/test/src/BinLogStream.test.ts b/modules/module-mysql/test/src/BinLogStream.test.ts index 5d35428b7..f02b212c8 100644 --- a/modules/module-mysql/test/src/BinLogStream.test.ts +++ b/modules/module-mysql/test/src/BinLogStream.test.ts @@ -401,4 +401,207 @@ function defineBinlogStreamTests(factory: storage.TestStorageFactory) { ]); } }); + + test('Snapshot filter - replicate only filtered rows', async () => { + await using context = await BinlogStreamTestContext.open(factory); + const { connectionManager } = context; + await context.updateSyncRules(` + initial_snapshot_filters: + users: + sql: "status = 'active'" + + bucket_definitions: + active_users: + data: + - SELECT id, name, status FROM "users"`); + + await connectionManager.query( + `CREATE TABLE users (id CHAR(36) PRIMARY KEY, name TEXT, status VARCHAR(20))` + ); + + // Insert rows before snapshot + const activeId = uuid(); + const inactiveId = uuid(); + await connectionManager.query( + `INSERT INTO users(id, name, status) VALUES('${activeId}', 'Active User', 'active')` + ); + await connectionManager.query( + `INSERT INTO users(id, name, status) VALUES('${inactiveId}', 'Inactive User', 'inactive')` + ); + + await context.replicateSnapshot(); + + const data = await context.getBucketData('active_users[]'); + + // Should only have the active user, not the inactive one + expect(data).toMatchObject([putOp('users', { id: activeId, name: 'Active User', status: 'active' })]); + expect(data.length).toBe(1); + }); + + test('Snapshot filter - only specified filter applied globally', async () => { + await using context = await BinlogStreamTestContext.open(factory); + const { connectionManager } = context; + await context.updateSyncRules(` + initial_snapshot_filters: + users: + sql: "status = 'active'" + + bucket_definitions: + active_users: + data: + - SELECT id, name, status FROM "users" WHERE status = 'active' + + admin_users: + data: + - SELECT id, name, is_admin FROM "users" WHERE is_admin = true`); + + await connectionManager.query( + `CREATE TABLE users (id CHAR(36) PRIMARY KEY, name TEXT, status VARCHAR(20), is_admin BOOLEAN)` + ); + + // Insert test data + const activeUserId = uuid(); + const adminUserId = uuid(); + const regularUserId = uuid(); + + await connectionManager.query( + `INSERT INTO users(id, name, status, is_admin) VALUES('${activeUserId}', 'Active User', 'active', false)` + ); + await connectionManager.query( + `INSERT INTO users(id, name, status, is_admin) VALUES('${adminUserId}', 'Admin User', 'inactive', true)` + ); + await connectionManager.query( + `INSERT INTO users(id, name, status, is_admin) VALUES('${regularUserId}', 'Regular User', 'inactive', false)` + ); + + await context.replicateSnapshot(); + + const activeData = await context.getBucketData('active_users[]'); + const adminData = await context.getBucketData('admin_users[]'); + + // Active bucket should have the active user + expect(activeData).toMatchObject([ + putOp('users', { id: activeUserId, name: 'Active User', status: 'active', is_admin: 0n }) + ]); + + // Admin bucket is empty because global filter only allows active users, not admin users + expect(adminData).toMatchObject([]); + + // Regular user should not be in either bucket (filtered out by snapshot filter) + }); + + test('Snapshot filter - CDC changes only affect filtered rows', async () => { + await using context = await BinlogStreamTestContext.open(factory); + const { connectionManager } = context; + await context.updateSyncRules(` + initial_snapshot_filters: + users: + sql: "status = 'active'" + + bucket_definitions: + active_users: + data: + - SELECT id, name, status FROM "users" WHERE status = 'active'`); + + await connectionManager.query( + `CREATE TABLE users (id CHAR(36) PRIMARY KEY, name TEXT, status VARCHAR(20))` + ); + + // Insert an active user before snapshot + const activeId = uuid(); + await connectionManager.query( + `INSERT INTO users(id, name, status) VALUES('${activeId}', 'Active User', 'active')` + ); + + await context.replicateSnapshot(); + await context.startStreaming(); + + // Insert an inactive user - should not appear in bucket + const inactiveId = uuid(); + await connectionManager.query( + `INSERT INTO users(id, name, status) VALUES('${inactiveId}', 'Inactive User', 'inactive')` + ); + + // Update the active user - should appear in bucket + await connectionManager.query(`UPDATE users SET name = 'Updated Active' WHERE id = '${activeId}'`); + + const data = await context.getBucketData('active_users[]'); + + // Should only have the active user with updated name + expect(data).toMatchObject([putOp('users', { id: activeId, name: 'Updated Active', status: 'active' })]); + expect(data.length).toBe(1); + }); + + test('Snapshot filter - complex WHERE clause', async () => { + await using context = await BinlogStreamTestContext.open(factory); + const { connectionManager } = context; + await context.updateSyncRules(` + initial_snapshot_filters: + users: + sql: "created_at > DATE_SUB(NOW(), INTERVAL 7 DAY) AND status = 'active'" + + bucket_definitions: + recent_active_users: + data: + - SELECT id, name, created_at FROM "users"`); + + await connectionManager.query( + `CREATE TABLE users (id CHAR(36) PRIMARY KEY, name TEXT, status VARCHAR(20), created_at DATETIME)` + ); + + // Insert recent active user + const recentActiveId = uuid(); + await connectionManager.query( + `INSERT INTO users(id, name, status, created_at) VALUES('${recentActiveId}', 'Recent Active', 'active', NOW())` + ); + + // Insert old active user + const oldActiveId = uuid(); + await connectionManager.query( + `INSERT INTO users(id, name, status, created_at) VALUES('${oldActiveId}', 'Old Active', 'active', DATE_SUB(NOW(), INTERVAL 30 DAY))` + ); + + // Insert recent inactive user + const recentInactiveId = uuid(); + await connectionManager.query( + `INSERT INTO users(id, name, status, created_at) VALUES('${recentInactiveId}', 'Recent Inactive', 'inactive', NOW())` + ); + + await context.replicateSnapshot(); + + const data = await context.getBucketData('recent_active_users[]'); + + // Should only have the recent active user + expect(data.length).toBe(1); + expect(data[0]).toMatchObject({ + op: 'PUT', + object_type: 'users', + object_id: recentActiveId + }); + }); + + test('Snapshot filter - no filter means all rows replicated', async () => { + await using context = await BinlogStreamTestContext.open(factory); + const { connectionManager } = context; + await context.updateSyncRules(` + bucket_definitions: + all_users: + data: + - SELECT id, name FROM "users"`); + + await connectionManager.query(`CREATE TABLE users (id CHAR(36) PRIMARY KEY, name TEXT)`); + + // Insert multiple users + const user1Id = uuid(); + const user2Id = uuid(); + await connectionManager.query(`INSERT INTO users(id, name) VALUES('${user1Id}', 'User 1')`); + await connectionManager.query(`INSERT INTO users(id, name) VALUES('${user2Id}', 'User 2')`); + + await context.replicateSnapshot(); + + const data = await context.getBucketData('all_users[]'); + + // Should have both users when no filter is specified + expect(data.length).toBe(2); + }); } diff --git a/modules/module-postgres-storage/src/storage/PostgresSyncRulesStorage.ts b/modules/module-postgres-storage/src/storage/PostgresSyncRulesStorage.ts index c7a3c2c29..98a3dc36b 100644 --- a/modules/module-postgres-storage/src/storage/PostgresSyncRulesStorage.ts +++ b/modules/module-postgres-storage/src/storage/PostgresSyncRulesStorage.ts @@ -249,7 +249,8 @@ export class PostgresSyncRulesStorage schema: schema, name: table, replicaIdColumns: replicaIdColumns, - snapshotComplete: sourceTableRow!.snapshot_done ?? true + snapshotComplete: sourceTableRow!.snapshot_done ?? true, + initialSnapshotFilter: options.sync_rules.definition.getInitialSnapshotFilter(connection_tag, schema, table, 'sql') }); if (!sourceTable.snapshotComplete) { sourceTable.snapshotStatus = { diff --git a/packages/service-core/src/storage/SourceTable.ts b/packages/service-core/src/storage/SourceTable.ts index 8e5951540..2b88cc5bd 100644 --- a/packages/service-core/src/storage/SourceTable.ts +++ b/packages/service-core/src/storage/SourceTable.ts @@ -2,6 +2,17 @@ import { DEFAULT_TAG } from '@powersync/service-sync-rules'; import * as util from '../util/util-index.js'; import { ColumnDescriptor, SourceEntityDescriptor } from './SourceEntity.js'; +/** + * Filter definition for initial snapshot replication. + * Object with database-specific filters: + * - sql: MySQL, PostgreSQL, SQL Server syntax + * - mongo: MongoDB query syntax (BSON/EJSON format) + */ +export type InitialSnapshotFilter = { + sql?: string; + mongo?: any; +}; + export interface SourceTableOptions { id: any; connectionTag: string; @@ -10,6 +21,7 @@ export interface SourceTableOptions { name: string; replicaIdColumns: ColumnDescriptor[]; snapshotComplete: boolean; + initialSnapshotFilter?: InitialSnapshotFilter; } export interface TableSnapshotStatus { @@ -84,6 +96,10 @@ export class SourceTable implements SourceEntityDescriptor { return this.options.replicaIdColumns; } + get initialSnapshotFilter() { + return this.options.initialSnapshotFilter; + } + /** * Sanitized name of the entity in the format of "{schema}.{entity name}" * Suitable for safe use in Postgres queries. @@ -107,7 +123,8 @@ export class SourceTable implements SourceEntityDescriptor { schema: this.schema, name: this.name, replicaIdColumns: this.replicaIdColumns, - snapshotComplete: this.snapshotComplete + snapshotComplete: this.snapshotComplete, + initialSnapshotFilter: this.initialSnapshotFilter }); copy.syncData = this.syncData; copy.syncParameters = this.syncParameters; diff --git a/packages/sync-rules/src/SyncConfig.ts b/packages/sync-rules/src/SyncConfig.ts index a8103131d..18a358cb2 100644 --- a/packages/sync-rules/src/SyncConfig.ts +++ b/packages/sync-rules/src/SyncConfig.ts @@ -10,6 +10,17 @@ import { TablePattern } from './TablePattern.js'; import { SqliteInputValue, SqliteRow, SqliteValue } from './types.js'; import { applyRowContext } from './utils.js'; +/** + * Filter definition for initial snapshot replication. + * Object with database-specific filters. + */ +export type InitialSnapshotFilter = { + sql?: string; + mongo?: any; +}; + +export type DatabaseType = 'sql' | 'mongo'; + /** * A class describing how the sync process has been configured (i.e. which buckets and parameters to create and how to * resolve buckets for connections). @@ -21,6 +32,13 @@ export abstract class SyncConfig { compatibility: CompatibilityContext = CompatibilityContext.FULL_BACKWARDS_COMPATIBILITY; eventDescriptors: SqlEventDescriptor[] = []; + /** + * Global initial snapshot filters for source tables. + * Map structure: tableName -> initialSnapshotFilter + * Filters are applied globally during initial snapshot, regardless of bucket definitions. + */ + initialSnapshotFilters: Map = new Map(); + /** * The (YAML-based) source contents from which these sync rules have been derived. */ @@ -122,6 +140,46 @@ export abstract class SyncConfig { debugRepresentation() { return this.bucketSources.map((rules) => rules.debugRepresentation()); } + + /** + * Get the initial snapshot filter for a given table. + * Filters are applied globally and support wildcard matching. + * + * @param connectionTag Connection tag (e.g., 'default') + * @param schema Schema name + * @param tableName Table name (without wildcard %) + * @param dbType Database type ('sql' for MySQL/Postgres/MSSQL, 'mongo' for MongoDB) + * @returns WHERE clause/query, or undefined if no filter is specified + */ + getInitialSnapshotFilter(connectionTag: string, schema: string, tableName: string, dbType: DatabaseType = 'sql'): string | any | undefined { + const fullTableName = `${schema}.${tableName}`; + + // Check for exact matches first + for (const [pattern, filterDef] of this.initialSnapshotFilters) { + const tablePattern = this.parseTablePattern(connectionTag, schema, pattern); + if (tablePattern.matches({ connectionTag, schema, name: tableName })) { + // Return the appropriate filter based on database type + return filterDef[dbType]; + } + } + + return undefined; + } + + /** + * Helper to parse a table pattern string into a TablePattern object + */ + private parseTablePattern(connectionTag: string, defaultSchema: string, pattern: string): TablePattern { + // Split on '.' to extract schema and table parts + const parts = pattern.split('.'); + if (parts.length === 1) { + // Just table name, use default schema + return new TablePattern(defaultSchema, parts[0]); + } else { + // schema.table format + return new TablePattern(parts[0], parts[1]); + } + } } export interface SyncConfigWithErrors { config: SyncConfig; diff --git a/packages/sync-rules/src/from_yaml.ts b/packages/sync-rules/src/from_yaml.ts index c4986dec0..1d9c6662f 100644 --- a/packages/sync-rules/src/from_yaml.ts +++ b/packages/sync-rules/src/from_yaml.ts @@ -12,7 +12,7 @@ import { validateSyncRulesSchema } from './json_schema.js'; import { SqlEventDescriptor } from './events/SqlEventDescriptor.js'; import { QueryParseResult, SqlBucketDescriptor } from './SqlBucketDescriptor.js'; import { QueryParseOptions, SourceSchema, StreamParseOptions } from './types.js'; -import { SyncConfig, SyncConfigWithErrors } from './SyncConfig.js'; +import { InitialSnapshotFilter, SyncConfig, SyncConfigWithErrors } from './SyncConfig.js'; import { ParsingErrorListener, SyncStreamsCompiler } from './compiler/compiler.js'; import { syncStreamFromSql } from './streams/from_sql.js'; import { PrecompiledSyncConfig } from './sync_plan/evaluator/index.js'; @@ -20,6 +20,34 @@ import { javaScriptExpressionEngine } from './sync_plan/engine/javascript.js'; const ACCEPT_POTENTIALLY_DANGEROUS_QUERIES = Symbol('ACCEPT_POTENTIALLY_DANGEROUS_QUERIES'); +/** + * Parse initial_snapshot_filter from YAML object with sql and/or mongo properties + */ +function parseInitialSnapshotFilter(value: unknown): InitialSnapshotFilter | undefined { + if (value instanceof YAMLMap) { + // Object format with db-specific filters + const result: { sql?: string; mongo?: any } = {}; + + const sqlValue = value.get('sql', true); + if (sqlValue instanceof Scalar) { + result.sql = sqlValue.toString(); + } + + const mongoValue = value.get('mongo', true); + if (mongoValue) { + // Parse mongo value - can be any YAML structure + result.mongo = mongoValue.toJSON(); + } + + // Only return if at least one property is set + if (result.sql !== undefined || result.mongo !== undefined) { + return result; + } + } + + return undefined; +} + /** * Reads `sync_rules.yaml` files containing a sync configuration. * @@ -88,6 +116,12 @@ export class SyncConfigFromYaml { result = this.#legacyParseBucketDefinitionsAndStreams(bucketMap, streamMap, compatibility); } + // Parse global initial_snapshot_filters + const filtersMap = parsed.get('initial_snapshot_filters') as YAMLMap | null; + if (filtersMap instanceof YAMLMap) { + this.#parseInitialSnapshotFilters(filtersMap, result); + } + const eventDefinitions = this.#parseEventDefinitions(parsed, compatibility); result.eventDescriptors.push(...eventDefinitions); @@ -111,6 +145,21 @@ export class SyncConfigFromYaml { } } + /** + * Parse the global initial_snapshot_filters configuration + */ + #parseInitialSnapshotFilters(filtersMap: YAMLMap, result: SyncConfig) { + for (const entry of filtersMap.items) { + const { key: tableKey, value: filterValue } = entry as { key: Scalar; value: unknown }; + const tableName = tableKey.toString(); + + const filter = parseInitialSnapshotFilter(filterValue); + if (filter) { + result.initialSnapshotFilters.set(tableName, filter); + } + } + } + /** * Parses the `config` block of a sync configuration. * diff --git a/packages/sync-rules/src/json_schema.ts b/packages/sync-rules/src/json_schema.ts index 0050ebbe8..dc8b5f5c4 100644 --- a/packages/sync-rules/src/json_schema.ts +++ b/packages/sync-rules/src/json_schema.ts @@ -7,6 +7,37 @@ const ajv = new Ajv({ allErrors: true, verbose: true }); export const syncRulesSchema: ajvModule.Schema = { type: 'object', properties: { + initial_snapshot_filters: { + type: 'object', + description: 'Global filters applied during initial snapshot replication for specific tables', + examples: [ + { + users: { + sql: "status = 'active'", + mongo: { status: 'active' } + }, + orders: { + sql: 'created_at > DATE_SUB(NOW(), INTERVAL 90 DAY)' + } + } + ], + patternProperties: { + '.*': { + type: 'object', + description: 'Database-specific filter syntax for a table', + properties: { + sql: { + type: 'string', + description: 'SQL WHERE clause for MySQL, PostgreSQL, SQL Server' + }, + mongo: { + description: 'MongoDB query filter (BSON/EJSON format)' + } + }, + additionalProperties: false + } + } + }, bucket_definitions: { type: 'object', description: 'List of bucket definitions', diff --git a/packages/sync-rules/test/src/snapshot_filter.test.ts b/packages/sync-rules/test/src/snapshot_filter.test.ts new file mode 100644 index 000000000..f28818964 --- /dev/null +++ b/packages/sync-rules/test/src/snapshot_filter.test.ts @@ -0,0 +1,224 @@ +import { describe, expect, test } from 'vitest'; +import { SqlSyncRules } from '../../src/index.js'; +import { PARSE_OPTIONS } from './util.js'; + +describe('snapshot filter tests', () => { + test('parse initial_snapshot_filter from global config', () => { + const { config: rules } = SqlSyncRules.fromYaml( + ` +initial_snapshot_filters: + users: + sql: "status = 'active'" + +bucket_definitions: + active_users: + data: + - SELECT id, name FROM users + `, + PARSE_OPTIONS + ); + + expect(rules.initialSnapshotFilters.size).toBe(1); + expect(rules.initialSnapshotFilters.get('users')).toEqual({ + sql: "status = 'active'" + }); + }); + + test('parse multiple tables with global filters', () => { + const { config: rules } = SqlSyncRules.fromYaml( + ` +initial_snapshot_filters: + users: + sql: "status = 'active'" + orders: + sql: "created_at > DATE_SUB(NOW(), INTERVAL 90 DAY)" + +bucket_definitions: + active_users: + data: + - SELECT id, name FROM users WHERE status = 'active' + + recent_orders: + data: + - SELECT * FROM orders + `, + PARSE_OPTIONS + ); + + expect(rules.initialSnapshotFilters.size).toBe(2); + expect(rules.initialSnapshotFilters.get('users')).toEqual({ sql: "status = 'active'" }); + expect(rules.initialSnapshotFilters.get('orders')).toEqual({ sql: 'created_at > DATE_SUB(NOW(), INTERVAL 90 DAY)' }); + }); + + test('getInitialSnapshotFilter returns undefined when no filters', () => { + const { config: rules } = SqlSyncRules.fromYaml( + ` +bucket_definitions: + all_users: + data: + - SELECT id, name FROM users + `, + PARSE_OPTIONS + ); + + const filter = rules.getInitialSnapshotFilter('default', 'public', 'users'); + expect(filter).toBeUndefined(); + }); + + test('getInitialSnapshotFilter returns single filter', () => { + const { config: rules } = SqlSyncRules.fromYaml( + ` +initial_snapshot_filters: + users: + sql: "status = 'active'" + +bucket_definitions: + active_users: + data: + - SELECT id, name FROM users + `, + PARSE_OPTIONS + ); + + const filter = rules.getInitialSnapshotFilter('default', 'public', 'users', 'sql'); + expect(filter).toBe("status = 'active'"); + }); + + test('getInitialSnapshotFilter handles schema-qualified table names', () => { + const { config: rules } = SqlSyncRules.fromYaml( + ` +initial_snapshot_filters: + public.users: + sql: "status = 'active'" + +bucket_definitions: + active_users: + data: + - SELECT id, name FROM public.users + `, + PARSE_OPTIONS + ); + + const filter = rules.getInitialSnapshotFilter('default', 'public', 'users', 'sql'); + expect(filter).toBe("status = 'active'"); + }); + + test('initial_snapshot_filter with complex WHERE clause', () => { + const { config: rules } = SqlSyncRules.fromYaml( + ` +initial_snapshot_filters: + todos: + sql: "created_at > DATE_SUB(NOW(), INTERVAL 30 DAY) AND archived = false" + +bucket_definitions: + recent_todos: + data: + - SELECT * FROM todos + `, + PARSE_OPTIONS + ); + + const filter = rules.initialSnapshotFilters.get('todos'); + expect(filter).toEqual({ + sql: 'created_at > DATE_SUB(NOW(), INTERVAL 30 DAY) AND archived = false' + }); + }); + + test('parse succeeds without errors when using initial_snapshot_filter', () => { + const { config: rules, errors } = SqlSyncRules.fromYaml( + ` +initial_snapshot_filters: + users: + sql: "status = 'active'" + +bucket_definitions: + active_users: + data: + - SELECT id, name FROM users + `, + PARSE_OPTIONS + ); + + expect(errors).toHaveLength(0); + expect(rules.initialSnapshotFilters.size).toBe(1); + }); + + test('parse initial_snapshot_filter with object format', () => { + const { config: rules, errors } = SqlSyncRules.fromYaml( + ` +initial_snapshot_filters: + users: + sql: "status = 'active'" + mongo: {status: 'active'} + +bucket_definitions: + active_users: + data: + - SELECT id, name FROM users + `, + PARSE_OPTIONS + ); + + expect(errors).toHaveLength(0); + expect(rules.initialSnapshotFilters.size).toBe(1); + const filter = rules.initialSnapshotFilters.get('users'); + expect(filter).toEqual({ + sql: "status = 'active'", + mongo: { status: 'active' } + }); + }); + + test('getInitialSnapshotFilter with object format - SQL', () => { + const { config: rules } = SqlSyncRules.fromYaml( + ` +bucket_definitions: + active_users: + data: + - SELECT id, name FROM users + source_tables: + users: + test('getInitialSnapshotFilter with object format - SQL and Mongo', () => { + const { config: rules } = SqlSyncRules.fromYaml( + ` +initial_snapshot_filters: + users: + sql: "status = 'active'" + mongo: {status: 'active'} + +bucket_definitions: + active_users: + data: + - SELECT id, name FROM users + `, + PARSE_OPTIONS + ); + + const sqlFilter = rules.getInitialSnapshotFilter('default', 'public', 'users', 'sql'); + expect(sqlFilter).toBe("status = 'active'"); + + const mongoFilter = rules.getInitialSnapshotFilter('default', 'public', 'users', 'mongo'); + expect(mongoFilter).toEqual({ status: 'active' }); + }); + + test('getInitialSnapshotFilter with object format - only sql specified', () => { + const { config: rules } = SqlSyncRules.fromYaml( + ` +initial_snapshot_filters: + users: + sql: "archived = false" + +bucket_definitions: + active_users: + data: + - SELECT id, name FROM users + `, + PARSE_OPTIONS + ); + + const sqlFilter = rules.getInitialSnapshotFilter('default', 'public', 'users', 'sql'); + expect(sqlFilter).toBe('archived = false'); + + const mongoFilter = rules.getInitialSnapshotFilter('default', 'public', 'users', 'mongo'); + expect(mongoFilter).toBeUndefined(); + }); +}); From be0c7cd954f0fb71cea87d2c8dd3bffe14cc556d Mon Sep 17 00:00:00 2001 From: raffidahmad Date: Thu, 12 Feb 2026 15:08:39 +0000 Subject: [PATCH 2/4] hotfix: Typo fixes Addresses an issue where the closing curly brace was misplaced, potentially preventing the filter from being applied. Removes an obsolete test case. --- modules/module-mysql/src/replication/BinLogStream.ts | 1 - packages/sync-rules/test/src/snapshot_filter.test.ts | 9 --------- 2 files changed, 10 deletions(-) diff --git a/modules/module-mysql/src/replication/BinLogStream.ts b/modules/module-mysql/src/replication/BinLogStream.ts index 9e11bf766..dcfe02e42 100644 --- a/modules/module-mysql/src/replication/BinLogStream.ts +++ b/modules/module-mysql/src/replication/BinLogStream.ts @@ -318,7 +318,6 @@ export class BinLogStream { if (table.initialSnapshotFilter?.sql) { query += ` WHERE ${table.initialSnapshotFilter.sql}`; this.logger.info(`Applying initial snapshot filter: ${table.initialSnapshotFilter.sql}`); - } } const queryStream = connection.query(query); diff --git a/packages/sync-rules/test/src/snapshot_filter.test.ts b/packages/sync-rules/test/src/snapshot_filter.test.ts index f28818964..e698b3005 100644 --- a/packages/sync-rules/test/src/snapshot_filter.test.ts +++ b/packages/sync-rules/test/src/snapshot_filter.test.ts @@ -168,15 +168,6 @@ bucket_definitions: }); }); - test('getInitialSnapshotFilter with object format - SQL', () => { - const { config: rules } = SqlSyncRules.fromYaml( - ` -bucket_definitions: - active_users: - data: - - SELECT id, name FROM users - source_tables: - users: test('getInitialSnapshotFilter with object format - SQL and Mongo', () => { const { config: rules } = SqlSyncRules.fromYaml( ` From e5c9853821af7304040c318f6569ea872556486f Mon Sep 17 00:00:00 2001 From: raffidahmad Date: Thu, 12 Feb 2026 15:28:02 +0000 Subject: [PATCH 3/4] feat: implement global initial_snapshot_filters for all database modules --- docs/initial-snapshot-filters.md | 73 +++++++++++++++++++ .../implementation/MongoSyncBucketStorage.ts | 3 +- .../src/replication/ChangeStream.ts | 6 +- .../src/replication/MongoSnapshotQuery.ts | 26 ++++++- .../module-mssql/src/replication/CDCStream.ts | 5 ++ .../src/replication/MSSQLSnapshotQuery.ts | 27 +++++-- .../src/replication/BinLogStream.ts | 10 ++- .../src/replication/SnapshotQuery.ts | 32 ++++++-- .../src/replication/WalStream.ts | 5 ++ packages/sync-rules/src/SyncConfig.ts | 39 +++++----- packages/sync-rules/src/from_yaml.ts | 54 +++++++++++--- packages/sync-rules/src/json_schema.ts | 5 +- .../test/src/snapshot_filter.test.ts | 4 +- 13 files changed, 237 insertions(+), 52 deletions(-) diff --git a/docs/initial-snapshot-filters.md b/docs/initial-snapshot-filters.md index 91e9eeab1..4b8e40964 100644 --- a/docs/initial-snapshot-filters.md +++ b/docs/initial-snapshot-filters.md @@ -288,6 +288,79 @@ bucket_definitions: - SELECT * FROM events_2025 ``` +## Special Characters and Identifiers + +⚠️ **Important**: Filter expressions are embedded directly into SQL/MongoDB queries. You must properly quote identifiers and escape string literals according to your database's syntax rules. + +### SQL Identifier Quoting + +**PostgreSQL** - Use double quotes for identifiers with spaces or special characters: +```yaml +initial_snapshot_filters: + users: + sql: "\"User Status\" = 'active' AND \"created-at\" > NOW() - INTERVAL '30 days'" +``` + +**MySQL** - Use backticks for identifiers with spaces or special characters: +```yaml +initial_snapshot_filters: + users: + sql: "`User Status` = 'active' AND `created-at` > NOW() - INTERVAL 30 DAY" +``` + +**SQL Server** - Use square brackets for identifiers with spaces or special characters: +```yaml +initial_snapshot_filters: + users: + sql: "[User Status] = 'active' AND [created-at] > DATEADD(day, -30, GETDATE())" +``` + +### String Literal Escaping + +Always use proper escaping for string literals containing quotes: + +```yaml +initial_snapshot_filters: + comments: + # Single quotes must be escaped as '' in SQL + sql: "content NOT LIKE '%can''t%' AND status = 'approved'" +``` + +### Complex Expressions with OR Operators + +PowerSync automatically wraps your filter in parentheses to prevent operator precedence issues: + +```yaml +initial_snapshot_filters: + users: + # This is wrapped as: WHERE (status = 'active' OR status = 'pending') + sql: "status = 'active' OR status = 'pending'" +``` + +### MongoDB Filters + +MongoDB filters use native BSON query syntax, which is safer than string concatenation: + +```yaml +initial_snapshot_filters: + users: + mongo: + $or: + - status: 'active' + - status: 'pending' + "special field": { $exists: true } +``` + +### Security Considerations + +- Filters are defined in `sync_rules.yaml` by administrators, not by end users +- Filters are static configuration, not dynamic user input +- Still follow security best practices: + - Avoid including sensitive data in filters + - Test filters in development before production + - Review filter changes carefully during deployment +- PowerSync does not parameterize filters since they are arbitrary SQL expressions, similar to bucket query definitions + ## Limitations - Filters are applied **globally** across all buckets using that table diff --git a/modules/module-mongodb-storage/src/storage/implementation/MongoSyncBucketStorage.ts b/modules/module-mongodb-storage/src/storage/implementation/MongoSyncBucketStorage.ts index ed33379b4..a01d4059a 100644 --- a/modules/module-mongodb-storage/src/storage/implementation/MongoSyncBucketStorage.ts +++ b/modules/module-mongodb-storage/src/storage/implementation/MongoSyncBucketStorage.ts @@ -239,7 +239,8 @@ export class MongoSyncBucketStorage schema: schema, name: name, replicaIdColumns: replicaIdColumns, - snapshotComplete: doc.snapshot_done ?? true + snapshotComplete: doc.snapshot_done ?? true, + initialSnapshotFilter: options.sync_rules.definition.getInitialSnapshotFilter(connection_tag, schema, name, 'mongo') }); sourceTable.syncEvent = options.sync_rules.tableTriggersEvent(sourceTable); sourceTable.syncData = options.sync_rules.tableSyncsData(sourceTable); diff --git a/modules/module-mongodb/src/replication/ChangeStream.ts b/modules/module-mongodb/src/replication/ChangeStream.ts index 0347edf3c..a9aba0846 100644 --- a/modules/module-mongodb/src/replication/ChangeStream.ts +++ b/modules/module-mongodb/src/replication/ChangeStream.ts @@ -483,7 +483,8 @@ export class ChangeStream { await using query = new ChunkedSnapshotQuery({ collection, key: table.snapshotStatus?.lastKey, - batchSize: this.snapshotChunkLength + batchSize: this.snapshotChunkLength, + filter: table.initialSnapshotFilter?.mongo }); if (query.lastKey != null) { this.logger.info( @@ -492,6 +493,9 @@ export class ChangeStream { } else { this.logger.info(`Replicating ${table.qualifiedName} ${table.formatSnapshotProgress()}`); } + if (table.initialSnapshotFilter?.mongo) { + this.logger.info(`Applying initial snapshot filter: ${JSON.stringify(table.initialSnapshotFilter.mongo)}`); + } let lastBatch = performance.now(); let nextChunkPromise = query.nextChunk(); diff --git a/modules/module-mongodb/src/replication/MongoSnapshotQuery.ts b/modules/module-mongodb/src/replication/MongoSnapshotQuery.ts index 64b64f9b5..e83c15cd1 100644 --- a/modules/module-mongodb/src/replication/MongoSnapshotQuery.ts +++ b/modules/module-mongodb/src/replication/MongoSnapshotQuery.ts @@ -13,12 +13,19 @@ export class ChunkedSnapshotQuery implements AsyncDisposable { private lastCursor: mongo.FindCursor | null = null; private collection: mongo.Collection; private batchSize: number; + private snapshotFilter: any; - public constructor(options: { collection: mongo.Collection; batchSize: number; key?: Uint8Array | null }) { + public constructor(options: { + collection: mongo.Collection; + batchSize: number; + key?: Uint8Array | null; + filter?: any; + }) { this.lastKey = options.key ? bson.deserialize(options.key, { useBigInt64: true })._id : null; this.lastCursor = null; this.collection = options.collection; this.batchSize = options.batchSize; + this.snapshotFilter = options.filter; } async nextChunk(): Promise<{ docs: mongo.Document[]; lastKey: Uint8Array } | { docs: []; lastKey: null }> { @@ -35,8 +42,23 @@ export class ChunkedSnapshotQuery implements AsyncDisposable { // any parsing as an operator. // Starting in MongoDB 5.0, this filter can use the _id index. Source: // https://www.mongodb.com/docs/manual/release-notes/5.0/#general-aggregation-improvements - const filter: mongo.Filter = + + // Build base filter for _id + const idFilter: mongo.Filter = this.lastKey == null ? {} : { $expr: { $gt: ['$_id', { $literal: this.lastKey }] } }; + + // Combine with snapshot filter if present + let filter: mongo.Filter; + if (this.snapshotFilter) { + if (this.lastKey == null) { + filter = this.snapshotFilter; + } else { + filter = { $and: [idFilter, this.snapshotFilter] }; + } + } else { + filter = idFilter; + } + cursor = this.collection.find(filter, { readConcern: 'majority', limit: this.batchSize, diff --git a/modules/module-mssql/src/replication/CDCStream.ts b/modules/module-mssql/src/replication/CDCStream.ts index d2b109f62..b2d571df1 100644 --- a/modules/module-mssql/src/replication/CDCStream.ts +++ b/modules/module-mssql/src/replication/CDCStream.ts @@ -356,6 +356,11 @@ export class CDCStream { query = new SimpleSnapshotQuery(transaction, table); replicatedCount = 0; } + + if (table.sourceTable.initialSnapshotFilter?.sql) { + this.logger.info(`Applying initial snapshot filter: ${table.sourceTable.initialSnapshotFilter.sql}`); + } + await query.initialize(); let hasRemainingData = true; diff --git a/modules/module-mssql/src/replication/MSSQLSnapshotQuery.ts b/modules/module-mssql/src/replication/MSSQLSnapshotQuery.ts index 25ce2f4ca..57ebd3af6 100644 --- a/modules/module-mssql/src/replication/MSSQLSnapshotQuery.ts +++ b/modules/module-mssql/src/replication/MSSQLSnapshotQuery.ts @@ -44,7 +44,11 @@ export class SimpleSnapshotQuery implements MSSQLSnapshotQuery { const request = this.transaction.request(); const stream = request.toReadableStream(); - request.query(`SELECT * FROM ${this.table.toQualifiedName()}`); + let query = `SELECT * FROM ${this.table.toQualifiedName()}`; + if (this.table.sourceTable.initialSnapshotFilter?.sql) { + query += ` WHERE (${this.table.sourceTable.initialSnapshotFilter.sql})`; + } + request.query(query); // MSSQL only streams one row at a time for await (const row of stream) { @@ -141,17 +145,26 @@ export class BatchedSnapshotQuery implements MSSQLSnapshotQuery { const request = this.transaction.request(); const stream = request.toReadableStream(); + const snapshotFilter = this.table.sourceTable.initialSnapshotFilter?.sql; + if (this.lastKey == null) { - request.query(`SELECT TOP(${this.batchSize}) * FROM ${this.table.toQualifiedName()} ORDER BY ${escapedKeyName}`); + let query = `SELECT TOP(${this.batchSize}) * FROM ${this.table.toQualifiedName()}`; + if (snapshotFilter) { + query += ` WHERE (${snapshotFilter})`; + } + query += ` ORDER BY ${escapedKeyName}`; + request.query(query); } else { if (this.key.typeId == null) { throw new Error(`typeId required for primary key ${this.key.name}`); } - request - .input('lastKey', this.lastKey) - .query( - `SELECT TOP(${this.batchSize}) * FROM ${this.table.toQualifiedName()} WHERE ${escapedKeyName} > @lastKey ORDER BY ${escapedKeyName}` - ); + request.input('lastKey', this.lastKey); + let query = `SELECT TOP(${this.batchSize}) * FROM ${this.table.toQualifiedName()} WHERE ${escapedKeyName} > @lastKey`; + if (snapshotFilter) { + query += ` AND (${snapshotFilter})`; + } + query += ` ORDER BY ${escapedKeyName}`; + request.query(query); } // MSSQL only streams one row at a time diff --git a/modules/module-mysql/src/replication/BinLogStream.ts b/modules/module-mysql/src/replication/BinLogStream.ts index dcfe02e42..374ebfa9a 100644 --- a/modules/module-mysql/src/replication/BinLogStream.ts +++ b/modules/module-mysql/src/replication/BinLogStream.ts @@ -307,17 +307,21 @@ export class BinLogStream { batch: storage.BucketStorageBatch, table: storage.SourceTable ) { - this.logger.info(`Replicating ${qualifiedMySQLTable(table)}`); + const qualifiedMySQLTableName = qualifiedMySQLTable(table); + + this.logger.info(`Replicating ${qualifiedMySQLTableName}`); // TODO count rows and log progress at certain batch sizes // MAX_EXECUTION_TIME(0) hint disables execution timeout for this query - let query = `SELECT /*+ MAX_EXECUTION_TIME(0) */ * FROM ${qualifiedMySQLTable(table)}`; + let query = `SELECT /*+ MAX_EXECUTION_TIME(0) */ * FROM ${qualifiedMySQLTableName}`; // Apply snapshot filter if it exists. This allows us to do partial snapshots, // for example for large tables where we only want to snapshot recent data. if (table.initialSnapshotFilter?.sql) { - query += ` WHERE ${table.initialSnapshotFilter.sql}`; + query += ` WHERE (${table.initialSnapshotFilter.sql})`; this.logger.info(`Applying initial snapshot filter: ${table.initialSnapshotFilter.sql}`); + } else { + this.logger.info(`No initial snapshot filter applied for ${qualifiedMySQLTableName}`); } const queryStream = connection.query(query); diff --git a/modules/module-postgres/src/replication/SnapshotQuery.ts b/modules/module-postgres/src/replication/SnapshotQuery.ts index 039185d60..04005c8e8 100644 --- a/modules/module-postgres/src/replication/SnapshotQuery.ts +++ b/modules/module-postgres/src/replication/SnapshotQuery.ts @@ -36,7 +36,11 @@ export class SimpleSnapshotQuery implements SnapshotQuery { ) {} public async initialize(): Promise { - await this.connection.query(`DECLARE snapshot_cursor CURSOR FOR SELECT * FROM ${this.table.qualifiedName}`); + let query = `SELECT * FROM ${this.table.qualifiedName}`; + if (this.table.initialSnapshotFilter?.sql) { + query += ` WHERE (${this.table.initialSnapshotFilter.sql})`; + } + await this.connection.query(`DECLARE snapshot_cursor CURSOR FOR ${query}`); } public nextChunk(): AsyncIterableIterator { @@ -119,17 +123,27 @@ export class ChunkedSnapshotQuery implements SnapshotQuery { public async *nextChunk(): AsyncIterableIterator { let stream: AsyncIterableIterator; const escapedKeyName = escapeIdentifier(this.key.name); + const snapshotFilter = this.table.initialSnapshotFilter?.sql; + if (this.lastKey == null) { - stream = this.connection.stream( - `SELECT * FROM ${this.table.qualifiedName} ORDER BY ${escapedKeyName} LIMIT ${this.chunkSize}` - ); + let query = `SELECT * FROM ${this.table.qualifiedName}`; + if (snapshotFilter) { + query += ` WHERE (${snapshotFilter})`; + } + query += ` ORDER BY ${escapedKeyName} LIMIT ${this.chunkSize}`; + stream = this.connection.stream(query); } else { if (this.key.typeId == null) { throw new Error(`typeId required for primary key ${this.key.name}`); } const type = Number(this.key.typeId); + let query = `SELECT * FROM ${this.table.qualifiedName} WHERE ${escapedKeyName} > $1`; + if (snapshotFilter) { + query += ` AND (${snapshotFilter})`; + } + query += ` ORDER BY ${escapedKeyName} LIMIT ${this.chunkSize}`; stream = this.connection.stream({ - statement: `SELECT * FROM ${this.table.qualifiedName} WHERE ${escapedKeyName} > $1 ORDER BY ${escapedKeyName} LIMIT ${this.chunkSize}`, + statement: query, params: [{ value: this.lastKey, type }] }); } @@ -200,8 +214,14 @@ export class IdSnapshotQuery implements SnapshotQuery { if (type == null) { throw new Error(`Cannot determine primary key array type for ${JSON.stringify(keyDefinition)}`); } + + let query = `SELECT * FROM ${this.table.qualifiedName} WHERE ${escapeIdentifier(keyDefinition.name)} = ANY($1)`; + if (this.table.initialSnapshotFilter?.sql) { + query += ` AND (${this.table.initialSnapshotFilter.sql})`; + } + yield* this.connection.stream({ - statement: `SELECT * FROM ${this.table.qualifiedName} WHERE ${escapeIdentifier(keyDefinition.name)} = ANY($1)`, + statement: query, params: [ { type: type, diff --git a/modules/module-postgres/src/replication/WalStream.ts b/modules/module-postgres/src/replication/WalStream.ts index 1dd2e23be..176ec5cd3 100644 --- a/modules/module-postgres/src/replication/WalStream.ts +++ b/modules/module-postgres/src/replication/WalStream.ts @@ -558,6 +558,11 @@ WHERE oid = $1::regclass`, q = new SimpleSnapshotQuery(db, table, this.snapshotChunkLength); at = 0; } + + if (table.initialSnapshotFilter?.sql) { + this.logger.info(`Applying initial snapshot filter: ${table.initialSnapshotFilter.sql}`); + } + await q.initialize(); let hasRemainingData = true; diff --git a/packages/sync-rules/src/SyncConfig.ts b/packages/sync-rules/src/SyncConfig.ts index 18a358cb2..a10b33a46 100644 --- a/packages/sync-rules/src/SyncConfig.ts +++ b/packages/sync-rules/src/SyncConfig.ts @@ -144,25 +144,26 @@ export abstract class SyncConfig { /** * Get the initial snapshot filter for a given table. * Filters are applied globally and support wildcard matching. - * - * @param connectionTag Connection tag (e.g., 'default') + * + * @param connectionTag Connection tag for the active source connection * @param schema Schema name - * @param tableName Table name (without wildcard %) - * @param dbType Database type ('sql' for MySQL/Postgres/MSSQL, 'mongo' for MongoDB) - * @returns WHERE clause/query, or undefined if no filter is specified + * @param tableName Concrete table name (wildcards are allowed in the filter patterns, not here) + * @param dbType Database type ('sql' for MySQL/Postgres/MSSQL, 'mongo' for MongoDB) - currently unused, returns full filter object + * @returns Filter object with sql/mongo properties, or undefined if no filter is specified */ - getInitialSnapshotFilter(connectionTag: string, schema: string, tableName: string, dbType: DatabaseType = 'sql'): string | any | undefined { - const fullTableName = `${schema}.${tableName}`; - - // Check for exact matches first + getInitialSnapshotFilter( + connectionTag: string, + schema: string, + tableName: string, + dbType: DatabaseType = 'sql' + ): InitialSnapshotFilter | undefined { for (const [pattern, filterDef] of this.initialSnapshotFilters) { const tablePattern = this.parseTablePattern(connectionTag, schema, pattern); if (tablePattern.matches({ connectionTag, schema, name: tableName })) { - // Return the appropriate filter based on database type - return filterDef[dbType]; + return filterDef; } } - + return undefined; } @@ -170,15 +171,17 @@ export abstract class SyncConfig { * Helper to parse a table pattern string into a TablePattern object */ private parseTablePattern(connectionTag: string, defaultSchema: string, pattern: string): TablePattern { - // Split on '.' to extract schema and table parts const parts = pattern.split('.'); if (parts.length === 1) { - // Just table name, use default schema - return new TablePattern(defaultSchema, parts[0]); - } else { - // schema.table format - return new TablePattern(parts[0], parts[1]); + return new TablePattern(`${connectionTag}.${defaultSchema}`, parts[0]); + } + if (parts.length === 2) { + return new TablePattern(`${connectionTag}.${parts[0]}`, parts[1]); } + const tag = parts[0]; + const schema = parts[1]; + const tableName = parts.slice(2).join('.'); + return new TablePattern(`${tag}.${schema}`, tableName); } } export interface SyncConfigWithErrors { diff --git a/packages/sync-rules/src/from_yaml.ts b/packages/sync-rules/src/from_yaml.ts index 1d9c6662f..ed8a07ddb 100644 --- a/packages/sync-rules/src/from_yaml.ts +++ b/packages/sync-rules/src/from_yaml.ts @@ -27,24 +27,32 @@ function parseInitialSnapshotFilter(value: unknown): InitialSnapshotFilter | und if (value instanceof YAMLMap) { // Object format with db-specific filters const result: { sql?: string; mongo?: any } = {}; - + const sqlValue = value.get('sql', true); - if (sqlValue instanceof Scalar) { - result.sql = sqlValue.toString(); + if (sqlValue) { + if (sqlValue instanceof Scalar) { + result.sql = sqlValue.value as string; + } else { + // If it's not a scalar, try to convert it + result.sql = String(sqlValue); + } } - - const mongoValue = value.get('mongo', true); - if (mongoValue) { - // Parse mongo value - can be any YAML structure + + const mongoValue = value.get('mongo', true) as any; + if (mongoValue instanceof Scalar) { + result.mongo = mongoValue.value; + } else if (mongoValue instanceof YAMLMap || mongoValue instanceof YAMLSeq) { result.mongo = mongoValue.toJSON(); + } else if (mongoValue !== null && mongoValue !== undefined) { + result.mongo = mongoValue; } - + // Only return if at least one property is set if (result.sql !== undefined || result.mongo !== undefined) { return result; } } - + return undefined; } @@ -151,11 +159,35 @@ export class SyncConfigFromYaml { #parseInitialSnapshotFilters(filtersMap: YAMLMap, result: SyncConfig) { for (const entry of filtersMap.items) { const { key: tableKey, value: filterValue } = entry as { key: Scalar; value: unknown }; - const tableName = tableKey.toString(); - + const tableName = tableKey.value as string; + + if (!filterValue) { + this.#errors.push( + this.#tokenError(tableKey, `Initial snapshot filter for table '${tableName}' cannot be empty`) + ); + continue; + } + + if (!(filterValue instanceof YAMLMap)) { + this.#errors.push( + this.#tokenError( + filterValue as any, + `Initial snapshot filter for table '${tableName}' must be an object with 'sql' and/or 'mongo' properties` + ) + ); + continue; + } + const filter = parseInitialSnapshotFilter(filterValue); if (filter) { result.initialSnapshotFilters.set(tableName, filter); + } else { + this.#errors.push( + this.#tokenError( + filterValue as any, + `Initial snapshot filter for table '${tableName}' must have at least 'sql' or 'mongo' property` + ) + ); } } } diff --git a/packages/sync-rules/src/json_schema.ts b/packages/sync-rules/src/json_schema.ts index dc8b5f5c4..731e54b67 100644 --- a/packages/sync-rules/src/json_schema.ts +++ b/packages/sync-rules/src/json_schema.ts @@ -31,10 +31,11 @@ export const syncRulesSchema: ajvModule.Schema = { description: 'SQL WHERE clause for MySQL, PostgreSQL, SQL Server' }, mongo: { - description: 'MongoDB query filter (BSON/EJSON format)' + description: 'MongoDB query filter (BSON/EJSON format) - can be any valid MongoDB query object' } }, - additionalProperties: false + additionalProperties: false, + minProperties: 1 } } }, diff --git a/packages/sync-rules/test/src/snapshot_filter.test.ts b/packages/sync-rules/test/src/snapshot_filter.test.ts index e698b3005..94945ca1e 100644 --- a/packages/sync-rules/test/src/snapshot_filter.test.ts +++ b/packages/sync-rules/test/src/snapshot_filter.test.ts @@ -47,7 +47,9 @@ bucket_definitions: expect(rules.initialSnapshotFilters.size).toBe(2); expect(rules.initialSnapshotFilters.get('users')).toEqual({ sql: "status = 'active'" }); - expect(rules.initialSnapshotFilters.get('orders')).toEqual({ sql: 'created_at > DATE_SUB(NOW(), INTERVAL 90 DAY)' }); + expect(rules.initialSnapshotFilters.get('orders')).toEqual({ + sql: 'created_at > DATE_SUB(NOW(), INTERVAL 90 DAY)' + }); }); test('getInitialSnapshotFilter returns undefined when no filters', () => { From f19190ebd651893a998dad153931a3626aab85cf Mon Sep 17 00:00:00 2001 From: raffidahmad Date: Fri, 13 Feb 2026 10:21:06 +0000 Subject: [PATCH 4/4] Clarifies filter update process in docs. Explains that widening filters after the initial snapshot requires a resnapshot or backfill to include previously excluded rows. Also updates the summary of initial snapshot filters to reflect the same. --- docs/initial-snapshot-filters.md | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/docs/initial-snapshot-filters.md b/docs/initial-snapshot-filters.md index 4b8e40964..46c878a8c 100644 --- a/docs/initial-snapshot-filters.md +++ b/docs/initial-snapshot-filters.md @@ -201,6 +201,10 @@ bucket_definitions: 3. **Storage**: Only filtered rows are stored in PostgreSQL/MongoDB storage 4. **Buckets**: All buckets using the filtered table will only see the filtered data +### Changing Filters Later + +If you widen a filter after the initial snapshot, CDC will **not** backfill older rows that were previously excluded because those rows did not change. To include them, run a **targeted resnapshot** of the affected tables or perform a **one-time backfill** job, then let CDC keep the data up to date. + ## Database-Specific Syntax ### MySQL @@ -365,7 +369,7 @@ initial_snapshot_filters: - Filters are applied **globally** across all buckets using that table - CDC changes only affect rows that were **initially snapshotted** -- Changing filters requires a **full re-snapshot** of affected tables +- Widening filters requires a **resnapshot or backfill** to pick up previously excluded rows - Filter syntax must be valid for your **source database** - Some buckets may be **empty** if their queries don't align with the global filter