Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
112 commits
Select commit Hold shift + click to select a range
510173c
Postgres: stream while snapshotting.
rkistner Dec 4, 2025
9d50970
Merge remote-tracking branch 'origin/concurrent-storage-batches' into…
rkistner Jan 7, 2026
689dcee
Fix error code check and improve test stability.
rkistner Jan 7, 2026
5a1caed
Merge branch 'concurrent-storage-batches' into postgres-concurrent-st…
rkistner Jan 7, 2026
f08bb44
Initial implementation for MongoDB.
rkistner Jan 7, 2026
eae4aef
Fix tests to use the new structure.
rkistner Jan 7, 2026
96cb151
Fix race conditions with table snapshot state.
rkistner Jan 7, 2026
53efacb
Fix race condition on completing snapshots.
rkistner Jan 7, 2026
a845758
Refactor ChangeStream implementation.
rkistner Jan 8, 2026
8e28411
Fixes.
rkistner Jan 8, 2026
f2b0b6c
Fix potential race condition leading to locks held for longer than
rkistner Jan 8, 2026
ffe752f
Implement actual multiplexing for mongodb change streams.
rkistner Jan 9, 2026
60e5897
Fix tests.
rkistner Jan 9, 2026
22f5904
Consistent sort order for sync rules.
rkistner Jan 12, 2026
55adf3a
Persist mappings.
rkistner Jan 12, 2026
ed9e90c
Refactor deletes.
rkistner Jan 12, 2026
46ca53e
Scoped bucket_data.
rkistner Jan 12, 2026
24c5484
Fix sync rules tests.
rkistner Jan 12, 2026
3e45cc7
Tests build again.
rkistner Jan 12, 2026
9f50346
Minor restructure and test fixes.
rkistner Jan 12, 2026
25344b8
Fixes for checksums and some tests.
rkistner Jan 13, 2026
99bca49
Refactor current_data.
rkistner Jan 13, 2026
017e6dd
Fix more tests.
rkistner Jan 13, 2026
03b07dd
More fixes for clearing data.
rkistner Jan 13, 2026
df561c0
Fix test build issues.
rkistner Jan 13, 2026
5714d9f
Initial fixes for parameter lookups.
rkistner Jan 13, 2026
d26a01f
Cleanup.
rkistner Jan 13, 2026
306beb6
Hack: re-use existing mappings.
rkistner Jan 13, 2026
2d14290
Initial writer restructuring.
rkistner Jan 13, 2026
d73fad6
WIP: merged processing.
rkistner Jan 14, 2026
872d3a9
Initial working through errors.
rkistner Jan 14, 2026
d816b6b
Restructure snapshotter.
rkistner Jan 14, 2026
f9dfbfc
Fix job wiring up.
rkistner Jan 14, 2026
44ac2b7
Fixes.
rkistner Jan 14, 2026
8b923a5
Fixes to source table filtering.
rkistner Jan 14, 2026
436e6f4
Fix source table filtering.
rkistner Jan 14, 2026
bb63762
Fix initialization of sync rule rules where no new snapshots are
rkistner Jan 14, 2026
596343d
Work around build issues.
rkistner Jan 15, 2026
40baa93
Merge branch 'concurrent-storage-batches' into postgres-concurrent-st…
rkistner Jan 15, 2026
8075d95
Merge branch 'postgres-concurrent-streaming' into mongo-concurrent-st…
rkistner Jan 15, 2026
cbd97b0
Merge branch 'mongo-concurrent-streaming' into incremental-reprocessing
rkistner Jan 15, 2026
e71bc49
Fix test build errors.
rkistner Jan 15, 2026
9cffa33
Support parameter lookups again; simplify a bit.
rkistner Jan 15, 2026
7acc0f7
Fix parameter row filtering.
rkistner Jan 15, 2026
24ba4d0
Minor code simplification.
rkistner Jan 15, 2026
54e5f59
Use TablePattern[] instead of Set<TablePattern>.
rkistner Jan 15, 2026
bb9fb76
Refactor to pull through TablePattern.
rkistner Jan 19, 2026
13d2a31
Improve lookup performance in some cases.
rkistner Jan 19, 2026
b9d517d
Re-add check for resumeToken order.
rkistner Jan 20, 2026
048675f
Fix batch logic in tests.
rkistner Jan 20, 2026
08089fd
Fix metadata going missing.
rkistner Jan 20, 2026
370bbdd
Fix detecting new changes.
rkistner Jan 20, 2026
3d9bbe8
Initial round of storage test fixes.
rkistner Jan 20, 2026
604461e
Round 2 of storage test fixes.
rkistner Jan 20, 2026
4f802cc
Test fix round 3.
rkistner Jan 20, 2026
c58bdd6
Initial compacting fixes.
rkistner Jan 20, 2026
b5baf5c
Fix parameter compacting.
rkistner Jan 20, 2026
e81181a
Add missing test file.
rkistner Jan 20, 2026
c46ce88
Test fix round 4.
rkistner Jan 21, 2026
ebdae0e
Another test fix.
rkistner Jan 21, 2026
bafd322
Improve queue responsiveness.
rkistner Jan 21, 2026
ab4a867
Postgres storage fixes part 1.
rkistner Jan 21, 2026
a85771e
Some postgres storage test fixes.
rkistner Jan 21, 2026
18f2d99
Implement getTable.
rkistner Jan 21, 2026
130041f
Fix updating table progress.
rkistner Jan 21, 2026
a93c031
Remove some startBatch() usage from tests.
rkistner Jan 21, 2026
b70f3be
Move WalStream to new BucketDataWriter.
rkistner Jan 21, 2026
8d9ca0d
Further compatibility fixes for postgres snapshotter.
rkistner Jan 21, 2026
7aa1754
Partial SQL Server refactor to new APIs.
rkistner Jan 21, 2026
7d3a091
Refactor BinLogStreams to use the new APIs (untested).
rkistner Jan 21, 2026
a209643
Remove startBatch.
rkistner Jan 21, 2026
a30c825
Remove resolveTable from the public API.
rkistner Jan 21, 2026
93f6103
Remove some BucketStorageBatch usages.
rkistner Jan 22, 2026
50b4b5b
Simplify createWriter usage.
rkistner Jan 22, 2026
fdd7668
Rename methods.
rkistner Jan 22, 2026
99a7590
Rename BucketDataWriters.
rkistner Jan 22, 2026
422d90f
Map by table id instead of reference.
rkistner Jan 22, 2026
fb1bb7f
Remove mapping for source tables - it is not reliable enough.
rkistner Jan 22, 2026
21517d7
Fix dropping tables.
rkistner Jan 22, 2026
5b30f54
Fix skipIf.
rkistner Jan 22, 2026
2489278
Fix sorting in test.
rkistner Jan 22, 2026
b4e03b7
Restructure logic to drop tables.
rkistner Jan 22, 2026
e32357d
Change drop/snapshot order for test compatibility.
rkistner Jan 22, 2026
c2cfe8d
Fix more tests.
rkistner Feb 2, 2026
a1aebc2
Remove duplicate processing issue.
rkistner Feb 2, 2026
a759c74
Another test fix.
rkistner Feb 2, 2026
52ae861
Rewrite parameter tests to be independent of storage format.
rkistner Feb 2, 2026
8681011
Merge branch 'concurrent-storage-batches' into postgres-concurrent-st…
rkistner Feb 2, 2026
1ee9e94
Merge branch 'postgres-concurrent-streaming' into mongo-concurrent-st…
rkistner Feb 2, 2026
e10bc50
Merge branch 'mongo-concurrent-streaming' into incremental-reprocessing
rkistner Feb 2, 2026
d169562
Fix initial compile issues post merge.
rkistner Feb 2, 2026
ec975c8
Fix circular import.
rkistner Feb 2, 2026
b236426
Fix some tests.
rkistner Feb 2, 2026
127d416
Fix test build issue.
rkistner Feb 3, 2026
d885186
Add missing file.
rkistner Feb 3, 2026
d45305f
Restructure cast functions.
rkistner Feb 3, 2026
997145b
Fix storage parameter tests.
rkistner Feb 3, 2026
cdbece1
Fix type issues.
rkistner Feb 3, 2026
745d85f
Disable more unstable tests.
rkistner Feb 3, 2026
dfa7e32
Fix MS SQL replication.
rkistner Feb 3, 2026
44edd30
Restructure parameter lookups.
rkistner Feb 3, 2026
6285c9f
Fix test.
rkistner Feb 3, 2026
6c06fd0
Fix sync-rule tests to use the correct source.
rkistner Feb 3, 2026
2c31123
Fix truncating tables.
rkistner Feb 3, 2026
674f05a
Merge branch 'concurrent-storage-batches' into postgres-concurrent-st…
rkistner Feb 17, 2026
3b24010
Merge branch 'postgres-concurrent-streaming' into mongo-concurrent-st…
rkistner Feb 17, 2026
11de7bb
Merge branch 'mongo-concurrent-streaming' into incremental-reprocessing
rkistner Feb 17, 2026
8bc2aec
Post-merge type fixes.
rkistner Feb 17, 2026
dfe726a
Fix sync-rule tests.
rkistner Feb 17, 2026
6184ebf
Fix more type issues.
rkistner Feb 17, 2026
ada05fd
Fix tsconfig.
rkistner Feb 18, 2026
d6807a2
Fix hydration issue.
rkistner Feb 18, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
141 changes: 131 additions & 10 deletions modules/module-mongodb-storage/src/storage/MongoBucketStorage.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { SqlSyncRules } from '@powersync/service-sync-rules';
import { BucketDataSource, ParameterIndexLookupCreator, SqlSyncRules } from '@powersync/service-sync-rules';

