Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
86b8c00
Add storage version.
rkistner Feb 4, 2026
7869fa5
Always use versioned bucket names with the new storage version.
rkistner Feb 4, 2026
8ba91e3
Fix some tests and imports.
rkistner Feb 4, 2026
9ee46f2
Fix sync rule tests.
rkistner Feb 4, 2026
f292250
Fix more tests.
rkistner Feb 4, 2026
df9e2aa
Merge remote-tracking branch 'origin/main' into storage-version
rkistner Feb 4, 2026
4cceebd
Rename STORAGE_VERSION_CONFIG.
rkistner Feb 5, 2026
9f22f56
Merge remote-tracking branch 'origin/main' into storage-version
rkistner Feb 10, 2026
d6f27ab
Update hydrate method post merge.
rkistner Feb 10, 2026
f771dcd
Merge remote-tracking branch 'origin/main' into storage-version
rkistner Feb 11, 2026
6999e38
Merge remote-tracking branch 'origin/main' into storage-version
rkistner Feb 16, 2026
a2402f8
Update tests.
rkistner Feb 16, 2026
088de1c
Add a check for down migrations.
rkistner Feb 16, 2026
3bc9fec
Add postgres migration.
rkistner Feb 16, 2026
db17781
Port storage version to Postgres storage.
rkistner Feb 16, 2026
6109f85
Expand comments.
rkistner Feb 16, 2026
4e8b465
Tweak Postgres migration script.
rkistner Feb 16, 2026
a7c299c
Enable versioned bucket ids on storage version 2; Test with version 1.
rkistner Feb 16, 2026
bc9bdef
Test with both storage versions.
rkistner Feb 16, 2026
26e5c2d
Further mongodb test refactoring.
rkistner Feb 16, 2026
a584973
Truncate tables instead of running down migrations for postgres storage
rkistner Feb 16, 2026
52564fa
Run postgres tests on multiple storage versions.
rkistner Feb 16, 2026
0046203
Fix schema test.
rkistner Feb 16, 2026
bb1fbc0
Update storage tests.
rkistner Feb 16, 2026
0414461
Add back timeout increases.
rkistner Feb 16, 2026
cfa3c5d
Update snapshots.
rkistner Feb 16, 2026
a353187
Fix migration tests.
rkistner Feb 16, 2026
56ff6c0
Merge remote-tracking branch 'origin/main' into storage-version
rkistner Feb 17, 2026
f2bfba7
Change error code for storage version issues.
rkistner Feb 17, 2026
255181b
Fix some tests.
rkistner Feb 17, 2026
22b84bc
Remove obsolete snapshot.
rkistner Feb 17, 2026
b609604
Remove more obsolete snapshots.
rkistner Feb 17, 2026
5042648
Changeset.
rkistner Feb 17, 2026
b69bf69
Fix imports.
rkistner Feb 17, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions .changeset/short-cheetahs-unite.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
---
'@powersync/service-module-postgres-storage': minor
'@powersync/service-module-mongodb-storage': minor
'@powersync/service-core-tests': minor
'@powersync/service-module-postgres': minor
'@powersync/service-module-mongodb': minor
'@powersync/service-core': minor
'@powersync/service-module-mssql': minor
'@powersync/service-module-mysql': minor
'@powersync/service-sync-rules': minor
'@powersync/service-errors': patch
---

Introduce storage versions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
import { migrations, storage as core_storage } from '@powersync/service-core';
import * as mongo_storage from '../../../storage/storage-index.js';
import { MongoStorageConfig } from '../../../types/types.js';

export const up: migrations.PowerSyncMigrationFunction = async (context) => {
const {
service_context: { configuration }
} = context;
const db = mongo_storage.createPowerSyncMongo(configuration.storage as MongoStorageConfig);

try {
await db.sync_rules.updateMany(
{ storage_version: { $exists: false } },
{ $set: { storage_version: core_storage.LEGACY_STORAGE_VERSION } }
);
} finally {
await db.client.close();
}
};

