Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -105,19 +105,19 @@ export class MergedSyncRules implements RowProcessor {
* @param table The source database table definition, _not_ the individually derived SourceTables.
* @returns
*/
getMatchingSources(table: SourceTableInterface): {
getMatchingSources(pattern: TablePattern): {
bucketDataSources: BucketDataSource[];
parameterIndexLookupCreators: ParameterIndexLookupCreator[];
} {
const bucketDataSources = [...this.resolvedDataSources.values()]
.map((dataSource) => dataSource.source)
.filter((ds) => ds.tableSyncsData(table));
.filter((ds) => ds.getSourceTables().some((table) => table.equals(pattern)));

const parameterIndexLookupCreators: ParameterIndexLookupCreator[] = [
...this.resolvedParameterLookupSources.values()
]
.map((dataSource) => dataSource.source)
.filter((ds) => ds.tableSyncsParameters(table));
.filter((ds) => ds.getSourceTables().some((table) => table.equals(pattern)));
return {
bucketDataSources,
parameterIndexLookupCreators
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,8 @@ export class MongoBucketDataWriter implements storage.BucketDataWriter {
replicaIdColumns: ref.replicaIdColumns,
snapshotComplete: doc.snapshot_done ?? true,
bucketDataSourceIds: doc.bucket_data_source_ids ?? [],
parameterLookupSourceIds: doc.parameter_lookup_source_ids ?? []
parameterLookupSourceIds: doc.parameter_lookup_source_ids ?? [],
pattern: ref.pattern
});
sourceTable.snapshotStatus =
doc.snapshot_status == null
Expand All @@ -302,11 +303,7 @@ export class MongoBucketDataWriter implements storage.BucketDataWriter {
}

async resolveTables(options: storage.ResolveTablesOptions): Promise<storage.ResolveTablesResult> {
const sources = this.rowProcessor.getMatchingSources({
connectionTag: options.connection_tag,
name: options.entity_descriptor.name,
schema: options.entity_descriptor.schema
});
const sources = this.rowProcessor.getMatchingSources(options.pattern);
const bucketDataSourceIds = sources.bucketDataSources.map((source) => this.mapping.bucketSourceId(source));
const parameterLookupSourceIds = sources.parameterIndexLookupCreators.map((source) =>
this.mapping.parameterLookupId(source)
Expand Down Expand Up @@ -390,7 +387,8 @@ export class MongoBucketDataWriter implements storage.BucketDataWriter {
replicaIdColumns: replicaIdColumns,
snapshotComplete: doc.snapshot_done ?? true,
bucketDataSourceIds: doc.bucket_data_source_ids ?? [],
parameterLookupSourceIds: doc.parameter_lookup_source_ids ?? []
parameterLookupSourceIds: doc.parameter_lookup_source_ids ?? [],
pattern: options.pattern
});
sourceTable.snapshotStatus =
doc.snapshot_status == null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ export interface SourceTableDocument {
replica_id_columns2: { name: string; type_oid?: number; type?: string }[] | undefined;
snapshot_done: boolean | undefined;
snapshot_status: SourceTableDocumentSnapshotStatus | undefined;
filter?: string;
}

export interface SourceTableDocumentSnapshotStatus {
Expand Down
112 changes: 81 additions & 31 deletions modules/module-mongodb/src/replication/ChangeStream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
ServiceError
} from '@powersync/lib-services-framework';
import {
BucketDataWriter,
BucketStorageFactory,
MetricsEngine,
SaveOperationTag,
Expand All @@ -33,6 +34,8 @@
} from './MongoRelation.js';
import { MongoSnapshotter } from './MongoSnapshotter.js';
import { CHECKPOINTS_COLLECTION, timestampToDate } from './replication-utils.js';
import { staticFilterToMongoExpression } from './staticFilters.js';
import { inspect } from 'node:util';

export interface ChangeStreamOptions {
connections: MongoManager;
Expand Down Expand Up @@ -166,7 +169,7 @@

const snapshotter = new MongoSnapshotter({
writer: async () => {
const writer = await this.factory.createCombinedWriter(

Check failure on line 172 in modules/module-mongodb/src/replication/ChangeStream.ts

View workflow job for this annotation

GitHub Actions / MongoDB Test (7.0)

test/src/change_stream.test.ts > change stream > postgres storage > replicating case sensitive table

Error: Not implemented yet ❯ PostgresBucketStorageFactory.createCombinedWriter ../module-postgres-storage/src/storage/PostgresBucketStorageFactory.ts:50:11 ❯ MongoSnapshotter.writer [as writerFactory] src/replication/ChangeStream.ts:172:43 ❯ MongoSnapshotter.queueSnapshotTables src/replication/MongoSnapshotter.ts:91:37 ❯ ChangeStream.initReplication src/replication/ChangeStream.ts:337:32

Check failure on line 172 in modules/module-mongodb/src/replication/ChangeStream.ts

View workflow job for this annotation

GitHub Actions / MongoDB Test (7.0)

test/src/change_stream.test.ts > change stream > postgres storage > postImages - on

Error: Not implemented yet ❯ PostgresBucketStorageFactory.createCombinedWriter ../module-postgres-storage/src/storage/PostgresBucketStorageFactory.ts:50:11 ❯ MongoSnapshotter.writer [as writerFactory] src/replication/ChangeStream.ts:172:43 ❯ MongoSnapshotter.queueSnapshotTables src/replication/MongoSnapshotter.ts:91:37 ❯ ChangeStream.initReplication src/replication/ChangeStream.ts:337:32

Check failure on line 172 in modules/module-mongodb/src/replication/ChangeStream.ts

View workflow job for this annotation

GitHub Actions / MongoDB Test (7.0)

test/src/change_stream.test.ts > change stream > postgres storage > postImages - autoConfigure

Error: Not implemented yet ❯ PostgresBucketStorageFactory.createCombinedWriter ../module-postgres-storage/src/storage/PostgresBucketStorageFactory.ts:50:11 ❯ MongoSnapshotter.writer [as writerFactory] src/replication/ChangeStream.ts:172:43 ❯ MongoSnapshotter.queueSnapshotTables src/replication/MongoSnapshotter.ts:91:37 ❯ ChangeStream.initReplication src/replication/ChangeStream.ts:337:32

Check failure on line 172 in modules/module-mongodb/src/replication/ChangeStream.ts

View workflow job for this annotation

GitHub Actions / MongoDB Test (7.0)

test/src/change_stream.test.ts > change stream > postgres storage > updateLookup - no fullDocument available

Error: Not implemented yet ❯ PostgresBucketStorageFactory.createCombinedWriter ../module-postgres-storage/src/storage/PostgresBucketStorageFactory.ts:50:11 ❯ MongoSnapshotter.writer [as writerFactory] src/replication/ChangeStream.ts:172:43 ❯ MongoSnapshotter.queueSnapshotTables src/replication/MongoSnapshotter.ts:91:37 ❯ ChangeStream.initReplication src/replication/ChangeStream.ts:337:32

Check failure on line 172 in modules/module-mongodb/src/replication/ChangeStream.ts

View workflow job for this annotation

GitHub Actions / MongoDB Test (7.0)

test/src/change_stream.test.ts > change stream > postgres storage > replicating wildcard

Error: Not implemented yet ❯ PostgresBucketStorageFactory.createCombinedWriter ../module-postgres-storage/src/storage/PostgresBucketStorageFactory.ts:50:11 ❯ MongoSnapshotter.writer [as writerFactory] src/replication/ChangeStream.ts:172:43 ❯ MongoSnapshotter.queueSnapshotTables src/replication/MongoSnapshotter.ts:91:37 ❯ ChangeStream.initReplication src/replication/ChangeStream.ts:337:32

Check failure on line 172 in modules/module-mongodb/src/replication/ChangeStream.ts

View workflow job for this annotation

GitHub Actions / MongoDB Test (7.0)

test/src/change_stream.test.ts > change stream > postgres storage > replicating basic values

Error: Not implemented yet ❯ PostgresBucketStorageFactory.createCombinedWriter ../module-postgres-storage/src/storage/PostgresBucketStorageFactory.ts:50:11 ❯ MongoSnapshotter.writer [as writerFactory] src/replication/ChangeStream.ts:172:43 ❯ MongoSnapshotter.queueSnapshotTables src/replication/MongoSnapshotter.ts:91:37 ❯ ChangeStream.initReplication src/replication/ChangeStream.ts:337:32

Check failure on line 172 in modules/module-mongodb/src/replication/ChangeStream.ts

View workflow job for this annotation

GitHub Actions / MongoDB Test (8.0)

test/src/change_stream.test.ts > change stream > postgres storage > replicating case sensitive table

Error: Not implemented yet ❯ PostgresBucketStorageFactory.createCombinedWriter ../module-postgres-storage/src/storage/PostgresBucketStorageFactory.ts:50:11 ❯ MongoSnapshotter.writer [as writerFactory] src/replication/ChangeStream.ts:172:43 ❯ MongoSnapshotter.queueSnapshotTables src/replication/MongoSnapshotter.ts:91:37 ❯ ChangeStream.initReplication src/replication/ChangeStream.ts:337:32

Check failure on line 172 in modules/module-mongodb/src/replication/ChangeStream.ts

View workflow job for this annotation

GitHub Actions / MongoDB Test (8.0)

test/src/change_stream.test.ts > change stream > postgres storage > postImages - on

Error: Not implemented yet ❯ PostgresBucketStorageFactory.createCombinedWriter ../module-postgres-storage/src/storage/PostgresBucketStorageFactory.ts:50:11 ❯ MongoSnapshotter.writer [as writerFactory] src/replication/ChangeStream.ts:172:43 ❯ MongoSnapshotter.queueSnapshotTables src/replication/MongoSnapshotter.ts:91:37 ❯ ChangeStream.initReplication src/replication/ChangeStream.ts:337:32

Check failure on line 172 in modules/module-mongodb/src/replication/ChangeStream.ts

View workflow job for this annotation

GitHub Actions / MongoDB Test (8.0)

test/src/change_stream.test.ts > change stream > postgres storage > postImages - autoConfigure

Error: Not implemented yet ❯ PostgresBucketStorageFactory.createCombinedWriter ../module-postgres-storage/src/storage/PostgresBucketStorageFactory.ts:50:11 ❯ MongoSnapshotter.writer [as writerFactory] src/replication/ChangeStream.ts:172:43 ❯ MongoSnapshotter.queueSnapshotTables src/replication/MongoSnapshotter.ts:91:37 ❯ ChangeStream.initReplication src/replication/ChangeStream.ts:337:32

Check failure on line 172 in modules/module-mongodb/src/replication/ChangeStream.ts

View workflow job for this annotation

GitHub Actions / MongoDB Test (8.0)

test/src/change_stream.test.ts > change stream > postgres storage > updateLookup - no fullDocument available

Error: Not implemented yet ❯ PostgresBucketStorageFactory.createCombinedWriter ../module-postgres-storage/src/storage/PostgresBucketStorageFactory.ts:50:11 ❯ MongoSnapshotter.writer [as writerFactory] src/replication/ChangeStream.ts:172:43 ❯ MongoSnapshotter.queueSnapshotTables src/replication/MongoSnapshotter.ts:91:37 ❯ ChangeStream.initReplication src/replication/ChangeStream.ts:337:32

Check failure on line 172 in modules/module-mongodb/src/replication/ChangeStream.ts

View workflow job for this annotation

GitHub Actions / MongoDB Test (8.0)

test/src/change_stream.test.ts > change stream > postgres storage > replicating wildcard

Error: Not implemented yet ❯ PostgresBucketStorageFactory.createCombinedWriter ../module-postgres-storage/src/storage/PostgresBucketStorageFactory.ts:50:11 ❯ MongoSnapshotter.writer [as writerFactory] src/replication/ChangeStream.ts:172:43 ❯ MongoSnapshotter.queueSnapshotTables src/replication/MongoSnapshotter.ts:91:37 ❯ ChangeStream.initReplication src/replication/ChangeStream.ts:337:32

Check failure on line 172 in modules/module-mongodb/src/replication/ChangeStream.ts

View workflow job for this annotation

GitHub Actions / MongoDB Test (8.0)

test/src/change_stream.test.ts > change stream > postgres storage > replicating basic values

Error: Not implemented yet ❯ PostgresBucketStorageFactory.createCombinedWriter ../module-postgres-storage/src/storage/PostgresBucketStorageFactory.ts:50:11 ❯ MongoSnapshotter.writer [as writerFactory] src/replication/ChangeStream.ts:172:43 ❯ MongoSnapshotter.queueSnapshotTables src/replication/MongoSnapshotter.ts:91:37 ❯ ChangeStream.initReplication src/replication/ChangeStream.ts:337:32

Check failure on line 172 in modules/module-mongodb/src/replication/ChangeStream.ts

View workflow job for this annotation

GitHub Actions / MongoDB Test (6.0)

test/src/change_stream.test.ts > change stream > postgres storage > replicating case sensitive table

Error: Not implemented yet ❯ PostgresBucketStorageFactory.createCombinedWriter ../module-postgres-storage/src/storage/PostgresBucketStorageFactory.ts:50:11 ❯ MongoSnapshotter.writer [as writerFactory] src/replication/ChangeStream.ts:172:43 ❯ MongoSnapshotter.queueSnapshotTables src/replication/MongoSnapshotter.ts:91:37 ❯ ChangeStream.initReplication src/replication/ChangeStream.ts:337:32

Check failure on line 172 in modules/module-mongodb/src/replication/ChangeStream.ts

View workflow job for this annotation

GitHub Actions / MongoDB Test (6.0)

test/src/change_stream.test.ts > change stream > postgres storage > postImages - on

Error: Not implemented yet ❯ PostgresBucketStorageFactory.createCombinedWriter ../module-postgres-storage/src/storage/PostgresBucketStorageFactory.ts:50:11 ❯ MongoSnapshotter.writer [as writerFactory] src/replication/ChangeStream.ts:172:43 ❯ MongoSnapshotter.queueSnapshotTables src/replication/MongoSnapshotter.ts:91:37 ❯ ChangeStream.initReplication src/replication/ChangeStream.ts:337:32

Check failure on line 172 in modules/module-mongodb/src/replication/ChangeStream.ts

View workflow job for this annotation

GitHub Actions / MongoDB Test (6.0)

test/src/change_stream.test.ts > change stream > postgres storage > postImages - autoConfigure

Error: Not implemented yet ❯ PostgresBucketStorageFactory.createCombinedWriter ../module-postgres-storage/src/storage/PostgresBucketStorageFactory.ts:50:11 ❯ MongoSnapshotter.writer [as writerFactory] src/replication/ChangeStream.ts:172:43 ❯ MongoSnapshotter.queueSnapshotTables src/replication/MongoSnapshotter.ts:91:37 ❯ ChangeStream.initReplication src/replication/ChangeStream.ts:337:32

Check failure on line 172 in modules/module-mongodb/src/replication/ChangeStream.ts

View workflow job for this annotation

GitHub Actions / MongoDB Test (6.0)

test/src/change_stream.test.ts > change stream > postgres storage > updateLookup - no fullDocument available

Error: Not implemented yet ❯ PostgresBucketStorageFactory.createCombinedWriter ../module-postgres-storage/src/storage/PostgresBucketStorageFactory.ts:50:11 ❯ MongoSnapshotter.writer [as writerFactory] src/replication/ChangeStream.ts:172:43 ❯ MongoSnapshotter.queueSnapshotTables src/replication/MongoSnapshotter.ts:91:37 ❯ ChangeStream.initReplication src/replication/ChangeStream.ts:337:32

Check failure on line 172 in modules/module-mongodb/src/replication/ChangeStream.ts

View workflow job for this annotation

GitHub Actions / MongoDB Test (6.0)

test/src/change_stream.test.ts > change stream > postgres storage > replicating wildcard

Error: Not implemented yet ❯ PostgresBucketStorageFactory.createCombinedWriter ../module-postgres-storage/src/storage/PostgresBucketStorageFactory.ts:50:11 ❯ MongoSnapshotter.writer [as writerFactory] src/replication/ChangeStream.ts:172:43 ❯ MongoSnapshotter.queueSnapshotTables src/replication/MongoSnapshotter.ts:91:37 ❯ ChangeStream.initReplication src/replication/ChangeStream.ts:337:32

Check failure on line 172 in modules/module-mongodb/src/replication/ChangeStream.ts

View workflow job for this annotation

GitHub Actions / MongoDB Test (6.0)

test/src/change_stream.test.ts > change stream > postgres storage > replicating basic values

Error: Not implemented yet ❯ PostgresBucketStorageFactory.createCombinedWriter ../module-postgres-storage/src/storage/PostgresBucketStorageFactory.ts:50:11 ❯ MongoSnapshotter.writer [as writerFactory] src/replication/ChangeStream.ts:172:43 ❯ MongoSnapshotter.queueSnapshotTables src/replication/MongoSnapshotter.ts:91:37 ❯ ChangeStream.initReplication src/replication/ChangeStream.ts:337:32
this.substreams.map((s) => s.storage),
{
defaultSchema: this.defaultDb.databaseName,
Expand Down Expand Up @@ -208,13 +211,18 @@
return this.abortSignal.aborted;
}

private getSourceNamespaceFilters(): { $match: any; multipleDatabases: boolean } {
const sourceTables = this.substreams.flatMap((s) => s.syncRules.getSourceTables());
private getSourceNamespaceFilters(writer: BucketDataWriter): {
$match: any;
multipleDatabases: boolean;
filters: any[];
} {
const sourceTables = writer.rowProcessor.getSourceTables();

let $inFilters: { db: string; coll: string }[] = [
{ db: this.defaultDb.databaseName, coll: CHECKPOINTS_COLLECTION }
];
let $refilters: { 'ns.db': string; 'ns.coll': RegExp }[] = [];
let filters: any[] = [];
let multipleDatabases = false;
for (let tablePattern of sourceTables) {
if (tablePattern.connectionTag != this.connections.connectionTag) {
Expand All @@ -225,19 +233,36 @@
multipleDatabases = true;
}

let filterExpression =
tablePattern.filter == null
? { $literal: true }
: staticFilterToMongoExpression(tablePattern.filter, { columnPrefix: '$fullDocument.' });

if (tablePattern.isWildcard) {
$refilters.push({
'ns.db': tablePattern.schema,
'ns.coll': new RegExp('^' + escapeRegExp(tablePattern.tablePrefix))
});
filters.push({
'ns.db': tablePattern.schema,
'ns.coll': new RegExp('^' + escapeRegExp(tablePattern.tablePrefix)),
$expr: filterExpression
});
} else {
$inFilters.push({
db: tablePattern.schema,
coll: tablePattern.name
});
filters.push({
'ns.db': tablePattern.schema,
'ns.coll': tablePattern.name,
$expr: filterExpression
});
}
}

// FIXME: deduplicate filters

// When we have a large number of collections, the performance of the pipeline
// depends a lot on how the filters here are specified.
// Currently, only the multipleDatabases == false case is optimized, and the
Expand All @@ -254,9 +279,9 @@
: // collection-level: filter on coll only
{ 'ns.coll': { $in: $inFilters.map((ns) => ns.coll) } };
if ($refilters.length > 0) {
return { $match: { $or: [nsFilter, ...$refilters] }, multipleDatabases };
return { $match: { $or: [nsFilter, ...$refilters] }, multipleDatabases, filters };
}
return { $match: nsFilter, multipleDatabases };
return { $match: nsFilter, multipleDatabases, filters };
}

private async checkPostImages(db: string, collectionInfo: mongo.CollectionInfo) {
Expand Down Expand Up @@ -398,17 +423,23 @@
}
}

private openChangeStream(options: { lsn: string | null; maxAwaitTimeMs?: number }) {
private openChangeStream(writer: BucketDataWriter, options: { lsn: string | null; maxAwaitTimeMs?: number }) {
const lastLsn = options.lsn ? MongoLSN.fromSerialized(options.lsn) : null;
const startAfter = lastLsn?.timestamp;
const resumeAfter = lastLsn?.resumeToken;

const filters = this.getSourceNamespaceFilters();
const filters = this.getSourceNamespaceFilters(writer);

const pipeline: mongo.Document[] = [
{
$match: filters.$match
},
// Not working currently - getting "resumeToken not found"
// {
// $match: {
// $or: filters.filters
// }
// },
{ $changeStreamSplitLargeEvent: {} }
];

Expand Down Expand Up @@ -474,37 +505,56 @@
// Ignore the postImages check in this case.
}

const result = await writer.resolveTables({
connection_id: this.connection_id,
connection_tag: this.connections.connectionTag,
entity_descriptor: descriptor
// What happens here:
// 1. We see a new collection that we haven't observed before.
// 2. We check which table pattern(s) match this collection, _regardless of specific row filters_.
// 3. We resolve the tables for those patterns.

// FIXME: don't scan through it all
// FIXME: handle wildcards
const patterns = writer.rowProcessor.getSourceTables().filter((t) => {
return (
t.connectionTag == this.connections.connectionTag && t.name == descriptor.name && t.schema == descriptor.schema

Check failure on line 517 in modules/module-mongodb/src/replication/ChangeStream.ts

View workflow job for this annotation

GitHub Actions / MongoDB Test (7.0)

test/src/change_stream.test.ts > change stream > mongodb storage > postImages - new collection with postImages enabled

Error: Cannot get name for wildcard table ❯ TablePattern.get name [as name] ../../packages/sync-rules/src/TablePattern.ts:58:13 ❯ src/replication/ChangeStream.ts:517:64 ❯ ChangeStream.handleRelations src/replication/ChangeStream.ts:515:60

Check failure on line 517 in modules/module-mongodb/src/replication/ChangeStream.ts

View workflow job for this annotation

GitHub Actions / MongoDB Test (7.0)

test/src/change_stream.test.ts > change stream > mongodb storage > replicating wildcard

Error: Cannot get name for wildcard table ❯ TablePattern.get name [as name] ../../packages/sync-rules/src/TablePattern.ts:58:13 ❯ src/replication/ChangeStream.ts:517:64 ❯ ChangeStream.handleRelations src/replication/ChangeStream.ts:515:60

Check failure on line 517 in modules/module-mongodb/src/replication/ChangeStream.ts

View workflow job for this annotation

GitHub Actions / MongoDB Test (8.0)

test/src/change_stream.test.ts > change stream > mongodb storage > postImages - new collection with postImages enabled

Error: Cannot get name for wildcard table ❯ TablePattern.get name [as name] ../../packages/sync-rules/src/TablePattern.ts:58:13 ❯ src/replication/ChangeStream.ts:517:64 ❯ ChangeStream.handleRelations src/replication/ChangeStream.ts:515:60

Check failure on line 517 in modules/module-mongodb/src/replication/ChangeStream.ts

View workflow job for this annotation

GitHub Actions / MongoDB Test (8.0)

test/src/change_stream.test.ts > change stream > mongodb storage > replicating wildcard

Error: Cannot get name for wildcard table ❯ TablePattern.get name [as name] ../../packages/sync-rules/src/TablePattern.ts:58:13 ❯ src/replication/ChangeStream.ts:517:64 ❯ ChangeStream.handleRelations src/replication/ChangeStream.ts:515:60

Check failure on line 517 in modules/module-mongodb/src/replication/ChangeStream.ts

View workflow job for this annotation

GitHub Actions / MongoDB Test (6.0)

test/src/change_stream.test.ts > change stream > mongodb storage > postImages - new collection with postImages enabled

Error: Cannot get name for wildcard table ❯ TablePattern.get name [as name] ../../packages/sync-rules/src/TablePattern.ts:58:13 ❯ src/replication/ChangeStream.ts:517:64 ❯ ChangeStream.handleRelations src/replication/ChangeStream.ts:515:60

Check failure on line 517 in modules/module-mongodb/src/replication/ChangeStream.ts

View workflow job for this annotation

GitHub Actions / MongoDB Test (6.0)

test/src/change_stream.test.ts > change stream > mongodb storage > replicating wildcard

Error: Cannot get name for wildcard table ❯ TablePattern.get name [as name] ../../packages/sync-rules/src/TablePattern.ts:58:13 ❯ src/replication/ChangeStream.ts:517:64 ❯ ChangeStream.handleRelations src/replication/ChangeStream.ts:515:60
);
});

const snapshot = options.snapshot;
this.relationCache.set(getCacheIdentifier(descriptor), result.tables);
let allTables: SourceTable[] = [];
for (let pattern of patterns) {
const result = await writer.resolveTables({
connection_id: this.connection_id,
connection_tag: this.connections.connectionTag,
entity_descriptor: descriptor,
pattern: pattern
});

// Drop conflicting collections.
// This is generally not expected for MongoDB source dbs, so we log an error.
if (result.dropTables.length > 0) {
this.logger.error(
`Conflicting collections found for ${JSON.stringify(descriptor)}. Dropping: ${result.dropTables.map((t) => t.id).join(', ')}`
);
await writer.drop(result.dropTables);
}
const snapshot = options.snapshot;
this.relationCache.set(getCacheIdentifier(descriptor), result.tables);

// Drop conflicting collections.
// This is generally not expected for MongoDB source dbs, so we log an error.
if (result.dropTables.length > 0) {
this.logger.error(
`Conflicting collections found for ${JSON.stringify(descriptor)}. Dropping: ${result.dropTables.map((t) => t.id).join(', ')}`
);
await writer.drop(result.dropTables);
}

// Snapshot if:
// 1. Snapshot is requested (false for initial snapshot, since that process handles it elsewhere)
// 2. Snapshot is not already done, AND:
// 3. The table is used in sync rules.
for (let table of result.tables) {
const shouldSnapshot = snapshot && !table.snapshotComplete && table.syncAny;
if (shouldSnapshot) {
this.logger.info(`New collection: ${descriptor.schema}.${descriptor.name}`);
await this.snapshotter.queueSnapshot(writer, table);
// Snapshot if:
// 1. Snapshot is requested (false for initial snapshot, since that process handles it elsewhere)
// 2. Snapshot is not already done, AND:
// 3. The table is used in sync rules.
for (let table of result.tables) {
const shouldSnapshot = snapshot && !table.snapshotComplete && table.syncAny;
if (shouldSnapshot) {
this.logger.info(`New collection: ${descriptor.schema}.${descriptor.name}`);
await this.snapshotter.queueSnapshot(writer, table);
}
}

allTables.push(...result.tables);
}

return result.tables;
return allTables;
}

private async drop(writer: storage.BucketDataWriter, entity: SourceEntityDescriptor): Promise<void> {
Expand Down Expand Up @@ -577,7 +627,7 @@

this.logger.info(`Resume streaming at ${startAfter?.inspect()} / ${lastLsn} | Token age: ${tokenAgeSeconds}s`);

await using streamManager = this.openChangeStream({ lsn: resumeFromLsn });
await using streamManager = this.openChangeStream(writer, { lsn: resumeFromLsn });
const { stream, filters } = streamManager;
if (this.abortSignal.aborted) {
await stream.close();
Expand Down
23 changes: 20 additions & 3 deletions modules/module-mongodb/src/replication/MongoSnapshotQuery.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { mongo } from '@powersync/lib-service-mongodb';
import { ReplicationAssertionError } from '@powersync/lib-services-framework';
import { bson } from '@powersync/service-core';
import { MongoExpression } from '@powersync/service-sync-rules';

/**
* Performs a collection snapshot query, chunking by ranges of _id.
Expand All @@ -13,12 +14,19 @@ export class ChunkedSnapshotQuery implements AsyncDisposable {
private lastCursor: mongo.FindCursor | null = null;
private collection: mongo.Collection;
private batchSize: number;
private filter: MongoExpression | null = null;

public constructor(options: { collection: mongo.Collection; batchSize: number; key?: Uint8Array | null }) {
public constructor(options: {
collection: mongo.Collection;
batchSize: number;
key?: Uint8Array | null;
filter?: MongoExpression;
}) {
this.lastKey = options.key ? bson.deserialize(options.key, { useBigInt64: true })._id : null;
this.lastCursor = null;
this.collection = options.collection;
this.batchSize = options.batchSize;
this.filter = options.filter ?? null;
}

async nextChunk(): Promise<{ docs: mongo.Document[]; lastKey: Uint8Array } | { docs: []; lastKey: null }> {
Expand All @@ -35,8 +43,17 @@ export class ChunkedSnapshotQuery implements AsyncDisposable {
// any parsing as an operator.
// Starting in MongoDB 5.0, this filter can use the _id index. Source:
// https://www.mongodb.com/docs/manual/release-notes/5.0/#general-aggregation-improvements
const filter: mongo.Filter<mongo.Document> =
this.lastKey == null ? {} : { $expr: { $gt: ['$_id', { $literal: this.lastKey }] } };
let filter: mongo.Filter<mongo.Document>;
if (this.lastKey == null) {
filter = this.filter == null ? {} : { $expr: this.filter };
} else {
if (this.filter == null) {
filter = { $expr: { $gt: ['$_id', { $literal: this.lastKey }] } };
} else {
filter = { $and: [{ $expr: { $gt: ['$_id', { $literal: this.lastKey }] } }, { $expr: this.filter }] };
}
}

cursor = this.collection.find(filter, {
readConcern: 'majority',
limit: this.batchSize,
Expand Down
31 changes: 12 additions & 19 deletions modules/module-mongodb/src/replication/MongoSnapshotter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import { MongoManager } from './MongoManager.js';
import { constructAfterRecord, createCheckpoint, getMongoRelation, STANDALONE_CHECKPOINT_ID } from './MongoRelation.js';
import { ChunkedSnapshotQuery } from './MongoSnapshotQuery.js';
import { CHECKPOINTS_COLLECTION } from './replication-utils.js';
import { staticFilterToMongoExpression } from './staticFilters.js';
import { JSONBig } from '@powersync/service-jsonbig';

export interface MongoSnapshotterOptions {
connections: MongoManager;
Expand Down Expand Up @@ -266,7 +268,8 @@ export class MongoSnapshotter {
const sourceTables = await writer.resolveTables({
connection_id: this.connection_id,
connection_tag: this.connections.connectionTag,
entity_descriptor: getMongoRelation({ db: schema, coll: collection.name })
entity_descriptor: getMongoRelation({ db: schema, coll: collection.name }),
pattern: tablePattern
});
// TODO: dropTables?
result.push(...sourceTables.tables);
Expand All @@ -280,17 +283,22 @@ export class MongoSnapshotter {
let at = table.snapshotStatus?.replicatedCount ?? 0;
const db = this.client.db(table.schema);
const collection = db.collection(table.name);

const mongoFilter = table.pattern?.filter ? staticFilterToMongoExpression(table.pattern.filter) : null;
const filterLog = mongoFilter ? ` | filter: ${JSONBig.stringify(mongoFilter)}` : '';

await using query = new ChunkedSnapshotQuery({
collection,
key: table.snapshotStatus?.lastKey,
batchSize: this.snapshotChunkLength
batchSize: this.snapshotChunkLength,
filter: mongoFilter
});
if (query.lastKey != null) {
this.logger.info(
`Replicating ${table.qualifiedName} ${table.formatSnapshotProgress()} - resuming at _id > ${query.lastKey}`
`Replicating ${table.qualifiedName} ${table.formatSnapshotProgress()} - resuming at _id > ${query.lastKey}${filterLog}`
);
} else {
this.logger.info(`Replicating ${table.qualifiedName} ${table.formatSnapshotProgress()}`);
this.logger.info(`Replicating ${table.qualifiedName} ${table.formatSnapshotProgress()}${filterLog}`);
}

let lastBatch = performance.now();
Expand Down Expand Up @@ -352,21 +360,6 @@ export class MongoSnapshotter {
return rowProcessor.applyRowContext<never>(inputRow);
}

private async getCollectionInfo(db: string, name: string): Promise<mongo.CollectionInfo | undefined> {
const collection = (
await this.client
.db(db)
.listCollections(
{
name: name
},
{ nameOnly: false }
)
.toArray()
)[0];
return collection;
}

private async checkPostImages(db: string, collectionInfo: mongo.CollectionInfo) {
if (!this.usePostImages) {
// Nothing to check
Expand Down
Loading
Loading