import { GetIntanceOptions, storage } from '@powersync/service-core';
import { GetIntanceOptions, maxLsn, CreateWriterOptions, storage } from '@powersync/service-core';

import { BaseObserver, ErrorCode, logger, ServiceError } from '@powersync/lib-services-framework';
import { v4 as uuid } from 'uuid';
Expand All @@ -13,6 +13,9 @@ import { SyncRuleDocument } from './implementation/models.js';
import { MongoPersistedSyncRulesContent } from './implementation/MongoPersistedSyncRulesContent.js';
import { MongoSyncBucketStorage, MongoSyncBucketStorageOptions } from './implementation/MongoSyncBucketStorage.js';
import { generateSlotName } from '../utils/util.js';
import { BucketDefinitionMapping } from './implementation/BucketDefinitionMapping.js';
import { MongoBucketDataWriter } from './storage-index.js';
import { MergedSyncRules } from './implementation/MergedSyncRules.js';

export class MongoBucketStorage
extends BaseObserver<storage.BucketStorageFactoryListener>
Expand Down Expand Up @@ -50,7 +53,14 @@ export class MongoBucketStorage
if ((typeof id as any) == 'bigint') {
id = Number(id);
}
const storage = new MongoSyncBucketStorage(this, id, syncRules, slot_name, undefined, this.internalOptions);
const storage = new MongoSyncBucketStorage(
this,
id,
syncRules as MongoPersistedSyncRulesContent,
slot_name,
undefined,
this.internalOptions
);
if (!options?.skipLifecycleHooks) {
this.iterateListeners((cb) => cb.syncStorageCreated?.(storage));
}
Expand All @@ -64,6 +74,48 @@ export class MongoBucketStorage
return storage;
}