export const down: migrations.PowerSyncMigrationFunction = async (context) => {
const {
service_context: { configuration }
} = context;

const db = mongo_storage.createPowerSyncMongo(configuration.storage as MongoStorageConfig);

try {
const newRules = await db.sync_rules
.find({ storage_version: { $gt: core_storage.LEGACY_STORAGE_VERSION } })
.toArray();
if (newRules.length > 0) {
throw new Error(
`Cannot revert migration due to newer storage versions in use: ${newRules.map((r) => `${r._id}: v${r.storage_version}`).join(', ')}`
);
}
await db.sync_rules.updateMany(
{ storage_version: core_storage.LEGACY_STORAGE_VERSION },
{ $unset: { storage_version: 1 } }
);
} finally {
await db.client.close();
}
};
16 changes: 14 additions & 2 deletions modules/module-mongodb-storage/src/storage/MongoBucketStorage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,11 @@ import { SyncRuleDocument } from './implementation/models.js';
import { MongoPersistedSyncRulesContent } from './implementation/MongoPersistedSyncRulesContent.js';
import { MongoSyncBucketStorage, MongoSyncBucketStorageOptions } from './implementation/MongoSyncBucketStorage.js';
import { generateSlotName } from '../utils/util.js';
import { MongoChecksumOptions } from './implementation/MongoChecksums.js';

export interface MongoBucketStorageOptions {
checksumOptions?: Omit<MongoChecksumOptions, 'storageConfig'>;
}

