diff --git a/modules/module-mongodb-storage/src/storage/implementation/MergedSyncRules.ts b/modules/module-mongodb-storage/src/storage/implementation/MergedSyncRules.ts index 6e5d6b60d..45a2232e1 100644 --- a/modules/module-mongodb-storage/src/storage/implementation/MergedSyncRules.ts +++ b/modules/module-mongodb-storage/src/storage/implementation/MergedSyncRules.ts @@ -105,19 +105,19 @@ export class MergedSyncRules implements RowProcessor { * @param table The source database table definition, _not_ the individually derived SourceTables. * @returns */ - getMatchingSources(table: SourceTableInterface): { + getMatchingSources(pattern: TablePattern): { bucketDataSources: BucketDataSource[]; parameterIndexLookupCreators: ParameterIndexLookupCreator[]; } { const bucketDataSources = [...this.resolvedDataSources.values()] .map((dataSource) => dataSource.source) - .filter((ds) => ds.tableSyncsData(table)); + .filter((ds) => ds.getSourceTables().some((table) => table.equals(pattern))); const parameterIndexLookupCreators: ParameterIndexLookupCreator[] = [ ...this.resolvedParameterLookupSources.values() ] .map((dataSource) => dataSource.source) - .filter((ds) => ds.tableSyncsParameters(table)); + .filter((ds) => ds.getSourceTables().some((table) => table.equals(pattern))); return { bucketDataSources, parameterIndexLookupCreators diff --git a/modules/module-mongodb-storage/src/storage/implementation/MongoBucketBatch.ts b/modules/module-mongodb-storage/src/storage/implementation/MongoBucketBatch.ts index 35f7739aa..7df0ece1a 100644 --- a/modules/module-mongodb-storage/src/storage/implementation/MongoBucketBatch.ts +++ b/modules/module-mongodb-storage/src/storage/implementation/MongoBucketBatch.ts @@ -284,7 +284,8 @@ export class MongoBucketDataWriter implements storage.BucketDataWriter { replicaIdColumns: ref.replicaIdColumns, snapshotComplete: doc.snapshot_done ?? true, bucketDataSourceIds: doc.bucket_data_source_ids ?? [], - parameterLookupSourceIds: doc.parameter_lookup_source_ids ?? [] + parameterLookupSourceIds: doc.parameter_lookup_source_ids ?? [], + pattern: ref.pattern }); sourceTable.snapshotStatus = doc.snapshot_status == null @@ -302,11 +303,7 @@ export class MongoBucketDataWriter implements storage.BucketDataWriter { } async resolveTables(options: storage.ResolveTablesOptions): Promise { - const sources = this.rowProcessor.getMatchingSources({ - connectionTag: options.connection_tag, - name: options.entity_descriptor.name, - schema: options.entity_descriptor.schema - }); + const sources = this.rowProcessor.getMatchingSources(options.pattern); const bucketDataSourceIds = sources.bucketDataSources.map((source) => this.mapping.bucketSourceId(source)); const parameterLookupSourceIds = sources.parameterIndexLookupCreators.map((source) => this.mapping.parameterLookupId(source) @@ -390,7 +387,8 @@ export class MongoBucketDataWriter implements storage.BucketDataWriter { replicaIdColumns: replicaIdColumns, snapshotComplete: doc.snapshot_done ?? true, bucketDataSourceIds: doc.bucket_data_source_ids ?? [], - parameterLookupSourceIds: doc.parameter_lookup_source_ids ?? [] + parameterLookupSourceIds: doc.parameter_lookup_source_ids ?? [], + pattern: options.pattern }); sourceTable.snapshotStatus = doc.snapshot_status == null diff --git a/modules/module-mongodb-storage/src/storage/implementation/models.ts b/modules/module-mongodb-storage/src/storage/implementation/models.ts index 85f36ba8a..d63c2a0c4 100644 --- a/modules/module-mongodb-storage/src/storage/implementation/models.ts +++ b/modules/module-mongodb-storage/src/storage/implementation/models.ts @@ -83,6 +83,7 @@ export interface SourceTableDocument { replica_id_columns2: { name: string; type_oid?: number; type?: string }[] | undefined; snapshot_done: boolean | undefined; snapshot_status: SourceTableDocumentSnapshotStatus | undefined; + filter?: string; } export interface SourceTableDocumentSnapshotStatus { diff --git a/modules/module-mongodb/src/replication/ChangeStream.ts b/modules/module-mongodb/src/replication/ChangeStream.ts index 5c3fd73d2..ca7469fbe 100644 --- a/modules/module-mongodb/src/replication/ChangeStream.ts +++ b/modules/module-mongodb/src/replication/ChangeStream.ts @@ -9,6 +9,7 @@ import { ServiceError } from '@powersync/lib-services-framework'; import { + BucketDataWriter, BucketStorageFactory, MetricsEngine, SaveOperationTag, @@ -33,6 +34,8 @@ import { } from './MongoRelation.js'; import { MongoSnapshotter } from './MongoSnapshotter.js'; import { CHECKPOINTS_COLLECTION, timestampToDate } from './replication-utils.js'; +import { staticFilterToMongoExpression } from './staticFilters.js'; +import { inspect } from 'node:util'; export interface ChangeStreamOptions { connections: MongoManager; @@ -208,13 +211,18 @@ export class ChangeStream { return this.abortSignal.aborted; } - private getSourceNamespaceFilters(): { $match: any; multipleDatabases: boolean } { - const sourceTables = this.substreams.flatMap((s) => s.syncRules.getSourceTables()); + private getSourceNamespaceFilters(writer: BucketDataWriter): { + $match: any; + multipleDatabases: boolean; + filters: any[]; + } { + const sourceTables = writer.rowProcessor.getSourceTables(); let $inFilters: { db: string; coll: string }[] = [ { db: this.defaultDb.databaseName, coll: CHECKPOINTS_COLLECTION } ]; let $refilters: { 'ns.db': string; 'ns.coll': RegExp }[] = []; + let filters: any[] = []; let multipleDatabases = false; for (let tablePattern of sourceTables) { if (tablePattern.connectionTag != this.connections.connectionTag) { @@ -225,19 +233,36 @@ export class ChangeStream { multipleDatabases = true; } + let filterExpression = + tablePattern.filter == null + ? { $literal: true } + : staticFilterToMongoExpression(tablePattern.filter, { columnPrefix: '$fullDocument.' }); + if (tablePattern.isWildcard) { $refilters.push({ 'ns.db': tablePattern.schema, 'ns.coll': new RegExp('^' + escapeRegExp(tablePattern.tablePrefix)) }); + filters.push({ + 'ns.db': tablePattern.schema, + 'ns.coll': new RegExp('^' + escapeRegExp(tablePattern.tablePrefix)), + $expr: filterExpression + }); } else { $inFilters.push({ db: tablePattern.schema, coll: tablePattern.name }); + filters.push({ + 'ns.db': tablePattern.schema, + 'ns.coll': tablePattern.name, + $expr: filterExpression + }); } } + // FIXME: deduplicate filters + // When we have a large number of collections, the performance of the pipeline // depends a lot on how the filters here are specified. // Currently, only the multipleDatabases == false case is optimized, and the @@ -254,9 +279,9 @@ export class ChangeStream { : // collection-level: filter on coll only { 'ns.coll': { $in: $inFilters.map((ns) => ns.coll) } }; if ($refilters.length > 0) { - return { $match: { $or: [nsFilter, ...$refilters] }, multipleDatabases }; + return { $match: { $or: [nsFilter, ...$refilters] }, multipleDatabases, filters }; } - return { $match: nsFilter, multipleDatabases }; + return { $match: nsFilter, multipleDatabases, filters }; } private async checkPostImages(db: string, collectionInfo: mongo.CollectionInfo) { @@ -398,17 +423,23 @@ export class ChangeStream { } } - private openChangeStream(options: { lsn: string | null; maxAwaitTimeMs?: number }) { + private openChangeStream(writer: BucketDataWriter, options: { lsn: string | null; maxAwaitTimeMs?: number }) { const lastLsn = options.lsn ? MongoLSN.fromSerialized(options.lsn) : null; const startAfter = lastLsn?.timestamp; const resumeAfter = lastLsn?.resumeToken; - const filters = this.getSourceNamespaceFilters(); + const filters = this.getSourceNamespaceFilters(writer); const pipeline: mongo.Document[] = [ { $match: filters.$match }, + // Not working currently - getting "resumeToken not found" + // { + // $match: { + // $or: filters.filters + // } + // }, { $changeStreamSplitLargeEvent: {} } ]; @@ -474,37 +505,56 @@ export class ChangeStream { // Ignore the postImages check in this case. } - const result = await writer.resolveTables({ - connection_id: this.connection_id, - connection_tag: this.connections.connectionTag, - entity_descriptor: descriptor + // What happens here: + // 1. We see a new collection that we haven't observed before. + // 2. We check which table pattern(s) match this collection, _regardless of specific row filters_. + // 3. We resolve the tables for those patterns. + + // FIXME: don't scan through it all + // FIXME: handle wildcards + const patterns = writer.rowProcessor.getSourceTables().filter((t) => { + return ( + t.connectionTag == this.connections.connectionTag && t.name == descriptor.name && t.schema == descriptor.schema + ); }); - const snapshot = options.snapshot; - this.relationCache.set(getCacheIdentifier(descriptor), result.tables); + let allTables: SourceTable[] = []; + for (let pattern of patterns) { + const result = await writer.resolveTables({ + connection_id: this.connection_id, + connection_tag: this.connections.connectionTag, + entity_descriptor: descriptor, + pattern: pattern + }); - // Drop conflicting collections. - // This is generally not expected for MongoDB source dbs, so we log an error. - if (result.dropTables.length > 0) { - this.logger.error( - `Conflicting collections found for ${JSON.stringify(descriptor)}. Dropping: ${result.dropTables.map((t) => t.id).join(', ')}` - ); - await writer.drop(result.dropTables); - } + const snapshot = options.snapshot; + this.relationCache.set(getCacheIdentifier(descriptor), result.tables); + + // Drop conflicting collections. + // This is generally not expected for MongoDB source dbs, so we log an error. + if (result.dropTables.length > 0) { + this.logger.error( + `Conflicting collections found for ${JSON.stringify(descriptor)}. Dropping: ${result.dropTables.map((t) => t.id).join(', ')}` + ); + await writer.drop(result.dropTables); + } - // Snapshot if: - // 1. Snapshot is requested (false for initial snapshot, since that process handles it elsewhere) - // 2. Snapshot is not already done, AND: - // 3. The table is used in sync rules. - for (let table of result.tables) { - const shouldSnapshot = snapshot && !table.snapshotComplete && table.syncAny; - if (shouldSnapshot) { - this.logger.info(`New collection: ${descriptor.schema}.${descriptor.name}`); - await this.snapshotter.queueSnapshot(writer, table); + // Snapshot if: + // 1. Snapshot is requested (false for initial snapshot, since that process handles it elsewhere) + // 2. Snapshot is not already done, AND: + // 3. The table is used in sync rules. + for (let table of result.tables) { + const shouldSnapshot = snapshot && !table.snapshotComplete && table.syncAny; + if (shouldSnapshot) { + this.logger.info(`New collection: ${descriptor.schema}.${descriptor.name}`); + await this.snapshotter.queueSnapshot(writer, table); + } } + + allTables.push(...result.tables); } - return result.tables; + return allTables; } private async drop(writer: storage.BucketDataWriter, entity: SourceEntityDescriptor): Promise { @@ -577,7 +627,7 @@ export class ChangeStream { this.logger.info(`Resume streaming at ${startAfter?.inspect()} / ${lastLsn} | Token age: ${tokenAgeSeconds}s`); - await using streamManager = this.openChangeStream({ lsn: resumeFromLsn }); + await using streamManager = this.openChangeStream(writer, { lsn: resumeFromLsn }); const { stream, filters } = streamManager; if (this.abortSignal.aborted) { await stream.close(); diff --git a/modules/module-mongodb/src/replication/MongoSnapshotQuery.ts b/modules/module-mongodb/src/replication/MongoSnapshotQuery.ts index 64b64f9b5..f62d4a3cf 100644 --- a/modules/module-mongodb/src/replication/MongoSnapshotQuery.ts +++ b/modules/module-mongodb/src/replication/MongoSnapshotQuery.ts @@ -1,6 +1,7 @@ import { mongo } from '@powersync/lib-service-mongodb'; import { ReplicationAssertionError } from '@powersync/lib-services-framework'; import { bson } from '@powersync/service-core'; +import { MongoExpression } from '@powersync/service-sync-rules'; /** * Performs a collection snapshot query, chunking by ranges of _id. @@ -13,12 +14,19 @@ export class ChunkedSnapshotQuery implements AsyncDisposable { private lastCursor: mongo.FindCursor | null = null; private collection: mongo.Collection; private batchSize: number; + private filter: MongoExpression | null = null; - public constructor(options: { collection: mongo.Collection; batchSize: number; key?: Uint8Array | null }) { + public constructor(options: { + collection: mongo.Collection; + batchSize: number; + key?: Uint8Array | null; + filter?: MongoExpression; + }) { this.lastKey = options.key ? bson.deserialize(options.key, { useBigInt64: true })._id : null; this.lastCursor = null; this.collection = options.collection; this.batchSize = options.batchSize; + this.filter = options.filter ?? null; } async nextChunk(): Promise<{ docs: mongo.Document[]; lastKey: Uint8Array } | { docs: []; lastKey: null }> { @@ -35,8 +43,17 @@ export class ChunkedSnapshotQuery implements AsyncDisposable { // any parsing as an operator. // Starting in MongoDB 5.0, this filter can use the _id index. Source: // https://www.mongodb.com/docs/manual/release-notes/5.0/#general-aggregation-improvements - const filter: mongo.Filter = - this.lastKey == null ? {} : { $expr: { $gt: ['$_id', { $literal: this.lastKey }] } }; + let filter: mongo.Filter; + if (this.lastKey == null) { + filter = this.filter == null ? {} : { $expr: this.filter }; + } else { + if (this.filter == null) { + filter = { $expr: { $gt: ['$_id', { $literal: this.lastKey }] } }; + } else { + filter = { $and: [{ $expr: { $gt: ['$_id', { $literal: this.lastKey }] } }, { $expr: this.filter }] }; + } + } + cursor = this.collection.find(filter, { readConcern: 'majority', limit: this.batchSize, diff --git a/modules/module-mongodb/src/replication/MongoSnapshotter.ts b/modules/module-mongodb/src/replication/MongoSnapshotter.ts index c458fb99c..cd2914cd1 100644 --- a/modules/module-mongodb/src/replication/MongoSnapshotter.ts +++ b/modules/module-mongodb/src/replication/MongoSnapshotter.ts @@ -20,6 +20,8 @@ import { MongoManager } from './MongoManager.js'; import { constructAfterRecord, createCheckpoint, getMongoRelation, STANDALONE_CHECKPOINT_ID } from './MongoRelation.js'; import { ChunkedSnapshotQuery } from './MongoSnapshotQuery.js'; import { CHECKPOINTS_COLLECTION } from './replication-utils.js'; +import { staticFilterToMongoExpression } from './staticFilters.js'; +import { JSONBig } from '@powersync/service-jsonbig'; export interface MongoSnapshotterOptions { connections: MongoManager; @@ -266,7 +268,8 @@ export class MongoSnapshotter { const sourceTables = await writer.resolveTables({ connection_id: this.connection_id, connection_tag: this.connections.connectionTag, - entity_descriptor: getMongoRelation({ db: schema, coll: collection.name }) + entity_descriptor: getMongoRelation({ db: schema, coll: collection.name }), + pattern: tablePattern }); // TODO: dropTables? result.push(...sourceTables.tables); @@ -280,17 +283,22 @@ export class MongoSnapshotter { let at = table.snapshotStatus?.replicatedCount ?? 0; const db = this.client.db(table.schema); const collection = db.collection(table.name); + + const mongoFilter = table.pattern?.filter ? staticFilterToMongoExpression(table.pattern.filter) : null; + const filterLog = mongoFilter ? ` | filter: ${JSONBig.stringify(mongoFilter)}` : ''; + await using query = new ChunkedSnapshotQuery({ collection, key: table.snapshotStatus?.lastKey, - batchSize: this.snapshotChunkLength + batchSize: this.snapshotChunkLength, + filter: mongoFilter }); if (query.lastKey != null) { this.logger.info( - `Replicating ${table.qualifiedName} ${table.formatSnapshotProgress()} - resuming at _id > ${query.lastKey}` + `Replicating ${table.qualifiedName} ${table.formatSnapshotProgress()} - resuming at _id > ${query.lastKey}${filterLog}` ); } else { - this.logger.info(`Replicating ${table.qualifiedName} ${table.formatSnapshotProgress()}`); + this.logger.info(`Replicating ${table.qualifiedName} ${table.formatSnapshotProgress()}${filterLog}`); } let lastBatch = performance.now(); @@ -352,21 +360,6 @@ export class MongoSnapshotter { return rowProcessor.applyRowContext(inputRow); } - private async getCollectionInfo(db: string, name: string): Promise { - const collection = ( - await this.client - .db(db) - .listCollections( - { - name: name - }, - { nameOnly: false } - ) - .toArray() - )[0]; - return collection; - } - private async checkPostImages(db: string, collectionInfo: mongo.CollectionInfo) { if (!this.usePostImages) { // Nothing to check diff --git a/modules/module-mongodb/src/replication/staticFilters.ts b/modules/module-mongodb/src/replication/staticFilters.ts new file mode 100644 index 000000000..ecbd37bde --- /dev/null +++ b/modules/module-mongodb/src/replication/staticFilters.ts @@ -0,0 +1,138 @@ +import { isAny, SqliteValue, StaticFilter } from '@powersync/service-sync-rules'; + +export type MongoExpression = + | Record + | unknown[] + | string + | number + | boolean + | null + | bigint + | Uint8Array; + +export interface StaticFilterToMongoOptions { + mapValue?: (value: SqliteValue) => unknown; + parseJsonArrayForIn?: boolean; + columnPrefix?: string; +} + +export function staticFilterToMongoExpression( + filter: StaticFilter, + options: StaticFilterToMongoOptions = {} +): MongoExpression { + const mapValue = options.mapValue ?? ((value: SqliteValue) => value); + const parseJsonArrayForIn = options.parseJsonArrayForIn ?? true; + // Marker for "cannot prefilter" so we avoid over-restricting the query. + const ANY = Symbol('static-filter-any'); + + const literalValue = (value: SqliteValue): MongoExpression => ({ $literal: mapValue(value) }); + + const toExpr = (node: StaticFilter): MongoExpression | typeof ANY => { + if (isAny(node)) { + return ANY; + } + + if ('and' in node) { + const parts = node.and.map(toExpr).filter((part) => part !== ANY) as MongoExpression[]; + if (parts.length === 0) { + return ANY; + } + if (parts.length === 1) { + return parts[0]; + } + return { $and: parts }; + } + + if ('or' in node) { + const parts = node.or.map(toExpr); + if (parts.some((part) => part === ANY)) { + return ANY; + } + const expressions = parts.filter((part) => part !== ANY) as MongoExpression[]; + if (expressions.length === 0) { + return { $const: false }; + } + if (expressions.length === 1) { + return expressions[0]; + } + return { $or: expressions }; + } + + if ('operator' in node) { + const left = toExpr(node.left); + const right = toExpr(node.right); + if (left === ANY || right === ANY) { + return ANY; + } + + const leftExpr = left as MongoExpression; + let rightExpr = right as MongoExpression; + const operator = node.operator.toUpperCase(); + + switch (operator) { + case '=': + case '==': + case 'IS': + // Big hack to support boolean values coerced to integers. + if ('value' in node.right && node.right.value == 0n) { + return { $in: [leftExpr, [0n, false]] }; + } else if ('value' in node.right && node.right.value == 1n) { + return { $in: [leftExpr, [1n, true]] }; + } + return { $eq: [leftExpr, rightExpr] }; + case '!=': + case '<>': + case 'IS NOT': + // Big hack to support boolean values coerced to integers. + if ('value' in node.right && node.right.value == 0n) { + return { $nin: [leftExpr, [0n, false]] }; + } else if ('value' in node.right && node.right.value == 1n) { + return { $nin: [leftExpr, [1n, true]] }; + } + return { $ne: [leftExpr, rightExpr] }; + case '<': + return { $lt: [leftExpr, rightExpr] }; + case '<=': + return { $lte: [leftExpr, rightExpr] }; + case '>': + return { $gt: [leftExpr, rightExpr] }; + case '>=': + return { $gte: [leftExpr, rightExpr] }; + case 'IN': { + if (parseJsonArrayForIn && 'value' in node.right) { + const value = node.right.value; + if (typeof value !== 'string') { + throw new Error('IN operator expects JSON array literal'); + } + let parsed: unknown; + try { + parsed = JSON.parse(value); + } catch { + throw new Error('IN operator expects JSON array literal'); + } + if (!Array.isArray(parsed)) { + throw new Error('IN operator expects JSON array literal'); + } + rightExpr = { $literal: parsed }; + } + return { $in: [leftExpr, rightExpr] }; + } + default: + return ANY; + } + } + + if ('column' in node) { + return `${options.columnPrefix ?? '$'}${node.column}`; + } + + if ('value' in node) { + return literalValue(node.value); + } + + throw new Error('Unsupported static filter'); + }; + + const result = toExpr(filter); + return result === ANY ? { $literal: true } : result; +} diff --git a/packages/service-core-tests/src/test-utils/general-utils.ts b/packages/service-core-tests/src/test-utils/general-utils.ts index fdecc59c3..a10385f32 100644 --- a/packages/service-core-tests/src/test-utils/general-utils.ts +++ b/packages/service-core-tests/src/test-utils/general-utils.ts @@ -143,7 +143,6 @@ export function bucketRequest( throw new Error('Failed to find global bucket'); } const bucketName = hydrationState.getBucketSourceScope(source).bucketPrefix + parameters; - console.log('query for bucket', bucketName); return { bucket: bucketName, start: BigInt(start ?? 0n), diff --git a/packages/service-core/src/storage/SourceTable.ts b/packages/service-core/src/storage/SourceTable.ts index 2a5eb3509..3a096784a 100644 --- a/packages/service-core/src/storage/SourceTable.ts +++ b/packages/service-core/src/storage/SourceTable.ts @@ -1,4 +1,4 @@ -import { DEFAULT_TAG } from '@powersync/service-sync-rules'; +import { DEFAULT_TAG, TablePattern } from '@powersync/service-sync-rules'; import * as util from '../util/util-index.js'; import { ColumnDescriptor, SourceEntityDescriptor } from './SourceEntity.js'; import { bson } from '../index.js'; @@ -19,6 +19,7 @@ export interface SourceTableOptions { bucketDataSourceIds?: number[]; parameterLookupSourceIds?: number[]; + pattern?: TablePattern; } export interface TableSnapshotStatus { @@ -93,6 +94,10 @@ export class SourceTable implements SourceEntityDescriptor { return this.options.replicaIdColumns; } + get pattern(): TablePattern | undefined { + return this.options.pattern; + } + /** * Sanitized name of the entity in the format of "{schema}.{entity name}" * Suitable for safe use in Postgres queries. @@ -124,7 +129,10 @@ export class SourceTable implements SourceEntityDescriptor { schema: this.schema, name: this.name, replicaIdColumns: this.replicaIdColumns, - snapshotComplete: this.snapshotComplete + snapshotComplete: this.snapshotComplete, + bucketDataSourceIds: this.bucketDataSourceIds, + parameterLookupSourceIds: this.parameterLookupSourceIds, + pattern: this.pattern }); copy.syncData = this.syncData; copy.syncParameters = this.syncParameters; diff --git a/packages/service-core/src/storage/SyncRulesBucketStorage.ts b/packages/service-core/src/storage/SyncRulesBucketStorage.ts index cfcf4678e..8bb4fe0f0 100644 --- a/packages/service-core/src/storage/SyncRulesBucketStorage.ts +++ b/packages/service-core/src/storage/SyncRulesBucketStorage.ts @@ -3,7 +3,8 @@ import { BucketDataSource, HydratedSyncRules, ScopedParameterLookup, - SqliteJsonRow + SqliteJsonRow, + TablePattern } from '@powersync/service-sync-rules'; import * as util from '../util/util-index.js'; import { BucketStorageBatch, FlushedResult, SaveUpdate } from './BucketStorageBatch.js'; @@ -169,6 +170,7 @@ export interface ResolveTablesOptions { connection_id: number; connection_tag: string; entity_descriptor: SourceEntityDescriptor; + pattern: TablePattern; } export interface ResolveTableOptions { diff --git a/packages/sync-rules/src/BucketSource.ts b/packages/sync-rules/src/BucketSource.ts index bdec256d8..fa11157ac 100644 --- a/packages/sync-rules/src/BucketSource.ts +++ b/packages/sync-rules/src/BucketSource.ts @@ -105,6 +105,7 @@ export interface BucketDataSource { readonly bucketParameters: string[]; getSourceTables(): TablePattern[]; + tableSyncsData(table: SourceTableInterface): boolean; /** diff --git a/packages/sync-rules/src/HydratedSyncRules.ts b/packages/sync-rules/src/HydratedSyncRules.ts index 3e9381cb6..c4a45f0e2 100644 --- a/packages/sync-rules/src/HydratedSyncRules.ts +++ b/packages/sync-rules/src/HydratedSyncRules.ts @@ -32,7 +32,7 @@ export interface RowProcessor { getSourceTables(): TablePattern[]; - getMatchingSources(table: SourceTableInterface): { + getMatchingSources(pattern: TablePattern): { bucketDataSources: BucketDataSource[]; parameterIndexLookupCreators: ParameterIndexLookupCreator[]; }; @@ -94,13 +94,15 @@ export class HydratedSyncRules implements RowProcessor { this.bucketSources = this.definition.bucketSources.map((source) => source.hydrate(params.createParams)); } - getMatchingSources(table: SourceTableInterface): { + getMatchingSources(pattern: TablePattern): { bucketDataSources: BucketDataSource[]; parameterIndexLookupCreators: ParameterIndexLookupCreator[]; } { - const bucketDataSources = this.bucketDataSources.filter((ds) => ds.tableSyncsData(table)); + const bucketDataSources = this.bucketDataSources.filter((ds) => + ds.getSourceTables().some((table) => table.equals(pattern)) + ); const parameterIndexLookupCreators: ParameterIndexLookupCreator[] = this.bucketParameterIndexLookupCreators.filter( - (ds) => ds.tableSyncsParameters(table) + (ds) => ds.getSourceTables().some((table) => table.equals(pattern)) ); return { bucketDataSources, diff --git a/packages/sync-rules/src/SqlBucketDescriptor.ts b/packages/sync-rules/src/SqlBucketDescriptor.ts index e1522fd8f..8e27f0d62 100644 --- a/packages/sync-rules/src/SqlBucketDescriptor.ts +++ b/packages/sync-rules/src/SqlBucketDescriptor.ts @@ -193,7 +193,7 @@ export class BucketDefinitionDataSource implements BucketDataSource { getSourceTables(): TablePattern[] { let result: TablePattern[] = []; for (let query of this.descriptor.dataQueries) { - result.push(query.sourceTable); + result.push(query.sourceTable.withFilter(query.filter.staticFilter)); } return result; } diff --git a/packages/sync-rules/src/TablePattern.ts b/packages/sync-rules/src/TablePattern.ts index 7b80a9b11..3ada83905 100644 --- a/packages/sync-rules/src/TablePattern.ts +++ b/packages/sync-rules/src/TablePattern.ts @@ -1,4 +1,6 @@ +import { JSONBig } from '@powersync/service-jsonbig'; import { SourceTableInterface } from './SourceTableInterface.js'; +import { ParameterMatchClause, StaticFilter } from './types.js'; export const DEFAULT_TAG = 'default'; @@ -11,7 +13,9 @@ export class TablePattern { public readonly schema: string; public readonly tablePattern: string; - constructor(schema: string, tablePattern: string) { + public readonly filter?: StaticFilter; + + constructor(schema: string, tablePattern: string, filter?: StaticFilter) { const splitSchema = schema.split('.'); if (splitSchema.length > 2) { throw new Error(`Invalid schema: ${schema}`); @@ -24,13 +28,14 @@ export class TablePattern { this.schema = schema; } this.tablePattern = tablePattern; + this.filter = filter; } /** * Unique lookup key for this pattern. For in-memory use only - no gaurantee of stability across restarts. */ get key(): string { - return JSON.stringify([this.connectionTag, this.schema, this.tablePattern]); + return JSONBig.stringify([this.connectionTag, this.schema, this.tablePattern, this.filter]); } equals(other: TablePattern): boolean { @@ -55,6 +60,14 @@ export class TablePattern { return this.tablePattern; } + withFilter(filter: StaticFilter | undefined): TablePattern { + return new TablePattern( + this.connectionTag == DEFAULT_TAG ? this.schema : `${this.connectionTag}.${this.schema}`, + this.tablePattern, + filter + ); + } + matches(table: SourceTableInterface) { if (this.connectionTag != table.connectionTag || this.schema != table.schema) { return false; diff --git a/packages/sync-rules/src/sql_filters.ts b/packages/sync-rules/src/sql_filters.ts index 2bcb92be1..2dec1fb13 100644 --- a/packages/sync-rules/src/sql_filters.ts +++ b/packages/sync-rules/src/sql_filters.ts @@ -42,6 +42,7 @@ import { QuerySchema, RowValueClause, SqliteValue, + StaticFilter, StaticValueClause, TrueIfParametersMatch } from './types.js'; @@ -277,6 +278,9 @@ export class SqlTools { }, getColumnDefinition(schema) { return schema.getColumn(table.nameInSchema, column); + }, + staticFilter: { + column: column } } satisfies RowValueClause; } else { @@ -495,7 +499,8 @@ export class SqlTools { return { [inputParam.key]: value }; }); }, - visitChildren: (v) => v(leftFilter) + visitChildren: (v) => v(leftFilter), + staticFilter: { any: true } } satisfies ParameterMatchClause; } else if ( this.supportsExpandingParameters && @@ -530,7 +535,8 @@ export class SqlTools { } return [{ [inputParam.key]: value }]; }, - visitChildren: (v) => v(rightFilter) + visitChildren: (v) => v(rightFilter), + staticFilter: { any: true } } satisfies ParameterMatchClause; } else { // Not supported, return the error previously computed @@ -578,7 +584,9 @@ export class SqlTools { return { [inputParam.key]: value }; }); }, - visitChildren: (v) => v(leftFilter) + visitChildren: (v) => v(leftFilter), + + staticFilter: { any: true } } satisfies ParameterMatchClause; } else if ( this.supportsExpandingParameters && @@ -620,7 +628,8 @@ export class SqlTools { return { [inputParam.key]: value }; }); }, - visitChildren: (v) => v(rightFilter) + visitChildren: (v) => v(rightFilter), + staticFilter: { any: true } } satisfies ParameterMatchClause; } else { // Not supported, return the error previously computed @@ -649,7 +658,8 @@ export class SqlTools { return [{ [inputParam.key]: value }]; }, - visitChildren: (v) => v(otherFilter) + visitChildren: (v) => v(otherFilter), + staticFilter: { any: true } } satisfies ParameterMatchClause; } @@ -841,6 +851,11 @@ export class SqlTools { const evaluated = fnImpl.call(...args); return staticValueClause(evaluated); } else if (argsType == 'row') { + let staticFilter: StaticFilter = { any: true }; + if (fnImpl.staticFilter != null) { + const rowValueClauses = argClauses.map((e) => e as RowValueClause); + staticFilter = fnImpl.staticFilter(...rowValueClauses.map((arg) => arg.staticFilter)); + } return { evaluate: (tables) => { const args = argClauses.map((e) => (e as RowValueClause).evaluate(tables)); @@ -851,7 +866,8 @@ export class SqlTools { (e) => (e as RowValueClause).getColumnDefinition(schema)?.type ?? ExpressionType.NONE ); return { name: `${fnImpl}()`, type: fnImpl.getReturnType(argTypes) }; - } + }, + staticFilter } satisfies RowValueClause; } else if (argsType == 'param') { const argStrings = argClauses.map((e) => (e as ParameterValueClause).key); @@ -972,6 +988,9 @@ function staticValueClause(value: SqliteValue): StaticValueClause { key: JSONBig.stringify(value), lookupParameterValue(_parameters) { return value; + }, + staticFilter: { + value: value } }; } diff --git a/packages/sync-rules/src/sql_functions.ts b/packages/sync-rules/src/sql_functions.ts index 60692edb1..9ba6f2b8e 100644 --- a/packages/sync-rules/src/sql_functions.ts +++ b/packages/sync-rules/src/sql_functions.ts @@ -1,6 +1,6 @@ import { JSONBig } from '@powersync/service-jsonbig'; import { SQLITE_FALSE, SQLITE_TRUE, sqliteBool, sqliteNot } from './sql_support.js'; -import { SqliteInputValue, SqliteValue } from './types.js'; +import { isAny, SqliteInputValue, SqliteValue, StaticFilter } from './types.js'; import { jsonValueToSqlite } from './utils.js'; // Declares @syncpoint/wkx module // This allows for consumers of this lib to resolve types correctly @@ -39,6 +39,7 @@ export interface SqlFunction { readonly debugName: string; call: (...args: SqliteValue[]) => SqliteValue; getReturnType(args: ExpressionType[]): ExpressionType; + staticFilter?: (...args: StaticFilter[]) => StaticFilter; } export interface DocumentedSqlFunction extends SqlFunction { @@ -55,6 +56,16 @@ export function getOperatorFunction(op: string): SqlFunction { }, getReturnType(args) { return getOperatorReturnType(op, args[0], args[1]); + }, + staticFilter(...args: StaticFilter[]) { + if (isAny(args[0]) || isAny(args[1])) { + return { any: true }; + } + return { + operator: op as any, + left: args[0], + right: args[1] + }; } }; } diff --git a/packages/sync-rules/src/sql_support.ts b/packages/sync-rules/src/sql_support.ts index 3ffb28b18..6b816b059 100644 --- a/packages/sync-rules/src/sql_support.ts +++ b/packages/sync-rules/src/sql_support.ts @@ -82,7 +82,8 @@ export function composeRowValues>(optio }, getColumnDefinition: function (schema: QuerySchema): ColumnDefinition | undefined { return options.getColumnDefinition(schema); - } + }, + staticFilter: { any: true } }; } @@ -127,6 +128,11 @@ export function compileStaticOperator(op: string, left: RowValueClause, right: R name: '?', type }; + }, + staticFilter: { + operator: op as any, + left: left.staticFilter, + right: right.staticFilter } }; } @@ -135,15 +141,20 @@ export function andFilters(a: CompiledClause, b: CompiledClause): CompiledClause // Optimizations: If the two clauses both only depend on row or parameter data, we can merge them into a single // clause. if (isRowValueClause(a) && isRowValueClause(b)) { - return composeRowValues({ - values: { a, b }, - compose(values) { - return sqliteBool(sqliteBool(values.a) && sqliteBool(values.b)); - }, - getColumnDefinition() { - return { name: 'and', type: ExpressionType.INTEGER }; + return { + ...composeRowValues({ + values: { a, b }, + compose(values) { + return sqliteBool(sqliteBool(values.a) && sqliteBool(values.b)); + }, + getColumnDefinition() { + return { name: 'and', type: ExpressionType.INTEGER }; + } + }), + staticFilter: { + and: [a.staticFilter, b.staticFilter] } - }); + }; } if (isParameterValueClause(a) && isParameterValueClause(b)) { return composeParameterValues({ @@ -197,6 +208,9 @@ export function andFilters(a: CompiledClause, b: CompiledClause): CompiledClause visitChildren: (visitor) => { visitor(aFilter); visitor(bFilter); + }, + staticFilter: { + and: [aFilter.staticFilter, bFilter.staticFilter] } } satisfies ParameterMatchClause; } @@ -205,15 +219,20 @@ export function orFilters(a: CompiledClause, b: CompiledClause): CompiledClause // Optimizations: If the two clauses both only depend on row or parameter data, we can merge them into a single // clause. if (isRowValueClause(a) && isRowValueClause(b)) { - return composeRowValues({ - values: { a, b }, - compose(values) { - return sqliteBool(sqliteBool(values.a) || sqliteBool(values.b)); - }, - getColumnDefinition() { - return { name: 'or', type: ExpressionType.INTEGER }; + return { + ...composeRowValues({ + values: { a, b }, + compose(values) { + return sqliteBool(sqliteBool(values.a) || sqliteBool(values.b)); + }, + getColumnDefinition() { + return { name: 'or', type: ExpressionType.INTEGER }; + } + }), + staticFilter: { + or: [a.staticFilter, b.staticFilter] } - }); + }; } if (isParameterValueClause(a) && isParameterValueClause(b)) { return composeParameterValues({ @@ -265,6 +284,9 @@ export function orParameterSetClauses(a: ParameterMatchClause, b: ParameterMatch visitChildren: (v) => { v(a); v(b); + }, + staticFilter: { + or: [a.staticFilter, b.staticFilter] } } satisfies ParameterMatchClause; } @@ -286,7 +308,8 @@ export function toBooleanParameterSetClause(clause: CompiledClause): ParameterMa const value = sqliteBool(clause.evaluate(tables)); return value ? MATCH_CONST_TRUE : MATCH_CONST_FALSE; }, - visitChildren: (v) => v(clause) + visitChildren: (v) => v(clause), + staticFilter: clause.staticFilter } satisfies ParameterMatchClause; } else if (isClauseError(clause)) { return { @@ -296,7 +319,8 @@ export function toBooleanParameterSetClause(clause: CompiledClause): ParameterMa filterRow(tables: QueryParameters): TrueIfParametersMatch { throw new Error('invalid clause'); }, - visitChildren: (v) => v(clause) + visitChildren: (v) => v(clause), + staticFilter: { any: true } } satisfies ParameterMatchClause; } else { // Equivalent to `bucket.param = true` @@ -321,7 +345,8 @@ export function toBooleanParameterSetClause(clause: CompiledClause): ParameterMa filterRow(tables: QueryParameters): TrueIfParametersMatch { return [{ [key]: SQLITE_TRUE }]; }, - visitChildren: (v) => v(clause) + visitChildren: (v) => v(clause), + staticFilter: { any: true } } satisfies ParameterMatchClause; } } diff --git a/packages/sync-rules/src/types.ts b/packages/sync-rules/src/types.ts index 242ac52e8..ba073efa7 100644 --- a/packages/sync-rules/src/types.ts +++ b/packages/sync-rules/src/types.ts @@ -357,6 +357,54 @@ export interface BaseClause { visitChildren?: (visitor: (clause: CompiledClause) => void) => void; } +export type StaticFilter = + | StaticOperator + | AndOperator + | OrOperator + | AnyOperator + | StaticColumnExpression + | StaticValue; + +export interface AnyOperator { + any: true; +} + +export function isAny(filter: StaticFilter): filter is AnyOperator { + return (filter as AnyOperator).any === true; +} + +export interface StaticOperator { + left: StaticFilter; + operator: string; // '=' | '!=' | '<' | '<=' | '>' | '>=' | 'IS' | 'IS NOT' | 'IN'; + right: StaticFilter; +} + +export interface StaticValue { + value: SqliteValue; +} + +export interface StaticColumnExpression { + column: string; +} + +export interface AndOperator { + and: StaticFilter[]; +} + +export interface OrOperator { + or: StaticFilter[]; +} + +export type MongoExpression = + | Record + | unknown[] + | string + | number + | boolean + | null + | bigint + | Uint8Array; + /** * This is a clause that matches row and parameter values for equality. * @@ -368,6 +416,13 @@ export interface BaseClause { export interface ParameterMatchClause extends BaseClause { error: boolean; + /** + * This is a filter that can be applied "statically" on the row, to pre-filter rows. + * + * This may be under-specific, i.e. it may include rows that do not actually match the full filter. + */ + staticFilter: StaticFilter; + specialType?: 'or'; /** @@ -456,6 +511,7 @@ export interface QuerySchema { export interface RowValueClause extends BaseClause { evaluate(tables: QueryParameters): SqliteValue; getColumnDefinition(schema: QuerySchema): ColumnDefinition | undefined; + staticFilter: StaticFilter; } /** diff --git a/packages/sync-rules/test/src/data_queries.test.ts b/packages/sync-rules/test/src/data_queries.test.ts index 09e615095..d36134e70 100644 --- a/packages/sync-rules/test/src/data_queries.test.ts +++ b/packages/sync-rules/test/src/data_queries.test.ts @@ -25,6 +25,10 @@ describe('data queries', () => { const query = SqlDataQuery.fromSql(['category'], sql, PARSE_OPTIONS, compatibility); expect(query.errors).toEqual([]); + expect(query.filter.staticFilter).toEqual({ + any: true + }); + expect(query.evaluateRow(ASSETS, { id: 'asset1', categories: JSON.stringify(['red', 'green']) })).toMatchObject([ { serializedBucketParameters: '["red"]', @@ -46,6 +50,12 @@ describe('data queries', () => { const query = SqlDataQuery.fromSql([], sql, PARSE_OPTIONS, compatibility); expect(query.errors).toEqual([]); + expect(query.filter.staticFilter).toEqual({ + operator: 'IN', + left: { value: 'green' }, + right: { column: 'categories' } + }); + expect(query.evaluateRow(ASSETS, { id: 'asset1', categories: JSON.stringify(['red', 'green']) })).toMatchObject([ { serializedBucketParameters: '[]', @@ -61,6 +71,11 @@ describe('data queries', () => { const sql = `SELECT * FROM assets WHERE assets.condition IN '["good","great"]'`; const query = SqlDataQuery.fromSql([], sql, PARSE_OPTIONS, compatibility); expect(query.errors).toEqual([]); + expect(query.filter.staticFilter).toEqual({ + operator: 'IN', + left: { column: 'condition' }, + right: { value: '["good","great"]' } + }); expect(query.evaluateRow(ASSETS, { id: 'asset1', condition: 'good' })).toMatchObject([ { @@ -73,6 +88,26 @@ describe('data queries', () => { expect(query.evaluateRow(ASSETS, { id: 'asset1', condition: 'bad' })).toEqual([]); }); + test('AND query', function () { + const sql = `SELECT * FROM assets WHERE archived = false AND condition > 5`; + const query = SqlDataQuery.fromSql([], sql, PARSE_OPTIONS, compatibility); + expect(query.errors).toEqual([]); + expect(query.filter.staticFilter).toEqual({ + and: [ + { + operator: '=', + left: { column: 'archived' }, + right: { value: 0n } + }, + { + operator: '>', + left: { column: 'condition' }, + right: { value: 5n } + } + ] + }); + }); + test('table alias', function () { const sql = 'SELECT * FROM assets as others WHERE others.org_id = bucket.org_id'; const query = SqlDataQuery.fromSql(['org_id'], sql, PARSE_OPTIONS, compatibility);