async createCombinedWriter(
storages: storage.SyncRulesBucketStorage[],
options: CreateWriterOptions
): Promise<MongoBucketDataWriter> {
const mongoStorages = storages as MongoSyncBucketStorage[];
const mappings = mongoStorages.map((s) => s.sync_rules.mapping);
const mergedMappings = BucketDefinitionMapping.merged(mappings);
const mergedProcessor = MergedSyncRules.merge(mongoStorages.map((s) => s.getParsedSyncRules(options)));

const writer = new MongoBucketDataWriter({
db: this.db,
mapping: mergedMappings,
markRecordUnavailable: options.markRecordUnavailable,
rowProcessor: mergedProcessor,
skipExistingRows: options.skipExistingRows ?? false,
slotName: '',
storeCurrentData: options.storeCurrentData,
logger: options.logger
});

for (let storage of mongoStorages) {
const doc = await this.db.sync_rules.findOne(
{
_id: storage.group_id
},
{ projection: { last_checkpoint_lsn: 1, no_checkpoint_before: 1, keepalive_op: 1, snapshot_lsn: 1 } }
);
const checkpoint_lsn = doc?.last_checkpoint_lsn ?? null;
const parsedSyncRules = storage.getParsedSyncRules(options);
const batch = writer.forSyncRules({
syncRules: parsedSyncRules,

lastCheckpointLsn: checkpoint_lsn,
resumeFromLsn: maxLsn(checkpoint_lsn, doc?.snapshot_lsn),
keepaliveOp: doc?.keepalive_op ? BigInt(doc.keepalive_op) : null
});
storage.iterateListeners((cb) => cb.batchStarted?.(batch));
}

return writer;
}

