From f08bb44a8bfffb861c13c9ec7b52fd161a30498a Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Wed, 7 Jan 2026 13:16:36 +0200 Subject: [PATCH 1/3] Initial implementation for MongoDB. --- modules/module-mongodb/package.json | 3 +- .../src/replication/ChangeStream.ts | 514 +++---------- .../src/replication/ChangeStreamErrors.ts | 38 + .../src/replication/MongoSnapshotter.ts | 706 ++++++++++++++++++ .../test/src/change_stream_utils.ts | 35 +- pnpm-lock.yaml | 3 + 6 files changed, 860 insertions(+), 439 deletions(-) create mode 100644 modules/module-mongodb/src/replication/ChangeStreamErrors.ts create mode 100644 modules/module-mongodb/src/replication/MongoSnapshotter.ts diff --git a/modules/module-mongodb/package.json b/modules/module-mongodb/package.json index 9a5b5a5b7..b56e157c3 100644 --- a/modules/module-mongodb/package.json +++ b/modules/module-mongodb/package.json @@ -35,6 +35,7 @@ "@powersync/service-sync-rules": "workspace:*", "@powersync/service-types": "workspace:*", "bson": "^6.10.4", + "p-defer": "^4.0.1", "ts-codec": "^1.3.0", "uuid": "^11.1.0" }, @@ -43,4 +44,4 @@ "@powersync/service-module-mongodb-storage": "workspace:*", "@powersync/service-module-postgres-storage": "workspace:*" } -} \ No newline at end of file +} diff --git a/modules/module-mongodb/src/replication/ChangeStream.ts b/modules/module-mongodb/src/replication/ChangeStream.ts index 8bd4e67de..a366a60b9 100644 --- a/modules/module-mongodb/src/replication/ChangeStream.ts +++ b/modules/module-mongodb/src/replication/ChangeStream.ts @@ -1,7 +1,6 @@ -import { isMongoNetworkTimeoutError, isMongoServerError, mongo } from '@powersync/lib-service-mongodb'; +import { mongo } from '@powersync/lib-service-mongodb'; import { container, - DatabaseConnectionError, logger as defaultLogger, ErrorCode, Logger, @@ -17,13 +16,7 @@ import { SourceTable, storage } from '@powersync/service-core'; -import { - DatabaseInputRow, - SqliteInputRow, - SqliteRow, - HydratedSyncRules, - TablePattern -} from '@powersync/service-sync-rules'; +import { DatabaseInputRow, SqliteInputRow, SqliteRow, HydratedSyncRules } from '@powersync/service-sync-rules'; import { ReplicationMetric } from '@powersync/service-types'; import { MongoLSN } from '../common/MongoLSN.js'; import { PostImagesOption } from '../types/types.js'; @@ -36,8 +29,9 @@ import { getMongoRelation, STANDALONE_CHECKPOINT_ID } from './MongoRelation.js'; -import { ChunkedSnapshotQuery } from './MongoSnapshotQuery.js'; import { CHECKPOINTS_COLLECTION, timestampToDate } from './replication-utils.js'; +import { ChangeStreamInvalidatedError, mapChangeStreamError } from './ChangeStreamErrors.js'; +import { MongoSnapshotter } from './MongoSnapshotter.js'; export interface ChangeStreamOptions { connections: MongoManager; @@ -60,25 +54,6 @@ export interface ChangeStreamOptions { logger?: Logger; } -interface InitResult { - needsInitialSync: boolean; - snapshotLsn: string | null; -} - -/** - * Thrown when the change stream is not valid anymore, and replication - * must be restarted. - * - * Possible reasons: - * * Some change stream documents do not have postImages. - * * startAfter/resumeToken is not valid anymore. - */ -export class ChangeStreamInvalidatedError extends DatabaseConnectionError { - constructor(message: string, cause: any) { - super(ErrorCode.PSYNC_S1344, message, cause); - } -} - export class ChangeStream { sync_rules: HydratedSyncRules; group_id: number; @@ -94,7 +69,11 @@ export class ChangeStream { private readonly maxAwaitTimeMS: number; - private abort_signal: AbortSignal; + private abortController = new AbortController(); + private abortSignal: AbortSignal = this.abortController.signal; + + private initPromise: Promise | null = null; + private snapshotter: MongoSnapshotter; private relationCache = new RelationCache(getCacheIdentifier); @@ -113,8 +92,6 @@ export class ChangeStream { private logger: Logger; - private snapshotChunkLength: number; - private changeStreamTimeout: number; constructor(options: ChangeStreamOptions) { @@ -123,7 +100,6 @@ export class ChangeStream { this.group_id = options.storage.group_id; this.connections = options.connections; this.maxAwaitTimeMS = options.maxAwaitTimeMS ?? 10_000; - this.snapshotChunkLength = options.snapshotChunkLength ?? 6_000; this.client = this.connections.client; this.defaultDb = this.connections.db; this.sync_rules = options.storage.getParsedSyncRules({ @@ -133,20 +109,25 @@ export class ChangeStream { // so we use 90% of the socket timeout value. this.changeStreamTimeout = Math.ceil(this.client.options.socketTimeoutMS * 0.9); - this.abort_signal = options.abort_signal; - this.abort_signal.addEventListener( - 'abort', - () => { - // TODO: Fast abort? - }, - { once: true } - ); - this.logger = options.logger ?? defaultLogger; + this.snapshotter = new MongoSnapshotter({ + ...options, + abort_signal: this.abortSignal, + logger: this.logger, + checkpointStreamId: this.checkpointStreamId + }); + + // We wrap in our own abort controller so we can trigger abort internally. + options.abort_signal.addEventListener('abort', () => { + this.abortController.abort(options.abort_signal.reason); + }); + if (options.abort_signal.aborted) { + this.abortController.abort(options.abort_signal.reason); + } } get stopped() { - return this.abort_signal.aborted; + return this.abortSignal.aborted; } private get usePostImages() { @@ -157,267 +138,6 @@ export class ChangeStream { return this.connections.options.postImages == PostImagesOption.AUTO_CONFIGURE; } - /** - * This resolves a pattern, persists the related metadata, and returns - * the resulting SourceTables. - * - * This implicitly checks the collection postImage configuration. - */ - async resolveQualifiedTableNames( - batch: storage.BucketStorageBatch, - tablePattern: TablePattern - ): Promise { - const schema = tablePattern.schema; - if (tablePattern.connectionTag != this.connections.connectionTag) { - return []; - } - - let nameFilter: RegExp | string; - if (tablePattern.isWildcard) { - nameFilter = new RegExp('^' + escapeRegExp(tablePattern.tablePrefix)); - } else { - nameFilter = tablePattern.name; - } - let result: storage.SourceTable[] = []; - - // Check if the collection exists - const collections = await this.client - .db(schema) - .listCollections( - { - name: nameFilter - }, - { nameOnly: false } - ) - .toArray(); - - if (!tablePattern.isWildcard && collections.length == 0) { - this.logger.warn(`Collection ${schema}.${tablePattern.name} not found`); - } - - for (let collection of collections) { - const table = await this.handleRelation( - batch, - getMongoRelation({ db: schema, coll: collection.name }), - // This is done as part of the initial setup - snapshot is handled elsewhere - { snapshot: false, collectionInfo: collection } - ); - - result.push(table); - } - - return result; - } - - async initSlot(): Promise { - const status = await this.storage.getStatus(); - if (status.snapshot_done && status.checkpoint_lsn) { - this.logger.info(`Initial replication already done`); - return { needsInitialSync: false, snapshotLsn: null }; - } - - return { needsInitialSync: true, snapshotLsn: status.snapshot_lsn }; - } - - async estimatedCount(table: storage.SourceTable): Promise { - const count = await this.estimatedCountNumber(table); - return `~${count}`; - } - - async estimatedCountNumber(table: storage.SourceTable): Promise { - const db = this.client.db(table.schema); - return await db.collection(table.name).estimatedDocumentCount(); - } - - /** - * This gets a LSN before starting a snapshot, which we can resume streaming from after the snapshot. - * - * This LSN can survive initial replication restarts. - */ - private async getSnapshotLsn(): Promise { - const hello = await this.defaultDb.command({ hello: 1 }); - // Basic sanity check - if (hello.msg == 'isdbgrid') { - throw new ServiceError( - ErrorCode.PSYNC_S1341, - 'Sharded MongoDB Clusters are not supported yet (including MongoDB Serverless instances).' - ); - } else if (hello.setName == null) { - throw new ServiceError( - ErrorCode.PSYNC_S1342, - 'Standalone MongoDB instances are not supported - use a replicaset.' - ); - } - - // Open a change stream just to get a resume token for later use. - // We could use clusterTime from the hello command, but that won't tell us if the - // snapshot isn't valid anymore. - // If we just use the first resumeToken from the stream, we get two potential issues: - // 1. The resumeToken may just be a wrapped clusterTime, which does not detect changes - // in source db or other stream issues. - // 2. The first actual change we get may have the same clusterTime, causing us to incorrect - // skip that event. - // Instead, we create a new checkpoint document, and wait until we get that document back in the stream. - // To avoid potential race conditions with the checkpoint creation, we create a new checkpoint document - // periodically until the timeout is reached. - - const LSN_TIMEOUT_SECONDS = 60; - const LSN_CREATE_INTERVAL_SECONDS = 1; - - await using streamManager = this.openChangeStream({ lsn: null, maxAwaitTimeMs: 0 }); - const { stream } = streamManager; - const startTime = performance.now(); - let lastCheckpointCreated = -10_000; - let eventsSeen = 0; - - while (performance.now() - startTime < LSN_TIMEOUT_SECONDS * 1000) { - if (performance.now() - lastCheckpointCreated >= LSN_CREATE_INTERVAL_SECONDS * 1000) { - await createCheckpoint(this.client, this.defaultDb, this.checkpointStreamId); - lastCheckpointCreated = performance.now(); - } - - // tryNext() doesn't block, while next() / hasNext() does block until there is data on the stream - const changeDocument = await stream.tryNext().catch((e) => { - throw mapChangeStreamError(e); - }); - if (changeDocument == null) { - continue; - } - - const ns = 'ns' in changeDocument && 'coll' in changeDocument.ns ? changeDocument.ns : undefined; - - if (ns?.coll == CHECKPOINTS_COLLECTION && 'documentKey' in changeDocument) { - const checkpointId = changeDocument.documentKey._id as string | mongo.ObjectId; - if (!this.checkpointStreamId.equals(checkpointId)) { - continue; - } - const { comparable: lsn } = new MongoLSN({ - timestamp: changeDocument.clusterTime!, - resume_token: changeDocument._id - }); - return lsn; - } - - eventsSeen += 1; - } - - // Could happen if there is a very large replication lag? - throw new ServiceError( - ErrorCode.PSYNC_S1301, - `Timeout after while waiting for checkpoint document for ${LSN_TIMEOUT_SECONDS}s. Streamed events = ${eventsSeen}` - ); - } - - /** - * Given a snapshot LSN, validate that we can read from it, by opening a change stream. - */ - private async validateSnapshotLsn(lsn: string) { - await using streamManager = this.openChangeStream({ lsn: lsn, maxAwaitTimeMs: 0 }); - const { stream } = streamManager; - try { - // tryNext() doesn't block, while next() / hasNext() does block until there is data on the stream - await stream.tryNext(); - } catch (e) { - // Note: A timeout here is not handled as a ChangeStreamInvalidatedError, even though - // we possibly cannot recover from it. - throw mapChangeStreamError(e); - } - } - - async initialReplication(snapshotLsn: string | null) { - const sourceTables = this.sync_rules.getSourceTables(); - await this.client.connect(); - - const flushResult = await this.storage.startBatch( - { - logger: this.logger, - zeroLSN: MongoLSN.ZERO.comparable, - defaultSchema: this.defaultDb.databaseName, - storeCurrentData: false, - skipExistingRows: true - }, - async (batch) => { - if (snapshotLsn == null) { - // First replication attempt - get a snapshot and store the timestamp - snapshotLsn = await this.getSnapshotLsn(); - await batch.setResumeLsn(snapshotLsn); - this.logger.info(`Marking snapshot at ${snapshotLsn}`); - } else { - this.logger.info(`Resuming snapshot at ${snapshotLsn}`); - // Check that the snapshot is still valid. - await this.validateSnapshotLsn(snapshotLsn); - } - - // Start by resolving all tables. - // This checks postImage configuration, and that should fail as - // early as possible. - let allSourceTables: SourceTable[] = []; - for (let tablePattern of sourceTables) { - const tables = await this.resolveQualifiedTableNames(batch, tablePattern); - allSourceTables.push(...tables); - } - - let tablesWithStatus: SourceTable[] = []; - for (let table of allSourceTables) { - if (table.snapshotComplete) { - this.logger.info(`Skipping ${table.qualifiedName} - snapshot already done`); - continue; - } - let count = await this.estimatedCountNumber(table); - const updated = await batch.updateTableProgress(table, { - totalEstimatedCount: count - }); - tablesWithStatus.push(updated); - this.relationCache.update(updated); - this.logger.info( - `To replicate: ${table.qualifiedName}: ${updated.snapshotStatus?.replicatedCount}/~${updated.snapshotStatus?.totalEstimatedCount}` - ); - } - - for (let table of tablesWithStatus) { - await this.snapshotTable(batch, table); - await batch.markTableSnapshotDone([table]); - - this.touch(); - } - - // The checkpoint here is a marker - we need to replicate up to at least this - // point before the data can be considered consistent. - // We could do this for each individual table, but may as well just do it once for the entire snapshot. - const checkpoint = await createCheckpoint(this.client, this.defaultDb, STANDALONE_CHECKPOINT_ID); - await batch.markAllSnapshotDone(checkpoint); - - // This will not create a consistent checkpoint yet, but will persist the op. - // Actual checkpoint will be created when streaming replication caught up. - await batch.commit(snapshotLsn); - - this.logger.info(`Snapshot done. Need to replicate from ${snapshotLsn} to ${checkpoint} to be consistent`); - } - ); - return { lastOpId: flushResult?.flushed_op }; - } - - private async setupCheckpointsCollection() { - const collection = await this.getCollectionInfo(this.defaultDb.databaseName, CHECKPOINTS_COLLECTION); - if (collection == null) { - await this.defaultDb.createCollection(CHECKPOINTS_COLLECTION, { - changeStreamPreAndPostImages: { enabled: true } - }); - } else if (this.usePostImages && collection.options?.changeStreamPreAndPostImages?.enabled != true) { - // Drop + create requires less permissions than collMod, - // and we don't care about the data in this collection. - await this.defaultDb.dropCollection(CHECKPOINTS_COLLECTION); - await this.defaultDb.createCollection(CHECKPOINTS_COLLECTION, { - changeStreamPreAndPostImages: { enabled: true } - }); - } else { - // Clear the collection on startup, to keep it clean - // We never query this collection directly, and don't want to keep the data around. - // We only use this to get data into the oplog/changestream. - await this.defaultDb.collection(CHECKPOINTS_COLLECTION).deleteMany({}); - } - } - private getSourceNamespaceFilters(): { $match: any; multipleDatabases: boolean } { const sourceTables = this.sync_rules.getSourceTables(); @@ -475,76 +195,6 @@ export class ChangeStream { } } - private async snapshotTable(batch: storage.BucketStorageBatch, table: storage.SourceTable) { - const totalEstimatedCount = await this.estimatedCountNumber(table); - let at = table.snapshotStatus?.replicatedCount ?? 0; - const db = this.client.db(table.schema); - const collection = db.collection(table.name); - await using query = new ChunkedSnapshotQuery({ - collection, - key: table.snapshotStatus?.lastKey, - batchSize: this.snapshotChunkLength - }); - if (query.lastKey != null) { - this.logger.info( - `Replicating ${table.qualifiedName} ${table.formatSnapshotProgress()} - resuming at _id > ${query.lastKey}` - ); - } else { - this.logger.info(`Replicating ${table.qualifiedName} ${table.formatSnapshotProgress()}`); - } - - let lastBatch = performance.now(); - let nextChunkPromise = query.nextChunk(); - while (true) { - const { docs: docBatch, lastKey } = await nextChunkPromise; - if (docBatch.length == 0) { - // No more data - stop iterating - break; - } - - if (this.abort_signal.aborted) { - throw new ReplicationAbortedError(`Aborted initial replication`, this.abort_signal.reason); - } - - // Pre-fetch next batch, so that we can read and write concurrently - nextChunkPromise = query.nextChunk(); - for (let document of docBatch) { - const record = this.constructAfterRecord(document); - - // This auto-flushes when the batch reaches its size limit - await batch.save({ - tag: SaveOperationTag.INSERT, - sourceTable: table, - before: undefined, - beforeReplicaId: undefined, - after: record, - afterReplicaId: document._id - }); - } - - // Important: flush before marking progress - await batch.flush(); - at += docBatch.length; - this.metrics.getCounter(ReplicationMetric.ROWS_REPLICATED).add(docBatch.length); - - table = await batch.updateTableProgress(table, { - lastKey, - replicatedCount: at, - totalEstimatedCount: totalEstimatedCount - }); - this.relationCache.update(table); - - const duration = performance.now() - lastBatch; - lastBatch = performance.now(); - this.logger.info( - `Replicating ${table.qualifiedName} ${table.formatSnapshotProgress()} in ${duration.toFixed(0)}ms` - ); - this.touch(); - } - // In case the loop was interrupted, make sure we await the last promise. - await nextChunkPromise; - } - private async getRelation( batch: storage.BucketStorageBatch, descriptor: SourceEntityDescriptor, @@ -634,14 +284,7 @@ export class ChangeStream { const shouldSnapshot = snapshot && !result.table.snapshotComplete && result.table.syncAny; if (shouldSnapshot) { this.logger.info(`New collection: ${descriptor.schema}.${descriptor.name}`); - // Truncate this table, in case a previous snapshot was interrupted. - await batch.truncate([result.table]); - - await this.snapshotTable(batch, result.table); - const no_checkpoint_before_lsn = await createCheckpoint(this.client, this.defaultDb, STANDALONE_CHECKPOINT_ID); - - const [table] = await batch.markTableSnapshotDone([result.table], no_checkpoint_before_lsn); - return table; + await this.snapshotter.queueSnapshot(batch, result.table); } return result.table; @@ -705,38 +348,79 @@ export class ChangeStream { } async replicate() { + let streamPromise: Promise | null = null; + let loopPromise: Promise | null = null; try { // If anything errors here, the entire replication process is halted, and // all connections automatically closed, including this one. - await this.initReplication(); - await this.streamChanges(); + this.initPromise = this.initReplication(); + await this.initPromise; + streamPromise = this.streamChanges() + .then(() => { + throw new ReplicationAssertionError(`Replication stream exited unexpectedly`); + }) + .catch((e) => { + this.abortController.abort(e); + throw e; + }); + loopPromise = this.snapshotter + .replicationLoop() + .then(() => { + throw new ReplicationAssertionError(`Replication snapshotter exited unexpectedly`); + }) + .catch((e) => { + this.abortController.abort(e); + throw e; + }); + const results = await Promise.allSettled([loopPromise, streamPromise]); + // First, prioritize non-aborted errors + for (let result of results) { + if (result.status == 'rejected' && !(result.reason instanceof ReplicationAbortedError)) { + throw result.reason; + } + } + // Then include aborted errors + for (let result of results) { + if (result.status == 'rejected') { + throw result.reason; + } + } + + // If we get here, both Promises completed successfully, which is unexpected. + throw new ReplicationAssertionError(`Replication loop exited unexpectedly`); } catch (e) { await this.storage.reportError(e); throw e; + } finally { + // Just to make sure + this.abortController.abort(); + } + } + + /** + * For tests: Wait until the initial snapshot is complete. + */ + public async waitForInitialSnapshot() { + if (this.initPromise == null) { + throw new ReplicationAssertionError('replicate() must be called before waitForInitialSnapshot()'); } + await this.initPromise; + await this.snapshotter.waitForInitialSnapshot(); } - async initReplication() { - const result = await this.initSlot(); - await this.setupCheckpointsCollection(); + private async initReplication() { + const result = await this.snapshotter.checkSlot(); + await this.snapshotter.setupCheckpointsCollection(); if (result.needsInitialSync) { if (result.snapshotLsn == null) { // Snapshot LSN is not present, so we need to start replication from scratch. - await this.storage.clear({ signal: this.abort_signal }); - } - const { lastOpId } = await this.initialReplication(result.snapshotLsn); - if (lastOpId != null) { - // Populate the cache _after_ initial replication, but _before_ we switch to this sync rules. - await this.storage.populatePersistentChecksumCache({ - signal: this.abort_signal, - // No checkpoint yet, but we do have the opId. - maxOpId: lastOpId - }); + await this.storage.clear({ signal: this.abortSignal }); } + await this.snapshotter.queueSnapshotTables(result.snapshotLsn); } } - async streamChanges() { + private async streamChanges() { try { await this.streamChangesInternal(); } catch (e) { @@ -802,7 +486,7 @@ export class ChangeStream { stream = this.defaultDb.watch(pipeline, streamOptions); } - this.abort_signal.addEventListener('abort', () => { + this.abortSignal.addEventListener('abort', () => { stream.close(); }); @@ -815,7 +499,7 @@ export class ChangeStream { }; } - async streamChangesInternal() { + private async streamChangesInternal() { await this.storage.startBatch( { logger: this.logger, @@ -840,7 +524,7 @@ export class ChangeStream { await using streamManager = this.openChangeStream({ lsn: resumeFromLsn }); const { stream, filters } = streamManager; - if (this.abort_signal.aborted) { + if (this.abortSignal.aborted) { await stream.close(); return; } @@ -862,7 +546,7 @@ export class ChangeStream { let lastEmptyResume = performance.now(); while (true) { - if (this.abort_signal.aborted) { + if (this.abortSignal.aborted) { break; } @@ -874,7 +558,7 @@ export class ChangeStream { break; } - if (this.abort_signal.aborted) { + if (this.abortSignal.aborted) { break; } @@ -1098,6 +782,8 @@ export class ChangeStream { } } ); + + throw new ReplicationAbortedError(`Replication stream aborted`, this.abortSignal.reason); } async getReplicationLagMillis(): Promise { @@ -1126,24 +812,4 @@ export class ChangeStream { } } -function mapChangeStreamError(e: any) { - if (isMongoNetworkTimeoutError(e)) { - // This typically has an unhelpful message like "connection 2 to 159.41.94.47:27017 timed out". - // We wrap the error to make it more useful. - throw new DatabaseConnectionError(ErrorCode.PSYNC_S1345, `Timeout while reading MongoDB ChangeStream`, e); - } else if (isMongoServerError(e) && e.codeName == 'MaxTimeMSExpired') { - // maxTimeMS was reached. Example message: - // MongoServerError: Executor error during aggregate command on namespace: powersync_test_data.$cmd.aggregate :: caused by :: operation exceeded time limit - throw new DatabaseConnectionError(ErrorCode.PSYNC_S1345, `Timeout while reading MongoDB ChangeStream`, e); - } else if ( - isMongoServerError(e) && - e.codeName == 'NoMatchingDocument' && - e.errmsg?.includes('post-image was not found') - ) { - throw new ChangeStreamInvalidatedError(e.errmsg, e); - } else if (isMongoServerError(e) && e.hasErrorLabel('NonResumableChangeStreamError')) { - throw new ChangeStreamInvalidatedError(e.message, e); - } else { - throw new DatabaseConnectionError(ErrorCode.PSYNC_S1346, `Error reading MongoDB ChangeStream`, e); - } -} +export { ChangeStreamInvalidatedError }; diff --git a/modules/module-mongodb/src/replication/ChangeStreamErrors.ts b/modules/module-mongodb/src/replication/ChangeStreamErrors.ts new file mode 100644 index 000000000..44fc40cd3 --- /dev/null +++ b/modules/module-mongodb/src/replication/ChangeStreamErrors.ts @@ -0,0 +1,38 @@ +import { isMongoNetworkTimeoutError, isMongoServerError } from '@powersync/lib-service-mongodb'; +import { DatabaseConnectionError, ErrorCode } from '@powersync/lib-services-framework'; + +/** + * Thrown when the change stream is not valid anymore, and replication + * must be restarted. + * + * Possible reasons: + * * Some change stream documents do not have postImages. + * * startAfter/resumeToken is not valid anymore. + */ +export class ChangeStreamInvalidatedError extends DatabaseConnectionError { + constructor(message: string, cause: any) { + super(ErrorCode.PSYNC_S1344, message, cause); + } +} + +export function mapChangeStreamError(e: any) { + if (isMongoNetworkTimeoutError(e)) { + // This typically has an unhelpful message like "connection 2 to 159.41.94.47:27017 timed out". + // We wrap the error to make it more useful. + throw new DatabaseConnectionError(ErrorCode.PSYNC_S1345, `Timeout while reading MongoDB ChangeStream`, e); + } else if (isMongoServerError(e) && e.codeName == 'MaxTimeMSExpired') { + // maxTimeMS was reached. Example message: + // MongoServerError: Executor error during aggregate command on namespace: powersync_test_data.$cmd.aggregate :: caused by :: operation exceeded time limit + throw new DatabaseConnectionError(ErrorCode.PSYNC_S1345, `Timeout while reading MongoDB ChangeStream`, e); + } else if ( + isMongoServerError(e) && + e.codeName == 'NoMatchingDocument' && + e.errmsg?.includes('post-image was not found') + ) { + throw new ChangeStreamInvalidatedError(e.errmsg, e); + } else if (isMongoServerError(e) && e.hasErrorLabel('NonResumableChangeStreamError')) { + throw new ChangeStreamInvalidatedError(e.message, e); + } else { + throw new DatabaseConnectionError(ErrorCode.PSYNC_S1346, `Error reading MongoDB ChangeStream`, e); + } +} diff --git a/modules/module-mongodb/src/replication/MongoSnapshotter.ts b/modules/module-mongodb/src/replication/MongoSnapshotter.ts new file mode 100644 index 000000000..55c0b030b --- /dev/null +++ b/modules/module-mongodb/src/replication/MongoSnapshotter.ts @@ -0,0 +1,706 @@ +import { mongo } from '@powersync/lib-service-mongodb'; +import { + container, + ErrorCode, + logger as defaultLogger, + Logger, + ReplicationAbortedError, + ServiceError +} from '@powersync/lib-services-framework'; +import { + MetricsEngine, + RelationCache, + SaveOperationTag, + SourceEntityDescriptor, + SourceTable, + InternalOpId, + storage +} from '@powersync/service-core'; +import { + DatabaseInputRow, + SqliteInputRow, + SqliteRow, + HydratedSyncRules, + TablePattern +} from '@powersync/service-sync-rules'; +import { ReplicationMetric } from '@powersync/service-types'; +import * as timers from 'node:timers/promises'; +import pDefer from 'p-defer'; +import { MongoLSN } from '../common/MongoLSN.js'; +import { PostImagesOption } from '../types/types.js'; +import { escapeRegExp } from '../utils.js'; +import { ChunkedSnapshotQuery } from './MongoSnapshotQuery.js'; +import { + constructAfterRecord, + createCheckpoint, + getCacheIdentifier, + getMongoRelation, + STANDALONE_CHECKPOINT_ID +} from './MongoRelation.js'; +import { MongoManager } from './MongoManager.js'; +import { mapChangeStreamError } from './ChangeStreamErrors.js'; +import { CHECKPOINTS_COLLECTION } from './replication-utils.js'; + +export interface MongoSnapshotterOptions { + connections: MongoManager; + storage: storage.SyncRulesBucketStorage; + metrics: MetricsEngine; + abort_signal: AbortSignal; + /** + * Override maxAwaitTimeMS for testing. + */ + maxAwaitTimeMS?: number; + /** + * Override snapshotChunkLength for testing. + */ + snapshotChunkLength?: number; + logger?: Logger; + checkpointStreamId: mongo.ObjectId; +} + +interface InitResult { + needsInitialSync: boolean; + snapshotLsn: string | null; +} + +export class MongoSnapshotter { + sync_rules: HydratedSyncRules; + group_id: number; + + connection_id = 1; + + private readonly storage: storage.SyncRulesBucketStorage; + private readonly metrics: MetricsEngine; + + private connections: MongoManager; + private readonly client: mongo.MongoClient; + private readonly defaultDb: mongo.Db; + + private readonly maxAwaitTimeMS: number; + private readonly snapshotChunkLength: number; + + private abortSignal: AbortSignal; + + private relationCache = new RelationCache(getCacheIdentifier); + + private logger: Logger; + + private checkpointStreamId: mongo.ObjectId; + private changeStreamTimeout: number; + + private queue = new Set(); + private initialSnapshotDone = pDefer(); + private lastSnapshotOpId: InternalOpId | null = null; + + constructor(options: MongoSnapshotterOptions) { + this.storage = options.storage; + this.metrics = options.metrics; + this.group_id = options.storage.group_id; + this.connections = options.connections; + this.maxAwaitTimeMS = options.maxAwaitTimeMS ?? 10_000; + this.snapshotChunkLength = options.snapshotChunkLength ?? 6_000; + this.client = this.connections.client; + this.defaultDb = this.connections.db; + this.sync_rules = options.storage.getParsedSyncRules({ + defaultSchema: this.defaultDb.databaseName + }); + this.abortSignal = options.abort_signal; + this.logger = options.logger ?? defaultLogger; + this.checkpointStreamId = options.checkpointStreamId; + this.changeStreamTimeout = Math.ceil(this.client.options.socketTimeoutMS * 0.9); + } + + private get usePostImages() { + return this.connections.options.postImages != PostImagesOption.OFF; + } + + private get configurePostImages() { + return this.connections.options.postImages == PostImagesOption.AUTO_CONFIGURE; + } + + async checkSlot(): Promise { + const status = await this.storage.getStatus(); + if (status.snapshot_done && status.checkpoint_lsn) { + this.logger.info(`Initial replication already done`); + return { needsInitialSync: false, snapshotLsn: null }; + } + + return { needsInitialSync: true, snapshotLsn: status.snapshot_lsn }; + } + + async setupCheckpointsCollection() { + const collection = await this.getCollectionInfo(this.defaultDb.databaseName, CHECKPOINTS_COLLECTION); + if (collection == null) { + await this.defaultDb.createCollection(CHECKPOINTS_COLLECTION, { + changeStreamPreAndPostImages: { enabled: true } + }); + } else if (this.usePostImages && collection.options?.changeStreamPreAndPostImages?.enabled != true) { + // Drop + create requires less permissions than collMod, + // and we don't care about the data in this collection. + await this.defaultDb.dropCollection(CHECKPOINTS_COLLECTION); + await this.defaultDb.createCollection(CHECKPOINTS_COLLECTION, { + changeStreamPreAndPostImages: { enabled: true } + }); + } else { + // Clear the collection on startup, to keep it clean + // We never query this collection directly, and don't want to keep the data around. + // We only use this to get data into the oplog/changestream. + await this.defaultDb.collection(CHECKPOINTS_COLLECTION).deleteMany({}); + } + } + + async queueSnapshotTables(snapshotLsn: string | null) { + const sourceTables = this.sync_rules.getSourceTables(); + await this.client.connect(); + + await this.storage.startBatch( + { + logger: this.logger, + zeroLSN: MongoLSN.ZERO.comparable, + defaultSchema: this.defaultDb.databaseName, + storeCurrentData: false, + skipExistingRows: true + }, + async (batch) => { + if (snapshotLsn == null) { + // First replication attempt - get a snapshot and store the timestamp + snapshotLsn = await this.getSnapshotLsn(); + await batch.setResumeLsn(snapshotLsn); + this.logger.info(`Marking snapshot at ${snapshotLsn}`); + } else { + this.logger.info(`Resuming snapshot at ${snapshotLsn}`); + // Check that the snapshot is still valid. + await this.validateSnapshotLsn(snapshotLsn); + } + + // Start by resolving all tables. + // This checks postImage configuration, and that should fail as + // early as possible. + let allSourceTables: SourceTable[] = []; + for (let tablePattern of sourceTables) { + const tables = await this.resolveQualifiedTableNames(batch, tablePattern); + allSourceTables.push(...tables); + } + + let tablesWithStatus: SourceTable[] = []; + for (let table of allSourceTables) { + if (table.snapshotComplete) { + this.logger.info(`Skipping ${table.qualifiedName} - snapshot already done`); + continue; + } + const count = await this.estimatedCountNumber(table); + const updated = await batch.updateTableProgress(table, { + totalEstimatedCount: count + }); + tablesWithStatus.push(updated); + this.relationCache.update(updated); + this.logger.info( + `To replicate: ${updated.qualifiedName}: ${updated.snapshotStatus?.replicatedCount}/~${updated.snapshotStatus?.totalEstimatedCount}` + ); + } + + for (let table of tablesWithStatus) { + this.queue.add(table); + } + } + ); + } + + async waitForInitialSnapshot() { + await this.initialSnapshotDone.promise; + } + + async replicationLoop() { + try { + if (this.queue.size == 0) { + // Special case where we start with no tables to snapshot + await this.markSnapshotDone(); + } + while (!this.abortSignal.aborted) { + const table = this.queue.values().next().value; + if (table == null) { + this.initialSnapshotDone.resolve(); + await timers.setTimeout(500, { signal: this.abortSignal }); + continue; + } + + await this.replicateTable(table); + this.queue.delete(table); + if (this.queue.size == 0) { + await this.markSnapshotDone(); + } + } + throw new ReplicationAbortedError(`Replication loop aborted`, this.abortSignal.reason); + } catch (e) { + // If initial snapshot already completed, this has no effect + this.initialSnapshotDone.reject(e); + throw e; + } + } + + private async markSnapshotDone() { + const flushResults = await this.storage.startBatch( + { + logger: this.logger, + zeroLSN: MongoLSN.ZERO.comparable, + defaultSchema: this.defaultDb.databaseName, + storeCurrentData: false, + skipExistingRows: true + }, + async (batch) => { + // The checkpoint here is a marker - we need to replicate up to at least this + // point before the data can be considered consistent. + const checkpoint = await createCheckpoint(this.client, this.defaultDb, STANDALONE_CHECKPOINT_ID); + await batch.markAllSnapshotDone(checkpoint); + } + ); + + const lastOp = flushResults?.flushed_op ?? this.lastSnapshotOpId; + if (lastOp != null) { + // Populate the cache _after_ initial replication, but _before_ we switch to this sync rules. + // TODO: only run this after initial replication, not after each table. + await this.storage.populatePersistentChecksumCache({ + // No checkpoint yet, but we do have the opId. + maxOpId: lastOp, + signal: this.abortSignal + }); + } + } + + private async replicateTable(table: SourceTable) { + const flushResults = await this.storage.startBatch( + { + logger: this.logger, + zeroLSN: MongoLSN.ZERO.comparable, + defaultSchema: this.defaultDb.databaseName, + storeCurrentData: false, + skipExistingRows: true + }, + async (batch) => { + await this.snapshotTable(batch, table); + + const noCheckpointBefore = await createCheckpoint(this.client, this.defaultDb, STANDALONE_CHECKPOINT_ID); + await batch.markTableSnapshotDone([table], noCheckpointBefore); + + // This commit ensures we set keepalive_op. + const resumeLsn = batch.resumeFromLsn ?? MongoLSN.ZERO.comparable; + await batch.commit(resumeLsn); + } + ); + if (flushResults?.flushed_op != null) { + this.lastSnapshotOpId = flushResults.flushed_op; + } + this.logger.info(`Flushed snapshot at ${flushResults?.flushed_op}`); + } + + async queueSnapshot(batch: storage.BucketStorageBatch, table: storage.SourceTable) { + await batch.markTableSnapshotRequired(table); + this.queue.add(table); + } + + async estimatedCount(table: storage.SourceTable): Promise { + const count = await this.estimatedCountNumber(table); + return `~${count}`; + } + + async estimatedCountNumber(table: storage.SourceTable): Promise { + const db = this.client.db(table.schema); + return await db.collection(table.name).estimatedDocumentCount(); + } + + async resolveQualifiedTableNames( + batch: storage.BucketStorageBatch, + tablePattern: TablePattern + ): Promise { + const schema = tablePattern.schema; + if (tablePattern.connectionTag != this.connections.connectionTag) { + return []; + } + + let nameFilter: RegExp | string; + if (tablePattern.isWildcard) { + nameFilter = new RegExp('^' + escapeRegExp(tablePattern.tablePrefix)); + } else { + nameFilter = tablePattern.name; + } + let result: storage.SourceTable[] = []; + + // Check if the collection exists + const collections = await this.client + .db(schema) + .listCollections( + { + name: nameFilter + }, + { nameOnly: false } + ) + .toArray(); + + if (!tablePattern.isWildcard && collections.length == 0) { + this.logger.warn(`Collection ${schema}.${tablePattern.name} not found`); + } + + for (let collection of collections) { + const table = await this.handleRelation(batch, getMongoRelation({ db: schema, coll: collection.name }), { + collectionInfo: collection + }); + + result.push(table); + } + + return result; + } + + private async snapshotTable(batch: storage.BucketStorageBatch, table: storage.SourceTable) { + const totalEstimatedCount = await this.estimatedCountNumber(table); + let at = table.snapshotStatus?.replicatedCount ?? 0; + const db = this.client.db(table.schema); + const collection = db.collection(table.name); + await using query = new ChunkedSnapshotQuery({ + collection, + key: table.snapshotStatus?.lastKey, + batchSize: this.snapshotChunkLength + }); + if (query.lastKey != null) { + this.logger.info( + `Replicating ${table.qualifiedName} ${table.formatSnapshotProgress()} - resuming at _id > ${query.lastKey}` + ); + } else { + this.logger.info(`Replicating ${table.qualifiedName} ${table.formatSnapshotProgress()}`); + } + + let lastBatch = performance.now(); + let nextChunkPromise = query.nextChunk(); + while (true) { + const { docs: docBatch, lastKey } = await nextChunkPromise; + if (docBatch.length == 0) { + // No more data - stop iterating + break; + } + + if (this.abortSignal.aborted) { + throw new ReplicationAbortedError(`Aborted initial replication`, this.abortSignal.reason); + } + + // Pre-fetch next batch, so that we can read and write concurrently + nextChunkPromise = query.nextChunk(); + for (let document of docBatch) { + const record = this.constructAfterRecord(document); + + // This auto-flushes when the batch reaches its size limit + await batch.save({ + tag: SaveOperationTag.INSERT, + sourceTable: table, + before: undefined, + beforeReplicaId: undefined, + after: record, + afterReplicaId: document._id + }); + } + + // Important: flush before marking progress + await batch.flush(); + at += docBatch.length; + this.metrics.getCounter(ReplicationMetric.ROWS_REPLICATED).add(docBatch.length); + + table = await batch.updateTableProgress(table, { + lastKey, + replicatedCount: at, + totalEstimatedCount: totalEstimatedCount + }); + this.relationCache.update(table); + + const duration = performance.now() - lastBatch; + lastBatch = performance.now(); + this.logger.info( + `Replicating ${table.qualifiedName} ${table.formatSnapshotProgress()} in ${duration.toFixed(0)}ms` + ); + this.touch(); + } + // In case the loop was interrupted, make sure we await the last promise. + await nextChunkPromise; + } + + private constructAfterRecord(document: mongo.Document): SqliteRow { + const inputRow = constructAfterRecord(document); + return this.sync_rules.applyRowContext(inputRow); + } + + private async getCollectionInfo(db: string, name: string): Promise { + const collection = ( + await this.client + .db(db) + .listCollections( + { + name: name + }, + { nameOnly: false } + ) + .toArray() + )[0]; + return collection; + } + + private async checkPostImages(db: string, collectionInfo: mongo.CollectionInfo) { + if (!this.usePostImages) { + // Nothing to check + return; + } + + const enabled = collectionInfo.options?.changeStreamPreAndPostImages?.enabled == true; + + if (!enabled && this.configurePostImages) { + await this.client.db(db).command({ + collMod: collectionInfo.name, + changeStreamPreAndPostImages: { enabled: true } + }); + this.logger.info(`Enabled postImages on ${db}.${collectionInfo.name}`); + } else if (!enabled) { + throw new ServiceError(ErrorCode.PSYNC_S1343, `postImages not enabled on ${db}.${collectionInfo.name}`); + } + } + + private async handleRelation( + batch: storage.BucketStorageBatch, + descriptor: SourceEntityDescriptor, + options: { collectionInfo: mongo.CollectionInfo | undefined } + ) { + if (options.collectionInfo != null) { + await this.checkPostImages(descriptor.schema, options.collectionInfo); + } else { + // If collectionInfo is null, the collection may have been dropped. + // Ignore the postImages check in this case. + } + + const result = await this.storage.resolveTable({ + group_id: this.group_id, + connection_id: this.connection_id, + connection_tag: this.connections.connectionTag, + entity_descriptor: descriptor, + sync_rules: this.sync_rules + }); + this.relationCache.update(result.table); + + // Drop conflicting collections. + // This is generally not expected for MongoDB source dbs, so we log an error. + if (result.dropTables.length > 0) { + this.logger.error( + `Conflicting collections found for ${JSON.stringify(descriptor)}. Dropping: ${result.dropTables.map((t) => t.id).join(', ')}` + ); + await batch.drop(result.dropTables); + } + + return result.table; + } + + private async getSnapshotLsn(): Promise { + const hello = await this.defaultDb.command({ hello: 1 }); + // Basic sanity check + if (hello.msg == 'isdbgrid') { + throw new ServiceError( + ErrorCode.PSYNC_S1341, + 'Sharded MongoDB Clusters are not supported yet (including MongoDB Serverless instances).' + ); + } else if (hello.setName == null) { + throw new ServiceError( + ErrorCode.PSYNC_S1342, + 'Standalone MongoDB instances are not supported - use a replicaset.' + ); + } + + // Open a change stream just to get a resume token for later use. + // We could use clusterTime from the hello command, but that won't tell us if the + // snapshot isn't valid anymore. + // If we just use the first resumeToken from the stream, we get two potential issues: + // 1. The resumeToken may just be a wrapped clusterTime, which does not detect changes + // in source db or other stream issues. + // 2. The first actual change we get may have the same clusterTime, causing us to incorrect + // skip that event. + // Instead, we create a new checkpoint document, and wait until we get that document back in the stream. + // To avoid potential race conditions with the checkpoint creation, we create a new checkpoint document + // periodically until the timeout is reached. + + const LSN_TIMEOUT_SECONDS = 60; + const LSN_CREATE_INTERVAL_SECONDS = 1; + + await using streamManager = this.openChangeStream({ lsn: null, maxAwaitTimeMs: 0 }); + const { stream } = streamManager; + const startTime = performance.now(); + let lastCheckpointCreated = -10_000; + let eventsSeen = 0; + + while (performance.now() - startTime < LSN_TIMEOUT_SECONDS * 1000) { + if (performance.now() - lastCheckpointCreated >= LSN_CREATE_INTERVAL_SECONDS * 1000) { + await createCheckpoint(this.client, this.defaultDb, this.checkpointStreamId); + lastCheckpointCreated = performance.now(); + } + + // tryNext() doesn't block, while next() / hasNext() does block until there is data on the stream + const changeDocument = await stream.tryNext().catch((e) => { + throw mapChangeStreamError(e); + }); + if (changeDocument == null) { + continue; + } + + const ns = 'ns' in changeDocument && 'coll' in changeDocument.ns ? changeDocument.ns : undefined; + + if (ns?.coll == CHECKPOINTS_COLLECTION && 'documentKey' in changeDocument) { + const checkpointId = changeDocument.documentKey._id as string | mongo.ObjectId; + if (!this.checkpointStreamId.equals(checkpointId)) { + continue; + } + const { comparable: lsn } = new MongoLSN({ + timestamp: changeDocument.clusterTime!, + resume_token: changeDocument._id + }); + return lsn; + } + + eventsSeen += 1; + } + + // Could happen if there is a very large replication lag? + throw new ServiceError( + ErrorCode.PSYNC_S1301, + `Timeout after while waiting for checkpoint document for ${LSN_TIMEOUT_SECONDS}s. Streamed events = ${eventsSeen}` + ); + } + + /** + * Given a snapshot LSN, validate that we can read from it, by opening a change stream. + */ + private async validateSnapshotLsn(lsn: string) { + await using streamManager = this.openChangeStream({ lsn: lsn, maxAwaitTimeMs: 0 }); + const { stream } = streamManager; + try { + // tryNext() doesn't block, while next() / hasNext() does block until there is data on the stream + await stream.tryNext(); + } catch (e) { + // Note: A timeout here is not handled as a ChangeStreamInvalidatedError, even though + // we possibly cannot recover from it. + throw mapChangeStreamError(e); + } + } + + private getSourceNamespaceFilters(): { $match: any; multipleDatabases: boolean } { + const sourceTables = this.sync_rules.getSourceTables(); + + let $inFilters: { db: string; coll: string }[] = [ + { db: this.defaultDb.databaseName, coll: CHECKPOINTS_COLLECTION } + ]; + let $refilters: { 'ns.db': string; 'ns.coll': RegExp }[] = []; + let multipleDatabases = false; + for (let tablePattern of sourceTables) { + if (tablePattern.connectionTag != this.connections.connectionTag) { + continue; + } + + if (tablePattern.schema != this.defaultDb.databaseName) { + multipleDatabases = true; + } + + if (tablePattern.isWildcard) { + $refilters.push({ + 'ns.db': tablePattern.schema, + 'ns.coll': new RegExp('^' + escapeRegExp(tablePattern.tablePrefix)) + }); + } else { + $inFilters.push({ + db: tablePattern.schema, + coll: tablePattern.name + }); + } + } + + const nsFilter = multipleDatabases + ? { ns: { $in: $inFilters } } + : { 'ns.coll': { $in: $inFilters.map((ns) => ns.coll) } }; + if ($refilters.length > 0) { + return { $match: { $or: [nsFilter, ...$refilters] }, multipleDatabases }; + } + return { $match: nsFilter, multipleDatabases }; + } + + static *getQueryData(results: Iterable): Generator { + for (let row of results) { + yield constructAfterRecord(row); + } + } + + private openChangeStream(options: { lsn: string | null; maxAwaitTimeMs?: number }) { + const lastLsn = options.lsn ? MongoLSN.fromSerialized(options.lsn) : null; + const startAfter = lastLsn?.timestamp; + const resumeAfter = lastLsn?.resumeToken; + + const filters = this.getSourceNamespaceFilters(); + + const pipeline: mongo.Document[] = [ + { + $match: filters.$match + }, + { $changeStreamSplitLargeEvent: {} } + ]; + + let fullDocument: 'required' | 'updateLookup'; + + if (this.usePostImages) { + // 'read_only' or 'auto_configure' + // Configuration happens during snapshot, or when we see new + // collections. + fullDocument = 'required'; + } else { + fullDocument = 'updateLookup'; + } + const streamOptions: mongo.ChangeStreamOptions = { + showExpandedEvents: true, + maxAwaitTimeMS: options.maxAwaitTimeMs ?? this.maxAwaitTimeMS, + fullDocument: fullDocument, + maxTimeMS: this.changeStreamTimeout + }; + + /** + * Only one of these options can be supplied at a time. + */ + if (resumeAfter) { + streamOptions.resumeAfter = resumeAfter; + } else { + // Legacy: We don't persist lsns without resumeTokens anymore, but we do still handle the + // case if we have an old one. + streamOptions.startAtOperationTime = startAfter; + } + + let stream: mongo.ChangeStream; + if (filters.multipleDatabases) { + // Requires readAnyDatabase@admin on Atlas + stream = this.client.watch(pipeline, streamOptions); + } else { + // Same general result, but requires less permissions than the above + stream = this.defaultDb.watch(pipeline, streamOptions); + } + + this.abortSignal.addEventListener('abort', () => { + stream.close(); + }); + + return { + stream, + filters, + [Symbol.asyncDispose]: async () => { + return stream.close(); + } + }; + } + + private lastTouchedAt = performance.now(); + + private touch() { + if (performance.now() - this.lastTouchedAt > 1_000) { + this.lastTouchedAt = performance.now(); + // Update the probes, but don't wait for it + container.probes.touch().catch((e) => { + this.logger.error(`Failed to touch the container probe: ${e.message}`, e); + }); + } + } +} diff --git a/modules/module-mongodb/test/src/change_stream_utils.ts b/modules/module-mongodb/test/src/change_stream_utils.ts index 1f54a7810..2851c01ee 100644 --- a/modules/module-mongodb/test/src/change_stream_utils.ts +++ b/modules/module-mongodb/test/src/change_stream_utils.ts @@ -7,8 +7,10 @@ import { OplogEntry, ProtocolOpId, ReplicationCheckpoint, + settledPromise, SyncRulesBucketStorage, - TestStorageOptions + TestStorageOptions, + unsettledPromise } from '@powersync/service-core'; import { METRICS_HELPER, test_utils } from '@powersync/service-core-tests'; @@ -18,11 +20,12 @@ import { createCheckpoint, STANDALONE_CHECKPOINT_ID } from '@module/replication/ import { NormalizedMongoConnectionConfig } from '@module/types/types.js'; import { clearTestDb, TEST_CONNECTION_OPTIONS } from './util.js'; +import { ReplicationAbortedError } from '@powersync/lib-services-framework'; export class ChangeStreamTestContext { private _walStream?: ChangeStream; private abortController = new AbortController(); - private streamPromise?: Promise>; + private settledReplicationPromise?: Promise>; public storage?: SyncRulesBucketStorage; /** @@ -66,7 +69,7 @@ export class ChangeStreamTestContext { async dispose() { this.abort(); - await this.streamPromise?.catch((e) => e); + await this.settledReplicationPromise; await this.factory[Symbol.asyncDispose](); await this.connectionManager.end(); } @@ -125,7 +128,18 @@ export class ChangeStreamTestContext { } async replicateSnapshot() { - await this.streamer.initReplication(); + // Use a settledPromise to avoid unhandled rejections + this.settledReplicationPromise ??= settledPromise(this.streamer.replicate()); + try { + await Promise.race([unsettledPromise(this.settledReplicationPromise), this.streamer.waitForInitialSnapshot()]); + } catch (e) { + if (e instanceof ReplicationAbortedError && e.cause != null) { + // Edge case for tests: replicate() can throw an error, but we'd receive the ReplicationAbortedError from + // waitForInitialSnapshot() first. In that case, prioritize the cause. + throw e.cause; + } + throw e; + } } /** @@ -143,21 +157,14 @@ export class ChangeStreamTestContext { } startStreaming() { - this.streamPromise = this.streamer - .streamChanges() - .then(() => ({ status: 'fulfilled', value: undefined }) satisfies PromiseFulfilledResult) - .catch((reason) => ({ status: 'rejected', reason }) satisfies PromiseRejectedResult); - return this.streamPromise; + this.settledReplicationPromise ??= settledPromise(this.streamer.replicate()); + return this.settledReplicationPromise; } async getCheckpoint(options?: { timeout?: number }) { let checkpoint = await Promise.race([ getClientCheckpoint(this.client, this.db, this.factory, { timeout: options?.timeout ?? 15_000 }), - this.streamPromise?.then((e) => { - if (e.status == 'rejected') { - throw e.reason; - } - }) + unsettledPromise(this.settledReplicationPromise!) ]); if (checkpoint == null) { // This indicates an issue with the test setup - streamingPromise completed instead diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index f5b6507fc..5e0fab3e9 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -198,6 +198,9 @@ importers: bson: specifier: ^6.10.4 version: 6.10.4 + p-defer: + specifier: ^4.0.1 + version: 4.0.1 ts-codec: specifier: ^1.3.0 version: 1.3.0 From eae4aef9090f4d01be9d45bf9374ded6872909e9 Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Wed, 7 Jan 2026 15:44:41 +0200 Subject: [PATCH 2/3] Fix tests to use the new structure. --- .../src/replication/MongoSnapshotter.ts | 7 +- .../test/src/change_stream.test.ts | 79 ++++++++----------- .../test/src/change_stream_utils.ts | 24 ++++-- .../test/src/chunked_snapshot.test.ts | 15 +++- .../module-mongodb/test/src/resume.test.ts | 6 +- .../test/src/resuming_snapshots.test.ts | 26 +++--- .../test/src/slow_tests.test.ts | 10 +-- 7 files changed, 87 insertions(+), 80 deletions(-) diff --git a/modules/module-mongodb/src/replication/MongoSnapshotter.ts b/modules/module-mongodb/src/replication/MongoSnapshotter.ts index 55c0b030b..660307c10 100644 --- a/modules/module-mongodb/src/replication/MongoSnapshotter.ts +++ b/modules/module-mongodb/src/replication/MongoSnapshotter.ts @@ -267,7 +267,7 @@ export class MongoSnapshotter { } } - private async replicateTable(table: SourceTable) { + private async replicateTable(tableRequest: SourceTable) { const flushResults = await this.storage.startBatch( { logger: this.logger, @@ -277,6 +277,11 @@ export class MongoSnapshotter { skipExistingRows: true }, async (batch) => { + // Get fresh table info, in case it was updated while queuing + const table = await this.handleRelation(batch, tableRequest, { collectionInfo: undefined }); + if (table.snapshotComplete) { + return; + } await this.snapshotTable(batch, table); const noCheckpointBefore = await createCheckpoint(this.client, this.defaultDb, STANDALONE_CHECKPOINT_ID); diff --git a/modules/module-mongodb/test/src/change_stream.test.ts b/modules/module-mongodb/test/src/change_stream.test.ts index b9375c935..e3d66e3b5 100644 --- a/modules/module-mongodb/test/src/change_stream.test.ts +++ b/modules/module-mongodb/test/src/change_stream.test.ts @@ -40,9 +40,7 @@ bucket_definitions: }); const collection = db.collection('test_data'); - await context.replicateSnapshot(); - - context.startStreaming(); + await context.initializeReplication(); const result = await collection.insertOne({ description: 'test1', num: 1152921504606846976n }); const test_id = result.insertedId; @@ -77,9 +75,7 @@ bucket_definitions: const result = await collection.insertOne({ description: 'test1', num: 1152921504606846976n }); const test_id = result.insertedId; - await context.replicateSnapshot(); - - context.startStreaming(); + await context.initializeReplication(); await setTimeout(30); await collection.updateOne({ _id: test_id }, { $set: { description: 'test2' } }); @@ -108,8 +104,7 @@ bucket_definitions: }); const collection = db.collection('test_data'); - await context.replicateSnapshot(); - context.startStreaming(); + await context.initializeReplication(); const session = client.startSession(); let test_id: mongo.ObjectId | undefined; @@ -155,9 +150,7 @@ bucket_definitions: }); const collection = db.collection('test_data'); - await context.replicateSnapshot(); - - context.startStreaming(); + await context.initializeReplication(); const session = client.startSession(); let test_id: mongo.ObjectId | undefined; @@ -202,9 +195,7 @@ bucket_definitions: }); const collection = db.collection('test_data'); - await context.replicateSnapshot(); - - context.startStreaming(); + await context.initializeReplication(); const session = client.startSession(); let test_id: mongo.ObjectId | undefined; @@ -242,9 +233,7 @@ bucket_definitions: `); await db.createCollection('test_DATA'); - await context.replicateSnapshot(); - - context.startStreaming(); + await context.initializeReplication(); const collection = db.collection('test_DATA'); const result = await collection.insertOne({ description: 'test1' }); @@ -266,8 +255,7 @@ bucket_definitions: `); await db.createCollection('test_data'); - await context.replicateSnapshot(); - context.startStreaming(); + await context.initializeReplication(); const largeDescription = crypto.randomBytes(20_000).toString('hex'); @@ -299,8 +287,7 @@ bucket_definitions: data: [] `; await context.updateSyncRules(syncRuleContent); - await context.replicateSnapshot(); - context.startStreaming(); + await context.initializeReplication(); const collection = db.collection('test_data'); const result = await collection.insertOne({ description: 'test1' }); @@ -327,8 +314,7 @@ bucket_definitions: - SELECT _id as id, description FROM "test_data2" `; await context.updateSyncRules(syncRuleContent); - await context.replicateSnapshot(); - context.startStreaming(); + await context.initializeReplication(); const collection = db.collection('test_data1'); const result = await collection.insertOne({ description: 'test1' }); @@ -354,11 +340,10 @@ bucket_definitions: const result = await collection.insertOne({ description: 'test1' }); const test_id = result.insertedId.toHexString(); - await context.replicateSnapshot(); + await context.initializeReplication(); // Note: snapshot is only consistent some time into the streaming request. // At the point that we get the first acknowledged checkpoint, as is required // for getBucketData(), the data should be consistent. - context.startStreaming(); const data = await context.getBucketData('global[]'); expect(data).toMatchObject([test_utils.putOp('test_data', { id: test_id, description: 'test1' })]); @@ -380,7 +365,7 @@ bucket_definitions: await db.createCollection('test_data'); - await context.replicateSnapshot(); + await context.initializeReplication(); const collection = db.collection('test_data'); const result = await collection.insertOne({ name: 't1' }); @@ -395,7 +380,6 @@ bucket_definitions: const largeDescription = crypto.randomBytes(12000000 / 2).toString('hex'); await collection.updateOne({ _id: test_id }, { $set: { description: largeDescription } }); - context.startStreaming(); const data = await context.getBucketData('global[]'); expect(data.length).toEqual(2); @@ -424,9 +408,7 @@ bucket_definitions: const { db } = context; await context.updateSyncRules(BASIC_SYNC_RULES); - await context.replicateSnapshot(); - - context.startStreaming(); + await context.initializeReplication(); const collection = db.collection('test_donotsync'); const result = await collection.insertOne({ description: 'test' }); @@ -447,7 +429,7 @@ bucket_definitions: data: - SELECT _id as id, description FROM "test_%"`); - await context.replicateSnapshot(); + await context.initializeReplication(); await db.createCollection('test_data', { // enabled: true here - everything should work @@ -458,15 +440,21 @@ bucket_definitions: const test_id = result.insertedId; await collection.updateOne({ _id: test_id }, { $set: { description: 'test2' } }); - context.startStreaming(); - const data = await context.getBucketData('global[]'); - expect(data).toMatchObject([ - // An extra op here, since this triggers a snapshot in addition to getting the event. - test_utils.putOp('test_data', { id: test_id!.toHexString(), description: 'test2' }), - test_utils.putOp('test_data', { id: test_id!.toHexString(), description: 'test1' }), - test_utils.putOp('test_data', { id: test_id!.toHexString(), description: 'test2' }) - ]); + // Either case is valid here + if (data.length == 3) { + expect(data).toMatchObject([ + // An extra op here, since this triggers a snapshot in addition to getting the event. + test_utils.putOp('test_data', { id: test_id!.toHexString(), description: 'test2' }), + test_utils.putOp('test_data', { id: test_id!.toHexString(), description: 'test1' }), + test_utils.putOp('test_data', { id: test_id!.toHexString(), description: 'test2' }) + ]); + } else { + expect(data).toMatchObject([ + test_utils.putOp('test_data', { id: test_id!.toHexString(), description: 'test1' }), + test_utils.putOp('test_data', { id: test_id!.toHexString(), description: 'test2' }) + ]); + } }); test('postImages - new collection with postImages disabled', async () => { @@ -480,7 +468,7 @@ bucket_definitions: data: - SELECT _id as id, description FROM "test_data%"`); - await context.replicateSnapshot(); + await context.initializeReplication(); await db.createCollection('test_data', { // enabled: false here, but autoConfigure will enable it. @@ -492,8 +480,6 @@ bucket_definitions: const test_id = result.insertedId; await collection.updateOne({ _id: test_id }, { $set: { description: 'test2' } }); - context.startStreaming(); - await expect(() => context.getBucketData('global[]')).rejects.toMatchObject({ message: expect.stringContaining('stream was configured to require a post-image for all update events') }); @@ -515,8 +501,8 @@ bucket_definitions: const collection = db.collection('test_data'); await collection.insertOne({ description: 'test1', num: 1152921504606846976n }); - await context.replicateSnapshot(); - await context.markSnapshotConsistent(); + // Initialize + await context.initializeReplication(); // Simulate an error await context.storage!.reportError(new Error('simulated error')); @@ -524,10 +510,9 @@ bucket_definitions: expect(syncRules).toBeTruthy(); expect(syncRules?.last_fatal_error).toEqual('simulated error'); - // startStreaming() should automatically clear the error. - context.startStreaming(); + // The new checkpoint should clear the error + await context.getCheckpoint(); - // getBucketData() creates a checkpoint that clears the error, so we don't do that // Just wait, and check that the error is cleared automatically. await vi.waitUntil( async () => { diff --git a/modules/module-mongodb/test/src/change_stream_utils.ts b/modules/module-mongodb/test/src/change_stream_utils.ts index 2851c01ee..81a17cd18 100644 --- a/modules/module-mongodb/test/src/change_stream_utils.ts +++ b/modules/module-mongodb/test/src/change_stream_utils.ts @@ -63,12 +63,12 @@ export class ChangeStreamTestContext { /** * Abort snapshot and/or replication, without actively closing connections. */ - abort() { - this.abortController.abort(); + abort(cause?: Error) { + this.abortController.abort(cause); } async dispose() { - this.abort(); + this.abort(new Error('Disposing test context')); await this.settledReplicationPromise; await this.factory[Symbol.asyncDispose](); await this.connectionManager.end(); @@ -118,6 +118,7 @@ export class ChangeStreamTestContext { metrics: METRICS_HELPER.metricsEngine, connections: this.connectionManager, abort_signal: this.abortController.signal, + logger: this.streamOptions?.logger, // Specifically reduce this from the default for tests on MongoDB <= 6.0, otherwise it can take // a long time to abort the stream. maxAwaitTimeMS: this.streamOptions?.maxAwaitTimeMS ?? 200, @@ -127,6 +128,18 @@ export class ChangeStreamTestContext { return this._walStream!; } + /** + * Replicate a snapshot, start streaming, and wait for a consistent checkpoint. + */ + async initializeReplication() { + await this.replicateSnapshot(); + // Make sure we're up to date + await this.getCheckpoint(); + } + + /** + * Replicate the initial snapshot, and start streaming. + */ async replicateSnapshot() { // Use a settledPromise to avoid unhandled rejections this.settledReplicationPromise ??= settledPromise(this.streamer.replicate()); @@ -156,11 +169,6 @@ export class ChangeStreamTestContext { }); } - startStreaming() { - this.settledReplicationPromise ??= settledPromise(this.streamer.replicate()); - return this.settledReplicationPromise; - } - async getCheckpoint(options?: { timeout?: number }) { let checkpoint = await Promise.race([ getClientCheckpoint(this.client, this.db, this.factory, { timeout: options?.timeout ?? 15_000 }), diff --git a/modules/module-mongodb/test/src/chunked_snapshot.test.ts b/modules/module-mongodb/test/src/chunked_snapshot.test.ts index 930c82e9c..26508b8f7 100644 --- a/modules/module-mongodb/test/src/chunked_snapshot.test.ts +++ b/modules/module-mongodb/test/src/chunked_snapshot.test.ts @@ -1,5 +1,11 @@ import { mongo } from '@powersync/lib-service-mongodb'; -import { reduceBucket, TestStorageConfig, TestStorageFactory } from '@powersync/service-core'; +import { + reduceBucket, + settledPromise, + TestStorageConfig, + TestStorageFactory, + unsettledPromise +} from '@powersync/service-core'; import { METRICS_HELPER } from '@powersync/service-core-tests'; import { JSONBig } from '@powersync/service-jsonbig'; import { SqliteJsonValue } from '@powersync/service-sync-rules'; @@ -116,7 +122,7 @@ function defineBatchTests(config: TestStorageConfig) { // 2. Replicate one batch of rows // Our "stopping point" here is not quite deterministic. - const p = context.replicateSnapshot(); + const p = settledPromise(context.initializeReplication()); const stopAfter = 100; const startRowCount = (await METRICS_HELPER.getMetricValueForTests('powersync_rows_replicated_total')) ?? 0; @@ -146,9 +152,10 @@ function defineBatchTests(config: TestStorageConfig) { await db.collection('test_data').insertOne({ _id: idD, description: 'new' }); // 4. Replicate the rest of the table. - await p; + await unsettledPromise(p); - context.startStreaming(); + // FIXME: only start streaming at this point: + // context.startStreaming(); const data = await context.getBucketData('global[]'); const reduced = reduceBucket(data); diff --git a/modules/module-mongodb/test/src/resume.test.ts b/modules/module-mongodb/test/src/resume.test.ts index a58bd8f4b..5e5dba9a9 100644 --- a/modules/module-mongodb/test/src/resume.test.ts +++ b/modules/module-mongodb/test/src/resume.test.ts @@ -1,7 +1,7 @@ import { ChangeStreamInvalidatedError } from '@module/replication/ChangeStream.js'; import { MongoManager } from '@module/replication/MongoManager.js'; import { normalizeConnectionConfig } from '@module/types/types.js'; -import { TestStorageConfig } from '@powersync/service-core'; +import { settledPromise, TestStorageConfig } from '@powersync/service-core'; import { describe, expect, test } from 'vitest'; import { ChangeStreamTestContext } from './change_stream_utils.js'; import { env } from './env.js'; @@ -26,8 +26,6 @@ function defineResumeTest(config: TestStorageConfig) { await context.replicateSnapshot(); - context.startStreaming(); - const collection = db.collection('test_data'); await collection.insertOne({ description: 'test1', num: 1152921504606846976n }); @@ -60,7 +58,7 @@ function defineResumeTest(config: TestStorageConfig) { context2.storage = factory.getInstance(activeContent!); // If this test times out, it likely didn't throw the expected error here. - const result = await context2.startStreaming(); + const result = await settledPromise(context2.initializeReplication()); // The ChangeStreamReplicationJob will detect this and throw a ChangeStreamInvalidatedError expect(result.status).toEqual('rejected'); expect((result as PromiseRejectedResult).reason).toBeInstanceOf(ChangeStreamInvalidatedError); diff --git a/modules/module-mongodb/test/src/resuming_snapshots.test.ts b/modules/module-mongodb/test/src/resuming_snapshots.test.ts index 302f5cc7b..24bd2b3c9 100644 --- a/modules/module-mongodb/test/src/resuming_snapshots.test.ts +++ b/modules/module-mongodb/test/src/resuming_snapshots.test.ts @@ -6,6 +6,7 @@ import { describe, expect, test } from 'vitest'; import { ChangeStreamTestContext } from './change_stream_utils.js'; import { env } from './env.js'; import { describeWithStorage } from './util.js'; +import { logger } from '@powersync/lib-services-framework'; describe.skipIf(!(env.CI || env.SLOW_TESTS))('batch replication', function () { describeWithStorage({ timeout: 240_000 }, function (config) { @@ -35,7 +36,9 @@ async function testResumingReplication(factory: TestStorageFactory, stopAfter: n let startRowCount: number; { - await using context = await ChangeStreamTestContext.open(factory, { streamOptions: { snapshotChunkLength: 1000 } }); + await using context = await ChangeStreamTestContext.open(factory, { + streamOptions: { snapshotChunkLength: 1000, logger: logger.child({ prefix: '[context1] ' }) } + }); await context.updateSyncRules(`bucket_definitions: global: @@ -87,7 +90,7 @@ async function testResumingReplication(factory: TestStorageFactory, stopAfter: n // Bypass the usual "clear db on factory open" step. await using context2 = await ChangeStreamTestContext.open(factory, { doNotClear: true, - streamOptions: { snapshotChunkLength: 1000 } + streamOptions: { snapshotChunkLength: 1000, logger: logger.child({ prefix: '[context2] ' }) } }); const { db } = context2; @@ -98,9 +101,8 @@ async function testResumingReplication(factory: TestStorageFactory, stopAfter: n await db.collection('test_data2').insertOne({ _id: 10001 as any, description: 'insert1' }); await context2.loadNextSyncRules(); - await context2.replicateSnapshot(); + await context2.initializeReplication(); - context2.startStreaming(); const data = await context2.getBucketData('global[]', undefined, {}); const deletedRowOps = data.filter((row) => row.object_type == 'test_data2' && row.object_id === '1'); @@ -122,26 +124,30 @@ async function testResumingReplication(factory: TestStorageFactory, stopAfter: n // We only test the final version. expect(JSON.parse(updatedRowOps[1].data as string).description).toEqual('update1'); - expect(insertedRowOps.length).toEqual(2); expect(JSON.parse(insertedRowOps[0].data as string).description).toEqual('insert1'); - expect(JSON.parse(insertedRowOps[1].data as string).description).toEqual('insert1'); + if (insertedRowOps.length != 1) { + // Also valid + expect(insertedRowOps.length).toEqual(2); + expect(JSON.parse(insertedRowOps[1].data as string).description).toEqual('insert1'); + } // 1000 of test_data1 during first replication attempt. // N >= 1000 of test_data2 during first replication attempt. // 10000 - N - 1 + 1 of test_data2 during second replication attempt. // An additional update during streaming replication (2x total for this row). - // An additional insert during streaming replication (2x total for this row). + // An additional insert during streaming replication (1x or 2x total for this row). // If the deleted row was part of the first replication batch, it's removed by streaming replication. // This adds 2 ops. // We expect this to be 11002 for stopAfter: 2000, and 11004 for stopAfter: 8000. // However, this is not deterministic. - const expectedCount = 11002 + deletedRowOps.length; + const expectedCount = 11000 + deletedRowOps.length + insertedRowOps.length; expect(data.length).toEqual(expectedCount); const replicatedCount = ((await METRICS_HELPER.getMetricValueForTests(ReplicationMetric.ROWS_REPLICATED)) ?? 0) - startRowCount; - // With resumable replication, there should be no need to re-replicate anything. - expect(replicatedCount).toEqual(expectedCount); + // With resumable replication, there should be no need to re-replicate anything, apart from the newly-inserted row + expect(replicatedCount).toBeGreaterThanOrEqual(expectedCount); + expect(replicatedCount).toBeLessThanOrEqual(expectedCount + 1); } } diff --git a/modules/module-mongodb/test/src/slow_tests.test.ts b/modules/module-mongodb/test/src/slow_tests.test.ts index df575ef39..8acf8d88d 100644 --- a/modules/module-mongodb/test/src/slow_tests.test.ts +++ b/modules/module-mongodb/test/src/slow_tests.test.ts @@ -2,7 +2,7 @@ import { setTimeout } from 'node:timers/promises'; import { describe, expect, test } from 'vitest'; import { mongo } from '@powersync/lib-service-mongodb'; -import { storage } from '@powersync/service-core'; +import { settledPromise, storage, unsettledPromise } from '@powersync/service-core'; import { ChangeStreamTestContext, setSnapshotHistorySeconds } from './change_stream_utils.js'; import { env } from './env.js'; @@ -41,8 +41,7 @@ bucket_definitions: await collection1.bulkWrite(operations); await collection2.bulkWrite(operations); - await context.replicateSnapshot(); - context.startStreaming(); + await context.initializeReplication(); const checksum = await context.getChecksum('global[]'); expect(checksum).toMatchObject({ count: 20_000 @@ -71,7 +70,7 @@ bucket_definitions: } await collection.bulkWrite(operations); - const snapshotPromise = context.replicateSnapshot(); + const snapshotPromise = settledPromise(context.initializeReplication()); for (let i = 49; i >= 0; i--) { await collection.updateMany( @@ -81,8 +80,7 @@ bucket_definitions: await setTimeout(20); } - await snapshotPromise; - context.startStreaming(); + await unsettledPromise(snapshotPromise); const data = await context.getBucketData('global[]'); From 53efacbc183484c5ed24d3f915ed403aef892fc5 Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Wed, 7 Jan 2026 16:32:23 +0200 Subject: [PATCH 3/3] Fix race condition on completing snapshots. --- modules/module-mongodb/src/replication/MongoSnapshotter.ts | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/modules/module-mongodb/src/replication/MongoSnapshotter.ts b/modules/module-mongodb/src/replication/MongoSnapshotter.ts index 660307c10..eeac96a03 100644 --- a/modules/module-mongodb/src/replication/MongoSnapshotter.ts +++ b/modules/module-mongodb/src/replication/MongoSnapshotter.ts @@ -252,6 +252,10 @@ export class MongoSnapshotter { // point before the data can be considered consistent. const checkpoint = await createCheckpoint(this.client, this.defaultDb, STANDALONE_CHECKPOINT_ID); await batch.markAllSnapshotDone(checkpoint); + // KLUDGE: We need to create an extra checkpoint _after_ marking the snapshot done, to fix + // issues with order of processing commits(). This is picked up by tests on postgres storage, + // the issue may be specific to that storage engine. + await createCheckpoint(this.client, this.defaultDb, STANDALONE_CHECKPOINT_ID); } );