Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 38 additions & 1 deletion packages/sync-rules/src/compiler/compiler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import { CompilerModelToSyncPlan } from './ir_to_sync_plan.js';
import { QuerierGraphBuilder } from './querier_graph.js';
import { StreamQueryParser } from './parser.js';
import { NodeLocations } from './expression.js';
import { SqlScope } from './scope.js';
import { PreparedSubquery } from './sqlite.js';

/**
* State for compiling sync streams.
Expand All @@ -23,24 +25,53 @@ import { NodeLocations } from './expression.js';
*/
export class SyncStreamsCompiler {
readonly output = new CompiledStreamQueries();
private readonly locations = new NodeLocations();

constructor(readonly defaultSchema: string) {}

/**
* Tries to parse the SQL query as a `SELECT` statement into a form supported for common table expressions.
*
* Common table expressions are parsed and validated independently and without a shared scope, meaning that CTEs are
* not allowed to reference other CTEs. This limitation is deliberate, but we _could_ support it (referenced CTEs
* would just get inlined into the new CTE by the parser). So we can revisit this and potentially support that in the
* future.
*
* Returns null and reports errors if that fails.
*/
commonTableExpression(sql: string, errors: ParsingErrorListener): PreparedSubquery | null {
const parser = new StreamQueryParser({
compiler: this,
originalText: sql,
locations: this.locations,
parentScope: new SqlScope({}),
errors
});

const [stmt] = parse(sql, { locationTracking: true });
return parser.parseAsSubquery(stmt);
}

/**
* Utility for compiling a sync stream.
*
* @param options Name, priority and `auto_subscribe` state for the stream.
*/
stream(options: StreamOptions): IndividualSyncStreamCompiler {
const builder = new QuerierGraphBuilder(this, options);
const rootScope = new SqlScope({});

return {
registerCommonTableExpression: (name, cte) => {
rootScope.registerCommonTableExpression(name, cte);
},
addQuery: (sql: string, errors: ParsingErrorListener) => {
const [stmt] = parse(sql, { locationTracking: true });
const parser = new StreamQueryParser({
compiler: this,
originalText: sql,
locations: new NodeLocations(),
locations: this.locations,
parentScope: rootScope,
errors
});
const query = parser.parse(stmt);
Expand All @@ -57,6 +88,12 @@ export class SyncStreamsCompiler {
* Utility for compiling a single sync stream.
*/
export interface IndividualSyncStreamCompiler {
/**
* Makes a common table expression prepared through {@link SyncStreamsCompiler.commonTableExpression} available when
* parsing queries for this stream.
*/
registerCommonTableExpression(name: string, cte: PreparedSubquery): void;

/**
* Validates and adds a parameter query to this stream.
*
Expand Down
4 changes: 2 additions & 2 deletions packages/sync-rules/src/compiler/expression.ts
Original file line number Diff line number Diff line change
Expand Up @@ -129,9 +129,9 @@ export class ConnectionParameter implements EqualsIgnoringResultSet {
* in-memory map.
*/
export class NodeLocations {
readonly sourceForNode = new Map<SqlExpression<ExpressionInput>, PGNode | NodeLocation>();
readonly sourceForNode = new Map<SqlExpression<unknown>, PGNode | NodeLocation>();

locationFor(source: SqlExpression<ExpressionInput>): NodeLocation {
locationFor(source: SqlExpression<unknown>): NodeLocation {
const location = getLocation(this.sourceForNode.get(source));
if (location == null) {
throw new Error('Missing location');
Expand Down
46 changes: 31 additions & 15 deletions packages/sync-rules/src/compiler/filter_simplifier.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
import { And, BaseTerm, EqualsClause, isBaseTerm, Or, SingleDependencyExpression } from './filter.js';
import { SyncExpression } from './expression.js';
import { NodeLocations, SyncExpression } from './expression.js';
import { SourceResultSet } from './table.js';
import { BinaryOperator } from '../sync_plan/expression.js';
import { BinaryOperator, SqlExpression } from '../sync_plan/expression.js';
import { expandNodeLocations } from '../errors.js';

export class FilterConditionSimplifier {
constructor(private readonly originalText: string) {}

simplifyOr(or: Or): Or {
const andTerms: And[] = [];
let baseTerms: BaseTerm[] = [];
Expand Down Expand Up @@ -116,18 +114,36 @@ export class FilterConditionSimplifier {
throw new Error("Can't compose zero expressions");
}

const [first, ...rest] = terms;
const locations = first.expression.locations;
let inner = first.expression.node;
for (const additional of rest) {
inner = { type: 'binary', operator, left: inner, right: additional.expression.node };
}

const location = expandNodeLocations(terms.map((e) => e.expression.location));
if (location) {
locations.sourceForNode.set(inner, location);
}
const locations = terms[0].expression.locations;
const inner = composeExpressionNodes(
locations,
operator,
terms.map((t) => t.expression.node)
);

return new SingleDependencyExpression(new SyncExpression(inner, locations));
}
}

export function composeExpressionNodes<T>(
locations: NodeLocations,
operator: BinaryOperator,
terms: SqlExpression<T>[]
) {
if (terms.length == 0) {
throw new Error("Can't compose zero expressions");
}

const [first, ...rest] = terms;
let inner = first;
for (const additional of rest) {
inner = { type: 'binary', operator, left: inner, right: additional };
}

const location = expandNodeLocations(terms.map((e) => locations.locationFor(e)));
if (location) {
locations.sourceForNode.set(inner, location);
}

return inner;
}
Loading