async getSystemIdentifier(): Promise<storage.BucketStorageSystemIdentifier> {
const { setName: id } = await this.db.db.command({
hello: 1
Expand Down Expand Up @@ -184,7 +236,17 @@ export class MongoBucketStorage
{
state: storage.SyncRuleState.PROCESSING
},
{ $set: { state: storage.SyncRuleState.STOP } }
{ $set: { state: storage.SyncRuleState.STOP } },
{
session: this.session
}
);

const activeSyncRules = await this.db.sync_rules.findOne(
{
state: storage.SyncRuleState.ACTIVE
},
{ session: this.session }
);

const id_doc = await this.db.op_id_sequence.findOneAndUpdate(
Expand All @@ -198,34 +260,91 @@ export class MongoBucketStorage
},
{
upsert: true,
returnDocument: 'after'
returnDocument: 'after',
session: this.session
}
);

const id = Number(id_doc!.op_id);
const slot_name = generateSlotName(this.slot_name_prefix, id);

const syncRules = SqlSyncRules.fromYaml(options.content, {
// No schema-based validation at this point
schema: undefined,
defaultSchema: 'not_applicable', // Not needed for validation
throwOnError: false
});
let bucketDefinitionMapping: Record<string, number> = {};
let parameterDefinitionMapping: Record<string, number> = {};
let bucketDefinitionId = (id << 16) + 1;
let parameterDefinitionId = (id << 17) + 1;

let existingMapping: BucketDefinitionMapping;
if (activeSyncRules != null) {
existingMapping = BucketDefinitionMapping.fromSyncRules(activeSyncRules);
} else {
existingMapping = new BucketDefinitionMapping({}, {});
}

syncRules.config.hydrate({
hydrationState: {
getBucketSourceScope(source: BucketDataSource) {
const existingId = existingMapping.equivalentBucketSourceId(source);
if (existingId != null) {
bucketDefinitionMapping[source.uniqueName] = existingId;
} else {
bucketDefinitionMapping[source.uniqueName] = bucketDefinitionId;
bucketDefinitionId += 1;
}
return {
// N/A
bucketPrefix: '',
source
};
},
getParameterIndexLookupScope(source: ParameterIndexLookupCreator) {
const key = `${source.defaultLookupScope.lookupName}#${source.defaultLookupScope.queryId}`;
const existingId = existingMapping.equivalentParameterLookupId(source);
if (existingId != null) {
parameterDefinitionMapping[key] = existingId;
} else {
parameterDefinitionMapping[key] = parameterDefinitionId;
parameterDefinitionId += 1;
}
// N/A
return source.defaultLookupScope;
}
}
});

const doc: SyncRuleDocument = {
_id: id,
content: options.content,
last_checkpoint: null,
last_checkpoint: activeSyncRules?.last_checkpoint ?? null,
last_checkpoint_lsn: null,
no_checkpoint_before: null,
keepalive_op: null,
// HACK: copy the op from the active sync rules, if any.
// This specifically helps for the case of the new sync rules not replicating anything new.
// FIXME: Make sure this is properly sound and tested.
keepalive_op: activeSyncRules?.last_checkpoint ? String(activeSyncRules.last_checkpoint) : null,
snapshot_done: false,
snapshot_lsn: undefined,
state: storage.SyncRuleState.PROCESSING,
slot_name: slot_name,
last_checkpoint_ts: null,
last_fatal_error: null,
last_fatal_error_ts: null,
last_keepalive_ts: null
last_keepalive_ts: null,
rule_mapping: {
definitions: bucketDefinitionMapping,
parameter_lookups: parameterDefinitionMapping
}
};
await this.db.sync_rules.insertOne(doc);
await this.db.sync_rules.insertOne(doc, { session: this.session });
await this.db.notifyCheckpoint();
rules = new MongoPersistedSyncRulesContent(this.db, doc);
if (options.lock) {
const lock = await rules.lock();
await rules.lock(this.session);
}
});

