Conversation
|
aede352 to
4636049
Compare
rkistner
reviewed
Jan 26, 2026
Contributor
rkistner
left a comment
There was a problem hiding this comment.
Some comments on supported functions
3522910 to
79ca7d8
Compare
rkistner
reviewed
Jan 29, 2026
rkistner
reviewed
Jan 29, 2026
fd0d1f6 to
f793fab
Compare
rkistner
approved these changes
Feb 2, 2026
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
This builds on #451: While that PR adds a tool compiling sync streams from SQL to a sync plan, this adds an evaluator implementing the
BucketSourceinterface over sync plans.The foundational layer to evaluate rows, parameters and queries is
ScalarExpressionEngine(inscalar_expression_engine.ts). This interface evaluates SQL queries of the formSELECT $outputs WHERE $filter, where both outputs and filter are deterministic and scalar expressions. There are two implementations:node:sqlite.Due to some behavioral differences between SQLite and our implementation, the SQL engine is not a drop-in replacement. But having it around helps validate that streams work with both, so we could introduce an option to let users migrate in the future.
Bucket data sources and parameter index lookup creators are implemented in a fairly straightforward way: In both cases, we have scalar filter expressions as well as output columns and parameter keys. These are all evaluated and used to create bucket parameters or lookups, respectively.
The most complex part of evaluating sync streams is finding parameters for connections. In sync plans, parameter instantiations are represented as
ParameterValues, which can be:WHERE id = auth.user_id()).a. A table-valued function using request parameters as input (e.g.
WHERE id IN auth.param('orgs')).b. An index parameter lookup (e.g.
WHERE id IN (SELECT org_id FROM orgs WHERE ...)).Lookups are further categorized in stages, where lookups from later stages are allowed to use lookups from earlier stages as inputs. Outputs from a lookup may be used by multiple later lookups or
ParameterValues, which means that the querier is a DAG structure.This structure is evaluated in
RequestParameterEvaluatorsofparameter_evaluator.ts. The main idea is that nodes in the lookup graph are evaluated and replaced with their instantiation (essentially a form of caching). We perform two evaluation runs:Some notes:
queryDynamicBucketDescriptionsis called. This would allow us to do fine-grained invalidation and potentially improve performance.