export class MongoBucketStorage
extends BaseObserver<storage.BucketStorageFactoryListener>
Expand All @@ -32,7 +37,7 @@ export class MongoBucketStorage
options: {
slot_name_prefix: string;
},
private internalOptions?: MongoSyncBucketStorageOptions
private internalOptions?: MongoBucketStorageOptions
) {
super();
this.client = db.client;
Expand All @@ -50,10 +55,15 @@ export class MongoBucketStorage
if ((typeof id as any) == 'bigint') {
id = Number(id);
}
const storage = new MongoSyncBucketStorage(this, id, syncRules, slot_name, undefined, this.internalOptions);
const storageConfig = (syncRules as MongoPersistedSyncRulesContent).getStorageConfig();
const storage = new MongoSyncBucketStorage(this, id, syncRules, slot_name, undefined, {
...this.internalOptions,
storageConfig
});
if (!options?.skipLifecycleHooks) {
this.iterateListeners((cb) => cb.syncStorageCreated?.(storage));
}

storage.registerListener({
batchStarted: (batch) => {
batch.registerListener({
Expand Down Expand Up @@ -205,8 +215,10 @@ export class MongoBucketStorage
const id = Number(id_doc!.op_id);
const slot_name = generateSlotName(this.slot_name_prefix, id);

const storageVersion = options.storageVersion ?? storage.CURRENT_STORAGE_VERSION;
const doc: SyncRuleDocument = {
_id: id,
storage_version: storageVersion,
content: options.content,
last_checkpoint: null,
last_checkpoint_lsn: null,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import {
PartialOrFullChecksum
} from '@powersync/service-core';
import { PowerSyncMongo } from './db.js';
import { StorageConfig } from './models.js';

/**
* Checksum calculation options, primarily for tests.
Expand All @@ -27,6 +28,8 @@ export interface MongoChecksumOptions {
* Limit on the number of documents to calculate a checksum on at a time.
*/
operationBatchLimit?: number;

storageConfig: StorageConfig;
}

const DEFAULT_BUCKET_BATCH_LIMIT = 200;
Expand All @@ -43,12 +46,15 @@ const DEFAULT_OPERATION_BATCH_LIMIT = 50_000;
*/
export class MongoChecksums {
private _cache: ChecksumCache | undefined;
private readonly storageConfig: StorageConfig;

constructor(
private db: PowerSyncMongo,
private group_id: number,
private options?: MongoChecksumOptions
) {}
private options: MongoChecksumOptions
) {
this.storageConfig = options.storageConfig;
}

/**
* Lazy-instantiated cache.
Expand Down Expand Up @@ -222,6 +228,11 @@ export class MongoChecksums {
});
}

// Historically, checksum may be stored as 'int' or 'double'.
// More recently, this should be a 'long'.
// $toLong ensures that we always sum it as a long, avoiding inaccuracies in the calculations.
const checksumLong = this.storageConfig.longChecksums ? '$checksum' : { $toLong: '$checksum' };

// Aggregate over a max of `batchLimit` operations at a time.
// Let's say we have 3 buckets (A, B, C), each with 10 operations, and our batch limit is 12.
// Then we'll do three batches:
Expand All @@ -245,10 +256,7 @@ export class MongoChecksums {
{
$group: {
_id: '$_id.b',
// Historically, checksum may be stored as 'int' or 'double'.
// More recently, this should be a 'long'.
// $toLong ensures that we always sum it as a long, avoiding inaccuracies in the calculations.
checksum_total: { $sum: { $toLong: '$checksum' } },
checksum_total: { $sum: checksumLong },
count: { $sum: 1 },
has_clear_op: {
$max: {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,20 +1,42 @@
import { SyncConfigWithErrors, HydratedSyncRules, versionedHydrationState } from '@powersync/service-sync-rules';
import {
CompatibilityOption,
DEFAULT_HYDRATION_STATE,
HydratedSyncRules,
HydrationState,
SyncConfigWithErrors,
versionedHydrationState
} from '@powersync/service-sync-rules';

import { storage } from '@powersync/service-core';

import { StorageConfig } from './models.js';

export class MongoPersistedSyncRules implements storage.PersistedSyncRules {
public readonly slot_name: string;
public readonly hydrationState: HydrationState;

constructor(
public readonly id: number,
public readonly sync_rules: SyncConfigWithErrors,
public readonly checkpoint_lsn: string | null,
slot_name: string | null
slot_name: string | null,
public readonly storageConfig: StorageConfig
) {
this.slot_name = slot_name ?? `powersync_${id}`;

if (
storageConfig.versionedBuckets ||
this.sync_rules.config.compatibility.isEnabled(CompatibilityOption.versionedBucketIds)
) {
// For new sync config versions (using the new storage version), we always enable versioned bucket names.
// For older versions, this depends on the compatibility option.
this.hydrationState = versionedHydrationState(this.id);
} else {
this.hydrationState = DEFAULT_HYDRATION_STATE;
}
}

hydratedSyncRules(): HydratedSyncRules {
return this.sync_rules.config.hydrate({ hydrationState: versionedHydrationState(this.id) });
return this.sync_rules.config.hydrate({ hydrationState: this.hydrationState });
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@ import { SqlSyncRules } from '@powersync/service-sync-rules';
import { MongoPersistedSyncRules } from './MongoPersistedSyncRules.js';
import { MongoSyncRulesLock } from './MongoSyncRulesLock.js';
import { PowerSyncMongo } from './db.js';
import { SyncRuleDocument } from './models.js';
import { getMongoStorageConfig, SyncRuleDocument } from './models.js';
import { ErrorCode, ServiceError } from '@powersync/lib-services-framework';

export class MongoPersistedSyncRulesContent implements storage.PersistedSyncRulesContent {
public readonly slot_name: string;
Expand All @@ -17,6 +18,7 @@ export class MongoPersistedSyncRulesContent implements storage.PersistedSyncRule
public readonly last_keepalive_ts: Date | null;
public readonly last_checkpoint_ts: Date | null;
public readonly active: boolean;
public readonly storageVersion: number;

public current_lock: MongoSyncRulesLock | null = null;

Expand All @@ -34,14 +36,32 @@ export class MongoPersistedSyncRulesContent implements storage.PersistedSyncRule
this.last_checkpoint_ts = doc.last_checkpoint_ts;
this.last_keepalive_ts = doc.last_keepalive_ts;
this.active = doc.state == 'ACTIVE';
this.storageVersion = doc.storage_version ?? storage.LEGACY_STORAGE_VERSION;
}

/**
* Load the storage config.
*
* This may throw if the persisted storage version is not supported.
*/
getStorageConfig() {
const storageConfig = getMongoStorageConfig(this.storageVersion);
if (storageConfig == null) {
throw new ServiceError(
ErrorCode.PSYNC_S1005,
`Unsupported storage version ${this.storageVersion} for sync rules ${this.id}`
);
}
return storageConfig;
}

parsed(options: storage.ParseSyncRulesOptions) {
return new MongoPersistedSyncRules(
this.id,
SqlSyncRules.fromYaml(this.sync_rules_content, options),
this.last_checkpoint_lsn,
this.slot_name
this.slot_name,
this.getStorageConfig()
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,23 @@ import * as timers from 'timers/promises';
import { idPrefixFilter, mapOpEntry, readSingleBatch, setSessionSnapshotTime } from '../../utils/util.js';
import { MongoBucketStorage } from '../MongoBucketStorage.js';
import { PowerSyncMongo } from './db.js';
import { BucketDataDocument, BucketDataKey, BucketStateDocument, SourceKey, SourceTableDocument } from './models.js';
import {
BucketDataDocument,
BucketDataKey,
BucketStateDocument,
SourceKey,
SourceTableDocument,
StorageConfig
} from './models.js';
import { MongoBucketBatch } from './MongoBucketBatch.js';
import { MongoChecksumOptions, MongoChecksums } from './MongoChecksums.js';
import { MongoCompactor } from './MongoCompactor.js';
import { MongoParameterCompactor } from './MongoParameterCompactor.js';
import { MongoWriteCheckpointAPI } from './MongoWriteCheckpointAPI.js';

export interface MongoSyncBucketStorageOptions {
checksumOptions?: MongoChecksumOptions;
checksumOptions?: Omit<MongoChecksumOptions, 'storageConfig'>;
storageConfig: StorageConfig;
}

/**
Expand Down Expand Up @@ -69,12 +77,15 @@ export class MongoSyncBucketStorage
public readonly group_id: number,
private readonly sync_rules: storage.PersistedSyncRulesContent,
public readonly slot_name: string,
writeCheckpointMode?: storage.WriteCheckpointMode,
options?: MongoSyncBucketStorageOptions
writeCheckpointMode: storage.WriteCheckpointMode | undefined,
options: MongoSyncBucketStorageOptions
) {
super();
this.db = factory.db;
this.checksums = new MongoChecksums(this.db, this.group_id, options?.checksumOptions);
this.checksums = new MongoChecksums(this.db, this.group_id, {
...options.checksumOptions,
storageConfig: options?.storageConfig
});
this.writeCheckpointAPI = new MongoWriteCheckpointAPI({
db: this.db,
mode: writeCheckpointMode ?? storage.WriteCheckpointMode.MANAGED,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,29 @@ export interface SyncRuleDocument {
id: string;
expires_at: Date;
} | null;

storage_version?: number;
}

export interface StorageConfig extends storage.StorageVersionConfig {
/**
* When true, bucket_data.checksum is guaranteed to be persisted as a Long.
*
* When false, it could also have been persisted as an Int32 or Double, in which case it must be converted to
* a Long before summing.
*/
longChecksums: boolean;
}

const LONG_CHECKSUMS_STORAGE_VERSION = 2;

export function getMongoStorageConfig(storageVersion: number): StorageConfig | undefined {
const baseConfig = storage.STORAGE_VERSION_CONFIG[storageVersion];
if (baseConfig == null) {
return undefined;
}

return { ...baseConfig, longChecksums: storageVersion >= LONG_CHECKSUMS_STORAGE_VERSION };
}

export interface CheckpointEventDocument {
Expand Down
7 changes: 3 additions & 4 deletions modules/module-mongodb-storage/src/utils/test-utils.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
import { mongo } from '@powersync/lib-service-mongodb';
import { PowerSyncMongo } from '../storage/implementation/db.js';
import { TestStorageOptions } from '@powersync/service-core';
import { MongoBucketStorage, MongoBucketStorageOptions } from '../storage/MongoBucketStorage.js';
import { MongoReportStorage } from '../storage/MongoReportStorage.js';
import { MongoBucketStorage } from '../storage/MongoBucketStorage.js';
import { MongoSyncBucketStorageOptions } from '../storage/implementation/MongoSyncBucketStorage.js';
import { PowerSyncMongo } from '../storage/implementation/db.js';

export type MongoTestStorageOptions = {
url: string;
isCI: boolean;
internalOptions?: MongoSyncBucketStorageOptions;
internalOptions?: MongoBucketStorageOptions;
};

export function mongoTestStorageFactoryGenerator(factoryOptions: MongoTestStorageOptions) {
Expand Down

This file was deleted.

Loading