diff --git a/packages/sync-rules/src/compiler/compiler.ts b/packages/sync-rules/src/compiler/compiler.ts index 1bcf2228a..6d2d0b74c 100644 --- a/packages/sync-rules/src/compiler/compiler.ts +++ b/packages/sync-rules/src/compiler/compiler.ts @@ -7,6 +7,8 @@ import { CompilerModelToSyncPlan } from './ir_to_sync_plan.js'; import { QuerierGraphBuilder } from './querier_graph.js'; import { StreamQueryParser } from './parser.js'; import { NodeLocations } from './expression.js'; +import { SqlScope } from './scope.js'; +import { PreparedSubquery } from './sqlite.js'; /** * State for compiling sync streams. @@ -23,9 +25,33 @@ import { NodeLocations } from './expression.js'; */ export class SyncStreamsCompiler { readonly output = new CompiledStreamQueries(); + private readonly locations = new NodeLocations(); constructor(readonly defaultSchema: string) {} + /** + * Tries to parse the SQL query as a `SELECT` statement into a form supported for common table expressions. + * + * Common table expressions are parsed and validated independently and without a shared scope, meaning that CTEs are + * not allowed to reference other CTEs. This limitation is deliberate, but we _could_ support it (referenced CTEs + * would just get inlined into the new CTE by the parser). So we can revisit this and potentially support that in the + * future. + * + * Returns null and reports errors if that fails. + */ + commonTableExpression(sql: string, errors: ParsingErrorListener): PreparedSubquery | null { + const parser = new StreamQueryParser({ + compiler: this, + originalText: sql, + locations: this.locations, + parentScope: new SqlScope({}), + errors + }); + + const [stmt] = parse(sql, { locationTracking: true }); + return parser.parseAsSubquery(stmt); + } + /** * Utility for compiling a sync stream. * @@ -33,14 +59,19 @@ export class SyncStreamsCompiler { */ stream(options: StreamOptions): IndividualSyncStreamCompiler { const builder = new QuerierGraphBuilder(this, options); + const rootScope = new SqlScope({}); return { + registerCommonTableExpression: (name, cte) => { + rootScope.registerCommonTableExpression(name, cte); + }, addQuery: (sql: string, errors: ParsingErrorListener) => { const [stmt] = parse(sql, { locationTracking: true }); const parser = new StreamQueryParser({ compiler: this, originalText: sql, - locations: new NodeLocations(), + locations: this.locations, + parentScope: rootScope, errors }); const query = parser.parse(stmt); @@ -57,6 +88,12 @@ export class SyncStreamsCompiler { * Utility for compiling a single sync stream. */ export interface IndividualSyncStreamCompiler { + /** + * Makes a common table expression prepared through {@link SyncStreamsCompiler.commonTableExpression} available when + * parsing queries for this stream. + */ + registerCommonTableExpression(name: string, cte: PreparedSubquery): void; + /** * Validates and adds a parameter query to this stream. * diff --git a/packages/sync-rules/src/compiler/expression.ts b/packages/sync-rules/src/compiler/expression.ts index 83950cda3..bb331bb51 100644 --- a/packages/sync-rules/src/compiler/expression.ts +++ b/packages/sync-rules/src/compiler/expression.ts @@ -129,9 +129,9 @@ export class ConnectionParameter implements EqualsIgnoringResultSet { * in-memory map. */ export class NodeLocations { - readonly sourceForNode = new Map, PGNode | NodeLocation>(); + readonly sourceForNode = new Map, PGNode | NodeLocation>(); - locationFor(source: SqlExpression): NodeLocation { + locationFor(source: SqlExpression): NodeLocation { const location = getLocation(this.sourceForNode.get(source)); if (location == null) { throw new Error('Missing location'); diff --git a/packages/sync-rules/src/compiler/filter_simplifier.ts b/packages/sync-rules/src/compiler/filter_simplifier.ts index 5af8b5312..84db2ed46 100644 --- a/packages/sync-rules/src/compiler/filter_simplifier.ts +++ b/packages/sync-rules/src/compiler/filter_simplifier.ts @@ -1,12 +1,10 @@ import { And, BaseTerm, EqualsClause, isBaseTerm, Or, SingleDependencyExpression } from './filter.js'; -import { SyncExpression } from './expression.js'; +import { NodeLocations, SyncExpression } from './expression.js'; import { SourceResultSet } from './table.js'; -import { BinaryOperator } from '../sync_plan/expression.js'; +import { BinaryOperator, SqlExpression } from '../sync_plan/expression.js'; import { expandNodeLocations } from '../errors.js'; export class FilterConditionSimplifier { - constructor(private readonly originalText: string) {} - simplifyOr(or: Or): Or { const andTerms: And[] = []; let baseTerms: BaseTerm[] = []; @@ -116,18 +114,36 @@ export class FilterConditionSimplifier { throw new Error("Can't compose zero expressions"); } - const [first, ...rest] = terms; - const locations = first.expression.locations; - let inner = first.expression.node; - for (const additional of rest) { - inner = { type: 'binary', operator, left: inner, right: additional.expression.node }; - } - - const location = expandNodeLocations(terms.map((e) => e.expression.location)); - if (location) { - locations.sourceForNode.set(inner, location); - } + const locations = terms[0].expression.locations; + const inner = composeExpressionNodes( + locations, + operator, + terms.map((t) => t.expression.node) + ); return new SingleDependencyExpression(new SyncExpression(inner, locations)); } } + +export function composeExpressionNodes( + locations: NodeLocations, + operator: BinaryOperator, + terms: SqlExpression[] +) { + if (terms.length == 0) { + throw new Error("Can't compose zero expressions"); + } + + const [first, ...rest] = terms; + let inner = first; + for (const additional of rest) { + inner = { type: 'binary', operator, left: inner, right: additional }; + } + + const location = expandNodeLocations(terms.map((e) => locations.locationFor(e))); + if (location) { + locations.sourceForNode.set(inner, location); + } + + return inner; +} diff --git a/packages/sync-rules/src/compiler/parser.ts b/packages/sync-rules/src/compiler/parser.ts index 0bdf66a1e..b61c37c3e 100644 --- a/packages/sync-rules/src/compiler/parser.ts +++ b/packages/sync-rules/src/compiler/parser.ts @@ -1,5 +1,17 @@ -import { Expr, ExprCall, ExprRef, From, nil, NodeLocation, PGNode, SelectedColumn, Statement } from 'pgsql-ast-parser'; import { + Expr, + ExprCall, + ExprRef, + From, + nil, + NodeLocation, + PGNode, + SelectedColumn, + SelectFromStatement, + Statement +} from 'pgsql-ast-parser'; +import { + BaseSourceResultSet, PhysicalSourceResultSet, RequestTableValuedResultSet, SourceResultSet, @@ -19,11 +31,11 @@ import { } from './filter.js'; import { expandNodeLocations } from '../errors.js'; import { cartesianProduct } from '../streams/utils.js'; -import { PostgresToSqlite } from './sqlite.js'; +import { PostgresToSqlite, PreparedSubquery } from './sqlite.js'; import { SqlScope } from './scope.js'; import { ParsingErrorListener, SyncStreamsCompiler } from './compiler.js'; import { TablePattern } from '../TablePattern.js'; -import { FilterConditionSimplifier } from './filter_simplifier.js'; +import { composeExpressionNodes, FilterConditionSimplifier } from './filter_simplifier.js'; import { SqlExpression } from '../sync_plan/expression.js'; /** @@ -73,7 +85,7 @@ export interface StreamQueryParserOptions { compiler: SyncStreamsCompiler; originalText: string; errors: ParsingErrorListener; - parentScope?: SqlScope; + parentScope: SqlScope; locations: NodeLocations; } @@ -85,6 +97,8 @@ export class StreamQueryParser { // Note: This is not the same as SqlScope since some result sets are inlined from CTEs or subqueries. These are not in // scope, but we still add them here to correctly track dependencies. private readonly resultSets = new Map(); + private readonly subqueryResultSets = new Map(); + private readonly resultColumns: ColumnSource[] = []; private where: SqlExpression[] = []; @@ -105,6 +119,7 @@ export class StreamQueryParser { originalText: this.originalText, errors: this.errors, locations: this.nodeLocations, + scope: this.statementScope, resolveTableName: this.resolveTableName.bind(this), generateTableAlias: () => { const counter = this.syntheticSubqueryCounter++; @@ -112,13 +127,7 @@ export class StreamQueryParser { }, joinSubqueryExpression: (expr) => { // Independently analyze the inner query. - const parseInner = new StreamQueryParser({ - compiler: this.compiler, - originalText: this.originalText, - errors: this.errors, - parentScope: this.statementScope, - locations: this.nodeLocations - }); + const parseInner = this.nestedParser(this.statementScope); let success = parseInner.processAst(expr, { forSubquery: true }); if (!success) { return null; @@ -140,6 +149,7 @@ export class StreamQueryParser { return null; } + parseInner.resultSets.forEach((v, k) => this.resultSets.set(k, v)); return { filters: parseInner.where, output: parseInner.parseExpression(resultColumn).node }; } }); @@ -170,6 +180,56 @@ export class StreamQueryParser { } } + parseAsSubquery(stmt: Statement, columnNames?: string[]): PreparedSubquery | null { + if (this.processAst(stmt, { forSubquery: true })) { + const resultColumns: Record> = {}; + let columnCount = 0; + + for (const column of (stmt as SelectFromStatement).columns ?? []) { + if (column.expr.type == 'ref' && column.expr.name == '*') { + // We don't support * columns in subqueries. The reason is that we want to be able to parse queries without + // knowing the schema, so we can't know what * would resolve to. + this.errors.report('* columns are not allowed in subqueries or common table expressions', column.expr); + } else { + const name = (columnNames && columnNames[columnCount]) ?? this.inferColumnName(column); + const expr = this.parseExpression(column.expr); + + if (Object.hasOwn(resultColumns, name)) { + this.errors.report(`There is a column named '${name}' already.`, column); + } + + resultColumns[name] = expr.node; + columnCount++; + } + } + + if (columnNames && columnNames.length != columnCount) { + this.errors.report( + `Expected this subquery to have ${columnNames.length} columns, it actually has ${columnCount}`, + stmt + ); + } + + return { + resultColumns, + tables: this.resultSets, + where: this.where.length == 0 ? null : composeExpressionNodes(this.nodeLocations, 'and', this.where) + }; + } else { + return null; + } + } + + private nestedParser(parentScope: SqlScope): StreamQueryParser { + return new StreamQueryParser({ + compiler: this.compiler, + originalText: this.originalText, + errors: this.errors, + parentScope, + locations: this.nodeLocations + }); + } + /** * Process the AST of a statement, returning whether it's close enough to a valid select statement to be supported for * sync streams (allowing us to process an invalid statement further to perhaps collect more errors). @@ -204,32 +264,60 @@ export class StreamQueryParser { this.where.push(this.parseExpression(expr).node); } + private addSubquery(source: SyntacticResultSetSource, subquery: PreparedSubquery) { + subquery.tables.forEach((v, k) => this.resultSets.set(k, v)); + if (subquery.where) { + this.where.push(subquery.where); + } + + this.subqueryResultSets.set(source, subquery); + } + private processFrom(from: From) { const scope = this.statementScope; - let handled = false; if (from.type == 'table') { + const name = from.name.alias ?? from.name.name; const source = new SyntacticResultSetSource(from.name, from.name.alias ?? null); - const resultSet = new PhysicalSourceResultSet( - new TablePattern(from.name.schema ?? this.compiler.defaultSchema, from.name.name), - source - ); - scope.registerResultSet(this.errors, from.name.alias ?? from.name.name, source); - this.resultSets.set(source, resultSet); - handled = true; + scope.registerResultSet(this.errors, name, source); + + // If this references a CTE in scope, use that instead of names. + const cte = from.name.schema == null ? scope.resolveCommonTableExpression(from.name.name) : null; + if (cte) { + this.addSubquery(source, cte); + } else { + // Not a CTE, so treat it as a source database table. + const resultSet = new PhysicalSourceResultSet( + new TablePattern(from.name.schema ?? this.compiler.defaultSchema, from.name.name), + source + ); + + this.resultSets.set(source, resultSet); + } } else if (from.type == 'call') { const source = new SyntacticResultSetSource(from, from.alias?.name ?? null); - scope.registerResultSet(this.errors, from.alias?.name ?? from.function.name, source); this.resultSets.set(source, this.resolveTableValued(from, source)); - handled = true; + scope.registerResultSet(this.errors, from.alias?.name ?? from.function.name, source); } else if (from.type == 'statement') { - // TODO: We could technically allow selecting from subqueries once we support CTEs. + const source = new SyntacticResultSetSource(from, from.alias); + + // For subqueries in FROM, existing expressions are not in scope. So fork from the root scope instead. + const parseInner = this.nestedParser(scope.rootScope); + const parsedSubquery = parseInner.parseAsSubquery( + from.statement, + from.columnNames?.map((c) => c.name) + ); + + if (parsedSubquery) { + scope.registerResultSet(this.errors, from.alias, source); + this.addSubquery(source, parsedSubquery); + } } const join = from.join; - if (join && handled) { + if (join) { if (join.type != 'INNER JOIN') { // We only support inner joins. - this.warnUnsupported(join, join.type); + this.warnUnsupported(join, 'FULL JOIN'); } if (join.using) { @@ -242,10 +330,6 @@ export class StreamQueryParser { this.addAndTermToWhereClause(join.on); } } - - if (!handled) { - this.warnUnsupported(from, 'This source'); - } } private resolveTableValued(call: ExprCall, source: SyntacticResultSetSource): RequestTableValuedResultSet { @@ -267,6 +351,10 @@ export class StreamQueryParser { return new RequestTableValuedResultSet(call.function.name, resolvedArguments, source); } + private inferColumnName(column: SelectedColumn): string { + return column.alias?.name ?? this.originalText.substring(column._location!.start, column._location!.end); + } + private processResultColumns(stmt: PGNode, columns: SelectedColumn[]) { const selectsFrom = (source: SourceResultSet, node: PGNode) => { if (source instanceof PhysicalSourceResultSet) { @@ -283,40 +371,47 @@ export class StreamQueryParser { } }; - for (const column of columns) { - if (column.expr.type == 'ref' && column.expr.name == '*') { - const resolved = this.resolveTableName(column.expr, column.expr.table?.name); - if (resolved != null) { - selectsFrom(resolved, column.expr); + const addColumn = (expr: SyncExpression, name: string) => { + for (const dependency of expr.instantiation) { + if (dependency instanceof ColumnInRow) { + selectsFrom(dependency.resultSet, dependency.syntacticOrigin); + } else { + this.errors.report( + 'This attempts to sync a connection parameter. Only values from the source database can be synced.', + dependency.syntacticOrigin + ); } + } - this.resultColumns.push(StarColumnSource.instance); - } else { - const expr = this.parseExpression(column.expr); - - for (const dependency of expr.instantiation) { - if (dependency instanceof ColumnInRow) { - selectsFrom(dependency.resultSet, dependency.syntacticOrigin); - } else { - this.errors.report( - 'This attempts to sync a connection parameter. Only values from the source database can be synced.', - dependency.syntacticOrigin - ); - } + try { + this.resultColumns.push(new ExpressionColumnSource(new RowExpression(expr), name)); + } catch (e) { + if (e instanceof InvalidExpressionError) { + // Invalid dependencies, we've already logged errors for this. Ignore. + } else { + throw e; } + } + }; - try { - const outputName = - column.alias?.name ?? this.originalText.substring(column._location!.start, column._location!.end); - - this.resultColumns.push(new ExpressionColumnSource(new RowExpression(expr), outputName)); - } catch (e) { - if (e instanceof InvalidExpressionError) { - // Invalid dependencies, we've already logged errors for this. Ignore. + for (const column of columns) { + if (column.expr.type == 'ref' && column.expr.name == '*') { + const resolved = this.resolveTableName(column.expr, column.expr.table?.name); + if (resolved != null) { + if (resolved instanceof BaseSourceResultSet) { + selectsFrom(resolved, column.expr); + this.resultColumns.push(StarColumnSource.instance); } else { - throw e; + // Selecting from a subquery, add all columns. + for (const [name, column] of Object.entries(resolved.resultColumns)) { + addColumn(new SyncExpression(column, this.nodeLocations), name); + } } } + } else { + const expr = this.parseExpression(column.expr); + const outputName = this.inferColumnName(column); + addColumn(expr, outputName); } } @@ -329,7 +424,7 @@ export class StreamQueryParser { return this.exprParser.translateExpression(source); } - resolveTableName(node: ExprRef, name: string | nil): SourceResultSet | null { + private resolveTableName(node: ExprRef, name: string | nil): SourceResultSet | PreparedSubquery | null { if (name == null) { // For unqualified references, there must be a single table in scope. We don't allow unqualified references if // there are multiple tables because we don't know which column is available in which table with certainty (and @@ -337,7 +432,7 @@ export class StreamQueryParser { // references. const resultSets = this.statementScope.resultSets; if (resultSets.length == 1) { - return this.resultSets.get(resultSets[0])!; + return this.resolveSoure(resultSets[0]); } else { this.errors.report('Invalid unqualified reference since multiple tables are in scope', node); return null; @@ -349,10 +444,20 @@ export class StreamQueryParser { return null; } - return this.resultSets.get(result)!; + return this.resolveSoure(result); } } + private resolveSoure(source: SyntacticResultSetSource): SourceResultSet | PreparedSubquery { + if (this.resultSets.has(source)) { + return this.resultSets.get(source)!; + } else if (this.subqueryResultSets.has(source)) { + return this.subqueryResultSets.get(source)!; + } + + throw new Error('internal error: result set from scope has not been registered'); + } + private warnUnsupported(node: PGNode | PGNode[] | nil, description: string) { if (node != null) { let location: PGNode | NodeLocation; @@ -402,7 +507,7 @@ export class StreamQueryParser { }; }); - return new FilterConditionSimplifier(this.originalText).simplifyOr({ terms: mappedTerms }); + return new FilterConditionSimplifier().simplifyOr({ terms: mappedTerms }); } private mapBaseExpression(pending: PendingBaseTerm): BaseTerm { diff --git a/packages/sync-rules/src/compiler/scope.ts b/packages/sync-rules/src/compiler/scope.ts index 3ed310b36..63b42843c 100644 --- a/packages/sync-rules/src/compiler/scope.ts +++ b/packages/sync-rules/src/compiler/scope.ts @@ -1,5 +1,6 @@ import { SyntacticResultSetSource } from './table.js'; import { ParsingErrorListener } from './compiler.js'; +import { PreparedSubquery } from './sqlite.js'; /** * Utilities for resolving references in SQL statements where multiple tables are in scope. @@ -9,11 +10,21 @@ import { ParsingErrorListener } from './compiler.js'; export class SqlScope { readonly parent?: SqlScope; private readonly nameToResultSet = new Map(); + private readonly commonTableExpressions = new Map(); constructor(options: { parent?: SqlScope }) { this.parent = options.parent; } + get rootScope(): SqlScope { + let maybeRoot: SqlScope = this; + while (maybeRoot.parent) { + maybeRoot = maybeRoot.parent; + } + + return maybeRoot; + } + get resultSets(): SyntacticResultSetSource[] { return [...this.nameToResultSet.values()]; } @@ -31,4 +42,17 @@ export class SqlScope { resolveResultSetForReference(name: string): SyntacticResultSetSource | undefined { return this.nameToResultSet.get(name.toLowerCase()) ?? this.parent?.resolveResultSetForReference(name); } + + registerCommonTableExpression(name: string, subquery: PreparedSubquery) { + this.commonTableExpressions.set(name.toLowerCase(), subquery); + } + + resolveCommonTableExpression(name: string): PreparedSubquery | null { + const inThisScope = this.commonTableExpressions.get(name.toLowerCase()); + if (inThisScope) { + return inThisScope; + } + + return this.parent ? this.parent.resolveCommonTableExpression(name) : null; + } } diff --git a/packages/sync-rules/src/compiler/sqlite.ts b/packages/sync-rules/src/compiler/sqlite.ts index b0a268ff6..79c6aa68c 100644 --- a/packages/sync-rules/src/compiler/sqlite.ts +++ b/packages/sync-rules/src/compiler/sqlite.ts @@ -19,15 +19,37 @@ import { } from '../sync_plan/expression.js'; import { ConnectionParameterSource } from '../sync_plan/plan.js'; import { ParsingErrorListener } from './compiler.js'; -import { SourceResultSet } from './table.js'; +import { BaseSourceResultSet, SourceResultSet, SyntacticResultSetSource } from './table.js'; +import { SqlScope } from './scope.js'; export interface ResolvedSubqueryExpression { filters: SqlExpression[]; output: SqlExpression; } +/** + * A prepared subquery or common table expression. + */ +export interface PreparedSubquery { + /** + * Columns selected by the query, indexed by their name. + */ + resultColumns: Record>; + + /** + * Tables the subquery selects from. + */ + tables: Map; + + /** + * Filters affecting the subquery. + */ + where: SqlExpression | null; +} + export interface PostgresToSqliteOptions { readonly originalText: string; + readonly scope: SqlScope; readonly errors: ParsingErrorListener; readonly locations: NodeLocations; @@ -36,7 +58,7 @@ export interface PostgresToSqliteOptions { * * Should report an error if resolving the table failed, using `node` as the source location for the error. */ - resolveTableName(node: ExprRef, name: string | nil): SourceResultSet | null; + resolveTableName(node: ExprRef, name: string | nil): SourceResultSet | PreparedSubquery | null; /** * Generates a table alias for synthetic subqueries like those generated to desugar `IN` expressions to `json_each` @@ -104,11 +126,22 @@ export class PostgresToSqlite { return this.invalidExpression(expr, '* columns are not supported here'); } - const instantiation = new ColumnInRow(expr, resultSet, expr.name); - return { - type: 'data', - source: instantiation - }; + if (resultSet instanceof BaseSourceResultSet) { + // This is an actual result set. + const instantiation = new ColumnInRow(expr, resultSet, expr.name); + return { + type: 'data', + source: instantiation + }; + } else { + // Resolved to a subquery, inline the reference. + const expression = resultSet.resultColumns[expr.name]; + if (expression == null) { + return this.invalidExpression(expr, 'Column not found in subquery.'); + } + + return expression; + } } case 'parameter': return this.invalidExpression( @@ -314,6 +347,14 @@ export class PostgresToSqlite { } else if (right.type == 'call' && right.function.name.toLowerCase() == 'row') { return this.desugarInValues(negated, expr.left, right.args); } else { + if (right.type == 'ref' && right.table == null) { + const cte = this.options.scope.resolveCommonTableExpression(right.name); + if (cte) { + // Something of the form x IN $cte. + return this.desugarInCte(negated, expr, right.name, cte); + } + } + return this.desugarInScalar(negated, expr, right); } } @@ -372,6 +413,28 @@ export class PostgresToSqlite { }); } + /** + * Desugar `$left IN cteName` to `$left IN (SELECT cteName.onlyColumn FROM cteName)`. + */ + private desugarInCte( + negated: boolean, + binary: ExprBinary, + cteName: string, + cte: PreparedSubquery + ): SqlExpression { + const columns = Object.keys(cte.resultColumns); + if (columns.length != 1) { + return this.invalidExpression(binary.right, 'Common-table expression must return a single column'); + } + + const name = columns[0]; + return this.desugarInSubquery(negated, binary, { + type: 'select', + columns: [{ expr: { type: 'ref', name, table: { name: cteName } } }], + from: [{ type: 'table', name: { name: cteName } }] + }); + } + private translateRequestParameter(source: ConnectionParameterSource, expr: ExprCall): SqlExpression { const parameter = new ConnectionParameter(expr, source); const replacement: SqlExpression = { diff --git a/packages/sync-rules/src/compiler/table.ts b/packages/sync-rules/src/compiler/table.ts index f387d9896..73d7f8148 100644 --- a/packages/sync-rules/src/compiler/table.ts +++ b/packages/sync-rules/src/compiler/table.ts @@ -27,7 +27,7 @@ export class SyntacticResultSetSource { ) {} } -abstract class BaseSourceResultSet { +export abstract class BaseSourceResultSet { constructor(readonly source: SyntacticResultSetSource) {} abstract get description(): string; diff --git a/packages/sync-rules/test/src/compiler/__snapshots__/cte.test.ts.snap b/packages/sync-rules/test/src/compiler/__snapshots__/cte.test.ts.snap new file mode 100644 index 000000000..845d8f8cf --- /dev/null +++ b/packages/sync-rules/test/src/compiler/__snapshots__/cte.test.ts.snap @@ -0,0 +1,307 @@ +// Vitest Snapshot v1, https://vitest.dev/guide/snapshot.html + +exports[`common table expressions > as data source 1`] = ` +{ + "buckets": [ + { + "hash": 385206737, + "sources": [ + 0, + ], + "uniqueName": "stream|0", + }, + ], + "dataSources": [ + { + "columns": [ + { + "alias": "id", + "expr": { + "source": { + "column": "id", + }, + "type": "data", + }, + }, + { + "alias": "name", + "expr": { + "source": { + "column": "name", + }, + "type": "data", + }, + }, + ], + "filters": [], + "hash": 148949426, + "outputTableName": "organisations", + "partitionBy": [ + { + "expr": { + "source": { + "column": "id", + }, + "type": "data", + }, + }, + ], + "table": { + "connection": "default", + "schema": "test_schema", + "table": "organisations", + }, + }, + ], + "parameterIndexes": [ + { + "filters": [], + "hash": 482282063, + "output": [ + { + "source": { + "column": "org_id", + }, + "type": "data", + }, + ], + "partitionBy": [ + { + "expr": { + "source": { + "column": "user_id", + }, + "type": "data", + }, + }, + ], + "table": { + "connection": "default", + "schema": "test_schema", + "table": "org_memberships", + }, + }, + ], + "streams": [ + { + "queriers": [ + { + "bucket": 0, + "lookupStages": [ + [ + { + "instantiation": [ + { + "expr": { + "function": "->>", + "parameters": [ + { + "source": { + "request": "auth", + }, + "type": "data", + }, + { + "type": "lit_string", + "value": "$.sub", + }, + ], + "type": "function", + }, + "type": "request", + }, + ], + "lookup": 0, + "type": "parameter", + }, + ], + ], + "requestFilters": [], + "sourceInstantiation": [ + { + "lookup": { + "idInStage": 0, + "stageId": 0, + }, + "resultIndex": 0, + "type": "lookup", + }, + ], + }, + ], + "stream": { + "isSubscribedByDefault": true, + "name": "stream", + "priority": 3, + }, + }, + ], + "version": "unstable", +} +`; + +exports[`common table expressions > as parameter query 1`] = ` +{ + "buckets": [ + { + "hash": 402167552, + "sources": [ + 0, + ], + "uniqueName": "stream|0", + }, + ], + "dataSources": [ + { + "columns": [ + "star", + ], + "filters": [], + "hash": 151623292, + "outputTableName": "projects", + "partitionBy": [ + { + "expr": { + "source": { + "column": "org_id", + }, + "type": "data", + }, + }, + ], + "table": { + "connection": "default", + "schema": "test_schema", + "table": "projects", + }, + }, + ], + "parameterIndexes": [ + { + "filters": [], + "hash": 482282063, + "output": [ + { + "source": { + "column": "org_id", + }, + "type": "data", + }, + ], + "partitionBy": [ + { + "expr": { + "source": { + "column": "user_id", + }, + "type": "data", + }, + }, + ], + "table": { + "connection": "default", + "schema": "test_schema", + "table": "org_memberships", + }, + }, + { + "filters": [], + "hash": 158472400, + "output": [ + { + "source": { + "column": "id", + }, + "type": "data", + }, + ], + "partitionBy": [ + { + "expr": { + "source": { + "column": "id", + }, + "type": "data", + }, + }, + ], + "table": { + "connection": "default", + "schema": "test_schema", + "table": "organisations", + }, + }, + ], + "streams": [ + { + "queriers": [ + { + "bucket": 0, + "lookupStages": [ + [ + { + "instantiation": [ + { + "expr": { + "function": "->>", + "parameters": [ + { + "source": { + "request": "auth", + }, + "type": "data", + }, + { + "type": "lit_string", + "value": "$.sub", + }, + ], + "type": "function", + }, + "type": "request", + }, + ], + "lookup": 0, + "type": "parameter", + }, + ], + [ + { + "instantiation": [ + { + "lookup": { + "idInStage": 0, + "stageId": 0, + }, + "resultIndex": 0, + "type": "lookup", + }, + ], + "lookup": 1, + "type": "parameter", + }, + ], + ], + "requestFilters": [], + "sourceInstantiation": [ + { + "lookup": { + "idInStage": 0, + "stageId": 1, + }, + "resultIndex": 0, + "type": "lookup", + }, + ], + }, + ], + "stream": { + "isSubscribedByDefault": true, + "name": "stream", + "priority": 3, + }, + }, + ], + "version": "unstable", +} +`; diff --git a/packages/sync-rules/test/src/compiler/cte.test.ts b/packages/sync-rules/test/src/compiler/cte.test.ts new file mode 100644 index 000000000..c37a4702d --- /dev/null +++ b/packages/sync-rules/test/src/compiler/cte.test.ts @@ -0,0 +1,69 @@ +import { describe, expect, test } from 'vitest'; +import { compileToSyncPlan, compileToSyncPlanWithoutErrors } from './utils.js'; +import { serializeSyncPlan } from '../../../src/index.js'; + +describe('common table expressions', () => { + test('as data source', () => { + const plan = compileToSyncPlanWithoutErrors([ + { + name: 'stream', + ctes: { + org_of_user: `SELECT id, name FROM organisations + WHERE id IN (SELECT org_id FROM org_memberships WHERE user_id = auth.user_id())` + }, + queries: ['SELECT * FROM org_of_user;'] + } + ]); + + expect(serializeSyncPlan(plan)).toMatchSnapshot(); + }); + + test('as parameter query', () => { + const plan = compileToSyncPlanWithoutErrors([ + { + name: 'stream', + ctes: { + org_of_user: `SELECT id, name FROM organisations + WHERE id IN (SELECT org_id FROM org_memberships WHERE user_id = auth.user_id())` + }, + queries: ['SELECT * FROM projects WHERE org_id IN (SELECT id FROM org_of_user);'] + } + ]); + + expect(serializeSyncPlan(plan)).toMatchSnapshot(); + + const withDirectReference = compileToSyncPlanWithoutErrors([ + { + name: 'stream', + ctes: { + org_of_user: `SELECT id FROM organisations + WHERE id IN (SELECT org_id FROM org_memberships WHERE user_id = auth.user_id())` + }, + queries: ['SELECT * FROM projects WHERE org_id IN org_of_user'] + } + ]); + expect(serializeSyncPlan(withDirectReference)).toStrictEqual(serializeSyncPlan(plan)); + }); + + describe('errors', () => { + test('shorthand syntax for CTE with multiple columns', () => { + const [errors, _] = compileToSyncPlan([ + { + name: 'stream', + ctes: { + org_of_user: `SELECT id, name FROM organisations + WHERE id IN (SELECT org_id FROM org_memberships WHERE user_id = auth.user_id())` + }, + queries: ['SELECT * FROM projects WHERE org_id IN org_of_user'] + } + ]); + + expect(errors).toStrictEqual([ + { + message: 'Common-table expression must return a single column', + source: 'org_of_user' + } + ]); + }); + }); +}); diff --git a/packages/sync-rules/test/src/compiler/sqlite.test.ts b/packages/sync-rules/test/src/compiler/sqlite.test.ts index cc5dd2532..ff8e2f544 100644 --- a/packages/sync-rules/test/src/compiler/sqlite.test.ts +++ b/packages/sync-rules/test/src/compiler/sqlite.test.ts @@ -4,6 +4,7 @@ import { describe, expect, test } from 'vitest'; import { getLocation } from '../../../src/errors.js'; import { ExpressionToSqlite } from '../../../src/sync_plan/expression_to_sql.js'; import { NodeLocations } from '../../../src/compiler/expression.js'; +import { SqlScope } from '../../../src/compiler/scope.js'; describe('sqlite conversion', () => { test('literals', () => { @@ -189,6 +190,7 @@ function translate(source: string): [string, TranslationError[]] { const translator = new PostgresToSqlite({ originalText: source, + scope: new SqlScope({}), errors: { report: (message, location) => { const resolved = getLocation(location); diff --git a/packages/sync-rules/test/src/compiler/subqueries.test.ts b/packages/sync-rules/test/src/compiler/subqueries.test.ts new file mode 100644 index 000000000..f8f8b7962 --- /dev/null +++ b/packages/sync-rules/test/src/compiler/subqueries.test.ts @@ -0,0 +1,65 @@ +import { describe, expect, test } from 'vitest'; +import { compilationErrorsForSingleStream, compileToSyncPlanWithoutErrors } from './utils.js'; + +describe('subqueries', () => { + describe('names', () => { + test('from inner columns', () => { + const plan = compileToSyncPlanWithoutErrors([ + { name: 'a', queries: ['SELECT * FROM (SELECT id, name FROM users) AS subquery'] } + ]); + const sources = plan.dataSources; + expect(sources).toHaveLength(1); + + const [source] = sources; + expect(source.sourceTable.tablePattern).toBe('users'); + + expect(source.columns.map((e) => (e as any).alias)).toEqual(['id', 'name']); + }); + + test('from outer alias', () => { + const plan = compileToSyncPlanWithoutErrors([ + { name: 'a', queries: ['SELECT * FROM (SELECT id, name FROM users) AS subquery (my_id, custom_name)'] } + ]); + const sources = plan.dataSources; + expect(sources).toHaveLength(1); + + const [source] = sources; + expect(source.sourceTable.tablePattern).toBe('users'); + + expect(source.columns.map((e) => (e as any).alias)).toEqual(['my_id', 'custom_name']); + }); + }); + + describe('errors', () => { + test('select star', () => { + expect(compilationErrorsForSingleStream('SELECT 1 FROM (SELECT * FROM users) AS u')).toStrictEqual([ + { + message: '* columns are not allowed in subqueries or common table expressions', + source: '*' + }, + { + message: 'Must have a result column selecting from a table', + source: 'SELECT 1 FROM (SELECT * FROM users) AS u' + } + ]); + }); + + test('name vs colum mismatch', () => { + expect(compilationErrorsForSingleStream('SELECT a FROM (SELECT id FROM users) AS u (a, b, c)')).toStrictEqual([ + { + message: 'Expected this subquery to have 3 columns, it actually has 1', + source: '(SELECT id FROM users)' + } + ]); + }); + + test('duplicate column', () => { + expect(compilationErrorsForSingleStream('SELECT id FROM (SELECT id, a AS id FROM users) AS u')).toStrictEqual([ + { + message: "There is a column named 'id' already.", + source: 'a AS id' + } + ]); + }); + }); +}); diff --git a/packages/sync-rules/test/src/compiler/utils.ts b/packages/sync-rules/test/src/compiler/utils.ts index c252ea6e1..81b55457b 100644 --- a/packages/sync-rules/test/src/compiler/utils.ts +++ b/packages/sync-rules/test/src/compiler/utils.ts @@ -11,6 +11,7 @@ import { interface SyncStreamInput { name: string; queries: string[]; + ctes?: Record; } interface TranslationError { @@ -51,18 +52,27 @@ export function compileToSyncPlan(inputs: SyncStreamInput[]): [TranslationError[ const compiler = new SyncStreamsCompiler('test_schema'); const errors: TranslationError[] = []; + function errorListenerOnSql(sql: string): ParsingErrorListener { + return { + report(message, location) { + const resolved = getLocation(location); + errors.push({ message, source: sql.substring(resolved?.start ?? 0, resolved?.end) }); + } + }; + } + for (const input of inputs) { const builder = compiler.stream({ name: input.name, isSubscribedByDefault: true, priority: 3 }); - for (const sql of input.queries) { - const listener: ParsingErrorListener = { - report(message, location) { - const resolved = getLocation(location); - errors.push({ message, source: sql.substring(resolved?.start ?? 0, resolved?.end) }); - } - }; + for (const [name, sql] of Object.entries(input.ctes ?? {})) { + const cte = compiler.commonTableExpression(sql, errorListenerOnSql(sql)); + if (cte) { + builder.registerCommonTableExpression(name, cte); + } + } - builder.addQuery(sql, listener); + for (const sql of input.queries) { + builder.addQuery(sql, errorListenerOnSql(sql)); } builder.finish();