Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/real-houses-lie.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@powersync/service-sync-rules': patch
---

Mark sync plans as stable.
7 changes: 7 additions & 0 deletions .changeset/thin-paws-battle.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
'@powersync/service-module-postgres-storage': minor
'@powersync/service-module-mongodb-storage': minor
'@powersync/service-core': minor
---

Store compiled sync plans in bucket storage.
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@ export class MongoBucketStorage extends storage.BucketStorageFactory {
_id: id,
storage_version: storageVersion,
content: options.config.yaml,
serialized_plan: options.config.plan,
last_checkpoint: null,
last_checkpoint_lsn: null,
no_checkpoint_before: null,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ export class MongoPersistedSyncRulesContent extends storage.PersistedSyncRulesCo
super({
id: doc._id,
sync_rules_content: doc.content,
compiled_plan: doc.serialized_plan ?? null,
last_checkpoint_lsn: doc.last_checkpoint_lsn,
// Handle legacy values
slot_name: doc.slot_name ?? `powersync_${doc._id}`,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { InternalOpId, storage } from '@powersync/service-core';
import { InternalOpId, SerializedSyncPlan, storage } from '@powersync/service-core';
import { SqliteJsonValue } from '@powersync/service-sync-rules';
import * as bson from 'bson';
import { event_types } from '@powersync/service-types';
Expand Down Expand Up @@ -199,6 +199,7 @@ export interface SyncRuleDocument {
last_fatal_error_ts: Date | null;

content: string;
serialized_plan?: SerializedSyncPlan | null;

lock?: {
id: string;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import { migrations } from '@powersync/service-core';
import { openMigrationDB } from '../migration-utils.js';

export const up: migrations.PowerSyncMigrationFunction = async (context) => {
const {
service_context: { configuration }
} = context;
await using client = openMigrationDB(configuration.storage);
await client.transaction(async (db) => {
await db.sql`
ALTER TABLE sync_rules
ADD COLUMN sync_plan JSON;
`.execute();
});
};

export const down: migrations.PowerSyncMigrationFunction = async (context) => {
const {
service_context: { configuration }
} = context;
await using client = openMigrationDB(configuration.storage);
await client.sql`ALTER TABLE sync_rules DROP COLUMN sync_plan`.execute();
};
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import { GetIntanceOptions, storage, SyncRulesBucketStorage, UpdateSyncRulesOptions } from '@powersync/service-core';
import { GetIntanceOptions, storage, SyncRulesBucketStorage } from '@powersync/service-core';
import * as pg_wire from '@powersync/service-jpgwire';
import * as sync_rules from '@powersync/service-sync-rules';
import crypto from 'crypto';
import * as uuid from 'uuid';

Expand Down Expand Up @@ -159,7 +158,14 @@ export class PostgresBucketStorageFactory extends storage.BucketStorageFactory {
nextval('sync_rules_id_sequence') AS id
)
INSERT INTO
sync_rules (id, content, state, slot_name, storage_version)
sync_rules (
id,
content,
sync_plan,
state,
slot_name,
storage_version
)
VALUES
(
(
Expand All @@ -169,6 +175,7 @@ export class PostgresBucketStorageFactory extends storage.BucketStorageFactory {
next_id
),
${{ type: 'varchar', value: options.config.yaml }},
${{ type: 'json', value: options.config.plan }},
${{ type: 'varchar', value: storage.SyncRuleState.PROCESSING }},
CONCAT(
${{ type: 'varchar', value: this.slot_name_prefix }},
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,6 @@
import * as lib_postgres from '@powersync/lib-service-postgres';
import { ErrorCode, logger, ServiceError } from '@powersync/lib-services-framework';
import { storage } from '@powersync/service-core';
import {
CompatibilityOption,
DEFAULT_HYDRATION_STATE,
HydrationState,
SqlSyncRules,
versionedHydrationState
} from '@powersync/service-sync-rules';
import { models } from '../../types/types.js';

export class PostgresPersistedSyncRulesContent extends storage.PersistedSyncRulesContent {
Expand All @@ -20,6 +13,7 @@ export class PostgresPersistedSyncRulesContent extends storage.PersistedSyncRule
super({
id: Number(row.id),
sync_rules_content: row.content,
compiled_plan: row.sync_plan,
last_checkpoint_lsn: row.last_checkpoint_lsn,
slot_name: row.slot_name,
last_fatal_error: row.last_fatal_error,
Expand Down
16 changes: 15 additions & 1 deletion modules/module-postgres-storage/src/types/models/SyncRules.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { framework, storage } from '@powersync/service-core';
import * as t from 'ts-codec';
import { bigint, pgwire_number } from '../codecs.js';
import { jsonContainerObject } from './json.js';

export const SyncRules = t.object({
id: pgwire_number,
Expand Down Expand Up @@ -48,7 +49,20 @@ export const SyncRules = t.object({
last_fatal_error: t.Null.or(t.string),
keepalive_op: t.Null.or(bigint),
storage_version: t.Null.or(pgwire_number).optional(),
content: t.string
content: t.string,
sync_plan: t.Null.or(
jsonContainerObject(
t.object({
plan: t.any,
compatibility: t.object({
edition: t.number,
overrides: t.record(t.boolean),
maxTimeValuePrecision: t.number.optional()
}),
eventDescriptors: t.record(t.array(t.string))
})
)
)
});

export type SyncRules = t.Encoded<typeof SyncRules>;
Expand Down
26 changes: 26 additions & 0 deletions modules/module-postgres-storage/src/types/models/json.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import { JsonContainer } from '@powersync/service-jsonbig';
import { Codec, codec } from 'ts-codec';

/**
* Wraps a codec to support {@link JsonContainer} values.
*
* Because our postgres client implementation wraps JSON objects in a {@link JsonContainer}, this intermediate layer is
* required to use JSON columns from Postgres in `ts-codec` models.
*
* Note that this serializes and deserializes values using {@link JSON}, so bigints are not supported.
*/
export function jsonContainerObject<I, O>(inner: Codec<I, O>): Codec<I, JsonContainer> {
return codec(
inner._tag,
(input) => {
return new JsonContainer(JSON.stringify(inner.encode(input)));
},
(json) => {
if (!(json instanceof JsonContainer)) {
throw new Error('Expected JsonContainer');
}

return inner.decode(JSON.parse(json.data));
}
);
}
Original file line number Diff line number Diff line change
Expand Up @@ -608,4 +608,52 @@ bucket_definitions:
expect(parsedSchema3).not.equals(parsedSchema2);
expect(parsedSchema3.getSourceTables()[0].schema).equals('databasename');
});

test('sync streams smoke test', async () => {
await using factory = await generateStorageFactory();
const syncRules = await factory.updateSyncRules(
updateSyncRulesFromYaml(`
config:
edition: 2
sync_config_compiler: true

streams:
stream:
query: |
SELECT data.* FROM test AS data, test AS param
WHERE data.foo = param.bar AND param.baz = auth.user_id()
`)
);
const bucketStorage = factory.getInstance(syncRules);

await bucketStorage.startBatch(test_utils.BATCH_OPTIONS, async (batch) => {
await batch.save({
sourceTable: TEST_TABLE,
tag: storage.SaveOperationTag.INSERT,
after: {
baz: 'baz',
bar: 'bar'
},
afterReplicaId: test_utils.rid('t1')
});

await batch.commit('1/1');
});

const checkpoint = await bucketStorage.getCheckpoint();
const parameters = await checkpoint.getParameterSets([
ScopedParameterLookup.direct(
{
lookupName: 'lookup',
queryId: '0'
},
['baz']
)
]);
expect(parameters).toEqual([
{
'0': 'bar'
}
]);
});
}
3 changes: 2 additions & 1 deletion packages/service-core/src/routes/endpoints/admin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,8 @@ export const validate = routeDefinition({
active: false,
last_checkpoint_lsn: '',
storageVersion: storage.LEGACY_STORAGE_VERSION,
sync_rules_content: content
sync_rules_content: content,
compiled_plan: null
});

const connectionStatus = await apiHandler.getConnectionStatus();
Expand Down
51 changes: 47 additions & 4 deletions packages/service-core/src/storage/BucketStorageFactory.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,13 @@ import { ReplicationEventPayload } from './ReplicationEventPayload.js';
import { ReplicationLock } from './ReplicationLock.js';
import { SyncRulesBucketStorage } from './SyncRulesBucketStorage.js';
import { ReportStorage } from './ReportStorage.js';
import { SqlSyncRules, SyncConfig } from '@powersync/service-sync-rules';
import {
PrecompiledSyncConfig,
SerializedCompatibilityContext,
serializeSyncPlan,
SqlSyncRules,
SyncConfig
} from '@powersync/service-sync-rules';

/**
* Represents a configured storage provider.
Expand Down Expand Up @@ -148,12 +154,32 @@ export interface StorageMetrics {
export interface UpdateSyncRulesOptions {
config: {
yaml: string;
// TODO: Add serialized sync plan if available
/**
* The serialized sync plan for the sync configuration, or `null` for configurations not using the sync stream
* compiler.
*/
plan: SerializedSyncPlan | null;
};
lock?: boolean;
storageVersion?: number;
}

export interface SerializedSyncPlan {
/**
* The serialized plan, from {@link serializeSyncPlan}.
*/
plan: unknown;
compatibility: SerializedCompatibilityContext;
/**
* Event descriptors are not currently represented in the sync plan because they don't use the sync streams compiler
* yet.
*
* We might revisit that in the future, but for now we store SQL text of their definitions here to be able to restore
* them.
*/
eventDescriptors: Record<string, string[]>;
}

export function updateSyncRulesFromYaml(
content: string,
options?: Omit<UpdateSyncRulesOptions, 'config'> & { validate?: boolean }
Expand All @@ -168,8 +194,25 @@ export function updateSyncRulesFromYaml(
return updateSyncRulesFromConfig(config, options);
}

export function updateSyncRulesFromConfig(parsed: SyncConfig, options?: Omit<UpdateSyncRulesOptions, 'config'>) {
return { config: { yaml: parsed.content }, ...options };
export function updateSyncRulesFromConfig(
parsed: SyncConfig,
options?: Omit<UpdateSyncRulesOptions, 'config'>
): UpdateSyncRulesOptions {
let plan: SerializedSyncPlan | null = null;
if (parsed instanceof PrecompiledSyncConfig) {
const eventDescriptors: Record<string, string[]> = {};
for (const event of parsed.eventDescriptors) {
eventDescriptors[event.name] = event.sourceQueries.map((q) => q.sql);
}

plan = {
compatibility: parsed.compatibility.serialize(),
plan: serializeSyncPlan(parsed.plan),
eventDescriptors
};
}

return { config: { yaml: parsed.content, plan }, ...options };
}

export interface GetIntanceOptions {
Expand Down
Loading