From bc04bf30eca27a14c31d343241a943d5639b5df1 Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Tue, 27 Jan 2026 12:55:47 +0100 Subject: [PATCH 01/14] Add evaluators --- packages/sync-rules/src/BaseSqlDataQuery.ts | 13 +- .../sync_plan/evaluator/bucket_data_source.ts | 155 +++++++ .../src/sync_plan/evaluator/bucket_source.ts | 193 +++++++++ .../src/sync_plan/evaluator/index.ts | 40 ++ .../evaluator/lookup_stage_evaluator.ts | 381 ++++++++++++++++++ .../parameter_index_lookup_creator.ts | 78 ++++ .../evaluator/scalar_expression_evaluator.ts | 166 ++++++++ .../src/sync_plan/expression_to_sql.ts | 7 +- packages/sync-rules/src/utils.ts | 18 + tsconfig.base.json | 4 +- 10 files changed, 1039 insertions(+), 16 deletions(-) create mode 100644 packages/sync-rules/src/sync_plan/evaluator/bucket_data_source.ts create mode 100644 packages/sync-rules/src/sync_plan/evaluator/bucket_source.ts create mode 100644 packages/sync-rules/src/sync_plan/evaluator/index.ts create mode 100644 packages/sync-rules/src/sync_plan/evaluator/lookup_stage_evaluator.ts create mode 100644 packages/sync-rules/src/sync_plan/evaluator/parameter_index_lookup_creator.ts create mode 100644 packages/sync-rules/src/sync_plan/evaluator/scalar_expression_evaluator.ts diff --git a/packages/sync-rules/src/BaseSqlDataQuery.ts b/packages/sync-rules/src/BaseSqlDataQuery.ts index 24ab1138b..9d86a157d 100644 --- a/packages/sync-rules/src/BaseSqlDataQuery.ts +++ b/packages/sync-rules/src/BaseSqlDataQuery.ts @@ -15,7 +15,7 @@ import { SqliteJsonRow, SqliteRow } from './types.js'; -import { filterJsonRow } from './utils.js'; +import { filterJsonRow, idFromData } from './utils.js'; export interface RowValueExtractor { extract(tables: QueryParameters, into: SqliteRow): void; @@ -183,16 +183,7 @@ export class BaseSqlDataQuery { } const data = this.transformRow(tables); - let id = data.id; - if (typeof id != 'string') { - // While an explicit cast would be better, this covers against very common - // issues when initially testing out sync, for example when the id column is an - // auto-incrementing integer. - // If there is no id column, we use a blank id. This will result in the user syncing - // a single arbitrary row for this table - better than just not being able to sync - // anything. - id = castAsText(id) ?? ''; - } + const id = idFromData(data); const outputTable = this.getOutputName(table.name); return resolvedBucketParameters.map((serializedBucketParameters) => { diff --git a/packages/sync-rules/src/sync_plan/evaluator/bucket_data_source.ts b/packages/sync-rules/src/sync_plan/evaluator/bucket_data_source.ts new file mode 100644 index 000000000..0e2e67962 --- /dev/null +++ b/packages/sync-rules/src/sync_plan/evaluator/bucket_data_source.ts @@ -0,0 +1,155 @@ +import { BucketDataSource } from '../../BucketSource.js'; +import { ColumnDefinition } from '../../ExpressionType.js'; +import { SourceTableInterface } from '../../SourceTableInterface.js'; +import { TablePattern } from '../../TablePattern.js'; +import { + EvaluateRowOptions, + SourceSchema, + SqliteJsonRow, + UnscopedEvaluatedRow, + UnscopedEvaluationResult +} from '../../types.js'; +import { filterJsonRow, idFromData, isJsonValue, JSONBucketNameSerialize } from '../../utils.js'; +import { SqlExpression } from '../expression.js'; +import { ExpressionToSqlite } from '../expression_to_sql.js'; +import * as plan from '../plan.js'; +import { StreamEvaluationContext } from './index.js'; +import { isValidParameterValue } from './lookup_stage_evaluator.js'; +import { mapExternalDataToInstantiation, ScalarExpressionEvaluator } from './scalar_expression_evaluator.js'; + +export class PreparedStreamBucketDataSource implements BucketDataSource { + private readonly sourceTables = new Set(); + private readonly sources: PreparedStreamDataSource[] = []; + + constructor( + readonly source: plan.StreamBucketDataSource, + context: StreamEvaluationContext + ) { + for (const data of source.sources) { + const prepared = new PreparedStreamDataSource(data, context); + + this.sources.push(prepared); + this.sourceTables.add(prepared.tablePattern); + } + } + + get uniqueName(): string { + return this.source.uniqueName; + } + + get bucketParameters(): string[] { + // We can pick an arbitrary evaluator within the source, since they're all guaranteed to have the same parameters. + const evaluator = this.source.sources[0]; + + // It doesn't matter what we return here because it's for debugging purposes only. + return evaluator.parameters.map((p) => ExpressionToSqlite.toSqlite(p.expr)); + } + + getSourceTables(): Set { + return this.sourceTables; + } + + private *sourcesForTable(table: SourceTableInterface) { + for (const source of this.sources) { + if (source.tablePattern.matches(table)) { + yield source; + } + } + } + + tableSyncsData(table: SourceTableInterface): boolean { + return !this.sourcesForTable(table).next().done; + } + + evaluateRow(options: EvaluateRowOptions): UnscopedEvaluationResult[] { + const results: UnscopedEvaluationResult[] = []; + for (const source of this.sourcesForTable(options.sourceTable)) { + source.evaluateRow(options, results); + } + + return results; + } + + resolveResultSets(schema: SourceSchema, tables: Record>): void { + throw new Error('resolveResultSets not implemented.'); + } + + debugWriteOutputTables(result: Record): void { + throw new Error('debugWriteOutputTables not implemented.'); + } +} + +class PreparedStreamDataSource { + readonly tablePattern: TablePattern; + private readonly outputs: ('star' | { index: number; alias: string })[] = []; + private readonly numberOfOutputExpressions: number; + private readonly evaluator: ScalarExpressionEvaluator; + private readonly evaluatorInputs: plan.ColumnSqlParameterValue[]; + private readonly fixedOutputTableName?: string; + + constructor(evaluator: plan.StreamDataSource, { engine }: StreamEvaluationContext) { + const mapExpressions = mapExternalDataToInstantiation(); + const outputExpressions: SqlExpression[] = []; + for (const column of evaluator.columns) { + if (column === 'star') { + this.outputs.push('star'); + } else { + const expressionIndex = outputExpressions.length; + outputExpressions.push(mapExpressions.transform(column.expr)); + this.outputs.push({ index: expressionIndex, alias: column.alias }); + } + } + + this.numberOfOutputExpressions = outputExpressions.length; + for (const parameter of evaluator.parameters) { + outputExpressions.push(mapExpressions.transform(parameter.expr)); + } + + this.evaluator = engine.prepareEvaluator({ + outputs: outputExpressions, + filters: evaluator.filters.map((f) => mapExpressions.transform(f)) + }); + this.fixedOutputTableName = evaluator.outputTableName; + this.tablePattern = evaluator.sourceTable; + this.evaluatorInputs = mapExpressions.instantiation; + } + + evaluateRow(options: EvaluateRowOptions, results: UnscopedEvaluationResult[]) { + try { + const inputInstantiation = this.evaluatorInputs.map((input) => options.record[input.column]); + row: for (const source of this.evaluator.evaluate(inputInstantiation)) { + const record: SqliteJsonRow = {}; + for (const output of this.outputs) { + if (output === 'star') { + Object.assign(record, filterJsonRow(options.record)); + } else { + const value = source[output.index]; + if (isJsonValue(value)) { + record[output.alias] = value; + } + } + } + const id = idFromData(record); + // source is [...outputs, ...partitionValues] + const partitionValues = source.splice(0, this.numberOfOutputExpressions); + + for (const bucketParameter of partitionValues) { + if (!isValidParameterValue(bucketParameter)) { + continue row; + } + } + + results.push({ + id, + data: record, + table: this.fixedOutputTableName ?? options.sourceTable.name, + serializedBucketParameters: JSONBucketNameSerialize.stringify(partitionValues) + } satisfies UnscopedEvaluatedRow); + } + + return results; + } catch (e) { + return results.push({ error: e.message }); + } + } +} diff --git a/packages/sync-rules/src/sync_plan/evaluator/bucket_source.ts b/packages/sync-rules/src/sync_plan/evaluator/bucket_source.ts new file mode 100644 index 000000000..bf73d028c --- /dev/null +++ b/packages/sync-rules/src/sync_plan/evaluator/bucket_source.ts @@ -0,0 +1,193 @@ +import { + BucketDataSource, + BucketSource, + BucketSourceType, + CreateSourceParams, + HydratedBucketSource, + ParameterIndexLookupCreator +} from '../../BucketSource.js'; +import { StreamEvaluationContext } from './index.js'; +import * as plan from '../plan.js'; +import { mapExternalDataToInstantiation, ScalarExpressionEngine } from './scalar_expression_evaluator.js'; +import { SqlExpression } from '../expression.js'; +import { RequestParameters } from '../../types.js'; +import { parametersForRequest, RequestParameterEvaluators, SqliteParameterValue } from './lookup_stage_evaluator.js'; +import { PendingQueriers } from '../../BucketParameterQuerier.js'; +import { RequestedStream } from '../../SqlSyncRules.js'; +import { BucketInclusionReason, ResolvedBucket } from '../../BucketDescription.js'; +import { buildBucketName, JSONBucketNameSerialize } from '../../utils.js'; + +export interface StreamInput extends StreamEvaluationContext { + preparedBuckets: Map; + preparedLookups: Map; +} + +export class StreamBucketSource implements BucketSource { + readonly dataSources: BucketDataSource[] = []; + readonly parameterIndexLookupCreators: ParameterIndexLookupCreator[] = []; + + constructor( + readonly stream: plan.CompiledSyncStream, + private readonly input: StreamInput + ) { + for (const querier of stream.queriers) { + const mappedSource = input.preparedBuckets.get(querier.bucket)!; + this.dataSources.push(mappedSource); + } + } + + get name(): string { + return this.stream.stream.name; + } + + get subscribedToByDefault(): boolean { + return this.stream.stream.isSubscribedByDefault; + } + + get type(): BucketSourceType { + return BucketSourceType.SYNC_STREAM; + } + + debugRepresentation() { + // TODO: Implement debugRepresentation for compiled sync streams + return `stream ${this.stream.stream.name}`; + } + + hydrate(params: CreateSourceParams): HydratedBucketSource { + const queriers = this.stream.queriers.map((q) => new PreparedQuerier(this.stream.stream, q, this.input)); + + return { + definition: this, + pushBucketParameterQueriers: (result, options) => { + const subscriptions = options.streams[this.name] ?? []; + if (!this.subscribedToByDefault && !subscriptions.length) { + // The client is not subscribing to this stream, so don't query buckets related to it. + return; + } + + let hasExplicitDefaultSubscription = false; + for (const subscription of subscriptions) { + let subscriptionParams = options.globalParameters; + if (subscription.parameters != null) { + subscriptionParams = subscriptionParams.withAddedStreamParameters(subscription.parameters); + } else { + hasExplicitDefaultSubscription = true; + } + + for (const querier of queriers) { + querier.querierForSubscription(params, result, subscriptionParams, subscription); + } + } + + // If the stream is subscribed to by default and there is no explicit subscription that would match the default + // subscription, also include the default querier. + if (this.subscribedToByDefault && !hasExplicitDefaultSubscription) { + for (const querier of queriers) { + querier.querierForSubscription(params, result, options.globalParameters, null); + } + } + } + }; + } +} + +class PreparedQuerier { + private readonly matchesParameters: (parameters: RequestParameters) => boolean; + private readonly lookupStages: RequestParameterEvaluators; + private readonly dataSource: BucketDataSource; + + constructor( + readonly stream: plan.StreamOptions, + querier: plan.StreamQuerier, + options: StreamInput + ) { + this.dataSource = options.preparedBuckets.get(querier.bucket)!; + this.matchesParameters = PreparedQuerier.prepareFilters(options.engine, querier.requestFilters); + + this.lookupStages = RequestParameterEvaluators.prepare(querier.lookupStages, querier.sourceInstantiation, options); + } + + querierForSubscription( + hydration: CreateSourceParams, + result: PendingQueriers, + params: RequestParameters, + subscription: RequestedStream | null + ) { + const reason: BucketInclusionReason = subscription != null ? { subscription: subscription.opaque_id } : 'default'; + + try { + if (!this.matchesParameters(params)) { + return; + } + + const subscriptionEvaluators = this.lookupStages.clone(); + + subscriptionEvaluators.partiallyInstantiate({ request: params }); + if (subscriptionEvaluators.isDefinitelyUninstantiable()) { + return; + } + + const bucketScope = hydration.hydrationState.getBucketSourceScope(this.dataSource); + + const parametersToBucket = (instantiation: SqliteParameterValue[]): ResolvedBucket => { + return { + definition: this.stream.name, + inclusion_reasons: [reason], + bucket: buildBucketName(bucketScope, JSONBucketNameSerialize.stringify(instantiation)), + priority: this.stream.priority + }; + }; + + // Do we need parameter lookups to resolve parameters? + const staticInstantiation = subscriptionEvaluators.extractFullInstantiation(); + if (staticInstantiation) { + // We don't! Return static querier. + result.queriers.push({ + staticBuckets: staticInstantiation.map(parametersToBucket), + hasDynamicBuckets: false, + async queryDynamicBucketDescriptions() { + return []; + } + }); + } else { + result.queriers.push({ + staticBuckets: [], + hasDynamicBuckets: true, + async queryDynamicBucketDescriptions(source) { + const evaluators = subscriptionEvaluators.clone(); + const instantiation = await evaluators.instantiate({ + hydrationState: hydration.hydrationState, + source, + request: params + }); + + return [...instantiation.map(parametersToBucket)]; + } + }); + } + } catch (e) { + result.errors.push({ + descriptor: this.stream.name, + message: `Error evaluating bucket ids: ${e.message}`, + subscription: subscription ?? undefined + }); + } + } + + private static prepareFilters( + engine: ScalarExpressionEngine, + requestFilters: SqlExpression[] + ) { + const mapExpressions = mapExternalDataToInstantiation(); + const evaluator = engine.prepareEvaluator({ + outputs: [], + filters: requestFilters.map((f) => mapExpressions.transform(f)) + }); + const instantiation = mapExpressions.instantiation; + + return (parameters: RequestParameters) => { + // We've added request filters as filters to prepareEvaluator, so if we get a row then the subscription matches. + return evaluator.evaluate(parametersForRequest(parameters, instantiation)).length != 0; + }; + } +} diff --git a/packages/sync-rules/src/sync_plan/evaluator/index.ts b/packages/sync-rules/src/sync_plan/evaluator/index.ts new file mode 100644 index 000000000..92edd3672 --- /dev/null +++ b/packages/sync-rules/src/sync_plan/evaluator/index.ts @@ -0,0 +1,40 @@ +import { SqlSyncRules } from '../../SqlSyncRules.js'; +import * as plan from '../plan.js'; +import { PreparedStreamBucketDataSource } from './bucket_data_source.js'; +import { PreparedParameterIndexLookupCreator } from './parameter_index_lookup_creator.js'; +import { StreamBucketSource, StreamInput } from './bucket_source.js'; +import { ScalarExpressionEngine } from './scalar_expression_evaluator.js'; + +export interface StreamEvaluationContext { + engine: ScalarExpressionEngine; +} + +export function addPrecompiledSyncPlanToRules( + plan: plan.SyncPlan, + rules: SqlSyncRules, + context: StreamEvaluationContext +) { + const preparedBuckets = new Map(); + const preparedLookups = new Map(); + + for (const bucket of plan.buckets) { + const prepared = new PreparedStreamBucketDataSource(bucket, context); + preparedBuckets.set(bucket, prepared); + rules.bucketDataSources.push(prepared); + } + + for (const parameter of plan.parameterIndexes) { + const prepared = new PreparedParameterIndexLookupCreator(parameter, context); + preparedLookups.set(parameter, prepared); + rules.bucketParameterLookupSources.push(prepared); + } + + const streamInput: StreamInput = { + ...context, + preparedBuckets, + preparedLookups + }; + for (const stream of plan.streams) { + rules.bucketSources.push(new StreamBucketSource(stream, streamInput)); + } +} diff --git a/packages/sync-rules/src/sync_plan/evaluator/lookup_stage_evaluator.ts b/packages/sync-rules/src/sync_plan/evaluator/lookup_stage_evaluator.ts new file mode 100644 index 000000000..b7e95ef75 --- /dev/null +++ b/packages/sync-rules/src/sync_plan/evaluator/lookup_stage_evaluator.ts @@ -0,0 +1,381 @@ +import { ParameterLookupSource, ScopedParameterLookup, UnscopedParameterLookup } from '../../BucketParameterQuerier.js'; +import { ParameterIndexLookupCreator } from '../../BucketSource.js'; +import { HydrationState } from '../../HydrationState.js'; +import { cartesianProduct } from '../../streams/utils.js'; +import { RequestParameters, SqliteJsonValue, SqliteValue } from '../../types.js'; +import { isJsonValue } from '../../utils.js'; +import { MapSourceVisitor, visitExpr } from '../expression_visitor.js'; +import * as plan from '../plan.js'; +import { StreamInput } from './bucket_source.js'; +import { + mapExternalDataToInstantiation, + TableValuedFunction, + TableValuedFunctionOutput +} from './scalar_expression_evaluator.js'; + +export type PreparedExpandingLookup = + | { type: 'parameter'; lookup: ParameterIndexLookupCreator; instantiation: PreparedParameterValue[] } + | { type: 'table_valued'; read(request: RequestParameters): SqliteParameterValue[][] } + | { type: 'cached'; values: SqliteParameterValue[][] }; + +/** + * A {@link plan.ParameterValue} that can be evaluated against request parameters. + * + * Additionally, this includes the `cached` variant which allows partially instantiating parameters. + */ +export type PreparedParameterValue = + | { type: 'request'; read(request: RequestParameters): SqliteValue } + | { type: 'lookup'; lookup: { stage: number; index: number }; resultIndex: number } + | { type: 'intersection'; values: PreparedParameterValue[] } + | { type: 'cached'; values: SqliteParameterValue[] }; + +export interface PartialInstantiationInput { + request: RequestParameters; +} + +export interface InstantiationInput extends PartialInstantiationInput { + hydrationState: HydrationState; + source: ParameterLookupSource; +} + +export class RequestParameterEvaluators { + private constructor( + readonly lookupStages: PreparedExpandingLookup[][], + readonly parameterValues: PreparedParameterValue[] + ) {} + + /** + * Returns a copy of this instance. + * + * We use this to be able to "fork" partial instantiations. For instance, we can evaluate paremeters not depending on + * subscription data as soon as the user connects (and reuse those instantiations for each subscription). + * + * Then for each subscription, we can further instantiate lookup stages and parameter values. + */ + clone(): RequestParameterEvaluators { + return new RequestParameterEvaluators( + this.lookupStages.map((stage) => [...stage]), + [...this.parameterValues] + ); + } + + /** + * Evaluates lookups and parameter values that be evaluated with a subset of the final input. + * + * This is used to determine whether a querier is static - the partial instantiation depending on request data fully + * resolves the stream, we don't need to lookup any parameters. + */ + partiallyInstantiate(input: PartialInstantiationInput) { + const helper = new PartialInstantiator(input, this); + + this.lookupStages.forEach((stage, stageIndex) => { + stage.forEach((_, indexInStage) => helper.expandingLookupSync(stageIndex, indexInStage)); + }); + + this.parameterValues.forEach((_, i) => helper.parameterSync(this.parameterValues, i)); + } + + async instantiate(input: InstantiationInput): Promise> { + const helper = new FullInstantiator(input, this); + + for (let i = 0; i < this.lookupStages.length; i++) { + // Within a stage, we can resolve lookups concurrently. + await Promise.all(this.lookupStages[i].map((_, j) => helper.expandingLookup(i, j))); + } + + // At this point, all lookups have been resolved and we can synchronously evaluate parameters which might depend on + // those lookups. + return helper.resolveInputs(this.parameterValues); + } + + /** + * Whether these evaluators are known to not result in any buckets, for instance because parameters are instanted to + * `NULL` values that aren't equal to anything. + * + * This is fairly efficient to compute and can be used to short-circuit further evaluation. + */ + isDefinitelyUninstantiable() { + for (const parameter of this.parameterValues) { + if (parameter.type != 'cached') { + return false; // Unknown + } + + if (parameter.values.length === 0) { + // Missing parameter. + return true; + } + } + + return false; + } + + extractFullInstantiation(): SqliteParameterValue[][] | undefined { + // All lookup stages need to be resolved, even if they're not used in a parameter. The reason is that queries like + // `WHERE 'static_value' IN (SELECT name FROM users WHERE id = auth.user_id())` are implemented as lookup stages, + // so we can't ignore them. + for (const stage of this.lookupStages) { + for (const element of stage) { + if (element.type !== 'cached') { + return undefined; + } + } + } + + // Outer array represents parameter, inner array represents values for a given parameter. + const parameters: SqliteParameterValue[][] = []; + for (const parameter of this.parameterValues) { + if (parameter.type !== 'cached') { + return undefined; + } + + parameters.push(parameter.values); + } + + // Transform to array of complete instantiations. + return [...cartesianProduct(...parameters)]; + } + + static prepare(lookupStages: plan.ExpandingLookup[][], values: plan.ParameterValue[], input: StreamInput) { + const mappedStages: PreparedExpandingLookup[][] = []; + const lookupToStage = new Map(); + + function mapParameterValue(value: plan.ParameterValue): PreparedParameterValue { + if (value.type == 'request') { + const mapper = mapExternalDataToInstantiation(); + const prepared = input.engine.prepareEvaluator({ filters: [], outputs: [mapper.transform(value.expr)] }); + const instantiation = mapper.instantiation; + + return { + type: 'request', + read(request) { + return prepared.evaluate(parametersForRequest(request, instantiation))[0][0]; + } + }; + } else if (value.type == 'lookup') { + const stagePosition = lookupToStage.get(value.lookup)!; + return { type: 'lookup', lookup: stagePosition, resultIndex: value.resultIndex }; + } else { + return { type: 'intersection', values: mapParameterValues(value.values) }; + } + } + + function mapParameterValues(values: plan.ParameterValue[]) { + return values.map(mapParameterValue); + } + + for (const stage of lookupStages) { + const stageIndex = mappedStages.length; + const mappedStage: PreparedExpandingLookup[] = []; + + for (const lookup of stage) { + const index = mappedStage.length; + lookupToStage.set(lookup, { stage: stageIndex, index }); + + if (lookup.type == 'parameter') { + mappedStage.push({ + type: 'parameter', + lookup: input.preparedLookups.get(lookup.lookup)!, + instantiation: mapParameterValues(lookup.instantiation) + }); + } else { + // Create an expression like SELECT FROM table_valued() WHERE + const mapInputs = mapExternalDataToInstantiation(); + const fn: TableValuedFunction = { + name: lookup.functionName, + inputs: lookup.functionInputs.map((e) => mapInputs.transform(e)) + }; + const mapOutputs = new MapSourceVisitor( + ({ column }) => ({ + function: fn, + column + }) + ); + + const prepared = input.engine.prepareEvaluator({ + tableValuedFunctions: [fn], + outputs: lookup.outputs.map((e) => visitExpr(mapOutputs, e, null)), + filters: lookup.filters.map((e) => visitExpr(mapOutputs, e, null)) + }); + + mappedStage.push({ + type: 'table_valued', + read(request) { + return [ + ...filterParameterRows(prepared.evaluate(parametersForRequest(request, mapInputs.instantiation))) + ]; + } + }); + } + } + } + + return new RequestParameterEvaluators(mappedStages, mapParameterValues(values)); + } +} + +class PartialInstantiator { + constructor( + protected readonly input: I, + protected readonly evaluators: RequestParameterEvaluators + ) {} + + parameterSync(parent: PreparedParameterValue[], index: number): SqliteParameterValue[] | undefined { + const current = parent[index]; + if (current.type === 'cached') { + return current.values; + } else if (current.type === 'intersection') { + let intersection: Set | null = null; + for (let i = 0; i < current.values.length; i++) { + const evaluated = this.parameterSync(current.values, i); + if (evaluated == null) { + return undefined; // Can't evaluate sub-parameter + } + + if (intersection == null) { + intersection = new Set(evaluated); + } else { + intersection = intersection.intersection(new Set(evaluated)); + } + + if (intersection.size == 0) { + // We don't even need to evaluate the rest. + break; + } + } + + let values: SqliteParameterValue[] = []; + if (intersection) { + values.push(...intersection.keys()); + } + + parent[index] = { type: 'cached', values }; + return values; + } else if (current.type === 'lookup') { + const resolvedLookup = this.expandingLookupSync(current.lookup.stage, current.lookup.index); + if (resolvedLookup) { + const values = resolvedLookup.map((row) => row[current.resultIndex]); + parent[index] = { type: 'cached', values }; + return values; + } + } else if (current.type === 'request') { + const value = current.read(this.input.request); + const values: SqliteParameterValue[] = isValidParameterValue(value) ? [value] : []; + + parent[index] = { type: 'cached', values }; + return values; + } + + return undefined; + } + + expandingLookupSync(stage: number, index: number): SqliteParameterValue[][] | undefined { + const lookup = this.evaluators.lookupStages[stage][index]; + if (lookup.type == 'table_valued') { + // We can evaluate this table-valued function already. + const values = lookup.read(this.input.request); + this.evaluators.lookupStages[stage][index] = { type: 'cached', values }; + return values; + } else if (lookup.type == 'cached') { + return lookup.values; + } + + return undefined; + } +} + +class FullInstantiator extends PartialInstantiator { + *resolveInputs(params: PreparedParameterValue[]): Generator { + const parameterValues = params.map((_, index) => { + const cached = this.parameterSync(params, index); + if (cached == null) { + // This method is only called for inputs from an earlier stage, which should have been resolved at this point. + throw new Error('Should have been able to resolve parameter from earlier stage synchronously.'); + } + return cached; + }); + + yield* cartesianProduct(...parameterValues); + } + + async expandingLookup(stage: number, index: number): Promise { + const lookup = this.evaluators.lookupStages[stage][index]; + if (lookup.type == 'parameter') { + const scope = this.input.hydrationState.getParameterIndexLookupScope(lookup.lookup); + lookup.instantiation; + + const outputs = await this.input.source.getParameterSets([ + ...this.resolveInputs(lookup.instantiation).map((instantiation) => + ScopedParameterLookup.normalized(scope, UnscopedParameterLookup.normalized(instantiation)) + ) + ]); + + // Stream parameters generate an output row like {0: , 1: , ...}. + const values = outputs.map((row) => { + const length = Object.entries(row).length; + const asArray: SqliteParameterValue[] = []; + + for (let i = 0; i < length; i++) { + asArray.push(row[i.toString()] as SqliteParameterValue); + } + return asArray; + }); + + this.evaluators.lookupStages[stage][index] = { type: 'cached', values }; + return values; + } + + const other = this.expandingLookupSync(stage, index); + if (other == null) { + throw new Error('internal error: Unable to resolve non-parameter lookup synchronously?'); + } + return other; + } +} + +/** + * A value that can be used as a bucket parameter. + * + * We don't support binary bucket parameters, so this needs to be a {@link SqliteJsonValue}. Further, bucket parameters + * are always instantiated through the `=` operator, and `NULL` values in SQLite don't compare via `=`. So, `null` + * values also aren't allowed as parameters. + */ +export type SqliteParameterValue = NonNullable; + +export function isValidParameterValue(value: SqliteValue): value is SqliteParameterValue { + return value != null && isJsonValue(value); +} + +export function isValidParameterValueRow(row: SqliteValue[]): row is SqliteParameterValue[] { + for (const value of row) { + if (!isValidParameterValue(value)) { + return false; + } + } + + return true; +} + +export function parametersForRequest(parameters: RequestParameters, values: plan.SqlParameterValue[]): string[] { + return values.map((v) => { + if ('request' in v) { + switch (v.request) { + case 'auth': + return parameters.rawTokenPayload; + case 'subscription': + return parameters.rawStreamParameters!; + case 'connection': + return parameters.rawUserParameters; + } + } else { + throw new Error('Illegal column reference in request filter'); + } + }); +} + +function* filterParameterRows(rows: SqliteValue[][]): Generator { + for (const row of rows) { + if (isValidParameterValueRow(row)) { + yield row; + } + } +} diff --git a/packages/sync-rules/src/sync_plan/evaluator/parameter_index_lookup_creator.ts b/packages/sync-rules/src/sync_plan/evaluator/parameter_index_lookup_creator.ts new file mode 100644 index 000000000..6200026ca --- /dev/null +++ b/packages/sync-rules/src/sync_plan/evaluator/parameter_index_lookup_creator.ts @@ -0,0 +1,78 @@ +import { ParameterIndexLookupCreator } from '../../BucketSource.js'; +import { ParameterLookupScope } from '../../HydrationState.js'; +import { mapExternalDataToInstantiation, ScalarExpressionEvaluator } from './scalar_expression_evaluator.js'; +import * as plan from '../plan.js'; +import { StreamEvaluationContext } from './index.js'; +import { TablePattern } from '../../TablePattern.js'; +import { SourceTableInterface } from '../../SourceTableInterface.js'; +import { SqliteJsonValue, SqliteRow, UnscopedEvaluatedParametersResult } from '../../types.js'; +import { isValidParameterValueRow } from './lookup_stage_evaluator.js'; +import { UnscopedParameterLookup } from '../../BucketParameterQuerier.js'; + +export class PreparedParameterIndexLookupCreator implements ParameterIndexLookupCreator { + readonly defaultLookupScope: ParameterLookupScope; + private readonly evaluator: ScalarExpressionEvaluator; + private readonly evaluatorInputs: plan.ColumnSqlParameterValue[]; + private readonly numberOfOutputs: number; + + constructor( + private readonly source: plan.StreamParameterIndexLookupCreator, + { engine }: StreamEvaluationContext + ) { + this.defaultLookupScope = source.defaultLookupScope; + const mapExpressions = mapExternalDataToInstantiation(); + const expressions = source.outputs.map((o) => mapExpressions.transform(o)); + + this.numberOfOutputs = expressions.length; + for (const parameter of source.parameters) { + expressions.push(mapExpressions.transform(parameter.expr)); + } + + this.evaluator = engine.prepareEvaluator({ + outputs: expressions, + filters: source.filters.map((f) => mapExpressions.transform(f)) + }); + this.evaluatorInputs = mapExpressions.instantiation; + } + + getSourceTables(): Set { + const set = new Set(); + set.add(this.source.sourceTable); + return set; + } + + evaluateParameterRow(sourceTable: SourceTableInterface, row: SqliteRow): UnscopedEvaluatedParametersResult[] { + const results: UnscopedEvaluatedParametersResult[] = []; + if (!this.source.sourceTable.matches(sourceTable)) { + return results; + } + + try { + const inputInstantiation = this.evaluatorInputs.map((input) => row[input.column]); + + for (const outputRow of this.evaluator.evaluate(inputInstantiation)) { + if (!isValidParameterValueRow(outputRow)) { + continue; + } + + const outputs: Record = {}; + for (let i = 0; i < this.numberOfOutputs; i++) { + outputs[i.toString()] = outputRow[i]; + } + + // source is [...outputs, ...partitionValues] + const partitionValues = outputRow.splice(0, this.numberOfOutputs); + const lookup = UnscopedParameterLookup.normalized(partitionValues); + results.push({ lookup, bucketParameters: [outputs] }); + } + } catch (e) { + results.push({ error: e.message }); + } + + return results; + } + + tableSyncsParameters(table: SourceTableInterface): boolean { + return this.source.sourceTable.matches(table); + } +} diff --git a/packages/sync-rules/src/sync_plan/evaluator/scalar_expression_evaluator.ts b/packages/sync-rules/src/sync_plan/evaluator/scalar_expression_evaluator.ts new file mode 100644 index 000000000..d7bce7613 --- /dev/null +++ b/packages/sync-rules/src/sync_plan/evaluator/scalar_expression_evaluator.ts @@ -0,0 +1,166 @@ +import { SqliteValue } from '../../types.js'; +import { ExternalData, SqlExpression } from '../expression.js'; +import { ExpressionToSqlite } from '../expression_to_sql.js'; +import { MapSourceVisitor, visitExpr } from '../expression_visitor.js'; +import { ColumnSqlParameterValue, RequestSqlParameterValue, SqlParameterValue } from '../plan.js'; + +/** + * Description of a scalar SQL statement (without external dependencies on tables). + * + * This corresponds to the SQL statement `SELECT $outputs FROM $tableValuedFunctions WHERE $filters`. + * + * Each output and filter expression can reference: + * + * 1. An external parameter, passed in {@link ScalarExpressionEvaluator.evaluate}. The number corresponds to the index + * of the parameter to use. + * 2. An output column of a table-valued function added to the statement. + */ +export interface ScalarStatement { + outputs: SqlExpression[]; + filters: SqlExpression[]; + tableValuedFunctions?: TableValuedFunction[]; +} + +export interface TableValuedFunction { + name: string; + inputs: SqlExpression[]; +} + +export interface TableValuedFunctionOutput { + function: TableValuedFunction; + column: string; +} + +export interface ScalarExpressionEngine { + prepareEvaluator(statement: ScalarStatement): ScalarExpressionEvaluator; + + /** + * Disposes all evaluators and closes this engine. + */ + close(): void; +} + +export interface ScalarExpressionEvaluator { + evaluate(inputs: SqliteValue[]): SqliteValue[][]; +} + +/** + * Creates a {@link ScalarExpressionEngine} backed by an in-memory SQLite database using `node:sqlite` APIs. + * + * @param module The imported `node:sqlite` module (passed as a parameter to ensure this package keeps working in + * browsers). + */ +export function nodeSqliteExpressionEngine(module: typeof import('node:sqlite')): ScalarExpressionEngine { + const db = new module.DatabaseSync(':memory:', { readOnly: true, readBigInts: true, returnArrays: true } as any); + + return { + prepareEvaluator({ outputs, filters, tableValuedFunctions = [] }): ScalarExpressionEvaluator { + const tableValuedFunctionNames = new Map(); + for (const fn of tableValuedFunctions) { + tableValuedFunctionNames.set(fn, `tbl_${tableValuedFunctionNames.size}`); + } + + const toSqlite = new StatementToSqlite(tableValuedFunctionNames); + toSqlite.addLexeme('SELECT'); + + if (outputs.length === 0) { + // We need to add a bogus expression to avoid a syntax error (`SELECT WHERE ...` alone is invalid). + toSqlite.addLexeme('1'); + } else { + outputs.forEach((expr, i) => { + if (i != 0) toSqlite.comma(); + visitExpr(toSqlite, expr, null); + }); + } + + if (tableValuedFunctionNames.size != 0) { + toSqlite.addLexeme('FROM'); + + tableValuedFunctionNames.forEach((name, fn) => { + visitExpr(toSqlite, { type: 'function', function: fn.name, parameters: fn.inputs }, null); + toSqlite.addLexeme('AS'); + toSqlite.identifier(name); + }); + } + + if (filters.length != 0) { + toSqlite.addLexeme('WHERE'); + filters.forEach((expr, i) => { + if (i != 0) toSqlite.comma(); + visitExpr(toSqlite, expr, null); + }); + } + + const stmt = db.prepare(toSqlite.sql); + return { + evaluate(inputs) { + // Types are wrong, all() will return a SqliteValue[][] because returnArrays is enabled. + return stmt.all(...inputs) as unknown as SqliteValue[][]; + } + }; + }, + close() { + db.close(); + } + }; +} + +class StatementToSqlite extends ExpressionToSqlite { + constructor(private readonly tableValuedFunctionNames: Map) { + super(); + } + + comma() { + this.addLexeme(',', { spaceLeft: false }); + } + + visitExternalData(expr: ExternalData): void { + if (typeof expr.source === 'number') { + this.addLexeme(`?${expr.source}`); + } else { + const fn = this.tableValuedFunctionNames.get(expr.source.function)!; + this.identifier(fn); + this.addLexeme('.', { spaceLeft: false, spaceRight: false }); + this.identifier(expr.source.column); + } + } +} + +/** + * Utility to transform multiple expressions embedding parameter values into expressions that reference a parameter + * index representing that value. + * + * Parameter values used multiple times are de-duplicated into the same SQL parameter. + */ +export function mapExternalDataToInstantiation() { + const instantiation: T[] = []; + const columnsToIndex = new Map(); + const requestToIndex = new Map(); + + const visitor = new MapSourceVisitor((data) => { + const oldLength = instantiation.length; + if ('column' in data) { + if (columnsToIndex.has(data)) { + return columnsToIndex.get(data)!; + } + + columnsToIndex.set(data, oldLength); + } else { + if (requestToIndex.has(data)) { + return requestToIndex.get(data)!; + } + + requestToIndex.set(data, oldLength); + } + + instantiation.push(data); + return oldLength; + }); + + return { + instantiation, + transform(expr: SqlExpression) { + return visitExpr(visitor, expr, null); + } + }; +} diff --git a/packages/sync-rules/src/sync_plan/expression_to_sql.ts b/packages/sync-rules/src/sync_plan/expression_to_sql.ts index 1e8d0c55a..123c59212 100644 --- a/packages/sync-rules/src/sync_plan/expression_to_sql.ts +++ b/packages/sync-rules/src/sync_plan/expression_to_sql.ts @@ -4,6 +4,7 @@ import { BinaryOperator, CaseWhenExpression, CastExpression, + ExternalData, LiteralExpression, ScalarFunctionCallExpression, ScalarInExpression, @@ -29,7 +30,7 @@ export class ExpressionToSqlite implements ExpressionVisitor implements ExpressionVisitor implements ExpressionVisitor): void { this.addLexeme('?'); } diff --git a/packages/sync-rules/src/utils.ts b/packages/sync-rules/src/utils.ts index cd5ed1566..774458229 100644 --- a/packages/sync-rules/src/utils.ts +++ b/packages/sync-rules/src/utils.ts @@ -15,6 +15,7 @@ import { SqliteValue } from './types.js'; import { CustomArray, CustomObject, CustomSqliteValue } from './types/custom_sqlite_value.js'; +import { castAsText } from './sql_functions.js'; export function isSelectStatement(q: Statement): q is SelectFromStatement { return q.type == 'select'; @@ -233,3 +234,20 @@ export function normalizeParameterValue(value: SqliteJsonValue): SqliteJsonValue } return value; } + +/** + * Extracts and normalizes the ID column from a row. + */ +export function idFromData(data: SqliteJsonRow): string { + let id = data.id; + if (typeof id != 'string') { + // While an explicit cast would be better, this covers against very common + // issues when initially testing out sync, for example when the id column is an + // auto-incrementing integer. + // If there is no id column, we use a blank id. This will result in the user syncing + // a single arbitrary row for this table - better than just not being able to sync + // anything. + id = castAsText(id) ?? ''; + } + return id; +} diff --git a/tsconfig.base.json b/tsconfig.base.json index b2bd00df4..8fca2db15 100644 --- a/tsconfig.base.json +++ b/tsconfig.base.json @@ -1,7 +1,7 @@ { "compilerOptions": { - "lib": ["ES2024"], - "target": "ES2024", + "lib": ["ESNext"], + "target": "esnext", "module": "NodeNext", "moduleResolution": "NodeNext", "strict": true, From dec2dee31a726117ffac30c900305d074265972d Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Tue, 27 Jan 2026 14:52:16 +0100 Subject: [PATCH 02/14] Add node expression test --- .../sync_plan/evaluator/bucket_data_source.ts | 3 +- .../src/sync_plan/evaluator/bucket_source.ts | 4 +- ...ge_evaluator.ts => parameter_evaluator.ts} | 126 ++++++++++++------ .../parameter_index_lookup_creator.ts | 2 +- .../evaluator/scalar_expression_evaluator.ts | 84 ++++++------ packages/sync-rules/src/types.ts | 9 ++ packages/sync-rules/src/utils.ts | 5 + .../scalar_expression_evaluator.test.ts | 111 +++++++++++++++ 8 files changed, 260 insertions(+), 84 deletions(-) rename packages/sync-rules/src/sync_plan/evaluator/{lookup_stage_evaluator.ts => parameter_evaluator.ts} (77%) create mode 100644 packages/sync-rules/test/src/sync_plan/evaluator/scalar_expression_evaluator.test.ts diff --git a/packages/sync-rules/src/sync_plan/evaluator/bucket_data_source.ts b/packages/sync-rules/src/sync_plan/evaluator/bucket_data_source.ts index 0e2e67962..b3bb9ae83 100644 --- a/packages/sync-rules/src/sync_plan/evaluator/bucket_data_source.ts +++ b/packages/sync-rules/src/sync_plan/evaluator/bucket_data_source.ts @@ -9,12 +9,11 @@ import { UnscopedEvaluatedRow, UnscopedEvaluationResult } from '../../types.js'; -import { filterJsonRow, idFromData, isJsonValue, JSONBucketNameSerialize } from '../../utils.js'; +import { filterJsonRow, idFromData, isJsonValue, isValidParameterValue, JSONBucketNameSerialize } from '../../utils.js'; import { SqlExpression } from '../expression.js'; import { ExpressionToSqlite } from '../expression_to_sql.js'; import * as plan from '../plan.js'; import { StreamEvaluationContext } from './index.js'; -import { isValidParameterValue } from './lookup_stage_evaluator.js'; import { mapExternalDataToInstantiation, ScalarExpressionEvaluator } from './scalar_expression_evaluator.js'; export class PreparedStreamBucketDataSource implements BucketDataSource { diff --git a/packages/sync-rules/src/sync_plan/evaluator/bucket_source.ts b/packages/sync-rules/src/sync_plan/evaluator/bucket_source.ts index bf73d028c..ecd576a68 100644 --- a/packages/sync-rules/src/sync_plan/evaluator/bucket_source.ts +++ b/packages/sync-rules/src/sync_plan/evaluator/bucket_source.ts @@ -10,8 +10,8 @@ import { StreamEvaluationContext } from './index.js'; import * as plan from '../plan.js'; import { mapExternalDataToInstantiation, ScalarExpressionEngine } from './scalar_expression_evaluator.js'; import { SqlExpression } from '../expression.js'; -import { RequestParameters } from '../../types.js'; -import { parametersForRequest, RequestParameterEvaluators, SqliteParameterValue } from './lookup_stage_evaluator.js'; +import { RequestParameters, SqliteParameterValue } from '../../types.js'; +import { parametersForRequest, RequestParameterEvaluators } from './parameter_evaluator.js'; import { PendingQueriers } from '../../BucketParameterQuerier.js'; import { RequestedStream } from '../../SqlSyncRules.js'; import { BucketInclusionReason, ResolvedBucket } from '../../BucketDescription.js'; diff --git a/packages/sync-rules/src/sync_plan/evaluator/lookup_stage_evaluator.ts b/packages/sync-rules/src/sync_plan/evaluator/parameter_evaluator.ts similarity index 77% rename from packages/sync-rules/src/sync_plan/evaluator/lookup_stage_evaluator.ts rename to packages/sync-rules/src/sync_plan/evaluator/parameter_evaluator.ts index b7e95ef75..c42bb30b6 100644 --- a/packages/sync-rules/src/sync_plan/evaluator/lookup_stage_evaluator.ts +++ b/packages/sync-rules/src/sync_plan/evaluator/parameter_evaluator.ts @@ -2,8 +2,8 @@ import { ParameterLookupSource, ScopedParameterLookup, UnscopedParameterLookup } import { ParameterIndexLookupCreator } from '../../BucketSource.js'; import { HydrationState } from '../../HydrationState.js'; import { cartesianProduct } from '../../streams/utils.js'; -import { RequestParameters, SqliteJsonValue, SqliteValue } from '../../types.js'; -import { isJsonValue } from '../../utils.js'; +import { RequestParameters, SqliteParameterValue, SqliteValue } from '../../types.js'; +import { isValidParameterValue } from '../../utils.js'; import { MapSourceVisitor, visitExpr } from '../expression_visitor.js'; import * as plan from '../plan.js'; import { StreamInput } from './bucket_source.js'; @@ -13,34 +13,53 @@ import { TableValuedFunctionOutput } from './scalar_expression_evaluator.js'; -export type PreparedExpandingLookup = - | { type: 'parameter'; lookup: ParameterIndexLookupCreator; instantiation: PreparedParameterValue[] } - | { type: 'table_valued'; read(request: RequestParameters): SqliteParameterValue[][] } - | { type: 'cached'; values: SqliteParameterValue[][] }; - /** - * A {@link plan.ParameterValue} that can be evaluated against request parameters. + * Finds bucket parameters for a given request or subscription. * - * Additionally, this includes the `cached` variant which allows partially instantiating parameters. + * In sync streams, queriers are represented as a DAG structure describing how to get from connection data to bucket + * parameters. + * + * As an example, consider the following stream: + * + * ``` + * SELECT projects.* FROM projects + * INNER JOIN orgs ON orgs.id = projects.org_id + * WHERE orgs.name = auth.parameter('org') + * ``` + * + * This would partition data into a bucket with a single parameter (grouping by `projects.org_id`). It would also + * prepare a lookup from `orgs.name` to `orgs.id`. + * + * The querier for this would have: + * + * 1. A single lookup stage with a single {@link plan.ParameterLookup}. That lookup would have an instantiation + * reflecting `auth.parameter('org')` as a `request` {@link plan.ParameterValue}. + * 2. A single {@link plan.StreamQuerier.sourceInstantiation}, a `lookup` {@link plan.ParameterValue} referencing the + * lookup from step 1. + * + * On this prepared evaluator, lookup stages and parameter values are tracked as {@link PreparedExpandingLookup}s and + * {@link PreparedParameterValue}s, respectively. These correspond to their definitions on sync plans, except that: + * + * 1. Instead of being a description of the parameter, they're a JavaScript function that can be invoked to compute + * parameters. + * 2. After being called once, we can replace them with a cached value. This enables a partial instantiation, and + * avoids recomputing everything whenever a parameter lookup changes. In the example stream, we would run and cache + * the outputs of `auth.parameter('org')` for a given connection. This sub-expression would not get re-evaluated + * when the `org-name` -> `org.id` lookup changes. + * + * For queriers that don't use parameter lookups, e.g. for streams like `SELECT * FROM users WHERE id = auth.user_id()`, + * the partial instantiation based on connection data happens to be a complete instantiation. We use this when building + * queriers by indicating that no lookups will be used. */ -export type PreparedParameterValue = - | { type: 'request'; read(request: RequestParameters): SqliteValue } - | { type: 'lookup'; lookup: { stage: number; index: number }; resultIndex: number } - | { type: 'intersection'; values: PreparedParameterValue[] } - | { type: 'cached'; values: SqliteParameterValue[] }; - -export interface PartialInstantiationInput { - request: RequestParameters; -} - -export interface InstantiationInput extends PartialInstantiationInput { - hydrationState: HydrationState; - source: ParameterLookupSource; -} - export class RequestParameterEvaluators { private constructor( + /** + * Pending lookup stages, or their cached outputs. + */ readonly lookupStages: PreparedExpandingLookup[][], + /** + * Pending parameter values, or their cached outputs. + */ readonly parameterValues: PreparedParameterValue[] ) {} @@ -48,9 +67,8 @@ export class RequestParameterEvaluators { * Returns a copy of this instance. * * We use this to be able to "fork" partial instantiations. For instance, we can evaluate paremeters not depending on - * subscription data as soon as the user connects (and reuse those instantiations for each subscription). - * - * Then for each subscription, we can further instantiate lookup stages and parameter values. + * parameter lookups as soon as the user connects (and keep that instantiation static along the lifetime of the + * connection). */ clone(): RequestParameterEvaluators { return new RequestParameterEvaluators( @@ -60,10 +78,10 @@ export class RequestParameterEvaluators { } /** - * Evaluates lookups and parameter values that be evaluated with a subset of the final input. + * Evaluates those lookups and parameter values that be evaluated without looking up parameter indexes. * - * This is used to determine whether a querier is static - the partial instantiation depending on request data fully - * resolves the stream, we don't need to lookup any parameters. + * This is also used to determine whether a querier is static - if the partial instantiation depending on request data + * fully resolves the stream, we don't need to lookup any parameters. */ partiallyInstantiate(input: PartialInstantiationInput) { const helper = new PartialInstantiator(input, this); @@ -75,6 +93,11 @@ export class RequestParameterEvaluators { this.parameterValues.forEach((_, i) => helper.parameterSync(this.parameterValues, i)); } + /** + * Resolves and caches all lookup stages and parameter values. + * + * Because this needs to lookup parameter indexes, it is asynchronous. + */ async instantiate(input: InstantiationInput): Promise> { const helper = new FullInstantiator(input, this); @@ -121,7 +144,7 @@ export class RequestParameterEvaluators { } } - // Outer array represents parameter, inner array represents values for a given parameter. + // Outer array represents parameters, inner array represents values for a given parameter. const parameters: SqliteParameterValue[][] = []; for (const parameter of this.parameterValues) { if (parameter.type !== 'cached') { @@ -135,12 +158,21 @@ export class RequestParameterEvaluators { return [...cartesianProduct(...parameters)]; } + /** + * Prepares evaluators for a description of parameter values obtained from a compiled querier in the sync plan. + * + * @param lookupStages The {@link plan.StreamQuerier.lookupStages} of the querier to compile. + * @param values The {@link plan.StreamQuerier.sourceInstantiation} of the querier to compile. + * @param input Access to bucket and parameter sources generated for buckets and parameter lookups referenced by the + * querier. + */ static prepare(lookupStages: plan.ExpandingLookup[][], values: plan.ParameterValue[], input: StreamInput) { const mappedStages: PreparedExpandingLookup[][] = []; const lookupToStage = new Map(); function mapParameterValue(value: plan.ParameterValue): PreparedParameterValue { if (value.type == 'request') { + // Prepare an expression evaluating the expression derived from request data. const mapper = mapExternalDataToInstantiation(); const prepared = input.engine.prepareEvaluator({ filters: [], outputs: [mapper.transform(value.expr)] }); const instantiation = mapper.instantiation; @@ -219,6 +251,10 @@ class PartialInstantiator { } } +export type PreparedExpandingLookup = + | { type: 'parameter'; lookup: ParameterIndexLookupCreator; instantiation: PreparedParameterValue[] } + | { type: 'table_valued'; read(request: RequestParameters): SqliteParameterValue[][] } + | { type: 'cached'; values: SqliteParameterValue[][] }; + /** - * A value that can be used as a bucket parameter. + * A {@link plan.ParameterValue} that can be evaluated against request parameters. * - * We don't support binary bucket parameters, so this needs to be a {@link SqliteJsonValue}. Further, bucket parameters - * are always instantiated through the `=` operator, and `NULL` values in SQLite don't compare via `=`. So, `null` - * values also aren't allowed as parameters. + * Additionally, this includes the `cached` variant which allows partially instantiating parameters. */ -export type SqliteParameterValue = NonNullable; +export type PreparedParameterValue = + | { type: 'request'; read(request: RequestParameters): SqliteValue } + | { type: 'lookup'; lookup: { stage: number; index: number }; resultIndex: number } + | { type: 'intersection'; values: PreparedParameterValue[] } + | { type: 'cached'; values: SqliteParameterValue[] }; -export function isValidParameterValue(value: SqliteValue): value is SqliteParameterValue { - return value != null && isJsonValue(value); +export interface PartialInstantiationInput { + request: RequestParameters; +} + +export interface InstantiationInput extends PartialInstantiationInput { + hydrationState: HydrationState; + source: ParameterLookupSource; } export function isValidParameterValueRow(row: SqliteValue[]): row is SqliteParameterValue[] { diff --git a/packages/sync-rules/src/sync_plan/evaluator/parameter_index_lookup_creator.ts b/packages/sync-rules/src/sync_plan/evaluator/parameter_index_lookup_creator.ts index 6200026ca..11cef0d6f 100644 --- a/packages/sync-rules/src/sync_plan/evaluator/parameter_index_lookup_creator.ts +++ b/packages/sync-rules/src/sync_plan/evaluator/parameter_index_lookup_creator.ts @@ -6,7 +6,7 @@ import { StreamEvaluationContext } from './index.js'; import { TablePattern } from '../../TablePattern.js'; import { SourceTableInterface } from '../../SourceTableInterface.js'; import { SqliteJsonValue, SqliteRow, UnscopedEvaluatedParametersResult } from '../../types.js'; -import { isValidParameterValueRow } from './lookup_stage_evaluator.js'; +import { isValidParameterValueRow } from './parameter_evaluator.js'; import { UnscopedParameterLookup } from '../../BucketParameterQuerier.js'; export class PreparedParameterIndexLookupCreator implements ParameterIndexLookupCreator { diff --git a/packages/sync-rules/src/sync_plan/evaluator/scalar_expression_evaluator.ts b/packages/sync-rules/src/sync_plan/evaluator/scalar_expression_evaluator.ts index d7bce7613..34a1c965e 100644 --- a/packages/sync-rules/src/sync_plan/evaluator/scalar_expression_evaluator.ts +++ b/packages/sync-rules/src/sync_plan/evaluator/scalar_expression_evaluator.ts @@ -16,8 +16,8 @@ import { ColumnSqlParameterValue, RequestSqlParameterValue, SqlParameterValue } * 2. An output column of a table-valued function added to the statement. */ export interface ScalarStatement { - outputs: SqlExpression[]; - filters: SqlExpression[]; + outputs?: SqlExpression[]; + filters?: SqlExpression[]; tableValuedFunctions?: TableValuedFunction[]; } @@ -54,44 +54,8 @@ export function nodeSqliteExpressionEngine(module: typeof import('node:sqlite')) const db = new module.DatabaseSync(':memory:', { readOnly: true, readBigInts: true, returnArrays: true } as any); return { - prepareEvaluator({ outputs, filters, tableValuedFunctions = [] }): ScalarExpressionEvaluator { - const tableValuedFunctionNames = new Map(); - for (const fn of tableValuedFunctions) { - tableValuedFunctionNames.set(fn, `tbl_${tableValuedFunctionNames.size}`); - } - - const toSqlite = new StatementToSqlite(tableValuedFunctionNames); - toSqlite.addLexeme('SELECT'); - - if (outputs.length === 0) { - // We need to add a bogus expression to avoid a syntax error (`SELECT WHERE ...` alone is invalid). - toSqlite.addLexeme('1'); - } else { - outputs.forEach((expr, i) => { - if (i != 0) toSqlite.comma(); - visitExpr(toSqlite, expr, null); - }); - } - - if (tableValuedFunctionNames.size != 0) { - toSqlite.addLexeme('FROM'); - - tableValuedFunctionNames.forEach((name, fn) => { - visitExpr(toSqlite, { type: 'function', function: fn.name, parameters: fn.inputs }, null); - toSqlite.addLexeme('AS'); - toSqlite.identifier(name); - }); - } - - if (filters.length != 0) { - toSqlite.addLexeme('WHERE'); - filters.forEach((expr, i) => { - if (i != 0) toSqlite.comma(); - visitExpr(toSqlite, expr, null); - }); - } - - const stmt = db.prepare(toSqlite.sql); + prepareEvaluator(input): ScalarExpressionEvaluator { + const stmt = db.prepare(scalarStatementToSql(input)); return { evaluate(inputs) { // Types are wrong, all() will return a SqliteValue[][] because returnArrays is enabled. @@ -105,6 +69,46 @@ export function nodeSqliteExpressionEngine(module: typeof import('node:sqlite')) }; } +export function scalarStatementToSql({ filters = [], outputs = [], tableValuedFunctions = [] }: ScalarStatement) { + const tableValuedFunctionNames = new Map(); + for (const fn of tableValuedFunctions) { + tableValuedFunctionNames.set(fn, `tbl_${tableValuedFunctionNames.size}`); + } + + const toSqlite = new StatementToSqlite(tableValuedFunctionNames); + toSqlite.addLexeme('SELECT'); + + if (outputs.length === 0) { + // We need to add a bogus expression to avoid a syntax error (`SELECT WHERE ...` alone is invalid). + toSqlite.addLexeme('1'); + } else { + outputs.forEach((expr, i) => { + if (i != 0) toSqlite.comma(); + visitExpr(toSqlite, expr, null); + }); + } + + if (tableValuedFunctionNames.size != 0) { + toSqlite.addLexeme('FROM'); + + tableValuedFunctionNames.forEach((name, fn) => { + visitExpr(toSqlite, { type: 'function', function: fn.name, parameters: fn.inputs }, null); + toSqlite.addLexeme('AS'); + toSqlite.identifier(name); + }); + } + + if (filters.length != 0) { + toSqlite.addLexeme('WHERE'); + filters.forEach((expr, i) => { + if (i != 0) toSqlite.comma(); + visitExpr(toSqlite, expr, null); + }); + } + + return toSqlite.sql; +} + class StatementToSqlite extends ExpressionToSqlite { constructor(private readonly tableValuedFunctionNames: Map) { super(); diff --git a/packages/sync-rules/src/types.ts b/packages/sync-rules/src/types.ts index 44bc7ee30..2967ac37f 100644 --- a/packages/sync-rules/src/types.ts +++ b/packages/sync-rules/src/types.ts @@ -237,6 +237,15 @@ export class RequestParameters implements ParameterValueSet { */ export type SqliteJsonValue = number | string | bigint | null; +/** + * A value that can be used as a bucket parameter. + * + * We don't support binary bucket parameters, so this needs to be a {@link SqliteJsonValue}. Further, bucket parameters + * are always instantiated through the `=` operator, and `NULL` values in SQLite don't compare via `=`. So, `null` + * values also aren't allowed as parameters. + */ +export type SqliteParameterValue = NonNullable; + /** * A value supported by the SQLite type system. */ diff --git a/packages/sync-rules/src/utils.ts b/packages/sync-rules/src/utils.ts index 774458229..825660e00 100644 --- a/packages/sync-rules/src/utils.ts +++ b/packages/sync-rules/src/utils.ts @@ -11,6 +11,7 @@ import { SqliteInputValue, SqliteJsonRow, SqliteJsonValue, + SqliteParameterValue, SqliteRow, SqliteValue } from './types.js'; @@ -81,6 +82,10 @@ export function isJsonValue(value: SqliteValue): value is SqliteJsonValue { return value == null || typeof value == 'string' || typeof value == 'number' || typeof value == 'bigint'; } +export function isValidParameterValue(value: SqliteValue): value is SqliteParameterValue { + return value != null && isJsonValue(value); +} + function filterJsonData(data: any, context: CompatibilityContext, depth = 0): any { if (depth > DEPTH_LIMIT) { // This is primarily to prevent infinite recursion diff --git a/packages/sync-rules/test/src/sync_plan/evaluator/scalar_expression_evaluator.test.ts b/packages/sync-rules/test/src/sync_plan/evaluator/scalar_expression_evaluator.test.ts new file mode 100644 index 000000000..6d41688a5 --- /dev/null +++ b/packages/sync-rules/test/src/sync_plan/evaluator/scalar_expression_evaluator.test.ts @@ -0,0 +1,111 @@ +import * as sqlite from 'node:sqlite'; +import { describe, expect, onTestFinished, test } from 'vitest'; +import { + nodeSqliteExpressionEngine, + scalarStatementToSql, + TableValuedFunction +} from '../../../../src/sync_plan/evaluator/scalar_expression_evaluator.js'; + +describe('scalarStatementToSql', () => { + test('empty', () => { + expect(scalarStatementToSql({})).toStrictEqual('SELECT 1'); + }); + + test('outputs', () => { + expect( + scalarStatementToSql({ + outputs: [ + { + type: 'function', + function: 'foo', + parameters: [ + { type: 'data', source: 1 }, + { type: 'lit_string', value: 'hello' } + ] + } + ] + }) + ).toStrictEqual(`SELECT "foo"(?1, 'hello')`); + }); + + test('filters', () => { + expect( + scalarStatementToSql({ + filters: [ + { + type: 'lit_int', + base10: '1' + } + ] + }) + ).toStrictEqual(`SELECT 1 WHERE 1`); + }); + + test('output and filters', () => { + expect( + scalarStatementToSql({ + outputs: [ + { + type: 'lit_string', + value: 'foo' + } + ], + filters: [ + { + type: 'function', + function: 'foo', + parameters: [] + } + ] + }) + ).toStrictEqual(`SELECT 'foo' WHERE "foo"()`); + }); + + test('table-valued functions', () => { + const fn: TableValuedFunction = { + name: 'json_each', + inputs: [{ type: 'data', source: 1 }] + }; + + expect( + scalarStatementToSql({ + outputs: [ + { + type: 'data', + source: { function: fn, column: 'value' } + } + ], + filters: [ + { + type: 'function', + function: 'foo', + parameters: [ + { + type: 'data', + source: { function: fn, column: 'key' } + } + ] + } + ], + tableValuedFunctions: [fn] + }) + ).toStrictEqual(`SELECT "tbl_0"."value" FROM "json_each"(?1) AS "tbl_0" WHERE "foo"("tbl_0"."key")`); + }); +}); + +test('nodeSqliteExpressionEngine', () => { + const engine = nodeSqliteExpressionEngine(sqlite); + onTestFinished(() => engine.close()); + + const fn: TableValuedFunction = { + name: 'json_each', + inputs: [{ type: 'data', source: 1 }] + }; + + const stmt = engine.prepareEvaluator({ + outputs: [{ type: 'data', source: { function: fn, column: 'value' } }], + tableValuedFunctions: [fn] + }); + + expect(stmt.evaluate([JSON.stringify([1, 'two', 3.2])])).toStrictEqual([[1n], ['two'], [3.2]]); +}); From 10c11ed2a161660b4816d6db50f833409e0edc26 Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Tue, 27 Jan 2026 15:07:45 +0100 Subject: [PATCH 03/14] Add a few more tests --- packages/sync-rules/src/index.ts | 2 + .../sync_plan/evaluator/bucket_data_source.ts | 4 +- .../parameter_index_lookup_creator.ts | 4 +- .../evaluator/scalar_expression_evaluator.ts | 8 +- .../sync-rules/test/src/compiler/utils.ts | 2 +- .../src/sync_plan/evaluator/evaluator.test.ts | 179 ++++++++++++++++++ .../test/src/sync_plan/evaluator/utils.ts | 36 ++++ 7 files changed, 228 insertions(+), 7 deletions(-) create mode 100644 packages/sync-rules/test/src/sync_plan/evaluator/evaluator.test.ts create mode 100644 packages/sync-rules/test/src/sync_plan/evaluator/utils.ts diff --git a/packages/sync-rules/src/index.ts b/packages/sync-rules/src/index.ts index a51bfc908..d12aafac3 100644 --- a/packages/sync-rules/src/index.ts +++ b/packages/sync-rules/src/index.ts @@ -33,3 +33,5 @@ export * from './HydratedSyncRules.js'; export * from './compiler/compiler.js'; export * from './sync_plan/plan.js'; export { serializeSyncPlan } from './sync_plan/serialize.js'; +export { addPrecompiledSyncPlanToRules } from './sync_plan/evaluator/index.js'; +export { nodeSqliteExpressionEngine } from './sync_plan/evaluator/scalar_expression_evaluator.js'; diff --git a/packages/sync-rules/src/sync_plan/evaluator/bucket_data_source.ts b/packages/sync-rules/src/sync_plan/evaluator/bucket_data_source.ts index b3bb9ae83..e36917040 100644 --- a/packages/sync-rules/src/sync_plan/evaluator/bucket_data_source.ts +++ b/packages/sync-rules/src/sync_plan/evaluator/bucket_data_source.ts @@ -82,6 +82,7 @@ class PreparedStreamDataSource { readonly tablePattern: TablePattern; private readonly outputs: ('star' | { index: number; alias: string })[] = []; private readonly numberOfOutputExpressions: number; + private readonly numberOfParameters: number; private readonly evaluator: ScalarExpressionEvaluator; private readonly evaluatorInputs: plan.ColumnSqlParameterValue[]; private readonly fixedOutputTableName?: string; @@ -103,6 +104,7 @@ class PreparedStreamDataSource { for (const parameter of evaluator.parameters) { outputExpressions.push(mapExpressions.transform(parameter.expr)); } + this.numberOfParameters = evaluator.parameters.length; this.evaluator = engine.prepareEvaluator({ outputs: outputExpressions, @@ -130,7 +132,7 @@ class PreparedStreamDataSource { } const id = idFromData(record); // source is [...outputs, ...partitionValues] - const partitionValues = source.splice(0, this.numberOfOutputExpressions); + const partitionValues = source.splice(this.numberOfOutputExpressions, this.numberOfParameters); for (const bucketParameter of partitionValues) { if (!isValidParameterValue(bucketParameter)) { diff --git a/packages/sync-rules/src/sync_plan/evaluator/parameter_index_lookup_creator.ts b/packages/sync-rules/src/sync_plan/evaluator/parameter_index_lookup_creator.ts index 11cef0d6f..02094fd1c 100644 --- a/packages/sync-rules/src/sync_plan/evaluator/parameter_index_lookup_creator.ts +++ b/packages/sync-rules/src/sync_plan/evaluator/parameter_index_lookup_creator.ts @@ -14,6 +14,7 @@ export class PreparedParameterIndexLookupCreator implements ParameterIndexLookup private readonly evaluator: ScalarExpressionEvaluator; private readonly evaluatorInputs: plan.ColumnSqlParameterValue[]; private readonly numberOfOutputs: number; + private readonly numberOfParameters: number; constructor( private readonly source: plan.StreamParameterIndexLookupCreator, @@ -27,6 +28,7 @@ export class PreparedParameterIndexLookupCreator implements ParameterIndexLookup for (const parameter of source.parameters) { expressions.push(mapExpressions.transform(parameter.expr)); } + this.numberOfParameters = source.parameters.length; this.evaluator = engine.prepareEvaluator({ outputs: expressions, @@ -61,7 +63,7 @@ export class PreparedParameterIndexLookupCreator implements ParameterIndexLookup } // source is [...outputs, ...partitionValues] - const partitionValues = outputRow.splice(0, this.numberOfOutputs); + const partitionValues = outputRow.splice(this.numberOfOutputs, this.numberOfParameters); const lookup = UnscopedParameterLookup.normalized(partitionValues); results.push({ lookup, bucketParameters: [outputs] }); } diff --git a/packages/sync-rules/src/sync_plan/evaluator/scalar_expression_evaluator.ts b/packages/sync-rules/src/sync_plan/evaluator/scalar_expression_evaluator.ts index 34a1c965e..e2f837475 100644 --- a/packages/sync-rules/src/sync_plan/evaluator/scalar_expression_evaluator.ts +++ b/packages/sync-rules/src/sync_plan/evaluator/scalar_expression_evaluator.ts @@ -142,23 +142,23 @@ export function mapExternalDataToInstantiation() { const requestToIndex = new Map(); const visitor = new MapSourceVisitor((data) => { - const oldLength = instantiation.length; + const indexIfAdded = instantiation.length + 1; if ('column' in data) { if (columnsToIndex.has(data)) { return columnsToIndex.get(data)!; } - columnsToIndex.set(data, oldLength); + columnsToIndex.set(data, indexIfAdded); } else { if (requestToIndex.has(data)) { return requestToIndex.get(data)!; } - requestToIndex.set(data, oldLength); + requestToIndex.set(data, indexIfAdded); } instantiation.push(data); - return oldLength; + return indexIfAdded; }); return { diff --git a/packages/sync-rules/test/src/compiler/utils.ts b/packages/sync-rules/test/src/compiler/utils.ts index 81b55457b..267f7865f 100644 --- a/packages/sync-rules/test/src/compiler/utils.ts +++ b/packages/sync-rules/test/src/compiler/utils.ts @@ -8,7 +8,7 @@ import { } from '../../../src/index.js'; // TODO: Replace with parsing from yaml once we support that -interface SyncStreamInput { +export interface SyncStreamInput { name: string; queries: string[]; ctes?: Record; diff --git a/packages/sync-rules/test/src/sync_plan/evaluator/evaluator.test.ts b/packages/sync-rules/test/src/sync_plan/evaluator/evaluator.test.ts new file mode 100644 index 000000000..afc3a4cae --- /dev/null +++ b/packages/sync-rules/test/src/sync_plan/evaluator/evaluator.test.ts @@ -0,0 +1,179 @@ +import { describe, expect } from 'vitest'; +import { syncTest } from './utils.js'; +import { + HydratedSyncRules, + ScopedParameterLookup, + SourceTableInterface, + SqliteRow, + SqliteValue +} from '../../../../src/index.js'; +import { TestSourceTable } from '../../util.js'; + +describe('evaluating rows', () => { + syncTest('emits rows', ({ sync }) => { + const desc = sync.prepareSyncStreams([{ name: 'stream', queries: ['SELECT * FROM users'] }]); + + expect( + desc.evaluateRow({ + sourceTable: USERS, + record: { + id: 'foo', + _double: 1, + _int: 1n, + _null: null, + _text: 'text', + _blob: new Uint8Array(10) // non-JSON columns should be removed + } + }) + ).toStrictEqual([ + { + bucket: 'stream|0[]', + id: 'foo', + data: { id: 'foo', _double: 1, _int: 1n, _null: null, _text: 'text' }, + table: 'users' + } + ]); + }); + + syncTest('forwards parameters', ({ sync }) => { + const desc = sync.prepareSyncStreams([ + { name: 'stream', queries: ["SELECT * FROM users WHERE value = subscription.parameter('p')"] } + ]); + + function evaluate(value: SqliteValue) { + const rows = desc.evaluateRow({ sourceTable: USERS, record: { id: 'foo', value } }); + if (rows.length == 0) { + return undefined; + } + + return rows[0].bucket; + } + + expect(evaluate(1)).toStrictEqual('stream|0[1]'); + expect(evaluate(1n)).toStrictEqual('stream|0[1]'); + expect(evaluate(1.1)).toStrictEqual('stream|0[1.1]'); + expect(evaluate('1')).toStrictEqual('stream|0["1"]'); + + // null is not equal to itself, so WHERE null = subscription.paraeter('p') should not match any rows. + expect(evaluate(null)).toStrictEqual(undefined); + + // We can't store binary values in bucket parameters + expect(evaluate(new Uint8Array(10))).toStrictEqual(undefined); + }); + + syncTest('output table name', ({ sync }) => { + const desc = sync.prepareSyncStreams([{ name: 'stream', queries: ['SELECT * FROM users u'] }]); + expect( + desc.evaluateRow({ + sourceTable: USERS, + record: { + id: 'foo' + } + }) + ).toStrictEqual([ + { + bucket: 'stream|0[]', + id: 'foo', + data: { id: 'foo' }, + table: 'u' + } + ]); + }); + + syncTest('wildcard with alias', ({ sync }) => { + const desc = sync.prepareSyncStreams([{ name: 'stream', queries: ['SELECT * FROM "%" output'] }]); + expect( + desc.evaluateRow({ + sourceTable: USERS, + record: { + id: 'foo' + } + }) + ).toStrictEqual([ + { + bucket: 'stream|0[]', + id: 'foo', + data: { id: 'foo' }, + table: 'output' + } + ]); + }); + + syncTest('wildcard without alias', ({ sync }) => { + const desc = sync.prepareSyncStreams([{ name: 'stream', queries: ['SELECT * FROM "%"'] }]); + expect( + desc.evaluateRow({ + sourceTable: USERS, + record: { + id: 'foo' + } + }) + ).toStrictEqual([ + { + bucket: 'stream|0[]', + id: 'foo', + data: { id: 'foo' }, + table: 'users' + } + ]); + }); + + syncTest('multiple tables in bucket', ({ sync }) => { + const desc = sync.prepareSyncStreams([ + { name: 'stream', queries: ['SELECT * FROM users', 'SELECT * FROM comments'] } + ]); + expect(evaluateBucketIds(desc, USERS, { id: 'foo' })).toStrictEqual(['stream|0[]']); + expect(evaluateBucketIds(desc, COMMENTS, { id: 'foo2' })).toStrictEqual(['stream|0[]']); + }); +}); + +describe('evaluating parameters', () => { + syncTest('emits parameters', ({ sync }) => { + const desc = sync.prepareSyncStreams([ + { + name: 'stream', + queries: ['SELECT * FROM comments WHERE issue_id IN (SELECT id FROM issues WHERE owner_id = auth.user_id())'] + } + ]); + + expect(desc.tableSyncsData(COMMENTS)).toBeTruthy(); + expect(desc.tableSyncsData(ISSUES)).toBeFalsy(); + expect(desc.tableSyncsParameters(ISSUES)).toBeTruthy(); + + expect(desc.evaluateParameterRow(ISSUES, { id: 'issue_id', owner_id: 'user1', name: 'name' })).toStrictEqual([ + { + lookup: ScopedParameterLookup.direct({ lookupName: 'lookup', queryId: '0' }, ['user1']), + bucketParameters: [ + { + '0': 'issue_id' + } + ] + } + ]); + }); + + syncTest('skips null and binary values', ({ sync }) => { + const desc = sync.prepareSyncStreams([ + { + name: 'stream', + queries: ['SELECT * FROM comments WHERE issue_id IN (SELECT id FROM issues WHERE owner_id = auth.user_id())'] + } + ]); + const blob = new Uint8Array(10); + + expect(desc.evaluateParameterRow(ISSUES, { id: 'issue_id', owner_id: 'user1' })).toHaveLength(1); + expect(desc.evaluateParameterRow(ISSUES, { id: 'issue_id', owner_id: null })).toHaveLength(0); + expect(desc.evaluateParameterRow(ISSUES, { id: null, owner_id: 'user1' })).toHaveLength(0); + + expect(desc.evaluateParameterRow(ISSUES, { id: 'issue_id', owner_id: blob })).toHaveLength(0); + expect(desc.evaluateParameterRow(ISSUES, { id: blob, owner_id: 'user1' })).toHaveLength(0); + }); +}); + +function evaluateBucketIds(source: HydratedSyncRules, sourceTable: SourceTableInterface, record: SqliteRow) { + return source.evaluateRow({ sourceTable, record }).map((r) => r.bucket); +} + +const USERS = new TestSourceTable('users'); +const COMMENTS = new TestSourceTable('comments'); +const ISSUES = new TestSourceTable('issues'); diff --git a/packages/sync-rules/test/src/sync_plan/evaluator/utils.ts b/packages/sync-rules/test/src/sync_plan/evaluator/utils.ts new file mode 100644 index 000000000..9a638fc7e --- /dev/null +++ b/packages/sync-rules/test/src/sync_plan/evaluator/utils.ts @@ -0,0 +1,36 @@ +import * as sqlite from 'node:sqlite'; + +import { + HydratedSyncRules, + SqlSyncRules, + versionedHydrationState, + nodeSqliteExpressionEngine, + addPrecompiledSyncPlanToRules +} from '../../../../src/index.js'; +import { compileToSyncPlanWithoutErrors, SyncStreamInput } from '../../compiler/utils.js'; +import { test } from 'vitest'; +import { ScalarExpressionEngine } from '../../../../src/sync_plan/evaluator/scalar_expression_evaluator.js'; + +interface SyncTest { + engine: ScalarExpressionEngine; + prepareSyncStreams(inputs: SyncStreamInput[]): HydratedSyncRules; +} + +export const syncTest = test.extend<{ sync: SyncTest }>({ + sync: async ({}, use) => { + const engine = nodeSqliteExpressionEngine(sqlite); + + await use({ + engine, + prepareSyncStreams: (inputs) => { + const plan = compileToSyncPlanWithoutErrors(inputs); + const rules = new SqlSyncRules(''); + + addPrecompiledSyncPlanToRules(plan, rules, { engine }); + return rules.hydrate({ hydrationState: versionedHydrationState(1) }); + } + }); + + engine.close(); + } +}); From 91ab2028bd861e1338f870d3ff25c7f6ddbf061a Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Tue, 27 Jan 2026 17:22:28 +0100 Subject: [PATCH 04/14] More engine tests --- packages/sync-rules/src/index.ts | 3 +- packages/sync-rules/src/sql_functions.ts | 2 +- .../src/sync_plan/engine/javascript.ts | 280 ++++++++++++++++++ .../scalar_expression_engine.ts} | 25 -- .../sync-rules/src/sync_plan/engine/sqlite.ts | 30 ++ .../sync_plan/evaluator/bucket_data_source.ts | 2 +- .../src/sync_plan/evaluator/bucket_source.ts | 2 +- .../src/sync_plan/evaluator/index.ts | 2 +- .../evaluator/parameter_evaluator.ts | 2 +- .../parameter_index_lookup_creator.ts | 2 +- .../sync-rules/src/sync_plan/expression.ts | 2 +- .../src/sync_plan/expression_to_sql.ts | 8 +- .../test/src/sync_plan/engine/engine.test.ts | 205 +++++++++++++ .../scalarStatementToSql.ts} | 23 +- .../test/src/sync_plan/evaluator/utils.ts | 2 +- 15 files changed, 534 insertions(+), 56 deletions(-) create mode 100644 packages/sync-rules/src/sync_plan/engine/javascript.ts rename packages/sync-rules/src/sync_plan/{evaluator/scalar_expression_evaluator.ts => engine/scalar_expression_engine.ts} (83%) create mode 100644 packages/sync-rules/src/sync_plan/engine/sqlite.ts create mode 100644 packages/sync-rules/test/src/sync_plan/engine/engine.test.ts rename packages/sync-rules/test/src/sync_plan/{evaluator/scalar_expression_evaluator.test.ts => engine/scalarStatementToSql.ts} (73%) diff --git a/packages/sync-rules/src/index.ts b/packages/sync-rules/src/index.ts index d12aafac3..fbc4b38cd 100644 --- a/packages/sync-rules/src/index.ts +++ b/packages/sync-rules/src/index.ts @@ -34,4 +34,5 @@ export * from './compiler/compiler.js'; export * from './sync_plan/plan.js'; export { serializeSyncPlan } from './sync_plan/serialize.js'; export { addPrecompiledSyncPlanToRules } from './sync_plan/evaluator/index.js'; -export { nodeSqliteExpressionEngine } from './sync_plan/evaluator/scalar_expression_evaluator.js'; +export { javaScriptExpressionEngine } from './sync_plan/engine/javascript.js'; +export { nodeSqliteExpressionEngine } from './sync_plan/engine/sqlite.js'; diff --git a/packages/sync-rules/src/sql_functions.ts b/packages/sync-rules/src/sql_functions.ts index 60692edb1..bbf89b662 100644 --- a/packages/sync-rules/src/sql_functions.ts +++ b/packages/sync-rules/src/sql_functions.ts @@ -1102,7 +1102,7 @@ const TYPE_ORDERING = { blob: 3 }; -function compare(a: SqliteValue, b: SqliteValue): number { +export function compare(a: SqliteValue, b: SqliteValue): number { // https://www.sqlite.org/datatype3.html#comparisons if (a == null && b == null) { // Only for IS / IS NOT diff --git a/packages/sync-rules/src/sync_plan/engine/javascript.ts b/packages/sync-rules/src/sync_plan/engine/javascript.ts new file mode 100644 index 000000000..88fa0611c --- /dev/null +++ b/packages/sync-rules/src/sync_plan/engine/javascript.ts @@ -0,0 +1,280 @@ +import { + compare, + CompatibilityContext, + generateSqlFunctions, + SQLITE_FALSE, + SQLITE_TRUE, + sqliteBool, + sqliteNot +} from '../../index.js'; +import { cast, evaluateOperator, SqlFunction } from '../../sql_functions.js'; +import { cartesianProduct } from '../../streams/utils.js'; +import { generateTableValuedFunctions } from '../../TableValuedFunctions.js'; +import { SqliteRow, SqliteValue } from '../../types.js'; +import { + ExternalData, + UnaryExpression, + BinaryExpression, + BetweenExpression, + ScalarInExpression, + CaseWhenExpression, + CastExpression, + ScalarFunctionCallExpression, + LiteralExpression, + SqlExpression +} from '../expression.js'; +import { ExpressionVisitor, visitExpr } from '../expression_visitor.js'; +import { + ScalarExpressionEngine, + ScalarExpressionEvaluator, + TableValuedFunction, + TableValuedFunctionOutput +} from './scalar_expression_engine.js'; + +/** + * Creates a {@link ScalarExpressionEngine} implemented by evaluating scalar expressions in JavaScript. + */ +export function javaScriptExpressionEngine(compatibility: CompatibilityContext): ScalarExpressionEngine { + const tableValued = generateTableValuedFunctions(compatibility); + const regularFunctions = generateSqlFunctions(compatibility); + const compiler = new ExpressionToJavaScriptFunction({ + named: regularFunctions.named, + jsonExtractJson: regularFunctions.operatorJsonExtractJson, + jsonExtractSql: regularFunctions.operatorJsonExtractSql + }); + + return { + close() {}, + prepareEvaluator({ outputs = [], filters = [], tableValuedFunctions = [] }): ScalarExpressionEvaluator { + function compileScalar(expr: SqlExpression) { + return compiler.compile(expr); + } + + const resolvedTableValuedFunctions = tableValuedFunctions.map((fn) => { + const found = tableValued[fn.name]; + if (found == null) { + throw new Error(`Unknown table-valued function: ${fn.name}`); + } + + const inputs = fn.inputs.map(compileScalar); + return { + original: fn, + evaluate: (input: PendingStatementEvaluation) => { + return found.call(inputs.map((f) => f(input))); + } + }; + }); + + const columns = outputs.map(compileScalar); + const compiledFilters = filters.map(compileScalar); + + return { + evaluate(inputs) { + // First, evaluate table-valued functions (if any). + const perFunctionResults: [TableValuedFunction, SqliteRow][][] = []; + + for (const { original, evaluate } of resolvedTableValuedFunctions) { + perFunctionResults.push( + evaluate({ inputs }).map((row) => [original, row] satisfies [TableValuedFunction, SqliteRow]) + ); + } + + const rows: SqliteValue[][] = []; + // We're doing an inner join on all table-valued functions, which we implement as a cross join on which each + // filter is evaluated. Having more than one table-valued function per statement would be very rare in + // practice. + row: for (const sourceRow of cartesianProduct(...perFunctionResults)) { + const byFunction = new Map(); + for (const [fn, output] of sourceRow) { + byFunction.set(fn, output); + } + + const input: PendingStatementEvaluation = { inputs, row: byFunction }; + + for (const filter of compiledFilters) { + if (!sqliteBool(filter(input))) { + continue row; + } + } + rows.push(columns.map((c) => c(input))); + } + + return rows; + } + }; + } + }; +} + +interface PendingStatementEvaluation { + inputs: SqliteValue[]; + row?: Map; +} + +type ExpressionImplementation = (input: PendingStatementEvaluation) => SqliteValue; + +interface KnownFunctions { + named: Record; + jsonExtractJson: SqlFunction; // -> operator + jsonExtractSql: SqlFunction; // ->> operator +} + +class ExpressionToJavaScriptFunction + implements ExpressionVisitor +{ + constructor(readonly functions: KnownFunctions) {} + + compile(expr: SqlExpression): ExpressionImplementation { + return visitExpr(this, expr, null); + } + + visitExternalData(expr: ExternalData, arg: null): ExpressionImplementation { + if (typeof expr.source === 'number') { + const index = expr.source; + // -1 because variables in SQLite are 1-indexed. + return ({ inputs }) => inputs[index - 1]; + } else { + const { column, function: fn } = expr.source; + + return ({ row }) => { + const result = row!.get(fn)!; + return result[column]; + }; + } + } + + visitUnaryExpression(expr: UnaryExpression): ExpressionImplementation { + const operand = this.compile(expr.operand); + + switch (expr.operator) { + case '+': + return operand; + case 'not': + return (input) => sqliteNot(operand(input)); + case '~': + case '-': + throw new Error(`unary operator not supported: ${expr.operator}`); + } + } + + visitBinaryExpression(expr: BinaryExpression): ExpressionImplementation { + const left = this.compile(expr.left); + const right = this.compile(expr.right); + const operator = expr.operator.toUpperCase(); + + return (input) => evaluateOperator(operator, left(input), right(input)); + } + + visitBetweenExpression(expr: BetweenExpression): ExpressionImplementation { + const low = this.compile(expr.low); + const high = this.compile(expr.high); + const value = this.compile(expr.value); + + return (input) => { + const evaluatedValue = value(input); + + return ( + sqliteBool(evaluateOperator('>=', evaluatedValue, low(input))) && + sqliteBool(evaluateOperator('<=', evaluatedValue, high(input))) + ); + }; + } + + visitScalarInExpression(expr: ScalarInExpression): ExpressionImplementation { + const target = this.compile(expr.target); + const inQuery = expr.in.map((q) => this.compile(q)); + + return (input) => { + const evaluatedTarget = target(input); + if (evaluatedTarget == null) { + return null; + } + + let hasNullQuery = false; + for (const q of inQuery) { + const evaluated = q(input); + if (evaluated == null) { + hasNullQuery = true; + continue; + } + + if (compare(evaluatedTarget, evaluated) == 0) { + return SQLITE_TRUE; + } + } + + return hasNullQuery ? null : SQLITE_FALSE; + }; + } + + visitCaseWhenExpression(expr: CaseWhenExpression): ExpressionImplementation { + const compiledWhens = expr.whens.map((w) => ({ when: this.compile(w.when), then: this.compile(w.then) })); + const compiledElse = expr.else && this.compile(expr.else); + + if (expr.operand) { + const operand = this.compile(expr.operand); + return (input) => { + const evaluatedOperand = operand(input); + + for (const { when, then } of compiledWhens) { + if (evaluateOperator('=', evaluatedOperand, when(input))) { + return then(input); + } + } + + return compiledElse ? compiledElse(input) : null; + }; + } else { + return (input) => { + for (const { when, then } of compiledWhens) { + if (sqliteBool(when(input))) { + return then(input); + } + } + + return compiledElse ? compiledElse(input) : null; + }; + } + } + + visitCastExpression(expr: CastExpression): ExpressionImplementation { + const operand = this.compile(expr.operand); + return (input) => { + return cast(operand(input), expr.cast_as); + }; + } + + visitScalarFunctionCallExpression( + expr: ScalarFunctionCallExpression + ): ExpressionImplementation { + let fnImpl: SqlFunction; + if (expr.function === '->') { + fnImpl = this.functions.jsonExtractJson; + } else if (expr.function === '->>') { + fnImpl = this.functions.jsonExtractSql; + } else { + fnImpl = this.functions.named[expr.function]; + if (!fnImpl) { + throw new Error(`Function not implemented: ${expr.function}`); + } + } + + const args = expr.parameters.map((p) => this.compile(p)); + return (input) => { + return fnImpl.call(...args.map((f) => f(input))); + }; + } + + visitLiteralExpression(expr: LiteralExpression): ExpressionImplementation { + switch (expr.type) { + case 'lit_null': + return () => null; + case 'lit_double': + return () => expr.value; + case 'lit_int': + return () => BigInt(expr.base10); + case 'lit_string': + return () => expr.value; + } + } +} diff --git a/packages/sync-rules/src/sync_plan/evaluator/scalar_expression_evaluator.ts b/packages/sync-rules/src/sync_plan/engine/scalar_expression_engine.ts similarity index 83% rename from packages/sync-rules/src/sync_plan/evaluator/scalar_expression_evaluator.ts rename to packages/sync-rules/src/sync_plan/engine/scalar_expression_engine.ts index e2f837475..e8c6bd740 100644 --- a/packages/sync-rules/src/sync_plan/evaluator/scalar_expression_evaluator.ts +++ b/packages/sync-rules/src/sync_plan/engine/scalar_expression_engine.ts @@ -44,31 +44,6 @@ export interface ScalarExpressionEvaluator { evaluate(inputs: SqliteValue[]): SqliteValue[][]; } -/** - * Creates a {@link ScalarExpressionEngine} backed by an in-memory SQLite database using `node:sqlite` APIs. - * - * @param module The imported `node:sqlite` module (passed as a parameter to ensure this package keeps working in - * browsers). - */ -export function nodeSqliteExpressionEngine(module: typeof import('node:sqlite')): ScalarExpressionEngine { - const db = new module.DatabaseSync(':memory:', { readOnly: true, readBigInts: true, returnArrays: true } as any); - - return { - prepareEvaluator(input): ScalarExpressionEvaluator { - const stmt = db.prepare(scalarStatementToSql(input)); - return { - evaluate(inputs) { - // Types are wrong, all() will return a SqliteValue[][] because returnArrays is enabled. - return stmt.all(...inputs) as unknown as SqliteValue[][]; - } - }; - }, - close() { - db.close(); - } - }; -} - export function scalarStatementToSql({ filters = [], outputs = [], tableValuedFunctions = [] }: ScalarStatement) { const tableValuedFunctionNames = new Map(); for (const fn of tableValuedFunctions) { diff --git a/packages/sync-rules/src/sync_plan/engine/sqlite.ts b/packages/sync-rules/src/sync_plan/engine/sqlite.ts new file mode 100644 index 000000000..ee55ac832 --- /dev/null +++ b/packages/sync-rules/src/sync_plan/engine/sqlite.ts @@ -0,0 +1,30 @@ +import { SqliteValue } from '../../types.js'; +import { ScalarExpressionEngine, ScalarExpressionEvaluator, scalarStatementToSql } from './scalar_expression_engine.js'; + +/** + * Creates a {@link ScalarExpressionEngine} backed by an in-memory SQLite database using `node:sqlite` APIs. + * + * @param module The imported `node:sqlite` module (passed as a parameter to ensure this package keeps working in + * browsers). + * + * @experimental This engine is not drop-in compatible with the JS operator implementations. It also doesn't support + * legacy JSON behavior. So we can only use this engine when a new compatibility option is enabled. + */ +export function nodeSqliteExpressionEngine(module: typeof import('node:sqlite')): ScalarExpressionEngine { + const db = new module.DatabaseSync(':memory:', { readOnly: true, readBigInts: true, returnArrays: true } as any); + + return { + prepareEvaluator(input): ScalarExpressionEvaluator { + const stmt = db.prepare(scalarStatementToSql(input)); + return { + evaluate(inputs) { + // Types are wrong, all() will return a SqliteValue[][] because returnArrays is enabled. + return stmt.all(...inputs) as unknown as SqliteValue[][]; + } + }; + }, + close() { + db.close(); + } + }; +} diff --git a/packages/sync-rules/src/sync_plan/evaluator/bucket_data_source.ts b/packages/sync-rules/src/sync_plan/evaluator/bucket_data_source.ts index e36917040..e2fdff51b 100644 --- a/packages/sync-rules/src/sync_plan/evaluator/bucket_data_source.ts +++ b/packages/sync-rules/src/sync_plan/evaluator/bucket_data_source.ts @@ -14,7 +14,7 @@ import { SqlExpression } from '../expression.js'; import { ExpressionToSqlite } from '../expression_to_sql.js'; import * as plan from '../plan.js'; import { StreamEvaluationContext } from './index.js'; -import { mapExternalDataToInstantiation, ScalarExpressionEvaluator } from './scalar_expression_evaluator.js'; +import { mapExternalDataToInstantiation, ScalarExpressionEvaluator } from '../engine/scalar_expression_engine.js'; export class PreparedStreamBucketDataSource implements BucketDataSource { private readonly sourceTables = new Set(); diff --git a/packages/sync-rules/src/sync_plan/evaluator/bucket_source.ts b/packages/sync-rules/src/sync_plan/evaluator/bucket_source.ts index ecd576a68..40bc2f164 100644 --- a/packages/sync-rules/src/sync_plan/evaluator/bucket_source.ts +++ b/packages/sync-rules/src/sync_plan/evaluator/bucket_source.ts @@ -8,7 +8,7 @@ import { } from '../../BucketSource.js'; import { StreamEvaluationContext } from './index.js'; import * as plan from '../plan.js'; -import { mapExternalDataToInstantiation, ScalarExpressionEngine } from './scalar_expression_evaluator.js'; +import { mapExternalDataToInstantiation, ScalarExpressionEngine } from '../engine/scalar_expression_engine.js'; import { SqlExpression } from '../expression.js'; import { RequestParameters, SqliteParameterValue } from '../../types.js'; import { parametersForRequest, RequestParameterEvaluators } from './parameter_evaluator.js'; diff --git a/packages/sync-rules/src/sync_plan/evaluator/index.ts b/packages/sync-rules/src/sync_plan/evaluator/index.ts index 92edd3672..dd3951e7c 100644 --- a/packages/sync-rules/src/sync_plan/evaluator/index.ts +++ b/packages/sync-rules/src/sync_plan/evaluator/index.ts @@ -3,7 +3,7 @@ import * as plan from '../plan.js'; import { PreparedStreamBucketDataSource } from './bucket_data_source.js'; import { PreparedParameterIndexLookupCreator } from './parameter_index_lookup_creator.js'; import { StreamBucketSource, StreamInput } from './bucket_source.js'; -import { ScalarExpressionEngine } from './scalar_expression_evaluator.js'; +import { ScalarExpressionEngine } from '../engine/scalar_expression_engine.js'; export interface StreamEvaluationContext { engine: ScalarExpressionEngine; diff --git a/packages/sync-rules/src/sync_plan/evaluator/parameter_evaluator.ts b/packages/sync-rules/src/sync_plan/evaluator/parameter_evaluator.ts index c42bb30b6..6d544dca2 100644 --- a/packages/sync-rules/src/sync_plan/evaluator/parameter_evaluator.ts +++ b/packages/sync-rules/src/sync_plan/evaluator/parameter_evaluator.ts @@ -11,7 +11,7 @@ import { mapExternalDataToInstantiation, TableValuedFunction, TableValuedFunctionOutput -} from './scalar_expression_evaluator.js'; +} from '../engine/scalar_expression_engine.js'; /** * Finds bucket parameters for a given request or subscription. diff --git a/packages/sync-rules/src/sync_plan/evaluator/parameter_index_lookup_creator.ts b/packages/sync-rules/src/sync_plan/evaluator/parameter_index_lookup_creator.ts index 02094fd1c..a0a829ef5 100644 --- a/packages/sync-rules/src/sync_plan/evaluator/parameter_index_lookup_creator.ts +++ b/packages/sync-rules/src/sync_plan/evaluator/parameter_index_lookup_creator.ts @@ -1,6 +1,6 @@ import { ParameterIndexLookupCreator } from '../../BucketSource.js'; import { ParameterLookupScope } from '../../HydrationState.js'; -import { mapExternalDataToInstantiation, ScalarExpressionEvaluator } from './scalar_expression_evaluator.js'; +import { mapExternalDataToInstantiation, ScalarExpressionEvaluator } from '../engine/scalar_expression_engine.js'; import * as plan from '../plan.js'; import { StreamEvaluationContext } from './index.js'; import { TablePattern } from '../../TablePattern.js'; diff --git a/packages/sync-rules/src/sync_plan/expression.ts b/packages/sync-rules/src/sync_plan/expression.ts index 0100af295..755714c76 100644 --- a/packages/sync-rules/src/sync_plan/expression.ts +++ b/packages/sync-rules/src/sync_plan/expression.ts @@ -101,7 +101,7 @@ export type CaseWhenExpression = { export type CastExpression = { type: 'cast'; operand: SqlExpression; - cast_as: 'string' | 'numeric' | 'real' | 'integer' | 'blob'; + cast_as: 'text' | 'numeric' | 'real' | 'integer' | 'blob'; }; /** diff --git a/packages/sync-rules/src/sync_plan/expression_to_sql.ts b/packages/sync-rules/src/sync_plan/expression_to_sql.ts index 123c59212..2b432c26e 100644 --- a/packages/sync-rules/src/sync_plan/expression_to_sql.ts +++ b/packages/sync-rules/src/sync_plan/expression_to_sql.ts @@ -162,7 +162,13 @@ export class ExpressionToSqlite implements ExpressionVisitor { + defineEngineTests(false, () => nodeSqliteExpressionEngine(sqlite)); +}); + +describe('javascript', () => { + defineEngineTests(true, () => + javaScriptExpressionEngine(new CompatibilityContext({ edition: CompatibilityEdition.SYNC_STREAMS })) + ); +}); + +function defineEngineTests(isJavaScript: boolean, createEngine: () => ScalarExpressionEngine) { + function prepare(stmt: ScalarStatement) { + const engine = createEngine(); + onTestFinished(() => engine.close()); + return engine.prepareEvaluator(stmt); + } + + function expectBinary(left: SqliteValue, op: BinaryOperator, right: SqliteValue, output: SqliteValue) { + const stmt = prepare({ + outputs: [ + { + type: 'binary', + left: { type: 'data', source: 1 }, + right: { type: 'data', source: 2 }, + operator: op + } + ] + }); + + expect(stmt.evaluate([left, right])).toStrictEqual([[output]]); + } + + test('literal null', () => { + expect(prepare({ outputs: [{ type: 'lit_null' }] }).evaluate([])).toStrictEqual([[null]]); + }); + + test('literal double', () => { + expect(prepare({ outputs: [{ type: 'lit_double', value: 3 }] }).evaluate([])).toStrictEqual([[3]]); + }); + + test('literal int', () => { + expect(prepare({ outputs: [{ type: 'lit_int', base10: '3' }] }).evaluate([])).toStrictEqual([[3n]]); + }); + + test('literal string', () => { + expect(prepare({ outputs: [{ type: 'lit_string', value: 'hello world' }] }).evaluate([])).toStrictEqual([ + ['hello world'] + ]); + }); + + test('length(?)', () => { + const stmt = prepare({ + outputs: [{ type: 'function', function: 'length', parameters: [{ type: 'data', source: 1 }] }] + }); + + expect(stmt.evaluate(['foo'])).toStrictEqual([[3n]]); + }); + + test('cast', () => { + const stmt = prepare({ + outputs: [{ type: 'cast', operand: { type: 'data', source: 1 }, cast_as: 'text' }] + }); + + expect(stmt.evaluate(['foo'])).toStrictEqual([['foo']]); + expect(stmt.evaluate([3])).toStrictEqual([[isJavaScript ? '3' : '3.0']]); + }); + + test('unary not', () => { + const stmt = prepare({ outputs: [{ type: 'unary', operator: 'not', operand: { type: 'data', source: 1 } }] }); + + expect(stmt.evaluate([1n])).toStrictEqual([[0n]]); + expect(stmt.evaluate([0n])).toStrictEqual([[1n]]); + }); + + test('binary', () => { + expectBinary(1n, '=', 1, 1n); + expectBinary(3, '+', 4, 7); + expectBinary(4, 'is', null, 0n); + expectBinary(null, 'is', null, 1n); + + expectBinary(1, 'or', null, 1n); + expectBinary(1, 'or', 0n, 1n); + expectBinary(1, 'and', 0n, 0n); + }); + + test('?1 between ?2 and ?3', () => { + const stmt = prepare({ + outputs: [ + { + type: 'between', + value: { type: 'data', source: 1 }, + low: { type: 'data', source: 2 }, + high: { type: 'data', source: 3 } + } + ] + }); + + expect(stmt.evaluate([1, 0, 2])).toStrictEqual([[1n]]); + expect(stmt.evaluate([1, 2, 3])).toStrictEqual([[0n]]); + expect(stmt.evaluate([4, 2, 3])).toStrictEqual([[0n]]); + expect(stmt.evaluate([1, 1, 1])).toStrictEqual([[1n]]); + }); + + test('scalar in', () => { + // SELECT ?1 IN (?2, ?3) + const stmt = prepare({ + outputs: [ + { + type: 'scalar_in', + target: { type: 'data', source: 1 }, + in: [ + { type: 'data', source: 2 }, + { type: 'data', source: 3 } + ] + } + ] + }); + + expect(stmt.evaluate([1, 2, 3])).toStrictEqual([[0n]]); + expect(stmt.evaluate([2, 2, 3])).toStrictEqual([[1n]]); + expect(stmt.evaluate([3, 2, 3])).toStrictEqual([[1n]]); + + expect(stmt.evaluate([null, 2, 3])).toStrictEqual([[null]]); + expect(stmt.evaluate([1, null, 3])).toStrictEqual([[null]]); + expect(stmt.evaluate([1, null, null])).toStrictEqual([[null]]); + }); + + test('when without operand', () => { + // SELECT CASE WHEN ?1 THEN 'a' WHEN ?2 THEN 'b' ELSE 'c' END + const stmt = prepare({ + outputs: [ + { + type: 'case_when', + whens: [ + { when: { type: 'data', source: 1 }, then: { type: 'lit_string', value: 'a' } }, + { when: { type: 'data', source: 2 }, then: { type: 'lit_string', value: 'b' } } + ], + else: { type: 'lit_string', value: 'c' } + } + ] + }); + + expect(stmt.evaluate([1, 0])).toStrictEqual([['a']]); + expect(stmt.evaluate([0, 1])).toStrictEqual([['b']]); + expect(stmt.evaluate([0, 0])).toStrictEqual([['c']]); + expect(stmt.evaluate([null, 'foo'])).toStrictEqual([['c']]); + }); + + test('when with operand', () => { + // SELECT CASE ?1 WHEN ?2 THEN 'match' ELSE 'else' END + const stmt = prepare({ + outputs: [ + { + type: 'case_when', + operand: { type: 'data', source: 1 }, + whens: [{ when: { type: 'data', source: 2 }, then: { type: 'lit_string', value: 'match' } }], + else: { type: 'lit_string', value: 'else' } + } + ] + }); + + expect(stmt.evaluate([1, 1])).toStrictEqual([['match']]); + expect(stmt.evaluate(['foo', 'foo'])).toStrictEqual([['match']]); + expect(stmt.evaluate(['foo', 'bar'])).toStrictEqual([['else']]); + expect(stmt.evaluate([null, null])).toStrictEqual([['else']]); + }); + + test('SELECT value FROM json_each(?)', () => { + const fn: TableValuedFunction = { + name: 'json_each', + inputs: [{ type: 'data', source: 1 }] + }; + + const stmt = prepare({ + outputs: [{ type: 'data', source: { function: fn, column: 'value' } }], + tableValuedFunctions: [fn] + }); + expect(stmt.evaluate([JSON.stringify([1, 'two', 3.2])])).toStrictEqual([ + [ + // The JavaScript json_each implementation uses JSON.parse. It probably should be using JSONBig, but it's too + // late to fix that now. + isJavaScript ? 1 : 1n + ], + ['two'], + [3.2] + ]); + }); +} diff --git a/packages/sync-rules/test/src/sync_plan/evaluator/scalar_expression_evaluator.test.ts b/packages/sync-rules/test/src/sync_plan/engine/scalarStatementToSql.ts similarity index 73% rename from packages/sync-rules/test/src/sync_plan/evaluator/scalar_expression_evaluator.test.ts rename to packages/sync-rules/test/src/sync_plan/engine/scalarStatementToSql.ts index 6d41688a5..c5d5fdad9 100644 --- a/packages/sync-rules/test/src/sync_plan/evaluator/scalar_expression_evaluator.test.ts +++ b/packages/sync-rules/test/src/sync_plan/engine/scalarStatementToSql.ts @@ -1,10 +1,8 @@ -import * as sqlite from 'node:sqlite'; -import { describe, expect, onTestFinished, test } from 'vitest'; +import { describe, expect, test } from 'vitest'; import { - nodeSqliteExpressionEngine, scalarStatementToSql, TableValuedFunction -} from '../../../../src/sync_plan/evaluator/scalar_expression_evaluator.js'; +} from '../../../../src/sync_plan/engine/scalar_expression_engine.js'; describe('scalarStatementToSql', () => { test('empty', () => { @@ -92,20 +90,3 @@ describe('scalarStatementToSql', () => { ).toStrictEqual(`SELECT "tbl_0"."value" FROM "json_each"(?1) AS "tbl_0" WHERE "foo"("tbl_0"."key")`); }); }); - -test('nodeSqliteExpressionEngine', () => { - const engine = nodeSqliteExpressionEngine(sqlite); - onTestFinished(() => engine.close()); - - const fn: TableValuedFunction = { - name: 'json_each', - inputs: [{ type: 'data', source: 1 }] - }; - - const stmt = engine.prepareEvaluator({ - outputs: [{ type: 'data', source: { function: fn, column: 'value' } }], - tableValuedFunctions: [fn] - }); - - expect(stmt.evaluate([JSON.stringify([1, 'two', 3.2])])).toStrictEqual([[1n], ['two'], [3.2]]); -}); diff --git a/packages/sync-rules/test/src/sync_plan/evaluator/utils.ts b/packages/sync-rules/test/src/sync_plan/evaluator/utils.ts index 9a638fc7e..75c6b562c 100644 --- a/packages/sync-rules/test/src/sync_plan/evaluator/utils.ts +++ b/packages/sync-rules/test/src/sync_plan/evaluator/utils.ts @@ -9,7 +9,7 @@ import { } from '../../../../src/index.js'; import { compileToSyncPlanWithoutErrors, SyncStreamInput } from '../../compiler/utils.js'; import { test } from 'vitest'; -import { ScalarExpressionEngine } from '../../../../src/sync_plan/evaluator/scalar_expression_evaluator.js'; +import { ScalarExpressionEngine } from '../../../../src/sync_plan/engine/scalar_expression_engine.js'; interface SyncTest { engine: ScalarExpressionEngine; From d03463d08ee3332c9078d2f05d4a197150c43043 Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Tue, 27 Jan 2026 17:35:32 +0100 Subject: [PATCH 05/14] Back to es2024 --- .../sync-rules/src/sync_plan/evaluator/bucket_source.ts | 2 +- .../src/sync_plan/evaluator/parameter_evaluator.ts | 9 +++++---- tsconfig.base.json | 4 ++-- 3 files changed, 8 insertions(+), 7 deletions(-) diff --git a/packages/sync-rules/src/sync_plan/evaluator/bucket_source.ts b/packages/sync-rules/src/sync_plan/evaluator/bucket_source.ts index 40bc2f164..c801249c5 100644 --- a/packages/sync-rules/src/sync_plan/evaluator/bucket_source.ts +++ b/packages/sync-rules/src/sync_plan/evaluator/bucket_source.ts @@ -161,7 +161,7 @@ class PreparedQuerier { request: params }); - return [...instantiation.map(parametersToBucket)]; + return [...instantiation].map(parametersToBucket); } }); } diff --git a/packages/sync-rules/src/sync_plan/evaluator/parameter_evaluator.ts b/packages/sync-rules/src/sync_plan/evaluator/parameter_evaluator.ts index 6d544dca2..593e79280 100644 --- a/packages/sync-rules/src/sync_plan/evaluator/parameter_evaluator.ts +++ b/packages/sync-rules/src/sync_plan/evaluator/parameter_evaluator.ts @@ -270,7 +270,8 @@ class PartialInstantiator; } if (intersection.size == 0) { @@ -339,11 +340,11 @@ class FullInstantiator extends PartialInstantiator { const scope = this.input.hydrationState.getParameterIndexLookupScope(lookup.lookup); lookup.instantiation; - const outputs = await this.input.source.getParameterSets([ - ...this.resolveInputs(lookup.instantiation).map((instantiation) => + const outputs = await this.input.source.getParameterSets( + [...this.resolveInputs(lookup.instantiation)].map((instantiation) => ScopedParameterLookup.normalized(scope, UnscopedParameterLookup.normalized(instantiation)) ) - ]); + ); // Stream parameters generate an output row like {0: , 1: , ...}. const values = outputs.map((row) => { diff --git a/tsconfig.base.json b/tsconfig.base.json index 8fca2db15..b2bd00df4 100644 --- a/tsconfig.base.json +++ b/tsconfig.base.json @@ -1,7 +1,7 @@ { "compilerOptions": { - "lib": ["ESNext"], - "target": "esnext", + "lib": ["ES2024"], + "target": "ES2024", "module": "NodeNext", "moduleResolution": "NodeNext", "strict": true, From 003ebb6b60547d038f59cf1e8a91423b2662210a Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Tue, 27 Jan 2026 18:04:00 +0100 Subject: [PATCH 06/14] Add querier test --- .../evaluator/parameter_evaluator.ts | 3 +- .../src/sync_plan/evaluator/evaluator.test.ts | 98 +++++++++++++++++++ .../test/src/sync_plan/evaluator/utils.ts | 10 +- 3 files changed, 104 insertions(+), 7 deletions(-) diff --git a/packages/sync-rules/src/sync_plan/evaluator/parameter_evaluator.ts b/packages/sync-rules/src/sync_plan/evaluator/parameter_evaluator.ts index 593e79280..ae72a345a 100644 --- a/packages/sync-rules/src/sync_plan/evaluator/parameter_evaluator.ts +++ b/packages/sync-rules/src/sync_plan/evaluator/parameter_evaluator.ts @@ -198,6 +198,7 @@ export class RequestParameterEvaluators { for (const stage of lookupStages) { const stageIndex = mappedStages.length; const mappedStage: PreparedExpandingLookup[] = []; + mappedStages.push(mappedStage); for (const lookup of stage) { const index = mappedStage.length; @@ -338,8 +339,6 @@ class FullInstantiator extends PartialInstantiator { const lookup = this.evaluators.lookupStages[stage][index]; if (lookup.type == 'parameter') { const scope = this.input.hydrationState.getParameterIndexLookupScope(lookup.lookup); - lookup.instantiation; - const outputs = await this.input.source.getParameterSets( [...this.resolveInputs(lookup.instantiation)].map((instantiation) => ScopedParameterLookup.normalized(scope, UnscopedParameterLookup.normalized(instantiation)) diff --git a/packages/sync-rules/test/src/sync_plan/evaluator/evaluator.test.ts b/packages/sync-rules/test/src/sync_plan/evaluator/evaluator.test.ts index afc3a4cae..678764575 100644 --- a/packages/sync-rules/test/src/sync_plan/evaluator/evaluator.test.ts +++ b/packages/sync-rules/test/src/sync_plan/evaluator/evaluator.test.ts @@ -2,8 +2,10 @@ import { describe, expect } from 'vitest'; import { syncTest } from './utils.js'; import { HydratedSyncRules, + RequestParameters, ScopedParameterLookup, SourceTableInterface, + SqliteJsonRow, SqliteRow, SqliteValue } from '../../../../src/index.js'; @@ -170,6 +172,102 @@ describe('evaluating parameters', () => { }); }); +describe('querier', () => { + syncTest('static', ({ sync }) => { + const desc = sync.prepareSyncStreams([ + { + name: 'stream', + queries: ['SELECT * FROM issues WHERE is_public'] + } + ]); + + const { querier } = desc.getBucketParameterQuerier({ + globalParameters: new RequestParameters({ sub: 'user' }, {}), + hasDefaultStreams: true, + streams: {} + }); + + expect(querier.staticBuckets.map((e) => e.bucket)).toStrictEqual(['stream|0[]']); + }); + + syncTest('request data', ({ sync }) => { + const desc = sync.prepareSyncStreams([ + { + name: 'stream', + queries: ['SELECT * FROM issues WHERE owner = auth.user_id()'] + } + ]); + + const { querier, errors } = desc.getBucketParameterQuerier({ + globalParameters: new RequestParameters({ sub: 'user' }, {}), + hasDefaultStreams: true, + streams: {} + }); + expect(errors).toStrictEqual([]); + + expect(querier.staticBuckets.map((e) => e.bucket)).toStrictEqual(['stream|0["user"]']); + }); + + syncTest('parameter lookups', async ({ sync }) => { + const desc = sync.prepareSyncStreams([ + { + name: 'stream', + queries: [ + `SELECT c.* FROM comments c + INNER JOIN issues i ON c.issue = i.id + INNER JOIN users owner ON owner.name = i.owned_by + WHERE owner.id = auth.user_id() + ` + ] + } + ]); + + const { querier, errors } = desc.getBucketParameterQuerier({ + globalParameters: new RequestParameters({ sub: 'user' }, {}), + hasDefaultStreams: true, + streams: {} + }); + expect(errors).toStrictEqual([]); + + expect(querier.staticBuckets.map((e) => e.bucket)).toStrictEqual([]); + let call = 0; + const buckets = await querier.queryDynamicBucketDescriptions({ + getParameterSets: async function (lookups: ScopedParameterLookup[]): Promise { + if (call == 0) { + // First call. Lookup from users.id => users.name + call++; + expect(lookups).toStrictEqual([ + ScopedParameterLookup.direct( + { + lookupName: 'lookup', + queryId: '0' + }, + ['user'] + ) + ]); + return [{ '0': 'name' }]; + } else if (call == 1) { + // Second call. Lookup from issues.owned_by => issues.id + call++; + expect(lookups).toStrictEqual([ + ScopedParameterLookup.direct( + { + lookupName: 'lookup', + queryId: '1' + }, + ['name'] + ) + ]); + return [{ '0': 'issue' }]; + } + + throw new Error('Function not implemented.'); + } + }); + expect(buckets.map((b) => b.bucket)).toStrictEqual(['stream|0["issue"]']); + }); +}); + function evaluateBucketIds(source: HydratedSyncRules, sourceTable: SourceTableInterface, record: SqliteRow) { return source.evaluateRow({ sourceTable, record }).map((r) => r.bucket); } diff --git a/packages/sync-rules/test/src/sync_plan/evaluator/utils.ts b/packages/sync-rules/test/src/sync_plan/evaluator/utils.ts index 75c6b562c..7dac35a3f 100644 --- a/packages/sync-rules/test/src/sync_plan/evaluator/utils.ts +++ b/packages/sync-rules/test/src/sync_plan/evaluator/utils.ts @@ -1,11 +1,11 @@ -import * as sqlite from 'node:sqlite'; - import { HydratedSyncRules, SqlSyncRules, versionedHydrationState, - nodeSqliteExpressionEngine, - addPrecompiledSyncPlanToRules + addPrecompiledSyncPlanToRules, + javaScriptExpressionEngine, + CompatibilityContext, + CompatibilityEdition } from '../../../../src/index.js'; import { compileToSyncPlanWithoutErrors, SyncStreamInput } from '../../compiler/utils.js'; import { test } from 'vitest'; @@ -18,7 +18,7 @@ interface SyncTest { export const syncTest = test.extend<{ sync: SyncTest }>({ sync: async ({}, use) => { - const engine = nodeSqliteExpressionEngine(sqlite); + const engine = javaScriptExpressionEngine(new CompatibilityContext({ edition: CompatibilityEdition.SYNC_STREAMS })); await use({ engine, From 7061397fa16ec727a004131671abd9d5ce0b7711 Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Tue, 27 Jan 2026 18:23:51 +0100 Subject: [PATCH 07/14] One more test --- .../engine/scalar_expression_engine.ts | 7 +++ .../test/src/sync_plan/engine/engine.test.ts | 45 +++++++++++++++++++ 2 files changed, 52 insertions(+) diff --git a/packages/sync-rules/src/sync_plan/engine/scalar_expression_engine.ts b/packages/sync-rules/src/sync_plan/engine/scalar_expression_engine.ts index e8c6bd740..44990a47b 100644 --- a/packages/sync-rules/src/sync_plan/engine/scalar_expression_engine.ts +++ b/packages/sync-rules/src/sync_plan/engine/scalar_expression_engine.ts @@ -65,11 +65,18 @@ export function scalarStatementToSql({ filters = [], outputs = [], tableValuedFu if (tableValuedFunctionNames.size != 0) { toSqlite.addLexeme('FROM'); + let first = true; tableValuedFunctionNames.forEach((name, fn) => { + if (!first) { + toSqlite.comma(); + } + visitExpr(toSqlite, { type: 'function', function: fn.name, parameters: fn.inputs }, null); toSqlite.addLexeme('AS'); toSqlite.identifier(name); + + first = false; }); } diff --git a/packages/sync-rules/test/src/sync_plan/engine/engine.test.ts b/packages/sync-rules/test/src/sync_plan/engine/engine.test.ts index 169f0e96f..4eea0515f 100644 --- a/packages/sync-rules/test/src/sync_plan/engine/engine.test.ts +++ b/packages/sync-rules/test/src/sync_plan/engine/engine.test.ts @@ -202,4 +202,49 @@ function defineEngineTests(isJavaScript: boolean, createEngine: () => ScalarExpr [3.2] ]); }); + + test('filters and multiple sources', () => { + // SELECT a.value, b.value FROM json_each(?1) a, json_each(?2) b where length(a.value || b.value) > 5 + const a: TableValuedFunction = { + name: 'json_each', + inputs: [{ type: 'data', source: 1 }] + }; + const b: TableValuedFunction = { + name: 'json_each', + inputs: [{ type: 'data', source: 2 }] + }; + + const stmt = prepare({ + outputs: [ + { type: 'data', source: { function: a, column: 'value' } }, + { type: 'data', source: { function: b, column: 'value' } } + ], + tableValuedFunctions: [a, b], + filters: [ + { + type: 'binary', + operator: '>', + left: { + type: 'function', + function: 'length', + parameters: [ + { + type: 'binary', + operator: '||', + left: { type: 'data', source: { function: a, column: 'value' } }, + right: { type: 'data', source: { function: b, column: 'value' } } + } + ] + }, + right: { type: 'lit_int', base10: '5' } + } + ] + }); + + expect(stmt.evaluate([JSON.stringify([]), JSON.stringify(['aaaaaa'])])).toStrictEqual([]); + expect(stmt.evaluate([JSON.stringify(['x', 'y']), JSON.stringify(['a', 'aaaaa'])])).toStrictEqual([ + ['x', 'aaaaa'], + ['y', 'aaaaa'] + ]); + }); } From 45e109a5e519093a744cc400dd3375dd14a54740 Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Tue, 27 Jan 2026 18:38:40 +0100 Subject: [PATCH 08/14] Suppport legacy JSON --- .../sync-rules/src/sync_plan/engine/sqlite.ts | 36 +++++++++++++++++-- .../test/src/sync_plan/engine/engine.test.ts | 23 +++++++++--- 2 files changed, 52 insertions(+), 7 deletions(-) diff --git a/packages/sync-rules/src/sync_plan/engine/sqlite.ts b/packages/sync-rules/src/sync_plan/engine/sqlite.ts index ee55ac832..444563d8d 100644 --- a/packages/sync-rules/src/sync_plan/engine/sqlite.ts +++ b/packages/sync-rules/src/sync_plan/engine/sqlite.ts @@ -1,3 +1,5 @@ +import { CompatibilityContext, CompatibilityOption } from '../../compatibility.js'; +import { generateSqlFunctions } from '../../index.js'; import { SqliteValue } from '../../types.js'; import { ScalarExpressionEngine, ScalarExpressionEvaluator, scalarStatementToSql } from './scalar_expression_engine.js'; @@ -7,11 +9,39 @@ import { ScalarExpressionEngine, ScalarExpressionEvaluator, scalarStatementToSql * @param module The imported `node:sqlite` module (passed as a parameter to ensure this package keeps working in * browsers). * - * @experimental This engine is not drop-in compatible with the JS operator implementations. It also doesn't support - * legacy JSON behavior. So we can only use this engine when a new compatibility option is enabled. + * @experimental This engine is not drop-in compatible with the JS operator implementations. So we can only use this + * engine when a new compatibility option is enabled. */ -export function nodeSqliteExpressionEngine(module: typeof import('node:sqlite')): ScalarExpressionEngine { +export function nodeSqliteExpressionEngine( + module: typeof import('node:sqlite'), + compatibility: CompatibilityContext +): ScalarExpressionEngine { const db = new module.DatabaseSync(':memory:', { readOnly: true, readBigInts: true, returnArrays: true } as any); + const functions = generateSqlFunctions(compatibility); + + function registerPowerSyncFunction(name: string) { + db.function(name, { useBigIntArguments: true, varargs: true, deterministic: true }, (...args) => { + const impl = functions.named[name]!; + return impl.call(...args); + }); + } + + // Needed to make them deterministic / prevent passing 'now' + registerPowerSyncFunction('unixepoch'); + registerPowerSyncFunction('datetime'); + + registerPowerSyncFunction('ST_AsGeoJSON'); + registerPowerSyncFunction('AS_AsText'); + registerPowerSyncFunction('ST_X'); + registerPowerSyncFunction('ST_Y'); + + if (!compatibility.isEnabled(CompatibilityOption.fixedJsonExtract)) { + // For backwards compatibility, use the old JSON operators which parse the path argument differently. + registerPowerSyncFunction('->'); + registerPowerSyncFunction('->>'); + registerPowerSyncFunction('json_extract'); + registerPowerSyncFunction('json_array_length'); + } return { prepareEvaluator(input): ScalarExpressionEvaluator { diff --git a/packages/sync-rules/test/src/sync_plan/engine/engine.test.ts b/packages/sync-rules/test/src/sync_plan/engine/engine.test.ts index 4eea0515f..8b38ac674 100644 --- a/packages/sync-rules/test/src/sync_plan/engine/engine.test.ts +++ b/packages/sync-rules/test/src/sync_plan/engine/engine.test.ts @@ -15,14 +15,14 @@ import { } from '../../../../src/sync_plan/engine/scalar_expression_engine.js'; import { BinaryOperator } from '../../../../src/sync_plan/expression.js'; +const compatibility = new CompatibilityContext({ edition: CompatibilityEdition.SYNC_STREAMS }); + describe('sqlite', () => { - defineEngineTests(false, () => nodeSqliteExpressionEngine(sqlite)); + defineEngineTests(false, () => nodeSqliteExpressionEngine(sqlite, compatibility)); }); describe('javascript', () => { - defineEngineTests(true, () => - javaScriptExpressionEngine(new CompatibilityContext({ edition: CompatibilityEdition.SYNC_STREAMS })) - ); + defineEngineTests(true, () => javaScriptExpressionEngine(compatibility)); }); function defineEngineTests(isJavaScript: boolean, createEngine: () => ScalarExpressionEngine) { @@ -47,6 +47,16 @@ function defineEngineTests(isJavaScript: boolean, createEngine: () => ScalarExpr expect(stmt.evaluate([left, right])).toStrictEqual([[output]]); } + function expectFunction(name: string, args: SqliteValue[], output: SqliteValue) { + const stmt = prepare({ + outputs: [ + { type: 'function', function: name, parameters: args.map((arg, i) => ({ type: 'data', source: i + 1 })) } + ] + }); + + expect(stmt.evaluate(args)).toStrictEqual([[output]]); + } + test('literal null', () => { expect(prepare({ outputs: [{ type: 'lit_null' }] }).evaluate([])).toStrictEqual([[null]]); }); @@ -247,4 +257,9 @@ function defineEngineTests(isJavaScript: boolean, createEngine: () => ScalarExpr ['y', 'aaaaa'] ]); }); + + test('custom functions', () => { + expectFunction('unixepoch', ['now'], null); + expectFunction('datetime', ['now'], null); + }); } From 2b69603d297be2fa80bf1c75ef10c3299f914c42 Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Wed, 28 Jan 2026 10:06:32 +0100 Subject: [PATCH 09/14] Test legacy json extract operators --- packages/sync-rules/src/sync_plan/engine/sqlite.ts | 14 +++++++++----- .../test/src/sync_plan/engine/engine.test.ts | 8 +++++++- 2 files changed, 16 insertions(+), 6 deletions(-) diff --git a/packages/sync-rules/src/sync_plan/engine/sqlite.ts b/packages/sync-rules/src/sync_plan/engine/sqlite.ts index 444563d8d..3c586cad6 100644 --- a/packages/sync-rules/src/sync_plan/engine/sqlite.ts +++ b/packages/sync-rules/src/sync_plan/engine/sqlite.ts @@ -20,8 +20,9 @@ export function nodeSqliteExpressionEngine( const functions = generateSqlFunctions(compatibility); function registerPowerSyncFunction(name: string) { + const impl = functions.named[name]!; + db.function(name, { useBigIntArguments: true, varargs: true, deterministic: true }, (...args) => { - const impl = functions.named[name]!; return impl.call(...args); }); } @@ -37,10 +38,13 @@ export function nodeSqliteExpressionEngine( if (!compatibility.isEnabled(CompatibilityOption.fixedJsonExtract)) { // For backwards compatibility, use the old JSON operators which parse the path argument differently. - registerPowerSyncFunction('->'); - registerPowerSyncFunction('->>'); - registerPowerSyncFunction('json_extract'); - registerPowerSyncFunction('json_array_length'); + db.function('->', { useBigIntArguments: true, varargs: true, deterministic: true }, (...args) => { + return functions.operatorJsonExtractJson.call(...args); + }); + + db.function('->>', { useBigIntArguments: true, varargs: true, deterministic: true }, (...args) => { + return functions.operatorJsonExtractSql.call(...args); + }); } return { diff --git a/packages/sync-rules/test/src/sync_plan/engine/engine.test.ts b/packages/sync-rules/test/src/sync_plan/engine/engine.test.ts index 8b38ac674..d39c09db0 100644 --- a/packages/sync-rules/test/src/sync_plan/engine/engine.test.ts +++ b/packages/sync-rules/test/src/sync_plan/engine/engine.test.ts @@ -15,7 +15,7 @@ import { } from '../../../../src/sync_plan/engine/scalar_expression_engine.js'; import { BinaryOperator } from '../../../../src/sync_plan/expression.js'; -const compatibility = new CompatibilityContext({ edition: CompatibilityEdition.SYNC_STREAMS }); +const compatibility = CompatibilityContext.FULL_BACKWARDS_COMPATIBILITY; describe('sqlite', () => { defineEngineTests(false, () => nodeSqliteExpressionEngine(sqlite, compatibility)); @@ -261,5 +261,11 @@ function defineEngineTests(isJavaScript: boolean, createEngine: () => ScalarExpr test('custom functions', () => { expectFunction('unixepoch', ['now'], null); expectFunction('datetime', ['now'], null); + + const jsonObject = JSON.stringify({ foo: { bar: ['baz'] } }); + + // Legacy JSON behavior, support paths without $. prefix + expectFunction('->', [jsonObject, 'foo.bar'], '["baz"]'); + expectFunction('->>', [jsonObject, 'foo.bar.0'], 'baz'); }); } From c4e010a4e117503fa1a051e7033ef38811960f99 Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Wed, 28 Jan 2026 10:32:24 +0100 Subject: [PATCH 10/14] Add errors on LIKE expressiosn --- packages/sync-rules/src/compiler/sqlite.ts | 3 +++ 1 file changed, 3 insertions(+) diff --git a/packages/sync-rules/src/compiler/sqlite.ts b/packages/sync-rules/src/compiler/sqlite.ts index 79c6aa68c..4a6f4fdab 100644 --- a/packages/sync-rules/src/compiler/sqlite.ts +++ b/packages/sync-rules/src/compiler/sqlite.ts @@ -216,6 +216,9 @@ export class PostgresToSqlite { const left = this.translateNodeWithLocation(expr.left); const right = this.translateNodeWithLocation(expr.right); if (expr.op === 'LIKE') { + // We don't support LIKE in the old bucket definition system, and want to make sure we're clear about ICU, + // case sensitivity and changing the escape character first. TODO: Support later. + this.options.errors.report('LIKE expressions are not currently supported.', expr); return { type: 'function', function: 'like', parameters: [left, right] }; } else if (expr.op === 'NOT LIKE') { return { From bd66588915bd85f9e1be0a2be0a18eb420627ce2 Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Wed, 28 Jan 2026 10:59:07 +0100 Subject: [PATCH 11/14] Clear up some comments --- packages/sync-rules/src/sync_plan/engine/sqlite.ts | 2 +- .../src/sync_plan/evaluator/parameter_evaluator.ts | 9 ++++++--- 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/packages/sync-rules/src/sync_plan/engine/sqlite.ts b/packages/sync-rules/src/sync_plan/engine/sqlite.ts index 3c586cad6..e8264a917 100644 --- a/packages/sync-rules/src/sync_plan/engine/sqlite.ts +++ b/packages/sync-rules/src/sync_plan/engine/sqlite.ts @@ -10,7 +10,7 @@ import { ScalarExpressionEngine, ScalarExpressionEvaluator, scalarStatementToSql * browsers). * * @experimental This engine is not drop-in compatible with the JS operator implementations. So we can only use this - * engine when a new compatibility option is enabled. + * engine when a new compatibility option is enabled. Currently, it is only used in tests. */ export function nodeSqliteExpressionEngine( module: typeof import('node:sqlite'), diff --git a/packages/sync-rules/src/sync_plan/evaluator/parameter_evaluator.ts b/packages/sync-rules/src/sync_plan/evaluator/parameter_evaluator.ts index ae72a345a..2640b0eb4 100644 --- a/packages/sync-rules/src/sync_plan/evaluator/parameter_evaluator.ts +++ b/packages/sync-rules/src/sync_plan/evaluator/parameter_evaluator.ts @@ -66,9 +66,12 @@ export class RequestParameterEvaluators { /** * Returns a copy of this instance. * - * We use this to be able to "fork" partial instantiations. For instance, we can evaluate paremeters not depending on - * parameter lookups as soon as the user connects (and keep that instantiation static along the lifetime of the - * connection). + * Since resolved values are replaced with their instantiation, we need to use closed evaluators before evaluating + * them on inputs that might change (like parameter lookups). + * + * Static data (like connection parameters) can be resolved sooner, and cloning that partially-instantiated evaluator + * graph essentially forks it. This allows us to cache connection parameters for the lifetime of the connection + * instead of re-evaluating them on every parameter lookup change. */ clone(): RequestParameterEvaluators { return new RequestParameterEvaluators( From eac8cb48ee9e4fb61f2948b5a678e0c1e05d0c4f Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Thu, 29 Jan 2026 10:03:00 +0100 Subject: [PATCH 12/14] Use AND to compose filters --- .../src/sync_plan/engine/scalar_expression_engine.ts | 7 ++++--- packages/sync-rules/src/sync_plan/expression_to_sql.ts | 2 +- ...calarStatementToSql.ts => scalarStatementToSql.test.ts} | 6 +++++- 3 files changed, 10 insertions(+), 5 deletions(-) rename packages/sync-rules/test/src/sync_plan/engine/{scalarStatementToSql.ts => scalarStatementToSql.test.ts} (94%) diff --git a/packages/sync-rules/src/sync_plan/engine/scalar_expression_engine.ts b/packages/sync-rules/src/sync_plan/engine/scalar_expression_engine.ts index 44990a47b..076daab76 100644 --- a/packages/sync-rules/src/sync_plan/engine/scalar_expression_engine.ts +++ b/packages/sync-rules/src/sync_plan/engine/scalar_expression_engine.ts @@ -1,6 +1,6 @@ import { SqliteValue } from '../../types.js'; import { ExternalData, SqlExpression } from '../expression.js'; -import { ExpressionToSqlite } from '../expression_to_sql.js'; +import { ExpressionToSqlite, Precedence } from '../expression_to_sql.js'; import { MapSourceVisitor, visitExpr } from '../expression_visitor.js'; import { ColumnSqlParameterValue, RequestSqlParameterValue, SqlParameterValue } from '../plan.js'; @@ -83,8 +83,9 @@ export function scalarStatementToSql({ filters = [], outputs = [], tableValuedFu if (filters.length != 0) { toSqlite.addLexeme('WHERE'); filters.forEach((expr, i) => { - if (i != 0) toSqlite.comma(); - visitExpr(toSqlite, expr, null); + if (i != 0) toSqlite.addLexeme('AND'); + + visitExpr(toSqlite, expr, Precedence.and); }); } diff --git a/packages/sync-rules/src/sync_plan/expression_to_sql.ts b/packages/sync-rules/src/sync_plan/expression_to_sql.ts index 2b432c26e..83aeaffa7 100644 --- a/packages/sync-rules/src/sync_plan/expression_to_sql.ts +++ b/packages/sync-rules/src/sync_plan/expression_to_sql.ts @@ -183,7 +183,7 @@ export class ExpressionToSqlite implements ExpressionVisitor { { type: 'lit_int', base10: '1' + }, + { + type: 'data', + source: 1 } ] }) - ).toStrictEqual(`SELECT 1 WHERE 1`); + ).toStrictEqual(`SELECT 1 WHERE 1 AND ?1`); }); test('output and filters', () => { From 52677d1505ee46f0e58f49761766934302185ecf Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Thu, 29 Jan 2026 10:22:52 +0100 Subject: [PATCH 13/14] Fix geometry functions --- packages/sync-rules/src/compiler/sqlite.ts | 7 ++++--- packages/sync-rules/src/sync_plan/engine/sqlite.ts | 8 ++++---- packages/sync-rules/src/sync_plan/expression.ts | 8 ++++---- .../sync-rules/test/src/sync_plan/engine/engine.test.ts | 9 ++++++++- 4 files changed, 20 insertions(+), 12 deletions(-) diff --git a/packages/sync-rules/src/compiler/sqlite.ts b/packages/sync-rules/src/compiler/sqlite.ts index 4a6f4fdab..772154415 100644 --- a/packages/sync-rules/src/compiler/sqlite.ts +++ b/packages/sync-rules/src/compiler/sqlite.ts @@ -178,11 +178,12 @@ export class PostgresToSqlite { return this.invalidExpression(expr.function, 'DISTINCT, ORDER BY, FILTER and OVER clauses are not supported'); } - const forbiddenReason = forbiddenFunctions[expr.function.name]; + const functionName = expr.function.name.toLowerCase(); + const forbiddenReason = forbiddenFunctions[functionName]; if (forbiddenReason) { return this.invalidExpression(expr.function, `Forbidden call: ${forbiddenReason}`); } - let allowedArgs = supportedFunctions[expr.function.name]; + let allowedArgs = supportedFunctions[functionName]; if (allowedArgs == null) { return this.invalidExpression(expr.function, 'Unknown function'); } else { @@ -204,7 +205,7 @@ export class PostgresToSqlite { return { type: 'function', - function: expr.function.name, + function: functionName, parameters: expr.args.map((a) => this.translateNodeWithLocation(a)) }; } diff --git a/packages/sync-rules/src/sync_plan/engine/sqlite.ts b/packages/sync-rules/src/sync_plan/engine/sqlite.ts index e8264a917..b0448ca40 100644 --- a/packages/sync-rules/src/sync_plan/engine/sqlite.ts +++ b/packages/sync-rules/src/sync_plan/engine/sqlite.ts @@ -31,10 +31,10 @@ export function nodeSqliteExpressionEngine( registerPowerSyncFunction('unixepoch'); registerPowerSyncFunction('datetime'); - registerPowerSyncFunction('ST_AsGeoJSON'); - registerPowerSyncFunction('AS_AsText'); - registerPowerSyncFunction('ST_X'); - registerPowerSyncFunction('ST_Y'); + registerPowerSyncFunction('st_asgeojson'); + registerPowerSyncFunction('st_astext'); + registerPowerSyncFunction('st_x'); + registerPowerSyncFunction('st_y'); if (!compatibility.isEnabled(CompatibilityOption.fixedJsonExtract)) { // For backwards compatibility, use the old JSON operators which parse the path argument differently. diff --git a/packages/sync-rules/src/sync_plan/expression.ts b/packages/sync-rules/src/sync_plan/expression.ts index 755714c76..e10d56823 100644 --- a/packages/sync-rules/src/sync_plan/expression.ts +++ b/packages/sync-rules/src/sync_plan/expression.ts @@ -212,8 +212,8 @@ export const supportedFunctions: Record = { base64: 1, json_keys: 1, uuid_blob: 1, - ST_AsGeoJSON: 1, - ST_AsText: 1, - ST_X: 1, - ST_: 1 + st_asgeojson: 1, + st_astext: 1, + st_x: 1, + st_y: 1 }; diff --git a/packages/sync-rules/test/src/sync_plan/engine/engine.test.ts b/packages/sync-rules/test/src/sync_plan/engine/engine.test.ts index d39c09db0..43b1d202e 100644 --- a/packages/sync-rules/test/src/sync_plan/engine/engine.test.ts +++ b/packages/sync-rules/test/src/sync_plan/engine/engine.test.ts @@ -3,7 +3,6 @@ import * as sqlite from 'node:sqlite'; import { describe, expect, onTestFinished, test } from 'vitest'; import { CompatibilityContext, - CompatibilityEdition, javaScriptExpressionEngine, nodeSqliteExpressionEngine, SqliteValue @@ -267,5 +266,13 @@ function defineEngineTests(isJavaScript: boolean, createEngine: () => ScalarExpr // Legacy JSON behavior, support paths without $. prefix expectFunction('->', [jsonObject, 'foo.bar'], '["baz"]'); expectFunction('->>', [jsonObject, 'foo.bar.0'], 'baz'); + + const point = '010100000097900F7A36FB40C0F4FDD478E9D63240'; + expectFunction('st_x', [point], -33.9626); + expectFunction('st_y', [point], 18.8395); + + const geography = '010100000097900F7A36FB40C0F4FDD478E9D63240'; + expectFunction('st_asgeojson', [geography], '{"type":"Point","coordinates":[-33.9626,18.8395]}'); + expectFunction('st_astext', [geography], 'POINT(-33.9626 18.8395)'); }); } From f793fab26b3f9cda901091c405bb58a1116772fd Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Thu, 29 Jan 2026 10:35:33 +0100 Subject: [PATCH 14/14] Smaller fixes, more tests --- packages/sync-rules/src/compiler/sqlite.ts | 3 +- .../src/sync_plan/engine/javascript.ts | 17 ++++--- .../sync-rules/src/sync_plan/expression.ts | 2 +- .../src/sync_plan/expression_to_sql.ts | 6 +-- .../test/src/sync_plan/engine/engine.test.ts | 46 ++++++++++++++----- 5 files changed, 50 insertions(+), 24 deletions(-) diff --git a/packages/sync-rules/src/compiler/sqlite.ts b/packages/sync-rules/src/compiler/sqlite.ts index 772154415..7f09c113b 100644 --- a/packages/sync-rules/src/compiler/sqlite.ts +++ b/packages/sync-rules/src/compiler/sqlite.ts @@ -247,8 +247,9 @@ export class PostgresToSqlite { let rightHandSideOfIs: SqlExpression; switch (expr.op) { - case '+': case '-': + return this.invalidExpression(expr, 'Unary minus is not currently supported'); + case '+': return { type: 'unary', operator: expr.op, operand: this.translateNodeWithLocation(expr.operand) }; case 'NOT': return { type: 'unary', operator: 'not', operand: this.translateNodeWithLocation(expr.operand) }; diff --git a/packages/sync-rules/src/sync_plan/engine/javascript.ts b/packages/sync-rules/src/sync_plan/engine/javascript.ts index 88fa0611c..7b5d3adde 100644 --- a/packages/sync-rules/src/sync_plan/engine/javascript.ts +++ b/packages/sync-rules/src/sync_plan/engine/javascript.ts @@ -151,9 +151,9 @@ class ExpressionToJavaScriptFunction return operand; case 'not': return (input) => sqliteNot(operand(input)); - case '~': - case '-': - throw new Error(`unary operator not supported: ${expr.operator}`); + // case '~': + // case '-': + // throw new Error(`unary operator not supported: ${expr.operator}`); } } @@ -173,10 +173,13 @@ class ExpressionToJavaScriptFunction return (input) => { const evaluatedValue = value(input); - return ( - sqliteBool(evaluateOperator('>=', evaluatedValue, low(input))) && - sqliteBool(evaluateOperator('<=', evaluatedValue, high(input))) - ); + const geqLow = evaluateOperator('>=', evaluatedValue, low(input)); + const leqHigh = evaluateOperator('<=', evaluatedValue, high(input)); + if (geqLow == null || leqHigh == null) { + return null; + } + + return sqliteBool(geqLow) && sqliteBool(leqHigh); }; } diff --git a/packages/sync-rules/src/sync_plan/expression.ts b/packages/sync-rules/src/sync_plan/expression.ts index e10d56823..4823176d8 100644 --- a/packages/sync-rules/src/sync_plan/expression.ts +++ b/packages/sync-rules/src/sync_plan/expression.ts @@ -26,7 +26,7 @@ export type SqlExpression = */ export type ExternalData = { type: 'data'; source: Data }; -export type UnaryOperator = 'not' | '~' | '+' | '-'; +export type UnaryOperator = 'not' | '+'; //| '-' | '~'; export type UnaryExpression = { type: 'unary'; diff --git a/packages/sync-rules/src/sync_plan/expression_to_sql.ts b/packages/sync-rules/src/sync_plan/expression_to_sql.ts index 83aeaffa7..b966257ba 100644 --- a/packages/sync-rules/src/sync_plan/expression_to_sql.ts +++ b/packages/sync-rules/src/sync_plan/expression_to_sql.ts @@ -221,7 +221,7 @@ const binaryPrecedence: Record = { const unaryPrecedence: Record = { not: Precedence.not, - '~': Precedence.unary, - '+': Precedence.unary, - '-': Precedence.unary + //'~': Precedence.unary, + '+': Precedence.unary + //'-': Precedence.unary }; diff --git a/packages/sync-rules/test/src/sync_plan/engine/engine.test.ts b/packages/sync-rules/test/src/sync_plan/engine/engine.test.ts index 43b1d202e..f1a9c7aa4 100644 --- a/packages/sync-rules/test/src/sync_plan/engine/engine.test.ts +++ b/packages/sync-rules/test/src/sync_plan/engine/engine.test.ts @@ -3,6 +3,7 @@ import * as sqlite from 'node:sqlite'; import { describe, expect, onTestFinished, test } from 'vitest'; import { CompatibilityContext, + CompatibilityEdition, javaScriptExpressionEngine, nodeSqliteExpressionEngine, SqliteValue @@ -14,19 +15,22 @@ import { } from '../../../../src/sync_plan/engine/scalar_expression_engine.js'; import { BinaryOperator } from '../../../../src/sync_plan/expression.js'; -const compatibility = CompatibilityContext.FULL_BACKWARDS_COMPATIBILITY; - describe('sqlite', () => { - defineEngineTests(false, () => nodeSqliteExpressionEngine(sqlite, compatibility)); + defineEngineTests(false, (c) => nodeSqliteExpressionEngine(sqlite, c)); }); describe('javascript', () => { - defineEngineTests(true, () => javaScriptExpressionEngine(compatibility)); + defineEngineTests(true, javaScriptExpressionEngine); }); -function defineEngineTests(isJavaScript: boolean, createEngine: () => ScalarExpressionEngine) { +function defineEngineTests( + isJavaScript: boolean, + createEngine: (compatibility: CompatibilityContext) => ScalarExpressionEngine +) { + let compatibility = CompatibilityContext.FULL_BACKWARDS_COMPATIBILITY; + function prepare(stmt: ScalarStatement) { - const engine = createEngine(); + const engine = createEngine(compatibility); onTestFinished(() => engine.close()); return engine.prepareEvaluator(stmt); } @@ -60,6 +64,12 @@ function defineEngineTests(isJavaScript: boolean, createEngine: () => ScalarExpr expect(prepare({ outputs: [{ type: 'lit_null' }] }).evaluate([])).toStrictEqual([[null]]); }); + test('not null', () => { + expect( + prepare({ outputs: [{ type: 'unary', operator: 'not', operand: { type: 'lit_null' } }] }).evaluate([]) + ).toStrictEqual([[isJavaScript ? 1n : null]]); + }); + test('literal double', () => { expect(prepare({ outputs: [{ type: 'lit_double', value: 3 }] }).evaluate([])).toStrictEqual([[3]]); }); @@ -125,6 +135,10 @@ function defineEngineTests(isJavaScript: boolean, createEngine: () => ScalarExpr expect(stmt.evaluate([1, 2, 3])).toStrictEqual([[0n]]); expect(stmt.evaluate([4, 2, 3])).toStrictEqual([[0n]]); expect(stmt.evaluate([1, 1, 1])).toStrictEqual([[1n]]); + + expect(stmt.evaluate([null, 3, 4])).toStrictEqual([[null]]); + expect(stmt.evaluate([4, null, 4])).toStrictEqual([[null]]); + expect(stmt.evaluate([4, 3, null])).toStrictEqual([[null]]); }); test('scalar in', () => { @@ -261,12 +275,6 @@ function defineEngineTests(isJavaScript: boolean, createEngine: () => ScalarExpr expectFunction('unixepoch', ['now'], null); expectFunction('datetime', ['now'], null); - const jsonObject = JSON.stringify({ foo: { bar: ['baz'] } }); - - // Legacy JSON behavior, support paths without $. prefix - expectFunction('->', [jsonObject, 'foo.bar'], '["baz"]'); - expectFunction('->>', [jsonObject, 'foo.bar.0'], 'baz'); - const point = '010100000097900F7A36FB40C0F4FDD478E9D63240'; expectFunction('st_x', [point], -33.9626); expectFunction('st_y', [point], 18.8395); @@ -275,4 +283,18 @@ function defineEngineTests(isJavaScript: boolean, createEngine: () => ScalarExpr expectFunction('st_asgeojson', [geography], '{"type":"Point","coordinates":[-33.9626,18.8395]}'); expectFunction('st_astext', [geography], 'POINT(-33.9626 18.8395)'); }); + + test('legacy and fixed JSON behavior', () => { + const jsonObject = JSON.stringify({ foo: { bar: ['baz'] }, 'foo.bar': ['another'] }); + compatibility = CompatibilityContext.FULL_BACKWARDS_COMPATIBILITY; + + // Legacy JSON behavior, support paths without $. prefix + expectFunction('->', [jsonObject, 'foo.bar'], '["baz"]'); + expectFunction('->>', [jsonObject, 'foo.bar.0'], 'baz'); + + // New JSON behavior, require $. syntax. + compatibility = new CompatibilityContext({ edition: CompatibilityEdition.SYNC_STREAMS }); + expectFunction('->', [jsonObject, 'foo.bar'], '["another"]'); + expectFunction('->>', [jsonObject, 'foo.bar.0'], null); + }); }