Expand Down Expand Up @@ -275,6 +394,8 @@ export class MongoBucketStorage
.find({
state: { $in: [storage.SyncRuleState.PROCESSING, storage.SyncRuleState.ACTIVE] }
})
// Prioritize "ACTIVE" first
.sort({ state: 1, _id: 1 })
.toArray();

return docs.map((doc) => {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
import { ServiceAssertionError } from '@powersync/lib-services-framework';
import { BucketDataSource, ParameterIndexLookupCreator, SqlSyncRules } from '@powersync/service-sync-rules';
import { SyncRuleDocument } from './models.js';

export class BucketDefinitionMapping {
static fromSyncRules(doc: Pick<SyncRuleDocument, 'rule_mapping'>): BucketDefinitionMapping {
return new BucketDefinitionMapping(doc.rule_mapping.definitions, doc.rule_mapping.parameter_lookups);
}

static merged(mappings: BucketDefinitionMapping[]): BucketDefinitionMapping {
return mappings.reduce((acc, curr) => acc.mergeWith(curr), new BucketDefinitionMapping());
}

constructor(
private definitions: Record<string, number> = {},
private parameterLookupMapping: Record<string, number> = {}
) {}

hasBucketSourceId(id: number) {
return Object.values(this.definitions).includes(id);
}

hasParameterLookupId(id: number) {
return Object.values(this.parameterLookupMapping).includes(id);
}

bucketSourceId(source: BucketDataSource): number {
const defId = this.definitions[source.uniqueName];
if (defId == null) {
throw new ServiceAssertionError(`No mapping found for bucket source ${source.uniqueName}`);
}
return defId;
}

parameterLookupId(source: ParameterIndexLookupCreator): number {
const key = `${source.defaultLookupScope.lookupName}#${source.defaultLookupScope.queryId}`;
const defId = this.parameterLookupMapping[key];
if (defId == null) {
throw new ServiceAssertionError(`No mapping found for parameter lookup source ${key}`);
}
return defId;
}

equivalentBucketSourceId(source: BucketDataSource): number | null {
// FIXME: Do an actual comparison, instead of just using the unique name
return this.definitions[source.uniqueName] ?? null;
}

equivalentParameterLookupId(source: ParameterIndexLookupCreator): number | null {
// FIXME: Do an actual comparison, instead of just using the scope
const key = `${source.defaultLookupScope.lookupName}#${source.defaultLookupScope.queryId}`;
return this.parameterLookupMapping[key] ?? null;
}

mergeWith(other: BucketDefinitionMapping): BucketDefinitionMapping {
return new BucketDefinitionMapping(
{ ...this.definitions, ...other.definitions },
{ ...this.parameterLookupMapping, ...other.parameterLookupMapping }
);
}
}
Loading
Loading