From ec975f0b0a4faa98115de41145138a24f1a6b4f0 Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Tue, 20 Jan 2026 14:59:51 +0100 Subject: [PATCH 1/6] Support subqueries in `FROM` --- packages/sync-rules/src/compiler/compiler.ts | 3 + .../sync-rules/src/compiler/expression.ts | 4 +- .../src/compiler/filter_simplifier.ts | 46 +++-- packages/sync-rules/src/compiler/parser.ts | 193 +++++++++++++----- packages/sync-rules/src/compiler/scope.ts | 9 + packages/sync-rules/src/compiler/sqlite.ts | 45 +++- packages/sync-rules/src/compiler/table.ts | 2 +- .../test/src/compiler/subqueries.test.ts | 65 ++++++ 8 files changed, 290 insertions(+), 77 deletions(-) create mode 100644 packages/sync-rules/test/src/compiler/subqueries.test.ts diff --git a/packages/sync-rules/src/compiler/compiler.ts b/packages/sync-rules/src/compiler/compiler.ts index 1bcf2228a..bc4e6a438 100644 --- a/packages/sync-rules/src/compiler/compiler.ts +++ b/packages/sync-rules/src/compiler/compiler.ts @@ -7,6 +7,7 @@ import { CompilerModelToSyncPlan } from './ir_to_sync_plan.js'; import { QuerierGraphBuilder } from './querier_graph.js'; import { StreamQueryParser } from './parser.js'; import { NodeLocations } from './expression.js'; +import { SqlScope } from './scope.js'; /** * State for compiling sync streams. @@ -33,6 +34,7 @@ export class SyncStreamsCompiler { */ stream(options: StreamOptions): IndividualSyncStreamCompiler { const builder = new QuerierGraphBuilder(this, options); + const rootScope = new SqlScope({}); return { addQuery: (sql: string, errors: ParsingErrorListener) => { @@ -41,6 +43,7 @@ export class SyncStreamsCompiler { compiler: this, originalText: sql, locations: new NodeLocations(), + parentScope: rootScope, errors }); const query = parser.parse(stmt); diff --git a/packages/sync-rules/src/compiler/expression.ts b/packages/sync-rules/src/compiler/expression.ts index 83950cda3..bb331bb51 100644 --- a/packages/sync-rules/src/compiler/expression.ts +++ b/packages/sync-rules/src/compiler/expression.ts @@ -129,9 +129,9 @@ export class ConnectionParameter implements EqualsIgnoringResultSet { * in-memory map. */ export class NodeLocations { - readonly sourceForNode = new Map, PGNode | NodeLocation>(); + readonly sourceForNode = new Map, PGNode | NodeLocation>(); - locationFor(source: SqlExpression): NodeLocation { + locationFor(source: SqlExpression): NodeLocation { const location = getLocation(this.sourceForNode.get(source)); if (location == null) { throw new Error('Missing location'); diff --git a/packages/sync-rules/src/compiler/filter_simplifier.ts b/packages/sync-rules/src/compiler/filter_simplifier.ts index 5af8b5312..84db2ed46 100644 --- a/packages/sync-rules/src/compiler/filter_simplifier.ts +++ b/packages/sync-rules/src/compiler/filter_simplifier.ts @@ -1,12 +1,10 @@ import { And, BaseTerm, EqualsClause, isBaseTerm, Or, SingleDependencyExpression } from './filter.js'; -import { SyncExpression } from './expression.js'; +import { NodeLocations, SyncExpression } from './expression.js'; import { SourceResultSet } from './table.js'; -import { BinaryOperator } from '../sync_plan/expression.js'; +import { BinaryOperator, SqlExpression } from '../sync_plan/expression.js'; import { expandNodeLocations } from '../errors.js'; export class FilterConditionSimplifier { - constructor(private readonly originalText: string) {} - simplifyOr(or: Or): Or { const andTerms: And[] = []; let baseTerms: BaseTerm[] = []; @@ -116,18 +114,36 @@ export class FilterConditionSimplifier { throw new Error("Can't compose zero expressions"); } - const [first, ...rest] = terms; - const locations = first.expression.locations; - let inner = first.expression.node; - for (const additional of rest) { - inner = { type: 'binary', operator, left: inner, right: additional.expression.node }; - } - - const location = expandNodeLocations(terms.map((e) => e.expression.location)); - if (location) { - locations.sourceForNode.set(inner, location); - } + const locations = terms[0].expression.locations; + const inner = composeExpressionNodes( + locations, + operator, + terms.map((t) => t.expression.node) + ); return new SingleDependencyExpression(new SyncExpression(inner, locations)); } } + +export function composeExpressionNodes( + locations: NodeLocations, + operator: BinaryOperator, + terms: SqlExpression[] +) { + if (terms.length == 0) { + throw new Error("Can't compose zero expressions"); + } + + const [first, ...rest] = terms; + let inner = first; + for (const additional of rest) { + inner = { type: 'binary', operator, left: inner, right: additional }; + } + + const location = expandNodeLocations(terms.map((e) => locations.locationFor(e))); + if (location) { + locations.sourceForNode.set(inner, location); + } + + return inner; +} diff --git a/packages/sync-rules/src/compiler/parser.ts b/packages/sync-rules/src/compiler/parser.ts index 0bdf66a1e..4b3c1a8f0 100644 --- a/packages/sync-rules/src/compiler/parser.ts +++ b/packages/sync-rules/src/compiler/parser.ts @@ -1,5 +1,17 @@ -import { Expr, ExprCall, ExprRef, From, nil, NodeLocation, PGNode, SelectedColumn, Statement } from 'pgsql-ast-parser'; import { + Expr, + ExprCall, + ExprRef, + From, + nil, + NodeLocation, + PGNode, + SelectedColumn, + SelectFromStatement, + Statement +} from 'pgsql-ast-parser'; +import { + BaseSourceResultSet, PhysicalSourceResultSet, RequestTableValuedResultSet, SourceResultSet, @@ -19,11 +31,11 @@ import { } from './filter.js'; import { expandNodeLocations } from '../errors.js'; import { cartesianProduct } from '../streams/utils.js'; -import { PostgresToSqlite } from './sqlite.js'; +import { PostgresToSqlite, PreparedSubquery } from './sqlite.js'; import { SqlScope } from './scope.js'; import { ParsingErrorListener, SyncStreamsCompiler } from './compiler.js'; import { TablePattern } from '../TablePattern.js'; -import { FilterConditionSimplifier } from './filter_simplifier.js'; +import { composeExpressionNodes, FilterConditionSimplifier } from './filter_simplifier.js'; import { SqlExpression } from '../sync_plan/expression.js'; /** @@ -73,7 +85,7 @@ export interface StreamQueryParserOptions { compiler: SyncStreamsCompiler; originalText: string; errors: ParsingErrorListener; - parentScope?: SqlScope; + parentScope: SqlScope; locations: NodeLocations; } @@ -85,6 +97,8 @@ export class StreamQueryParser { // Note: This is not the same as SqlScope since some result sets are inlined from CTEs or subqueries. These are not in // scope, but we still add them here to correctly track dependencies. private readonly resultSets = new Map(); + private readonly subqueryResultSets = new Map(); + private readonly resultColumns: ColumnSource[] = []; private where: SqlExpression[] = []; @@ -112,13 +126,7 @@ export class StreamQueryParser { }, joinSubqueryExpression: (expr) => { // Independently analyze the inner query. - const parseInner = new StreamQueryParser({ - compiler: this.compiler, - originalText: this.originalText, - errors: this.errors, - parentScope: this.statementScope, - locations: this.nodeLocations - }); + const parseInner = this.nestedParser(this.statementScope); let success = parseInner.processAst(expr, { forSubquery: true }); if (!success) { return null; @@ -140,6 +148,7 @@ export class StreamQueryParser { return null; } + parseInner.resultSets.forEach((v, k) => this.resultSets.set(k, v)); return { filters: parseInner.where, output: parseInner.parseExpression(resultColumn).node }; } }); @@ -170,6 +179,56 @@ export class StreamQueryParser { } } + parseAsSubquery(stmt: Statement, columnNames?: string[]): PreparedSubquery | null { + if (this.processAst(stmt, { forSubquery: true })) { + const resultColumns: Record> = {}; + let columnCount = 0; + + for (const column of (stmt as SelectFromStatement).columns ?? []) { + if (column.expr.type == 'ref' && column.expr.name == '*') { + // We don't support * columns in subqueries. The reason is that we want to be able to parse queries without + // knowing the schema, so we can't know what * would resolve to. + this.errors.report('* columns are not allowed in subqueries or common table expressions', column.expr); + } else { + const name = (columnNames && columnNames[columnCount]) ?? this.inferColumnName(column); + const expr = this.parseExpression(column.expr); + + if (Object.hasOwn(resultColumns, name)) { + this.errors.report(`There is a column named '${name}' already.`, column); + } + + resultColumns[name] = expr.node; + columnCount++; + } + } + + if (columnNames && columnNames.length != columnCount) { + this.errors.report( + `Expected this subquery to have ${columnNames.length} columns, it actually has ${columnCount}`, + stmt + ); + } + + return { + resultColumns, + tables: [...this.resultSets.values()], + where: this.where.length == 0 ? null : composeExpressionNodes(this.nodeLocations, 'and', this.where) + }; + } else { + return null; + } + } + + private nestedParser(parentScope: SqlScope): StreamQueryParser { + return new StreamQueryParser({ + compiler: this.compiler, + originalText: this.originalText, + errors: this.errors, + parentScope, + locations: this.nodeLocations + }); + } + /** * Process the AST of a statement, returning whether it's close enough to a valid select statement to be supported for * sync streams (allowing us to process an invalid statement further to perhaps collect more errors). @@ -206,7 +265,6 @@ export class StreamQueryParser { private processFrom(from: From) { const scope = this.statementScope; - let handled = false; if (from.type == 'table') { const source = new SyntacticResultSetSource(from.name, from.name.alias ?? null); const resultSet = new PhysicalSourceResultSet( @@ -215,21 +273,35 @@ export class StreamQueryParser { ); scope.registerResultSet(this.errors, from.name.alias ?? from.name.name, source); this.resultSets.set(source, resultSet); - handled = true; } else if (from.type == 'call') { const source = new SyntacticResultSetSource(from, from.alias?.name ?? null); scope.registerResultSet(this.errors, from.alias?.name ?? from.function.name, source); this.resultSets.set(source, this.resolveTableValued(from, source)); - handled = true; } else if (from.type == 'statement') { - // TODO: We could technically allow selecting from subqueries once we support CTEs. + const parseInner = this.nestedParser(this.statementScope); + const parsedSubquery = parseInner.parseAsSubquery( + from.statement, + from.columnNames?.map((c) => c.name) + ); + + if (parsedSubquery) { + parseInner.resultSets.forEach((v, k) => this.resultSets.set(k, v)); + + if (parsedSubquery.where) { + this.where.push(parsedSubquery.where); + } + + const source = new SyntacticResultSetSource(from, from.alias); + scope.registerResultSet(this.errors, from.alias, source); + this.subqueryResultSets.set(source, parsedSubquery); + } } const join = from.join; - if (join && handled) { + if (join) { if (join.type != 'INNER JOIN') { // We only support inner joins. - this.warnUnsupported(join, join.type); + this.warnUnsupported(join, 'FULL JOIN'); } if (join.using) { @@ -242,10 +314,6 @@ export class StreamQueryParser { this.addAndTermToWhereClause(join.on); } } - - if (!handled) { - this.warnUnsupported(from, 'This source'); - } } private resolveTableValued(call: ExprCall, source: SyntacticResultSetSource): RequestTableValuedResultSet { @@ -267,6 +335,10 @@ export class StreamQueryParser { return new RequestTableValuedResultSet(call.function.name, resolvedArguments, source); } + private inferColumnName(column: SelectedColumn): string { + return column.alias?.name ?? this.originalText.substring(column._location!.start, column._location!.end); + } + private processResultColumns(stmt: PGNode, columns: SelectedColumn[]) { const selectsFrom = (source: SourceResultSet, node: PGNode) => { if (source instanceof PhysicalSourceResultSet) { @@ -283,40 +355,47 @@ export class StreamQueryParser { } }; - for (const column of columns) { - if (column.expr.type == 'ref' && column.expr.name == '*') { - const resolved = this.resolveTableName(column.expr, column.expr.table?.name); - if (resolved != null) { - selectsFrom(resolved, column.expr); + const addColumn = (expr: SyncExpression, name: string) => { + for (const dependency of expr.instantiation) { + if (dependency instanceof ColumnInRow) { + selectsFrom(dependency.resultSet, dependency.syntacticOrigin); + } else { + this.errors.report( + 'This attempts to sync a connection parameter. Only values from the source database can be synced.', + dependency.syntacticOrigin + ); } + } - this.resultColumns.push(StarColumnSource.instance); - } else { - const expr = this.parseExpression(column.expr); - - for (const dependency of expr.instantiation) { - if (dependency instanceof ColumnInRow) { - selectsFrom(dependency.resultSet, dependency.syntacticOrigin); - } else { - this.errors.report( - 'This attempts to sync a connection parameter. Only values from the source database can be synced.', - dependency.syntacticOrigin - ); - } + try { + this.resultColumns.push(new ExpressionColumnSource(new RowExpression(expr), name)); + } catch (e) { + if (e instanceof InvalidExpressionError) { + // Invalid dependencies, we've already logged errors for this. Ignore. + } else { + throw e; } + } + }; - try { - const outputName = - column.alias?.name ?? this.originalText.substring(column._location!.start, column._location!.end); - - this.resultColumns.push(new ExpressionColumnSource(new RowExpression(expr), outputName)); - } catch (e) { - if (e instanceof InvalidExpressionError) { - // Invalid dependencies, we've already logged errors for this. Ignore. + for (const column of columns) { + if (column.expr.type == 'ref' && column.expr.name == '*') { + const resolved = this.resolveTableName(column.expr, column.expr.table?.name); + if (resolved != null) { + if (resolved instanceof BaseSourceResultSet) { + selectsFrom(resolved, column.expr); + this.resultColumns.push(StarColumnSource.instance); } else { - throw e; + // Selecting from a subquery, add all columns. + for (const [name, column] of Object.entries(resolved.resultColumns)) { + addColumn(new SyncExpression(column, this.nodeLocations), name); + } } } + } else { + const expr = this.parseExpression(column.expr); + const outputName = this.inferColumnName(column); + addColumn(expr, outputName); } } @@ -329,7 +408,7 @@ export class StreamQueryParser { return this.exprParser.translateExpression(source); } - resolveTableName(node: ExprRef, name: string | nil): SourceResultSet | null { + private resolveTableName(node: ExprRef, name: string | nil): SourceResultSet | PreparedSubquery | null { if (name == null) { // For unqualified references, there must be a single table in scope. We don't allow unqualified references if // there are multiple tables because we don't know which column is available in which table with certainty (and @@ -337,7 +416,7 @@ export class StreamQueryParser { // references. const resultSets = this.statementScope.resultSets; if (resultSets.length == 1) { - return this.resultSets.get(resultSets[0])!; + return this.resolveSoure(resultSets[0]); } else { this.errors.report('Invalid unqualified reference since multiple tables are in scope', node); return null; @@ -349,10 +428,20 @@ export class StreamQueryParser { return null; } - return this.resultSets.get(result)!; + return this.resolveSoure(result); } } + private resolveSoure(source: SyntacticResultSetSource): SourceResultSet | PreparedSubquery { + if (this.resultSets.has(source)) { + return this.resultSets.get(source)!; + } else if (this.subqueryResultSets.has(source)) { + return this.subqueryResultSets.get(source)!; + } + + throw new Error('internal error: result set from scope has not been registered'); + } + private warnUnsupported(node: PGNode | PGNode[] | nil, description: string) { if (node != null) { let location: PGNode | NodeLocation; @@ -402,7 +491,7 @@ export class StreamQueryParser { }; }); - return new FilterConditionSimplifier(this.originalText).simplifyOr({ terms: mappedTerms }); + return new FilterConditionSimplifier().simplifyOr({ terms: mappedTerms }); } private mapBaseExpression(pending: PendingBaseTerm): BaseTerm { diff --git a/packages/sync-rules/src/compiler/scope.ts b/packages/sync-rules/src/compiler/scope.ts index 3ed310b36..c782239be 100644 --- a/packages/sync-rules/src/compiler/scope.ts +++ b/packages/sync-rules/src/compiler/scope.ts @@ -14,6 +14,15 @@ export class SqlScope { this.parent = options.parent; } + get rootScope(): SqlScope { + let maybeRoot: SqlScope = this; + while (maybeRoot.parent) { + maybeRoot = maybeRoot.parent; + } + + return maybeRoot; + } + get resultSets(): SyntacticResultSetSource[] { return [...this.nameToResultSet.values()]; } diff --git a/packages/sync-rules/src/compiler/sqlite.ts b/packages/sync-rules/src/compiler/sqlite.ts index b0a268ff6..75fd812dd 100644 --- a/packages/sync-rules/src/compiler/sqlite.ts +++ b/packages/sync-rules/src/compiler/sqlite.ts @@ -19,13 +19,33 @@ import { } from '../sync_plan/expression.js'; import { ConnectionParameterSource } from '../sync_plan/plan.js'; import { ParsingErrorListener } from './compiler.js'; -import { SourceResultSet } from './table.js'; +import { BaseSourceResultSet, SourceResultSet } from './table.js'; export interface ResolvedSubqueryExpression { filters: SqlExpression[]; output: SqlExpression; } +/** + * A prepared subquery or common table expression. + */ +export interface PreparedSubquery { + /** + * Columns selected by the query, indexed by their name. + */ + resultColumns: Record>; + + /** + * Tables the subquery selects from. + */ + tables: SourceResultSet[]; + + /** + * Filters affecting the subquery. + */ + where: SqlExpression | null; +} + export interface PostgresToSqliteOptions { readonly originalText: string; readonly errors: ParsingErrorListener; @@ -36,7 +56,7 @@ export interface PostgresToSqliteOptions { * * Should report an error if resolving the table failed, using `node` as the source location for the error. */ - resolveTableName(node: ExprRef, name: string | nil): SourceResultSet | null; + resolveTableName(node: ExprRef, name: string | nil): SourceResultSet | PreparedSubquery | null; /** * Generates a table alias for synthetic subqueries like those generated to desugar `IN` expressions to `json_each` @@ -104,11 +124,22 @@ export class PostgresToSqlite { return this.invalidExpression(expr, '* columns are not supported here'); } - const instantiation = new ColumnInRow(expr, resultSet, expr.name); - return { - type: 'data', - source: instantiation - }; + if (resultSet instanceof BaseSourceResultSet) { + // This is an actual result set. + const instantiation = new ColumnInRow(expr, resultSet, expr.name); + return { + type: 'data', + source: instantiation + }; + } else { + // Resolved to a subquery, inline the reference. + const expression = resultSet.resultColumns[expr.name]; + if (expression == null) { + return this.invalidExpression(expr, 'Column not found in subquery.'); + } + + return expression; + } } case 'parameter': return this.invalidExpression( diff --git a/packages/sync-rules/src/compiler/table.ts b/packages/sync-rules/src/compiler/table.ts index f387d9896..73d7f8148 100644 --- a/packages/sync-rules/src/compiler/table.ts +++ b/packages/sync-rules/src/compiler/table.ts @@ -27,7 +27,7 @@ export class SyntacticResultSetSource { ) {} } -abstract class BaseSourceResultSet { +export abstract class BaseSourceResultSet { constructor(readonly source: SyntacticResultSetSource) {} abstract get description(): string; diff --git a/packages/sync-rules/test/src/compiler/subqueries.test.ts b/packages/sync-rules/test/src/compiler/subqueries.test.ts new file mode 100644 index 000000000..f8f8b7962 --- /dev/null +++ b/packages/sync-rules/test/src/compiler/subqueries.test.ts @@ -0,0 +1,65 @@ +import { describe, expect, test } from 'vitest'; +import { compilationErrorsForSingleStream, compileToSyncPlanWithoutErrors } from './utils.js'; + +describe('subqueries', () => { + describe('names', () => { + test('from inner columns', () => { + const plan = compileToSyncPlanWithoutErrors([ + { name: 'a', queries: ['SELECT * FROM (SELECT id, name FROM users) AS subquery'] } + ]); + const sources = plan.dataSources; + expect(sources).toHaveLength(1); + + const [source] = sources; + expect(source.sourceTable.tablePattern).toBe('users'); + + expect(source.columns.map((e) => (e as any).alias)).toEqual(['id', 'name']); + }); + + test('from outer alias', () => { + const plan = compileToSyncPlanWithoutErrors([ + { name: 'a', queries: ['SELECT * FROM (SELECT id, name FROM users) AS subquery (my_id, custom_name)'] } + ]); + const sources = plan.dataSources; + expect(sources).toHaveLength(1); + + const [source] = sources; + expect(source.sourceTable.tablePattern).toBe('users'); + + expect(source.columns.map((e) => (e as any).alias)).toEqual(['my_id', 'custom_name']); + }); + }); + + describe('errors', () => { + test('select star', () => { + expect(compilationErrorsForSingleStream('SELECT 1 FROM (SELECT * FROM users) AS u')).toStrictEqual([ + { + message: '* columns are not allowed in subqueries or common table expressions', + source: '*' + }, + { + message: 'Must have a result column selecting from a table', + source: 'SELECT 1 FROM (SELECT * FROM users) AS u' + } + ]); + }); + + test('name vs colum mismatch', () => { + expect(compilationErrorsForSingleStream('SELECT a FROM (SELECT id FROM users) AS u (a, b, c)')).toStrictEqual([ + { + message: 'Expected this subquery to have 3 columns, it actually has 1', + source: '(SELECT id FROM users)' + } + ]); + }); + + test('duplicate column', () => { + expect(compilationErrorsForSingleStream('SELECT id FROM (SELECT id, a AS id FROM users) AS u')).toStrictEqual([ + { + message: "There is a column named 'id' already.", + source: 'a AS id' + } + ]); + }); + }); +}); From 89f8ffd444418e9187aaa567522add5fd7d71c62 Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Tue, 20 Jan 2026 15:57:35 +0100 Subject: [PATCH 2/6] Support common table expressions --- packages/sync-rules/src/compiler/compiler.ts | 31 +- packages/sync-rules/src/compiler/parser.ts | 44 ++- packages/sync-rules/src/compiler/scope.ts | 15 + packages/sync-rules/src/compiler/sqlite.ts | 4 +- .../compiler/__snapshots__/cte.test.ts.snap | 307 ++++++++++++++++++ .../sync-rules/test/src/compiler/cte.test.ts | 35 ++ .../sync-rules/test/src/compiler/utils.ts | 26 +- 7 files changed, 436 insertions(+), 26 deletions(-) create mode 100644 packages/sync-rules/test/src/compiler/__snapshots__/cte.test.ts.snap create mode 100644 packages/sync-rules/test/src/compiler/cte.test.ts diff --git a/packages/sync-rules/src/compiler/compiler.ts b/packages/sync-rules/src/compiler/compiler.ts index bc4e6a438..b8749c19f 100644 --- a/packages/sync-rules/src/compiler/compiler.ts +++ b/packages/sync-rules/src/compiler/compiler.ts @@ -8,6 +8,7 @@ import { QuerierGraphBuilder } from './querier_graph.js'; import { StreamQueryParser } from './parser.js'; import { NodeLocations } from './expression.js'; import { SqlScope } from './scope.js'; +import { PreparedSubquery } from './sqlite.js'; /** * State for compiling sync streams. @@ -24,9 +25,28 @@ import { SqlScope } from './scope.js'; */ export class SyncStreamsCompiler { readonly output = new CompiledStreamQueries(); + private readonly locations = new NodeLocations(); constructor(readonly defaultSchema: string) {} + /** + * Tries to parse the SQL query as a `SELECT` statement into a form supported for common table expressions. + * + * Returns null and reports errors if that fails. + */ + commonTableExpression(sql: string, errors: ParsingErrorListener): PreparedSubquery | null { + const parser = new StreamQueryParser({ + compiler: this, + originalText: sql, + locations: this.locations, + parentScope: new SqlScope({}), + errors + }); + + const [stmt] = parse(sql, { locationTracking: true }); + return parser.parseAsSubquery(stmt); + } + /** * Utility for compiling a sync stream. * @@ -37,12 +57,15 @@ export class SyncStreamsCompiler { const rootScope = new SqlScope({}); return { + registerCommonTableExpression: (name, cte) => { + rootScope.registerCommonTableExpression(name, cte); + }, addQuery: (sql: string, errors: ParsingErrorListener) => { const [stmt] = parse(sql, { locationTracking: true }); const parser = new StreamQueryParser({ compiler: this, originalText: sql, - locations: new NodeLocations(), + locations: this.locations, parentScope: rootScope, errors }); @@ -60,6 +83,12 @@ export class SyncStreamsCompiler { * Utility for compiling a single sync stream. */ export interface IndividualSyncStreamCompiler { + /** + * Makes a common table expression prepared through {@link SyncStreamsCompiler.commonTableExpression} available when + * parsing queries for this stream. + */ + registerCommonTableExpression(name: string, cte: PreparedSubquery): void; + /** * Validates and adds a parameter query to this stream. * diff --git a/packages/sync-rules/src/compiler/parser.ts b/packages/sync-rules/src/compiler/parser.ts index 4b3c1a8f0..591b96992 100644 --- a/packages/sync-rules/src/compiler/parser.ts +++ b/packages/sync-rules/src/compiler/parser.ts @@ -211,7 +211,7 @@ export class StreamQueryParser { return { resultColumns, - tables: [...this.resultSets.values()], + tables: this.resultSets, where: this.where.length == 0 ? null : composeExpressionNodes(this.nodeLocations, 'and', this.where) }; } else { @@ -263,21 +263,42 @@ export class StreamQueryParser { this.where.push(this.parseExpression(expr).node); } + private addSubquery(source: SyntacticResultSetSource, subquery: PreparedSubquery) { + subquery.tables.forEach((v, k) => this.resultSets.set(k, v)); + if (subquery.where) { + this.where.push(subquery.where); + } + + this.subqueryResultSets.set(source, subquery); + } + private processFrom(from: From) { const scope = this.statementScope; if (from.type == 'table') { + const name = from.name.alias ?? from.name.name; const source = new SyntacticResultSetSource(from.name, from.name.alias ?? null); - const resultSet = new PhysicalSourceResultSet( - new TablePattern(from.name.schema ?? this.compiler.defaultSchema, from.name.name), - source - ); - scope.registerResultSet(this.errors, from.name.alias ?? from.name.name, source); - this.resultSets.set(source, resultSet); + scope.registerResultSet(this.errors, name, source); + + // If this references a CTE in scope, use that instead of names. + const cte = from.name.schema == null ? scope.resolveCommonTableExpression(from.name.name) : null; + if (cte) { + this.addSubquery(source, cte); + } else { + // Not a CTE, so treat it as a source database table. + const resultSet = new PhysicalSourceResultSet( + new TablePattern(from.name.schema ?? this.compiler.defaultSchema, from.name.name), + source + ); + + this.resultSets.set(source, resultSet); + } } else if (from.type == 'call') { const source = new SyntacticResultSetSource(from, from.alias?.name ?? null); scope.registerResultSet(this.errors, from.alias?.name ?? from.function.name, source); this.resultSets.set(source, this.resolveTableValued(from, source)); } else if (from.type == 'statement') { + const source = new SyntacticResultSetSource(from, from.alias); + const parseInner = this.nestedParser(this.statementScope); const parsedSubquery = parseInner.parseAsSubquery( from.statement, @@ -285,15 +306,8 @@ export class StreamQueryParser { ); if (parsedSubquery) { - parseInner.resultSets.forEach((v, k) => this.resultSets.set(k, v)); - - if (parsedSubquery.where) { - this.where.push(parsedSubquery.where); - } - - const source = new SyntacticResultSetSource(from, from.alias); scope.registerResultSet(this.errors, from.alias, source); - this.subqueryResultSets.set(source, parsedSubquery); + this.addSubquery(source, parsedSubquery); } } diff --git a/packages/sync-rules/src/compiler/scope.ts b/packages/sync-rules/src/compiler/scope.ts index c782239be..1b3c392dd 100644 --- a/packages/sync-rules/src/compiler/scope.ts +++ b/packages/sync-rules/src/compiler/scope.ts @@ -1,5 +1,6 @@ import { SyntacticResultSetSource } from './table.js'; import { ParsingErrorListener } from './compiler.js'; +import { PreparedSubquery } from './sqlite.js'; /** * Utilities for resolving references in SQL statements where multiple tables are in scope. @@ -9,6 +10,7 @@ import { ParsingErrorListener } from './compiler.js'; export class SqlScope { readonly parent?: SqlScope; private readonly nameToResultSet = new Map(); + private readonly commonTableExpressions = new Map(); constructor(options: { parent?: SqlScope }) { this.parent = options.parent; @@ -40,4 +42,17 @@ export class SqlScope { resolveResultSetForReference(name: string): SyntacticResultSetSource | undefined { return this.nameToResultSet.get(name.toLowerCase()) ?? this.parent?.resolveResultSetForReference(name); } + + registerCommonTableExpression(name: string, subquery: PreparedSubquery) { + this.commonTableExpressions.set(name, subquery); + } + + resolveCommonTableExpression(name: string): PreparedSubquery | null { + const inThisScope = this.commonTableExpressions.get(name); + if (inThisScope) { + return inThisScope; + } + + return this.parent ? this.parent.resolveCommonTableExpression(name) : null; + } } diff --git a/packages/sync-rules/src/compiler/sqlite.ts b/packages/sync-rules/src/compiler/sqlite.ts index 75fd812dd..b917b1064 100644 --- a/packages/sync-rules/src/compiler/sqlite.ts +++ b/packages/sync-rules/src/compiler/sqlite.ts @@ -19,7 +19,7 @@ import { } from '../sync_plan/expression.js'; import { ConnectionParameterSource } from '../sync_plan/plan.js'; import { ParsingErrorListener } from './compiler.js'; -import { BaseSourceResultSet, SourceResultSet } from './table.js'; +import { BaseSourceResultSet, SourceResultSet, SyntacticResultSetSource } from './table.js'; export interface ResolvedSubqueryExpression { filters: SqlExpression[]; @@ -38,7 +38,7 @@ export interface PreparedSubquery { /** * Tables the subquery selects from. */ - tables: SourceResultSet[]; + tables: Map; /** * Filters affecting the subquery. diff --git a/packages/sync-rules/test/src/compiler/__snapshots__/cte.test.ts.snap b/packages/sync-rules/test/src/compiler/__snapshots__/cte.test.ts.snap new file mode 100644 index 000000000..845d8f8cf --- /dev/null +++ b/packages/sync-rules/test/src/compiler/__snapshots__/cte.test.ts.snap @@ -0,0 +1,307 @@ +// Vitest Snapshot v1, https://vitest.dev/guide/snapshot.html + +exports[`common table expressions > as data source 1`] = ` +{ + "buckets": [ + { + "hash": 385206737, + "sources": [ + 0, + ], + "uniqueName": "stream|0", + }, + ], + "dataSources": [ + { + "columns": [ + { + "alias": "id", + "expr": { + "source": { + "column": "id", + }, + "type": "data", + }, + }, + { + "alias": "name", + "expr": { + "source": { + "column": "name", + }, + "type": "data", + }, + }, + ], + "filters": [], + "hash": 148949426, + "outputTableName": "organisations", + "partitionBy": [ + { + "expr": { + "source": { + "column": "id", + }, + "type": "data", + }, + }, + ], + "table": { + "connection": "default", + "schema": "test_schema", + "table": "organisations", + }, + }, + ], + "parameterIndexes": [ + { + "filters": [], + "hash": 482282063, + "output": [ + { + "source": { + "column": "org_id", + }, + "type": "data", + }, + ], + "partitionBy": [ + { + "expr": { + "source": { + "column": "user_id", + }, + "type": "data", + }, + }, + ], + "table": { + "connection": "default", + "schema": "test_schema", + "table": "org_memberships", + }, + }, + ], + "streams": [ + { + "queriers": [ + { + "bucket": 0, + "lookupStages": [ + [ + { + "instantiation": [ + { + "expr": { + "function": "->>", + "parameters": [ + { + "source": { + "request": "auth", + }, + "type": "data", + }, + { + "type": "lit_string", + "value": "$.sub", + }, + ], + "type": "function", + }, + "type": "request", + }, + ], + "lookup": 0, + "type": "parameter", + }, + ], + ], + "requestFilters": [], + "sourceInstantiation": [ + { + "lookup": { + "idInStage": 0, + "stageId": 0, + }, + "resultIndex": 0, + "type": "lookup", + }, + ], + }, + ], + "stream": { + "isSubscribedByDefault": true, + "name": "stream", + "priority": 3, + }, + }, + ], + "version": "unstable", +} +`; + +exports[`common table expressions > as parameter query 1`] = ` +{ + "buckets": [ + { + "hash": 402167552, + "sources": [ + 0, + ], + "uniqueName": "stream|0", + }, + ], + "dataSources": [ + { + "columns": [ + "star", + ], + "filters": [], + "hash": 151623292, + "outputTableName": "projects", + "partitionBy": [ + { + "expr": { + "source": { + "column": "org_id", + }, + "type": "data", + }, + }, + ], + "table": { + "connection": "default", + "schema": "test_schema", + "table": "projects", + }, + }, + ], + "parameterIndexes": [ + { + "filters": [], + "hash": 482282063, + "output": [ + { + "source": { + "column": "org_id", + }, + "type": "data", + }, + ], + "partitionBy": [ + { + "expr": { + "source": { + "column": "user_id", + }, + "type": "data", + }, + }, + ], + "table": { + "connection": "default", + "schema": "test_schema", + "table": "org_memberships", + }, + }, + { + "filters": [], + "hash": 158472400, + "output": [ + { + "source": { + "column": "id", + }, + "type": "data", + }, + ], + "partitionBy": [ + { + "expr": { + "source": { + "column": "id", + }, + "type": "data", + }, + }, + ], + "table": { + "connection": "default", + "schema": "test_schema", + "table": "organisations", + }, + }, + ], + "streams": [ + { + "queriers": [ + { + "bucket": 0, + "lookupStages": [ + [ + { + "instantiation": [ + { + "expr": { + "function": "->>", + "parameters": [ + { + "source": { + "request": "auth", + }, + "type": "data", + }, + { + "type": "lit_string", + "value": "$.sub", + }, + ], + "type": "function", + }, + "type": "request", + }, + ], + "lookup": 0, + "type": "parameter", + }, + ], + [ + { + "instantiation": [ + { + "lookup": { + "idInStage": 0, + "stageId": 0, + }, + "resultIndex": 0, + "type": "lookup", + }, + ], + "lookup": 1, + "type": "parameter", + }, + ], + ], + "requestFilters": [], + "sourceInstantiation": [ + { + "lookup": { + "idInStage": 0, + "stageId": 1, + }, + "resultIndex": 0, + "type": "lookup", + }, + ], + }, + ], + "stream": { + "isSubscribedByDefault": true, + "name": "stream", + "priority": 3, + }, + }, + ], + "version": "unstable", +} +`; diff --git a/packages/sync-rules/test/src/compiler/cte.test.ts b/packages/sync-rules/test/src/compiler/cte.test.ts new file mode 100644 index 000000000..fde34ea17 --- /dev/null +++ b/packages/sync-rules/test/src/compiler/cte.test.ts @@ -0,0 +1,35 @@ +import { describe, expect, test } from 'vitest'; +import { compileToSyncPlanWithoutErrors } from './utils.js'; +import { serializeSyncPlan } from '../../../src/index.js'; + +describe('common table expressions', () => { + test('as data source', () => { + const plan = compileToSyncPlanWithoutErrors([ + { + name: 'stream', + ctes: { + org_of_user: `SELECT id, name FROM organisations + WHERE id IN (SELECT org_id FROM org_memberships WHERE user_id = auth.user_id())` + }, + queries: ['SELECT * FROM org_of_user;'] + } + ]); + + expect(serializeSyncPlan(plan)).toMatchSnapshot(); + }); + + test('as parameter query', () => { + const plan = compileToSyncPlanWithoutErrors([ + { + name: 'stream', + ctes: { + org_of_user: `SELECT id, name FROM organisations + WHERE id IN (SELECT org_id FROM org_memberships WHERE user_id = auth.user_id())` + }, + queries: ['SELECT * FROM projects WHERE org_id IN (SELECT id FROM org_of_user);'] + } + ]); + + expect(serializeSyncPlan(plan)).toMatchSnapshot(); + }); +}); diff --git a/packages/sync-rules/test/src/compiler/utils.ts b/packages/sync-rules/test/src/compiler/utils.ts index c252ea6e1..81b55457b 100644 --- a/packages/sync-rules/test/src/compiler/utils.ts +++ b/packages/sync-rules/test/src/compiler/utils.ts @@ -11,6 +11,7 @@ import { interface SyncStreamInput { name: string; queries: string[]; + ctes?: Record; } interface TranslationError { @@ -51,18 +52,27 @@ export function compileToSyncPlan(inputs: SyncStreamInput[]): [TranslationError[ const compiler = new SyncStreamsCompiler('test_schema'); const errors: TranslationError[] = []; + function errorListenerOnSql(sql: string): ParsingErrorListener { + return { + report(message, location) { + const resolved = getLocation(location); + errors.push({ message, source: sql.substring(resolved?.start ?? 0, resolved?.end) }); + } + }; + } + for (const input of inputs) { const builder = compiler.stream({ name: input.name, isSubscribedByDefault: true, priority: 3 }); - for (const sql of input.queries) { - const listener: ParsingErrorListener = { - report(message, location) { - const resolved = getLocation(location); - errors.push({ message, source: sql.substring(resolved?.start ?? 0, resolved?.end) }); - } - }; + for (const [name, sql] of Object.entries(input.ctes ?? {})) { + const cte = compiler.commonTableExpression(sql, errorListenerOnSql(sql)); + if (cte) { + builder.registerCommonTableExpression(name, cte); + } + } - builder.addQuery(sql, listener); + for (const sql of input.queries) { + builder.addQuery(sql, errorListenerOnSql(sql)); } builder.finish(); From e9f99abba824f63eb0c03814946962354eba19ee Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Tue, 20 Jan 2026 16:39:11 +0100 Subject: [PATCH 3/6] Support short-hand syntax --- packages/sync-rules/src/compiler/parser.ts | 3 +- packages/sync-rules/src/compiler/scope.ts | 4 +-- packages/sync-rules/src/compiler/sqlite.ts | 32 +++++++++++++++++ .../sync-rules/test/src/compiler/cte.test.ts | 36 ++++++++++++++++++- 4 files changed, 71 insertions(+), 4 deletions(-) diff --git a/packages/sync-rules/src/compiler/parser.ts b/packages/sync-rules/src/compiler/parser.ts index 591b96992..cce4434a5 100644 --- a/packages/sync-rules/src/compiler/parser.ts +++ b/packages/sync-rules/src/compiler/parser.ts @@ -119,6 +119,7 @@ export class StreamQueryParser { originalText: this.originalText, errors: this.errors, locations: this.nodeLocations, + scope: this.statementScope, resolveTableName: this.resolveTableName.bind(this), generateTableAlias: () => { const counter = this.syntheticSubqueryCounter++; @@ -294,8 +295,8 @@ export class StreamQueryParser { } } else if (from.type == 'call') { const source = new SyntacticResultSetSource(from, from.alias?.name ?? null); - scope.registerResultSet(this.errors, from.alias?.name ?? from.function.name, source); this.resultSets.set(source, this.resolveTableValued(from, source)); + scope.registerResultSet(this.errors, from.alias?.name ?? from.function.name, source); } else if (from.type == 'statement') { const source = new SyntacticResultSetSource(from, from.alias); diff --git a/packages/sync-rules/src/compiler/scope.ts b/packages/sync-rules/src/compiler/scope.ts index 1b3c392dd..63b42843c 100644 --- a/packages/sync-rules/src/compiler/scope.ts +++ b/packages/sync-rules/src/compiler/scope.ts @@ -44,11 +44,11 @@ export class SqlScope { } registerCommonTableExpression(name: string, subquery: PreparedSubquery) { - this.commonTableExpressions.set(name, subquery); + this.commonTableExpressions.set(name.toLowerCase(), subquery); } resolveCommonTableExpression(name: string): PreparedSubquery | null { - const inThisScope = this.commonTableExpressions.get(name); + const inThisScope = this.commonTableExpressions.get(name.toLowerCase()); if (inThisScope) { return inThisScope; } diff --git a/packages/sync-rules/src/compiler/sqlite.ts b/packages/sync-rules/src/compiler/sqlite.ts index b917b1064..79c6aa68c 100644 --- a/packages/sync-rules/src/compiler/sqlite.ts +++ b/packages/sync-rules/src/compiler/sqlite.ts @@ -20,6 +20,7 @@ import { import { ConnectionParameterSource } from '../sync_plan/plan.js'; import { ParsingErrorListener } from './compiler.js'; import { BaseSourceResultSet, SourceResultSet, SyntacticResultSetSource } from './table.js'; +import { SqlScope } from './scope.js'; export interface ResolvedSubqueryExpression { filters: SqlExpression[]; @@ -48,6 +49,7 @@ export interface PreparedSubquery { export interface PostgresToSqliteOptions { readonly originalText: string; + readonly scope: SqlScope; readonly errors: ParsingErrorListener; readonly locations: NodeLocations; @@ -345,6 +347,14 @@ export class PostgresToSqlite { } else if (right.type == 'call' && right.function.name.toLowerCase() == 'row') { return this.desugarInValues(negated, expr.left, right.args); } else { + if (right.type == 'ref' && right.table == null) { + const cte = this.options.scope.resolveCommonTableExpression(right.name); + if (cte) { + // Something of the form x IN $cte. + return this.desugarInCte(negated, expr, right.name, cte); + } + } + return this.desugarInScalar(negated, expr, right); } } @@ -403,6 +413,28 @@ export class PostgresToSqlite { }); } + /** + * Desugar `$left IN cteName` to `$left IN (SELECT cteName.onlyColumn FROM cteName)`. + */ + private desugarInCte( + negated: boolean, + binary: ExprBinary, + cteName: string, + cte: PreparedSubquery + ): SqlExpression { + const columns = Object.keys(cte.resultColumns); + if (columns.length != 1) { + return this.invalidExpression(binary.right, 'Common-table expression must return a single column'); + } + + const name = columns[0]; + return this.desugarInSubquery(negated, binary, { + type: 'select', + columns: [{ expr: { type: 'ref', name, table: { name: cteName } } }], + from: [{ type: 'table', name: { name: cteName } }] + }); + } + private translateRequestParameter(source: ConnectionParameterSource, expr: ExprCall): SqlExpression { const parameter = new ConnectionParameter(expr, source); const replacement: SqlExpression = { diff --git a/packages/sync-rules/test/src/compiler/cte.test.ts b/packages/sync-rules/test/src/compiler/cte.test.ts index fde34ea17..c37a4702d 100644 --- a/packages/sync-rules/test/src/compiler/cte.test.ts +++ b/packages/sync-rules/test/src/compiler/cte.test.ts @@ -1,5 +1,5 @@ import { describe, expect, test } from 'vitest'; -import { compileToSyncPlanWithoutErrors } from './utils.js'; +import { compileToSyncPlan, compileToSyncPlanWithoutErrors } from './utils.js'; import { serializeSyncPlan } from '../../../src/index.js'; describe('common table expressions', () => { @@ -31,5 +31,39 @@ describe('common table expressions', () => { ]); expect(serializeSyncPlan(plan)).toMatchSnapshot(); + + const withDirectReference = compileToSyncPlanWithoutErrors([ + { + name: 'stream', + ctes: { + org_of_user: `SELECT id FROM organisations + WHERE id IN (SELECT org_id FROM org_memberships WHERE user_id = auth.user_id())` + }, + queries: ['SELECT * FROM projects WHERE org_id IN org_of_user'] + } + ]); + expect(serializeSyncPlan(withDirectReference)).toStrictEqual(serializeSyncPlan(plan)); + }); + + describe('errors', () => { + test('shorthand syntax for CTE with multiple columns', () => { + const [errors, _] = compileToSyncPlan([ + { + name: 'stream', + ctes: { + org_of_user: `SELECT id, name FROM organisations + WHERE id IN (SELECT org_id FROM org_memberships WHERE user_id = auth.user_id())` + }, + queries: ['SELECT * FROM projects WHERE org_id IN org_of_user'] + } + ]); + + expect(errors).toStrictEqual([ + { + message: 'Common-table expression must return a single column', + source: 'org_of_user' + } + ]); + }); }); }); From 1cce7279ce27c8ccae1f22acb13ae49b5ad7f2c2 Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Tue, 20 Jan 2026 16:47:25 +0100 Subject: [PATCH 4/6] Use root scope --- packages/sync-rules/src/compiler/parser.ts | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/packages/sync-rules/src/compiler/parser.ts b/packages/sync-rules/src/compiler/parser.ts index cce4434a5..b61c37c3e 100644 --- a/packages/sync-rules/src/compiler/parser.ts +++ b/packages/sync-rules/src/compiler/parser.ts @@ -300,7 +300,8 @@ export class StreamQueryParser { } else if (from.type == 'statement') { const source = new SyntacticResultSetSource(from, from.alias); - const parseInner = this.nestedParser(this.statementScope); + // For subqueries in FROM, existing expressions are not in scope. So fork from the root scope instead. + const parseInner = this.nestedParser(scope.rootScope); const parsedSubquery = parseInner.parseAsSubquery( from.statement, from.columnNames?.map((c) => c.name) From 903a1b36c1c3340c2b7669d25e77af43e56bb9ab Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Tue, 20 Jan 2026 17:22:31 +0100 Subject: [PATCH 5/6] Fix tests --- packages/sync-rules/test/src/compiler/sqlite.test.ts | 2 ++ 1 file changed, 2 insertions(+) diff --git a/packages/sync-rules/test/src/compiler/sqlite.test.ts b/packages/sync-rules/test/src/compiler/sqlite.test.ts index cc5dd2532..ff8e2f544 100644 --- a/packages/sync-rules/test/src/compiler/sqlite.test.ts +++ b/packages/sync-rules/test/src/compiler/sqlite.test.ts @@ -4,6 +4,7 @@ import { describe, expect, test } from 'vitest'; import { getLocation } from '../../../src/errors.js'; import { ExpressionToSqlite } from '../../../src/sync_plan/expression_to_sql.js'; import { NodeLocations } from '../../../src/compiler/expression.js'; +import { SqlScope } from '../../../src/compiler/scope.js'; describe('sqlite conversion', () => { test('literals', () => { @@ -189,6 +190,7 @@ function translate(source: string): [string, TranslationError[]] { const translator = new PostgresToSqlite({ originalText: source, + scope: new SqlScope({}), errors: { report: (message, location) => { const resolved = getLocation(location); From 3f810f9ac1e48ff6104aef8bcc8f86ffa2e0ce0d Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Thu, 29 Jan 2026 09:43:42 +0100 Subject: [PATCH 6/6] Document that CTEs can't reference CTEs --- packages/sync-rules/src/compiler/compiler.ts | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/packages/sync-rules/src/compiler/compiler.ts b/packages/sync-rules/src/compiler/compiler.ts index b8749c19f..6d2d0b74c 100644 --- a/packages/sync-rules/src/compiler/compiler.ts +++ b/packages/sync-rules/src/compiler/compiler.ts @@ -32,6 +32,11 @@ export class SyncStreamsCompiler { /** * Tries to parse the SQL query as a `SELECT` statement into a form supported for common table expressions. * + * Common table expressions are parsed and validated independently and without a shared scope, meaning that CTEs are + * not allowed to reference other CTEs. This limitation is deliberate, but we _could_ support it (referenced CTEs + * would just get inlined into the new CTE by the parser). So we can revisit this and potentially support that in the + * future. + * * Returns null and reports errors if that fails. */ commonTableExpression(sql: string, errors: ParsingErrorListener): PreparedSubquery | null {