diff --git a/packages/powersync-db-collection/package.json b/packages/powersync-db-collection/package.json index 587b3298c..0f763b3cf 100644 --- a/packages/powersync-db-collection/package.json +++ b/packages/powersync-db-collection/package.json @@ -55,6 +55,7 @@ "@standard-schema/spec": "^1.1.0", "@tanstack/db": "workspace:*", "@tanstack/store": "^0.8.0", + "async-mutex": "^0.5.0", "debug": "^4.4.3", "p-defer": "^4.0.1" }, diff --git a/packages/powersync-db-collection/src/index.ts b/packages/powersync-db-collection/src/index.ts index 6879d7a22..f8d092805 100644 --- a/packages/powersync-db-collection/src/index.ts +++ b/packages/powersync-db-collection/src/index.ts @@ -1,3 +1,4 @@ export * from './definitions' export * from './powersync' export * from './PowerSyncTransactor' +export * from './sqlite-compiler' diff --git a/packages/powersync-db-collection/src/powersync.ts b/packages/powersync-db-collection/src/powersync.ts index bc2e85bc2..a35b0d777 100644 --- a/packages/powersync-db-collection/src/powersync.ts +++ b/packages/powersync-db-collection/src/powersync.ts @@ -1,10 +1,14 @@ import { DiffTriggerOperation, sanitizeSQL } from '@powersync/common' +import { Mutex } from 'async-mutex' +import { or } from '@tanstack/db' +import { compileSQLite } from './sqlite-compiler' import { PendingOperationStore } from './PendingOperationStore' import { PowerSyncTransactor } from './PowerSyncTransactor' import { DEFAULT_BATCH_SIZE } from './definitions' import { asPowerSyncRecord, mapOperation } from './helpers' import { convertTableToSchema } from './schema' import { serializeForSQLite } from './serialization' +import type { LoadSubsetOptions, OperationType, SyncConfig } from '@tanstack/db' import type { AnyTableColumnType, ExtractedTable, @@ -24,9 +28,8 @@ import type { PowerSyncCollectionUtils, } from './definitions' import type { PendingOperation } from './PendingOperationStore' -import type { SyncConfig } from '@tanstack/db' import type { StandardSchemaV1 } from '@standard-schema/spec' -import type { Table, TriggerDiffRecord } from '@powersync/common' +import type { LockContext, Table, TriggerDiffRecord } from '@powersync/common' /** * Creates PowerSync collection options for use with a standard Collection. @@ -225,6 +228,7 @@ export function powerSyncCollectionOptions< table, schema: inputSchema, syncBatchSize = DEFAULT_BATCH_SIZE, + syncMode = 'eager', ...restConfig } = config @@ -296,11 +300,66 @@ export function powerSyncCollectionOptions< */ const sync: SyncConfig = { sync: (params) => { - const { begin, write, commit, markReady } = params + const { begin, write, collection, commit, markReady } = params const abortController = new AbortController() - // The sync function needs to be synchronous - async function start() { + let disposeTracking: (() => Promise) | null = null + + if (syncMode === `eager`) { + return runEagerSync() + } else { + return runOnDemandSync() + } + + async function createDiffTrigger(options: { + when: Record + writeType: (rowId: string) => OperationType + batchQuery: ( + lockContext: LockContext, + batchSize: number, + cursor: number, + ) => Promise> + onReady: () => void + }) { + const { when, writeType, batchQuery, onReady } = options + + return await database.triggers.createDiffTrigger({ + source: viewName, + destination: trackedTableName, + when, + hooks: { + beforeCreate: async (context) => { + let currentBatchCount = syncBatchSize + let cursor = 0 + while (currentBatchCount == syncBatchSize) { + begin() + + const batchItems = await batchQuery( + context, + syncBatchSize, + cursor, + ) + currentBatchCount = batchItems.length + cursor += currentBatchCount + for (const row of batchItems) { + write({ + type: writeType(row.id), + value: deserializeSyncRow(row), + }) + } + commit() + } + onReady() + database.logger.info( + `Sync is ready for ${viewName} into ${trackedTableName}`, + ) + }, + }, + }) + } + + // The sync function needs to be synchronous. + async function start(afterOnChangeRegistered?: () => Promise) { database.logger.info( `Sync is starting for ${viewName} into ${trackedTableName}`, ) @@ -362,68 +421,200 @@ export function powerSyncCollectionOptions< }, ) - const disposeTracking = await database.triggers.createDiffTrigger({ - source: viewName, - destination: trackedTableName, - when: { - [DiffTriggerOperation.INSERT]: `TRUE`, - [DiffTriggerOperation.UPDATE]: `TRUE`, - [DiffTriggerOperation.DELETE]: `TRUE`, - }, - hooks: { - beforeCreate: async (context) => { - let currentBatchCount = syncBatchSize - let cursor = 0 - while (currentBatchCount == syncBatchSize) { - begin() - const batchItems = await context.getAll( - sanitizeSQL`SELECT * FROM ${viewName} LIMIT ? OFFSET ?`, - [syncBatchSize, cursor], - ) - currentBatchCount = batchItems.length - cursor += currentBatchCount - for (const row of batchItems) { - write({ - type: `insert`, - value: deserializeSyncRow(row), - }) - } - commit() - } - markReady() - database.logger.info( - `Sync is ready for ${viewName} into ${trackedTableName}`, - ) - }, - }, - }) + await afterOnChangeRegistered?.() // If the abort controller was aborted while processing the request above if (abortController.signal.aborted) { - await disposeTracking() + await disposeTracking?.() } else { abortController.signal.addEventListener( `abort`, () => { - disposeTracking() + disposeTracking?.() }, { once: true }, ) } } - start().catch((error) => - database.logger.error( - `Could not start syncing process for ${viewName} into ${trackedTableName}`, - error, - ), - ) + // Eager mode. + // Registers a diff trigger for the entire table. + function runEagerSync() { + start(async () => { + disposeTracking = await createDiffTrigger({ + when: { + [DiffTriggerOperation.INSERT]: `TRUE`, + [DiffTriggerOperation.UPDATE]: `TRUE`, + [DiffTriggerOperation.DELETE]: `TRUE`, + }, + writeType: (_rowId: string) => `insert`, + batchQuery: ( + lockContext: LockContext, + batchSize: number, + cursor: number, + ) => + lockContext.getAll( + sanitizeSQL`SELECT * FROM ${viewName} LIMIT ? OFFSET ?`, + [batchSize, cursor], + ), + onReady: () => markReady(), + }) + }).catch((error) => + database.logger.error( + `Could not start syncing process for ${viewName} into ${trackedTableName}`, + error, + ), + ) + + return () => { + database.logger.info( + `Sync has been stopped for ${viewName} into ${trackedTableName}`, + ) + abortController.abort() + } + } - return () => { - database.logger.info( - `Sync has been stopped for ${viewName} into ${trackedTableName}`, + // On-demand mode. + // Registers a diff trigger for the active WHERE expressions. + function runOnDemandSync() { + start().catch((error) => + database.logger.error( + `Could not start syncing process for ${viewName} into ${trackedTableName}`, + error, + ), ) - abortController.abort() + + // Tracks all active WHERE expressions for on-demand sync filtering. + // Each loadSubset call pushes its predicate; unloadSubset removes it. + const activeWhereExpressions: Array = [] + const mutex = new Mutex() + + const loadSubset = async (options?: LoadSubsetOptions): Promise => { + if (options) { + activeWhereExpressions.push(options.where) + } + + if (activeWhereExpressions.length === 0) { + await disposeTracking?.() + return + } + + const combinedWhere = + activeWhereExpressions.length === 1 + ? activeWhereExpressions[0] + : or( + activeWhereExpressions[0]!, + activeWhereExpressions[1]!, + ...activeWhereExpressions.slice(2), + ) + + const compiledNewData = compileSQLite( + { where: combinedWhere }, + { jsonColumn: 'NEW.data' }, + ) + + const compiledOldData = compileSQLite( + { where: combinedWhere }, + { jsonColumn: 'OLD.data' }, + ) + + const compiledView = compileSQLite({ where: combinedWhere }) + + const newDataWhenClause = toInlinedWhereClause(compiledNewData) + const oldDataWhenClause = toInlinedWhereClause(compiledOldData) + const viewWhereClause = toInlinedWhereClause(compiledView) + + await disposeTracking?.() + + disposeTracking = await createDiffTrigger({ + when: { + [DiffTriggerOperation.INSERT]: newDataWhenClause, + [DiffTriggerOperation.UPDATE]: `(${newDataWhenClause}) OR (${oldDataWhenClause})`, + [DiffTriggerOperation.DELETE]: oldDataWhenClause, + }, + writeType: (rowId: string) => + collection.has(rowId) ? `update` : `insert`, + batchQuery: ( + lockContext: LockContext, + batchSize: number, + cursor: number, + ) => + lockContext.getAll( + `SELECT * FROM ${viewName} WHERE ${viewWhereClause} LIMIT ? OFFSET ?`, + [batchSize, cursor], + ), + onReady: () => {}, + }) + } + + const toInlinedWhereClause = (compiled: { + where?: string + params: Array + }): string => { + if (!compiled.where) return 'TRUE' + const sqlParts = compiled.where.split('?') + return sanitizeSQL( + sqlParts as unknown as TemplateStringsArray, + ...compiled.params, + ) + } + + const unloadSubset = async (options: LoadSubsetOptions) => { + const idx = activeWhereExpressions.indexOf(options.where) + if (idx !== -1) { + activeWhereExpressions.splice(idx, 1) + } + + // Evict rows that were exclusively loaded by the departing predicate. + // These are rows matching the departing WHERE that are no longer covered + // by any remaining active predicate. + const compiledDeparting = compileSQLite({ where: options.where }) + const departingWhereSQL = toInlinedWhereClause(compiledDeparting) + + let evictionSQL: string + if (activeWhereExpressions.length === 0) { + evictionSQL = `SELECT id FROM ${viewName} WHERE ${departingWhereSQL}` + } else { + const combinedRemaining = + activeWhereExpressions.length === 1 + ? activeWhereExpressions[0]! + : or( + activeWhereExpressions[0]!, + activeWhereExpressions[1]!, + ...activeWhereExpressions.slice(2), + ) + const compiledRemaining = compileSQLite({ where: combinedRemaining }) + const remainingWhereSQL = toInlinedWhereClause(compiledRemaining) + evictionSQL = `SELECT id FROM ${viewName} WHERE (${departingWhereSQL}) AND NOT (${remainingWhereSQL})` + } + + const rowsToEvict = await database.getAll<{ id: string }>(evictionSQL) + if (rowsToEvict.length > 0) { + begin() + for (const { id } of rowsToEvict) { + write({ type: `delete`, key: id }) + } + commit() + } + + // Recreate the diff trigger for the remaining active WHERE expressions. + await loadSubset() + } + + markReady() + + return { + cleanup: () => { + database.logger.info( + `Sync has been stopped for ${viewName} into ${trackedTableName}`, + ) + abortController.abort() + }, + loadSubset: (options: LoadSubsetOptions) => + mutex.runExclusive(() => loadSubset(options)), + unloadSubset: (options: LoadSubsetOptions) => + mutex.runExclusive(() => unloadSubset(options)), + } } }, // Expose the getSyncMetadata function @@ -442,6 +633,7 @@ export function powerSyncCollectionOptions< getKey, // Syncing should start immediately since we need to monitor the changes for mutations startSync: true, + syncMode, sync, onInsert: async (params) => { // The transaction here should only ever contain a single insert mutation diff --git a/packages/powersync-db-collection/src/sqlite-compiler.ts b/packages/powersync-db-collection/src/sqlite-compiler.ts new file mode 100644 index 000000000..e2df875fc --- /dev/null +++ b/packages/powersync-db-collection/src/sqlite-compiler.ts @@ -0,0 +1,354 @@ +import type { IR, LoadSubsetOptions } from '@tanstack/db' + +/** + * Result of compiling LoadSubsetOptions to SQLite + */ +export interface SQLiteCompiledQuery { + /** The WHERE clause (without "WHERE" keyword), e.g., "price > ?" */ + where?: string + /** The ORDER BY clause (without "ORDER BY" keyword), e.g., "price DESC" */ + orderBy?: string + /** The LIMIT value */ + limit?: number + /** Parameter values in order, to be passed to SQLite query */ + params: Array +} + +/** + * Options for controlling how SQL is compiled. + */ +export interface CompileSQLiteOptions { + /** + * When set, column references emit `json_extract(, '$.')` + * instead of `""`. The `id` column is excluded since it's stored + * as a direct column in the tracked table. + */ + jsonColumn?: string +} + +/** + * Compiles TanStack DB LoadSubsetOptions to SQLite query components. + * + * @example + * ```typescript + * const compiled = compileSQLite({ + * where: { type: 'func', name: 'gt', args: [ + * { type: 'ref', path: ['price'] }, + * { type: 'val', value: 100 } + * ]}, + * orderBy: [{ expression: { type: 'ref', path: ['price'] }, compareOptions: { direction: 'desc', nulls: 'last' } }], + * limit: 50 + * }) + * // Result: { where: '"price" > ?', orderBy: '"price" DESC', limit: 50, params: [100] } + * ``` + */ +export function compileSQLite( + options: LoadSubsetOptions, + compileOptions?: CompileSQLiteOptions, +): SQLiteCompiledQuery { + const { where, orderBy, limit } = options + + const params: Array = [] + const result: SQLiteCompiledQuery = { params } + + if (where) { + result.where = compileExpression(where, params, compileOptions) + } + + if (orderBy) { + result.orderBy = compileOrderBy(orderBy, params, compileOptions) + } + + if (limit !== undefined) { + result.limit = limit + } + + return result +} + +/** + * Quote SQLite identifiers to handle column names correctly. + * SQLite uses double quotes for identifiers. + */ +function quoteIdentifier(name: string): string { + // Escape any double quotes in the name by doubling them + const escaped = name.replace(/"/g, `""`) + return `"${escaped}"` +} + +/** + * Compiles a BasicExpression to a SQL string, mutating the params array. + */ +function compileExpression( + exp: IR.BasicExpression, + params: Array, + compileOptions?: CompileSQLiteOptions, +): string { + switch (exp.type) { + case `val`: + params.push(exp.value) + return `?` + case `ref`: { + if (exp.path.length !== 1) { + throw new Error( + `SQLite compiler doesn't support nested properties: ${exp.path.join(`.`)}`, + ) + } + const columnName = exp.path[0]! + if (compileOptions?.jsonColumn && columnName !== `id`) { + return `json_extract(${compileOptions.jsonColumn}, '$.${columnName}')` + } + return quoteIdentifier(columnName) + } + case `func`: + return compileFunction(exp, params, compileOptions) + default: + throw new Error(`Unknown expression type: ${(exp as any).type}`) + } +} + +/** + * Compiles an OrderBy array to a SQL ORDER BY clause. + */ +function compileOrderBy( + orderBy: IR.OrderBy, + params: Array, + compileOptions?: CompileSQLiteOptions, +): string { + const clauses = orderBy.map((clause: IR.OrderByClause) => + compileOrderByClause(clause, params, compileOptions), + ) + return clauses.join(`, `) +} + +/** + * Compiles a single OrderByClause to SQL. + */ +function compileOrderByClause( + clause: IR.OrderByClause, + params: Array, + compileOptions?: CompileSQLiteOptions, +): string { + const { expression, compareOptions } = clause + let sql = compileExpression(expression, params, compileOptions) + + if (compareOptions.direction === `desc`) { + sql = `${sql} DESC` + } + + // SQLite supports NULLS FIRST/LAST (since 3.30.0) + if (compareOptions.nulls === `first`) { + sql = `${sql} NULLS FIRST` + } else { + // Default to NULLS LAST (nulls === 'last') + sql = `${sql} NULLS LAST` + } + + return sql +} + +/** + * Check if a BasicExpression represents a null/undefined value + */ +function isNullValue(exp: IR.BasicExpression): boolean { + return exp.type === `val` && (exp.value === null || exp.value === undefined) +} + +/** + * Compiles a function expression (operator) to SQL. + */ +function compileFunction( + exp: IR.Func, + params: Array, + compileOptions?: CompileSQLiteOptions, +): string { + const { name, args } = exp + + // Check for null values in comparison operators + if (isComparisonOp(name)) { + const hasNullArg = args.some((arg: IR.BasicExpression) => isNullValue(arg)) + if (hasNullArg) { + throw new Error( + `Cannot use null/undefined with '${name}' operator. ` + + `Use isNull() to check for null values.`, + ) + } + } + + // Compile arguments + const compiledArgs = args.map((arg: IR.BasicExpression) => + compileExpression(arg, params, compileOptions), + ) + + // Handle different operator types + switch (name) { + // Binary comparison operators + case `eq`: + case `gt`: + case `gte`: + case `lt`: + case `lte`: { + if (compiledArgs.length !== 2) { + throw new Error(`${name} expects 2 arguments`) + } + const opSymbol = getComparisonOp(name) + return `${compiledArgs[0]} ${opSymbol} ${compiledArgs[1]}` + } + + // Logical operators + case `and`: + case `or`: { + if (compiledArgs.length < 2) { + throw new Error(`${name} expects at least 2 arguments`) + } + const opKeyword = name === `and` ? `AND` : `OR` + return compiledArgs + .map((arg: string) => `(${arg})`) + .join(` ${opKeyword} `) + } + + case `not`: { + if (compiledArgs.length !== 1) { + throw new Error(`not expects 1 argument`) + } + // Check if argument is isNull/isUndefined for IS NOT NULL + const arg = args[0] + if (arg && arg.type === `func`) { + if (arg.name === `isNull` || arg.name === `isUndefined`) { + const innerArg = compileExpression( + arg.args[0]!, + params, + compileOptions, + ) + return `${innerArg} IS NOT NULL` + } + } + return `NOT (${compiledArgs[0]})` + } + + // Null checking + case `isNull`: + case `isUndefined`: { + if (compiledArgs.length !== 1) { + throw new Error(`${name} expects 1 argument`) + } + return `${compiledArgs[0]} IS NULL` + } + + // IN operator + case `in`: { + if (compiledArgs.length !== 2) { + throw new Error(`in expects 2 arguments (column and array)`) + } + // The second argument should be an array value + // We need to handle this specially - expand the array into multiple placeholders + const lastParamIndex = params.length - 1 + const arrayValue = params[lastParamIndex] + + if (!Array.isArray(arrayValue)) { + throw new Error(`in operator requires an array value`) + } + + // Remove the array param and add individual values + params.pop() + const placeholders = arrayValue.map(() => { + params.push(arrayValue[params.length - lastParamIndex]) + return `?` + }) + + // Re-add individual values properly + params.length = lastParamIndex // Reset to before array + for (const val of arrayValue) { + params.push(val) + } + + return `${compiledArgs[0]} IN (${placeholders.join(`, `)})` + } + + // String operators + case `like`: { + if (compiledArgs.length !== 2) { + throw new Error(`like expects 2 arguments`) + } + return `${compiledArgs[0]} LIKE ${compiledArgs[1]}` + } + + case `ilike`: { + if (compiledArgs.length !== 2) { + throw new Error(`ilike expects 2 arguments`) + } + return `${compiledArgs[0]} LIKE ${compiledArgs[1]} COLLATE NOCASE` + } + + // String case functions + case `upper`: { + if (compiledArgs.length !== 1) { + throw new Error(`upper expects 1 argument`) + } + return `UPPER(${compiledArgs[0]})` + } + + case `lower`: { + if (compiledArgs.length !== 1) { + throw new Error(`lower expects 1 argument`) + } + return `LOWER(${compiledArgs[0]})` + } + + case `length`: { + if (compiledArgs.length !== 1) { + throw new Error(`length expects 1 argument`) + } + return `LENGTH(${compiledArgs[0]})` + } + + case `concat`: { + if (compiledArgs.length < 1) { + throw new Error(`concat expects at least 1 argument`) + } + return `CONCAT(${compiledArgs.join(`, `)})` + } + + case `add`: { + if (compiledArgs.length !== 2) { + throw new Error(`add expects 2 arguments`) + } + return `${compiledArgs[0]} + ${compiledArgs[1]}` + } + + // Null fallback + case `coalesce`: { + if (compiledArgs.length < 1) { + throw new Error(`coalesce expects at least 1 argument`) + } + return `COALESCE(${compiledArgs.join(`, `)})` + } + + default: + throw new Error( + `Operator '${name}' is not supported in PowerSync on-demand sync. ` + + `Supported operators: eq, gt, gte, lt, lte, and, or, not, isNull, in, like, ilike, upper, lower, length, concat, add, coalesce`, + ) + } +} + +/** + * Check if operator is a comparison operator + */ +function isComparisonOp(name: string): boolean { + return [`eq`, `gt`, `gte`, `lt`, `lte`, `like`, `ilike`].includes(name) +} + +/** + * Get the SQL symbol for a comparison operator + */ +function getComparisonOp(name: string): string { + const ops: Record = { + eq: `=`, + gt: `>`, + gte: `>=`, + lt: `<`, + lte: `<=`, + } + return ops[name]! +} diff --git a/packages/powersync-db-collection/tests/on-demand-sync.test.ts b/packages/powersync-db-collection/tests/on-demand-sync.test.ts new file mode 100644 index 000000000..a23210471 --- /dev/null +++ b/packages/powersync-db-collection/tests/on-demand-sync.test.ts @@ -0,0 +1,1499 @@ +import { randomUUID } from 'node:crypto' +import { tmpdir } from 'node:os' +import { PowerSyncDatabase, Schema, Table, column } from '@powersync/node' +import { + and, + createCollection, + createLiveQueryCollection, + eq, + gt, + gte, + lt, + or, +} from '@tanstack/db' +import { describe, expect, it, onTestFinished, vi } from 'vitest' +import { powerSyncCollectionOptions } from '../src' + +const APP_SCHEMA = new Schema({ + products: new Table({ + name: column.text, + price: column.integer, + category: column.text, + }), +}) + +describe(`On-Demand Sync Mode`, () => { + async function createDatabase() { + const db = new PowerSyncDatabase({ + database: { + dbFilename: `test-on-demand-${randomUUID()}.sqlite`, + dbLocation: tmpdir(), + implementation: { type: `node:sqlite` }, + }, + schema: APP_SCHEMA, + }) + onTestFinished(async () => { + // Wait a moment for any pending cleanup operations to complete + // before closing the database to prevent "operation on closed remote" errors + await new Promise((resolve) => setTimeout(resolve, 100)) + await db.disconnectAndClear() + await db.close() + }) + await db.disconnectAndClear() + return db + } + + async function createTestProducts(db: PowerSyncDatabase) { + await db.execute(` + INSERT INTO products (id, name, price, category) + VALUES + (uuid(), 'Product A', 50, 'electronics'), + (uuid(), 'Product B', 150, 'electronics'), + (uuid(), 'Product C', 25, 'clothing'), + (uuid(), 'Product D', 200, 'electronics'), + (uuid(), 'Product E', 75, 'clothing') + `) + } + + it(`should not load any data initially in on-demand mode`, async () => { + const db = await createDatabase() + await createTestProducts(db) + + // Verify data exists in SQLite + const sqliteCount = await db.get<{ count: number }>( + `SELECT COUNT(*) as count FROM products`, + ) + expect(sqliteCount.count).toBe(5) + + // Create collection with on-demand sync mode + const collection = createCollection( + powerSyncCollectionOptions({ + database: db, + table: APP_SCHEMA.props.products, + syncMode: `on-demand`, + }), + ) + onTestFinished(() => collection.cleanup()) + + // Wait for collection to be ready + await collection.stateWhenReady() + + // Verify NO data was loaded into the collection + expect(collection.size).toBe(0) + }) + + it(`should load only matching data when live query is created`, async () => { + const db = await createDatabase() + await createTestProducts(db) + + // Create collection with on-demand sync mode + const collection = createCollection( + powerSyncCollectionOptions({ + database: db, + table: APP_SCHEMA.props.products, + syncMode: `on-demand`, + }), + ) + onTestFinished(() => collection.cleanup()) + + await collection.stateWhenReady() + + // Verify collection is empty initially + expect(collection.size).toBe(0) + + // Create a live query that filters for electronics over $100 + const expensiveElectronics = createLiveQueryCollection({ + query: (q) => + q + .from({ product: collection }) + .where(({ product }) => eq(product.category, `electronics`)) + .where(({ product }) => gt(product.price, 100)) + .select(({ product }) => ({ + id: product.id, + name: product.name, + price: product.price, + category: product.category, + })), + }) + onTestFinished(() => expensiveElectronics.cleanup()) + + // Preload triggers the live query to request data via loadSubset + await expensiveElectronics.preload() + + // Wait for loadSubset to complete and data to appear + await vi.waitFor( + () => { + // The live query should have triggered loadSubset + // Only electronics with price > 100 should match: Product B (150), Product D (200) + expect(expensiveElectronics.size).toBe(2) + }, + { timeout: 2000 }, + ) + + // Verify the correct products were loaded + const loadedProducts = expensiveElectronics.toArray + const names = loadedProducts.map((p) => p.name).sort() + expect(names).toEqual([`Product B`, `Product D`]) + + // Verify prices are correct + const prices = loadedProducts.map((p) => p.price).sort((a, b) => a! - b!) + expect(prices).toEqual([150, 200]) + }) + + it(`should reactively update live query when new matching data is inserted into SQLite`, async () => { + const db = await createDatabase() + await createTestProducts(db) + + // Create collection with on-demand sync mode + const collection = createCollection( + powerSyncCollectionOptions({ + database: db, + table: APP_SCHEMA.props.products, + syncMode: `on-demand`, + }), + ) + onTestFinished(() => collection.cleanup()) + + await collection.stateWhenReady() + + // Create a live query that filters for electronics over $100 + const expensiveElectronics = createLiveQueryCollection({ + query: (q) => + q + .from({ product: collection }) + .where(({ product }) => eq(product.category, `electronics`)) + .where(({ product }) => gt(product.price, 100)) + .select(({ product }) => ({ + id: product.id, + name: product.name, + price: product.price, + category: product.category, + })), + }) + + onTestFinished(() => expensiveElectronics.cleanup()) + + // Preload triggers the live query to request data via loadSubset + await expensiveElectronics.preload() + + // Wait for initial data to load + await vi.waitFor( + () => { + expect(expensiveElectronics.size).toBe(2) + }, + { timeout: 2000 }, + ) + + // Verify initial products + let names = expensiveElectronics.toArray.map((p) => p.name).sort() + expect(names).toEqual([`Product B`, `Product D`]) + + // Now insert a new matching product directly into SQLite + await db.execute(` + INSERT INTO products (id, name, price, category) + VALUES (uuid(), 'Product F', 300, 'electronics') + `) + + // Wait for the diff trigger to propagate the change to the live query + await vi.waitFor( + () => { + // Should now have 3 products: B, D, and F + expect(expensiveElectronics.size).toBe(3) + }, + { timeout: 2000 }, + ) + + // Verify all products including the new one + names = expensiveElectronics.toArray.map((p) => p.name).sort() + expect(names).toEqual([`Product B`, `Product D`, `Product F`]) + + // Verify the new product's price + const productF = expensiveElectronics.toArray.find( + (p) => p.name === `Product F`, + ) + expect(productF?.price).toBe(300) + }) + + it(`should not include non-matching data inserted into SQLite`, async () => { + const db = await createDatabase() + await createTestProducts(db) + + // Create collection with on-demand sync mode + const collection = createCollection( + powerSyncCollectionOptions({ + database: db, + table: APP_SCHEMA.props.products, + syncMode: `on-demand`, + }), + ) + onTestFinished(() => collection.cleanup()) + + await collection.stateWhenReady() + + // Create a live query that filters for electronics over $100 + const expensiveElectronics = createLiveQueryCollection({ + query: (q) => + q + .from({ product: collection }) + .where(({ product }) => eq(product.category, `electronics`)) + .where(({ product }) => gt(product.price, 100)) + .select(({ product }) => ({ + id: product.id, + name: product.name, + price: product.price, + category: product.category, + })), + }) + onTestFinished(() => expensiveElectronics.cleanup()) + + // Preload triggers the live query to request data via loadSubset + await expensiveElectronics.preload() + + // Wait for initial data to load + await vi.waitFor( + () => { + expect(expensiveElectronics.size).toBe(2) + }, + { timeout: 2000 }, + ) + + // Verify initial products + const initialNames = expensiveElectronics.toArray.map((p) => p.name).sort() + expect(initialNames).toEqual([`Product B`, `Product D`]) + + // Insert a non-matching product: electronics but too cheap + await db.execute(` + INSERT INTO products (id, name, price, category) + VALUES (uuid(), 'Cheap Electronics', 50, 'electronics') + `) + + // Insert another non-matching product: expensive but wrong category + await db.execute(` + INSERT INTO products (id, name, price, category) + VALUES (uuid(), 'Expensive Clothing', 500, 'clothing') + `) + + // Wait a bit to allow any potential (incorrect) updates to propagate + await new Promise((resolve) => setTimeout(resolve, 200)) + + // Verify the live query still has only the original 2 products + expect(expensiveElectronics.size).toBe(2) + + // Verify the names haven't changed + const finalNames = expensiveElectronics.toArray.map((p) => p.name).sort() + expect(finalNames).toEqual([`Product B`, `Product D`]) + + // Verify the base collection only contains items matching active predicates + // Non-matching diff trigger items are filtered out in on-demand mode + expect(collection.size).toBe(2) // Only the 2 matching items from loadSubset + }) + + it(`should handle multiple live queries without losing predicate coverage`, async () => { + const db = await createDatabase() + await createTestProducts(db) + + // Create collection with on-demand sync mode + const collection = createCollection( + powerSyncCollectionOptions({ + database: db, + table: APP_SCHEMA.props.products, + syncMode: `on-demand`, + }), + ) + onTestFinished(() => collection.cleanup()) + + await collection.stateWhenReady() + + // LQ1: electronics category + const electronicsQuery = createLiveQueryCollection({ + query: (q) => + q + .from({ product: collection }) + .where(({ product }) => eq(product.category, `electronics`)) + .select(({ product }) => ({ + id: product.id, + name: product.name, + price: product.price, + category: product.category, + })), + }) + onTestFinished(() => electronicsQuery.cleanup()) + + await electronicsQuery.preload() + + await vi.waitFor( + () => { + // Products A(50), B(150), D(200) are electronics + expect(electronicsQuery.size).toBe(3) + }, + { timeout: 2000 }, + ) + + // LQ2: price > 100 (different predicate on same collection) + const expensiveQuery = createLiveQueryCollection({ + query: (q) => + q + .from({ product: collection }) + .where(({ product }) => gt(product.price, 100)) + .select(({ product }) => ({ + id: product.id, + name: product.name, + price: product.price, + category: product.category, + })), + }) + + onTestFinished(() => expensiveQuery.cleanup()) + + await expensiveQuery.preload() + + await vi.waitFor( + () => { + // Products B(150) and D(200) have price > 100 + expect(expensiveQuery.size).toBe(2) + }, + { timeout: 2000 }, + ) + + // Now insert a new product that matches LQ1 (electronics) but NOT LQ2 (price <= 100) + await db.execute(` + INSERT INTO products (id, name, price, category) + VALUES (uuid(), 'Cheap Gadget', 30, 'electronics') + `) + + // The diff trigger should use the OR of both active predicates: + // (category = 'electronics') OR (price > 100) + // 'Cheap Gadget' (electronics, price=30) matches the first predicate, + // so it should reach the base collection and appear in electronicsQuery. + await vi.waitFor( + () => { + expect(electronicsQuery.size).toBe(4) // 3 original + Cheap Gadget + }, + { timeout: 2000 }, + ) + }) + + it(`should handle three live queries with combined predicate coverage`, async () => { + const db = await createDatabase() + await createTestProducts(db) + + const collection = createCollection( + powerSyncCollectionOptions({ + database: db, + table: APP_SCHEMA.props.products, + syncMode: `on-demand`, + }), + ) + onTestFinished(() => collection.cleanup()) + + await collection.stateWhenReady() + + // LQ1: electronics category + const electronicsQuery = createLiveQueryCollection({ + query: (q) => + q + .from({ product: collection }) + .where(({ product }) => eq(product.category, `electronics`)) + .select(({ product }) => ({ + id: product.id, + name: product.name, + price: product.price, + category: product.category, + })), + }) + onTestFinished(() => electronicsQuery.cleanup()) + + await electronicsQuery.preload() + + await vi.waitFor( + () => { + // Products A(50), B(150), D(200) are electronics + expect(electronicsQuery.size).toBe(3) + }, + { timeout: 2000 }, + ) + + // LQ2: price > 100 + const expensiveQuery = createLiveQueryCollection({ + query: (q) => + q + .from({ product: collection }) + .where(({ product }) => gt(product.price, 100)) + .select(({ product }) => ({ + id: product.id, + name: product.name, + price: product.price, + category: product.category, + })), + }) + + onTestFinished(() => expensiveQuery.cleanup()) + + await expensiveQuery.preload() + + await vi.waitFor( + () => { + // Products B(150) and D(200) have price > 100 + expect(expensiveQuery.size).toBe(2) + }, + { timeout: 2000 }, + ) + + // LQ3: clothing category — a third predicate to exercise the 3-arg OR path + const clothingQuery = createLiveQueryCollection({ + query: (q) => + q + .from({ product: collection }) + .where(({ product }) => eq(product.category, `clothing`)) + .select(({ product }) => ({ + id: product.id, + name: product.name, + price: product.price, + category: product.category, + })), + }) + + onTestFinished(() => clothingQuery.cleanup()) + + await clothingQuery.preload() + + await vi.waitFor( + () => { + // Products C(25) and E(75) are clothing + expect(clothingQuery.size).toBe(2) + }, + { timeout: 2000 }, + ) + + // Insert a product that only matches LQ3 (clothing, cheap) + // Diff trigger must OR all three predicates to catch this + await db.execute(` + INSERT INTO products (id, name, price, category) + VALUES (uuid(), 'New Shirt', 40, 'clothing') + `) + + await vi.waitFor( + () => { + expect(clothingQuery.size).toBe(3) // C, E + New Shirt + }, + { timeout: 2000 }, + ) + + // Verify the other queries are unaffected + expect(electronicsQuery.size).toBe(3) + expect(expensiveQuery.size).toBe(2) + }) + + it(`should stop loading data for a predicate after its live query is cleaned up`, async () => { + const db = await createDatabase() + await createTestProducts(db) + + const collection = createCollection( + powerSyncCollectionOptions({ + database: db, + table: APP_SCHEMA.props.products, + syncMode: `on-demand`, + }), + ) + onTestFinished(() => collection.cleanup()) + + await collection.stateWhenReady() + + // LQ1: electronics category + const electronicsQuery = createLiveQueryCollection({ + query: (q) => + q + .from({ product: collection }) + .where(({ product }) => eq(product.category, `electronics`)) + .select(({ product }) => ({ + id: product.id, + name: product.name, + price: product.price, + category: product.category, + })), + }) + + onTestFinished(() => electronicsQuery.cleanup()) + + await electronicsQuery.preload() + + await vi.waitFor( + () => { + expect(electronicsQuery.size).toBe(3) + }, + { timeout: 2000 }, + ) + + // LQ2: clothing category + const clothingQuery = createLiveQueryCollection({ + query: (q) => + q + .from({ product: collection }) + .where(({ product }) => eq(product.category, `clothing`)) + .select(({ product }) => ({ + id: product.id, + name: product.name, + price: product.price, + category: product.category, + })), + }) + + await clothingQuery.preload() + + await vi.waitFor( + () => { + expect(clothingQuery.size).toBe(2) + }, + { timeout: 2000 }, + ) + + const electronicsCount = electronicsQuery.size // 3 + + // Kill LQ2 — its predicate should be removed and its rows evicted + clothingQuery.cleanup() + + // Wait for clothing rows to be evicted; collection shrinks to electronics-only + await vi.waitFor( + () => { + expect(collection.size).toBe(electronicsCount) + }, + { timeout: 2000 }, + ) + + // Insert a new clothing item — should NOT be picked up since LQ2 is gone + await db.execute(` + INSERT INTO products (id, name, price, category) + VALUES (uuid(), 'New Shirt', 40, 'clothing') + `) + + // Wait to allow any (incorrect) propagation + await new Promise((resolve) => setTimeout(resolve, 200)) + + // Collection should not have grown — clothing predicate is no longer active + expect(collection.size).toBe(electronicsCount) + + // Insert a new electronics item — should still be picked up by LQ1 + await db.execute(` + INSERT INTO products (id, name, price, category) + VALUES (uuid(), 'New Gadget', 99, 'electronics') + `) + + await vi.waitFor( + () => { + expect(electronicsQuery.size).toBe(4) // 3 original + New Gadget + }, + { timeout: 2000 }, + ) + + // Kill LQ1 — no active predicates remain; electronics rows should be evicted + electronicsQuery.cleanup() + + await vi.waitFor( + () => { + expect(collection.size).toBe(0) + }, + { timeout: 2000 }, + ) + + // Insert items matching both former predicates — neither should be picked up + await db.execute(` + INSERT INTO products (id, name, price, category) + VALUES (uuid(), 'Another Gadget', 120, 'electronics') + `) + await db.execute(` + INSERT INTO products (id, name, price, category) + VALUES (uuid(), 'Another Shirt', 15, 'clothing') + `) + + await new Promise((resolve) => setTimeout(resolve, 200)) + + // Collection should remain empty — no active predicates + expect(collection.size).toBe(0) + }) + + describe(`Basic loadSubset behavior`, () => { + it(`should pass correct WHERE clause from live query filters to loadSubset`, async () => { + const db = await createDatabase() + await createTestProducts(db) + + const collection = createCollection( + powerSyncCollectionOptions({ + database: db, + table: APP_SCHEMA.props.products, + syncMode: `on-demand`, + }), + ) + onTestFinished(() => collection.cleanup()) + await collection.stateWhenReady() + + // Query using lt — only products with price < 50: Product C (25) + const cheapQuery = createLiveQueryCollection({ + query: (q) => + q + .from({ product: collection }) + .where(({ product }) => lt(product.price, 50)) + .select(({ product }) => ({ + id: product.id, + name: product.name, + price: product.price, + category: product.category, + })), + }) + + onTestFinished(() => cheapQuery.cleanup()) + + await cheapQuery.preload() + + await vi.waitFor( + () => { + expect(cheapQuery.size).toBe(1) + }, + { timeout: 2000 }, + ) + + const names = cheapQuery.toArray.map((p) => p.name) + expect(names).toEqual([`Product C`]) + }) + + it(`should pass ORDER BY and LIMIT to loadSubset`, async () => { + const db = await createDatabase() + await createTestProducts(db) + + const collection = createCollection( + powerSyncCollectionOptions({ + database: db, + table: APP_SCHEMA.props.products, + syncMode: `on-demand`, + }), + ) + onTestFinished(() => collection.cleanup()) + await collection.stateWhenReady() + + // Top 2 most expensive products, ordered by price descending + const top2Query = createLiveQueryCollection({ + query: (q) => + q + .from({ product: collection }) + .orderBy(({ product }) => product.price, `desc`) + .limit(2) + .select(({ product }) => ({ + id: product.id, + name: product.name, + price: product.price, + category: product.category, + })), + }) + onTestFinished(() => top2Query.cleanup()) + + await top2Query.preload() + + await vi.waitFor( + () => { + expect(top2Query.size).toBe(2) + }, + { timeout: 2000 }, + ) + + const prices = top2Query.toArray.map((p) => p.price) + // Product D (200) and Product B (150) are the top 2 + expect(prices).toEqual([200, 150]) + }) + + it(`should handle complex filters (AND, OR) in loadSubset`, async () => { + const db = await createDatabase() + await createTestProducts(db) + + const collection = createCollection( + powerSyncCollectionOptions({ + database: db, + table: APP_SCHEMA.props.products, + syncMode: `on-demand`, + }), + ) + onTestFinished(() => collection.cleanup()) + await collection.stateWhenReady() + + // Complex filter: (electronics AND price >= 150) OR (clothing AND price < 50) + // Matches: Product B (electronics, 150), Product D (electronics, 200), Product C (clothing, 25) + const complexQuery = createLiveQueryCollection({ + query: (q) => + q + .from({ product: collection }) + .where(({ product }) => + or( + and( + eq(product.category, `electronics`), + gte(product.price, 150), + ), + and(eq(product.category, `clothing`), lt(product.price, 50)), + ), + ) + .select(({ product }) => ({ + id: product.id, + name: product.name, + price: product.price, + category: product.category, + })), + }) + onTestFinished(() => complexQuery.cleanup()) + + await complexQuery.preload() + + await vi.waitFor( + () => { + expect(complexQuery.size).toBe(3) + }, + { timeout: 2000 }, + ) + + const names = complexQuery.toArray.map((p) => p.name).sort() + expect(names).toEqual([`Product B`, `Product C`, `Product D`]) + }) + + it(`should handle empty result from loadSubset`, async () => { + const db = await createDatabase() + await createTestProducts(db) + + const collection = createCollection( + powerSyncCollectionOptions({ + database: db, + table: APP_SCHEMA.props.products, + syncMode: `on-demand`, + }), + ) + onTestFinished(() => collection.cleanup()) + await collection.stateWhenReady() + + // Query for a category that doesn't exist — no matching rows + const emptyQuery = createLiveQueryCollection({ + query: (q) => + q + .from({ product: collection }) + .where(({ product }) => eq(product.category, `furniture`)) + .select(({ product }) => ({ + id: product.id, + name: product.name, + price: product.price, + category: product.category, + })), + }) + onTestFinished(() => emptyQuery.cleanup()) + + await emptyQuery.preload() + + // Give it time to process + await new Promise((resolve) => setTimeout(resolve, 200)) + + expect(emptyQuery.size).toBe(0) + expect(collection.size).toBe(0) + }) + }) + + describe(`Reactive updates via diff trigger`, () => { + it(`should handle UPDATE to an existing row that still matches the predicate`, async () => { + const db = await createDatabase() + await createTestProducts(db) + + const collection = createCollection( + powerSyncCollectionOptions({ + database: db, + table: APP_SCHEMA.props.products, + syncMode: `on-demand`, + }), + ) + onTestFinished(() => collection.cleanup()) + await collection.stateWhenReady() + + const electronicsQuery = createLiveQueryCollection({ + query: (q) => + q + .from({ product: collection }) + .where(({ product }) => eq(product.category, `electronics`)) + .select(({ product }) => ({ + id: product.id, + name: product.name, + price: product.price, + category: product.category, + })), + }) + onTestFinished(() => electronicsQuery.cleanup()) + + await electronicsQuery.preload() + + await vi.waitFor( + () => { + // Products A(50), B(150), D(200) are electronics + expect(electronicsQuery.size).toBe(3) + }, + { timeout: 2000 }, + ) + + // Update Product A's price — still electronics, still matches + const productA = electronicsQuery.toArray.find( + (p) => p.name === `Product A`, + ) + await db.execute(`UPDATE products SET price = 99 WHERE id = ?`, [ + productA!.id, + ]) + + await vi.waitFor( + () => { + const updated = electronicsQuery.toArray.find( + (p) => p.name === `Product A`, + ) + expect(updated?.price).toBe(99) + }, + { timeout: 2000 }, + ) + + // Size unchanged — same row, just updated + expect(electronicsQuery.size).toBe(3) + }) + + it(`should handle UPDATE that causes a row to no longer match the predicate`, async () => { + const db = await createDatabase() + await createTestProducts(db) + + const collection = createCollection( + powerSyncCollectionOptions({ + database: db, + table: APP_SCHEMA.props.products, + syncMode: `on-demand`, + }), + ) + onTestFinished(() => collection.cleanup()) + await collection.stateWhenReady() + + const electronicsQuery = createLiveQueryCollection({ + query: (q) => + q + .from({ product: collection }) + .where(({ product }) => eq(product.category, `electronics`)) + .select(({ product }) => ({ + id: product.id, + name: product.name, + price: product.price, + category: product.category, + })), + }) + onTestFinished(() => electronicsQuery.cleanup()) + + await electronicsQuery.preload() + + await vi.waitFor( + () => { + expect(electronicsQuery.size).toBe(3) + }, + { timeout: 2000 }, + ) + + // Change Product A from electronics to clothing — no longer matches + const productA = electronicsQuery.toArray.find( + (p) => p.name === `Product A`, + ) + await db.execute( + `UPDATE products SET category = 'clothing' WHERE id = ?`, + [productA!.id], + ) + + await vi.waitFor( + () => { + expect(electronicsQuery.size).toBe(2) + }, + { timeout: 2000 }, + ) + + const names = electronicsQuery.toArray.map((p) => p.name).sort() + expect(names).toEqual([`Product B`, `Product D`]) + }) + + it(`should handle UPDATE that causes a row to start matching the predicate`, async () => { + const db = await createDatabase() + await createTestProducts(db) + + const collection = createCollection( + powerSyncCollectionOptions({ + database: db, + table: APP_SCHEMA.props.products, + syncMode: `on-demand`, + }), + ) + onTestFinished(() => collection.cleanup()) + await collection.stateWhenReady() + + const electronicsQuery = createLiveQueryCollection({ + query: (q) => + q + .from({ product: collection }) + .where(({ product }) => eq(product.category, `electronics`)) + .select(({ product }) => ({ + id: product.id, + name: product.name, + price: product.price, + category: product.category, + })), + }) + onTestFinished(() => electronicsQuery.cleanup()) + + await electronicsQuery.preload() + + await vi.waitFor( + () => { + // Products A(50), B(150), D(200) are electronics + expect(electronicsQuery.size).toBe(3) + }, + { timeout: 2000 }, + ) + + // Change Product C from clothing to electronics — now matches + // Product C has id we need to look up from SQLite directly + const productC = await db.get<{ id: string }>( + `SELECT id FROM products WHERE name = 'Product C'`, + ) + await db.execute( + `UPDATE products SET category = 'electronics' WHERE id = ?`, + [productC.id], + ) + + await vi.waitFor( + () => { + expect(electronicsQuery.size).toBe(4) + }, + { timeout: 2000 }, + ) + + const names = electronicsQuery.toArray.map((p) => p.name).sort() + expect(names).toEqual([ + `Product A`, + `Product B`, + `Product C`, + `Product D`, + ]) + }) + + it(`should handle DELETE of a matching row`, async () => { + const db = await createDatabase() + await createTestProducts(db) + + const collection = createCollection( + powerSyncCollectionOptions({ + database: db, + table: APP_SCHEMA.props.products, + syncMode: `on-demand`, + }), + ) + onTestFinished(() => collection.cleanup()) + await collection.stateWhenReady() + + const electronicsQuery = createLiveQueryCollection({ + query: (q) => + q + .from({ product: collection }) + .where(({ product }) => eq(product.category, `electronics`)) + .select(({ product }) => ({ + id: product.id, + name: product.name, + price: product.price, + category: product.category, + })), + }) + onTestFinished(() => electronicsQuery.cleanup()) + + await electronicsQuery.preload() + + await vi.waitFor( + () => { + expect(electronicsQuery.size).toBe(3) + }, + { timeout: 2000 }, + ) + + // Delete Product A + const productA = electronicsQuery.toArray.find( + (p) => p.name === `Product A`, + ) + await db.execute(`DELETE FROM products WHERE id = ?`, [productA!.id]) + + await vi.waitFor( + () => { + expect(electronicsQuery.size).toBe(2) + }, + { timeout: 2000 }, + ) + + const names = electronicsQuery.toArray.map((p) => p.name).sort() + expect(names).toEqual([`Product B`, `Product D`]) + }) + }) + + describe(`Unload / cleanup`, () => { + it(`should handle rapid create-and-destroy of live queries without errors`, async () => { + const db = await createDatabase() + await createTestProducts(db) + + const collection = createCollection( + powerSyncCollectionOptions({ + database: db, + table: APP_SCHEMA.props.products, + syncMode: `on-demand`, + }), + ) + onTestFinished(() => collection.cleanup()) + await collection.stateWhenReady() + + // Rapidly create and destroy 5 live queries + for (let i = 0; i < 5; i++) { + const query = createLiveQueryCollection({ + query: (q) => + q + .from({ product: collection }) + .where(({ product }) => eq(product.category, `electronics`)) + .select(({ product }) => ({ + id: product.id, + name: product.name, + price: product.price, + category: product.category, + })), + }) + query.cleanup() + } + + // Give time for any async cleanup to settle + await new Promise((resolve) => setTimeout(resolve, 200)) + + // Collection should still be functional — create one more and verify it works + const finalQuery = createLiveQueryCollection({ + query: (q) => + q + .from({ product: collection }) + .where(({ product }) => eq(product.category, `electronics`)) + .select(({ product }) => ({ + id: product.id, + name: product.name, + price: product.price, + category: product.category, + })), + }) + onTestFinished(() => finalQuery.cleanup()) + + await finalQuery.preload() + + await vi.waitFor( + () => { + expect(finalQuery.size).toBe(3) + }, + { timeout: 2000 }, + ) + }) + + it(`should handle re-creating a live query with the same predicate after cleanup`, async () => { + const db = await createDatabase() + await createTestProducts(db) + + const collection = createCollection( + powerSyncCollectionOptions({ + database: db, + table: APP_SCHEMA.props.products, + syncMode: `on-demand`, + }), + ) + onTestFinished(() => collection.cleanup()) + await collection.stateWhenReady() + + // Create first query + const query1 = createLiveQueryCollection({ + query: (q) => + q + .from({ product: collection }) + .where(({ product }) => eq(product.category, `electronics`)) + .select(({ product }) => ({ + id: product.id, + name: product.name, + price: product.price, + category: product.category, + })), + }) + + await query1.preload() + + await vi.waitFor( + () => { + expect(query1.size).toBe(3) + }, + { timeout: 2000 }, + ) + + // Destroy it + query1.cleanup() + + await new Promise((resolve) => setTimeout(resolve, 100)) + + // Re-create with same predicate + const query2 = createLiveQueryCollection({ + query: (q) => + q + .from({ product: collection }) + .where(({ product }) => eq(product.category, `electronics`)) + .select(({ product }) => ({ + id: product.id, + name: product.name, + price: product.price, + category: product.category, + })), + }) + onTestFinished(() => query2.cleanup()) + + await query2.preload() + + await vi.waitFor( + () => { + expect(query2.size).toBe(3) + }, + { timeout: 2000 }, + ) + + // Verify reactive updates still work on the re-created query + await db.execute(` + INSERT INTO products (id, name, price, category) + VALUES (uuid(), 'Product F', 300, 'electronics') + `) + + await vi.waitFor( + () => { + expect(query2.size).toBe(4) + }, + { timeout: 2000 }, + ) + }) + }) + + describe(`Edge cases`, () => { + it(`should handle loadSubset with no WHERE clause (load all data)`, async () => { + const db = await createDatabase() + await createTestProducts(db) + + const collection = createCollection( + powerSyncCollectionOptions({ + database: db, + table: APP_SCHEMA.props.products, + syncMode: `on-demand`, + }), + ) + onTestFinished(() => collection.cleanup()) + await collection.stateWhenReady() + + // Query with no WHERE — selects all products + const allQuery = createLiveQueryCollection({ + query: (q) => + q.from({ product: collection }).select(({ product }) => ({ + id: product.id, + name: product.name, + price: product.price, + category: product.category, + })), + }) + onTestFinished(() => allQuery.cleanup()) + + await allQuery.preload() + + await vi.waitFor( + () => { + expect(allQuery.size).toBe(5) + }, + { timeout: 2000 }, + ) + }) + + it(`should handle empty result from loadSubset (no matching rows in SQLite)`, async () => { + const db = await createDatabase() + await createTestProducts(db) + + const collection = createCollection( + powerSyncCollectionOptions({ + database: db, + table: APP_SCHEMA.props.products, + syncMode: `on-demand`, + }), + ) + onTestFinished(() => collection.cleanup()) + await collection.stateWhenReady() + + const emptyQuery = createLiveQueryCollection({ + query: (q) => + q + .from({ product: collection }) + .where(({ product }) => eq(product.category, `furniture`)) + .select(({ product }) => ({ + id: product.id, + name: product.name, + price: product.price, + category: product.category, + })), + }) + onTestFinished(() => emptyQuery.cleanup()) + + await emptyQuery.preload() + + await new Promise((resolve) => setTimeout(resolve, 200)) + + expect(emptyQuery.size).toBe(0) + expect(collection.size).toBe(0) + }) + + it(`should handle concurrent loadSubset calls (multiple queries preloading simultaneously)`, async () => { + const db = await createDatabase() + await createTestProducts(db) + + const collection = createCollection( + powerSyncCollectionOptions({ + database: db, + table: APP_SCHEMA.props.products, + syncMode: `on-demand`, + }), + ) + onTestFinished(() => collection.cleanup()) + await collection.stateWhenReady() + + // Create three queries but don't await preload individually + const electronicsQuery = createLiveQueryCollection({ + query: (q) => + q + .from({ product: collection }) + .where(({ product }) => eq(product.category, `electronics`)) + .select(({ product }) => ({ + id: product.id, + name: product.name, + price: product.price, + category: product.category, + })), + }) + onTestFinished(() => electronicsQuery.cleanup()) + + const clothingQuery = createLiveQueryCollection({ + query: (q) => + q + .from({ product: collection }) + .where(({ product }) => eq(product.category, `clothing`)) + .select(({ product }) => ({ + id: product.id, + name: product.name, + price: product.price, + category: product.category, + })), + }) + onTestFinished(() => clothingQuery.cleanup()) + + const expensiveQuery = createLiveQueryCollection({ + query: (q) => + q + .from({ product: collection }) + .where(({ product }) => gt(product.price, 100)) + .select(({ product }) => ({ + id: product.id, + name: product.name, + price: product.price, + category: product.category, + })), + }) + + onTestFinished(() => expensiveQuery.cleanup()) + + // Preload all concurrently + await Promise.all([ + electronicsQuery.preload(), + clothingQuery.preload(), + expensiveQuery.preload(), + ]) + + await vi.waitFor( + () => { + expect(electronicsQuery.size).toBe(3) // A, B, D + expect(clothingQuery.size).toBe(2) // C, E + expect(expensiveQuery.size).toBe(2) // B, D + }, + { timeout: 2000 }, + ) + }) + }) + + describe(`Overlapping data across queries`, () => { + it(`should deduplicate rows when multiple live queries load the same data`, async () => { + const db = await createDatabase() + await createTestProducts(db) + + const collection = createCollection( + powerSyncCollectionOptions({ + database: db, + table: APP_SCHEMA.props.products, + syncMode: `on-demand`, + }), + ) + onTestFinished(() => collection.cleanup()) + await collection.stateWhenReady() + + // LQ1: electronics category — matches A(50), B(150), D(200) + const electronicsQuery = createLiveQueryCollection({ + query: (q) => + q + .from({ product: collection }) + .where(({ product }) => eq(product.category, `electronics`)) + .select(({ product }) => ({ + id: product.id, + name: product.name, + price: product.price, + category: product.category, + })), + }) + + onTestFinished(() => electronicsQuery.cleanup()) + + await electronicsQuery.preload() + + await vi.waitFor( + () => { + expect(electronicsQuery.size).toBe(3) + }, + { timeout: 2000 }, + ) + + // LQ2: price > 100 — matches B(150), D(200) + // Products B and D overlap with LQ1 + const expensiveQuery = createLiveQueryCollection({ + query: (q) => + q + .from({ product: collection }) + .where(({ product }) => gt(product.price, 100)) + .select(({ product }) => ({ + id: product.id, + name: product.name, + price: product.price, + category: product.category, + })), + }) + + onTestFinished(() => expensiveQuery.cleanup()) + + await expensiveQuery.preload() + + await vi.waitFor( + () => { + expect(expensiveQuery.size).toBe(2) + }, + { timeout: 2000 }, + ) + + // Both loadSubset calls inserted rows B and D — base collection should have no duplicates + // Union of both subsets: A, B, D (B and D are shared) + const baseNames = collection.toArray.map((p: any) => p.name).sort() + expect(baseNames).toEqual([`Product A`, `Product B`, `Product D`]) + + // Both live queries return correct results over the shared data + const electronicsNames = electronicsQuery.toArray + .map((p) => p.name) + .sort() + expect(electronicsNames).toEqual([`Product A`, `Product B`, `Product D`]) + + const expensiveNames = expensiveQuery.toArray.map((p) => p.name).sort() + expect(expensiveNames).toEqual([`Product B`, `Product D`]) + + // Update a shared row — both queries should see the change + const productB = expensiveQuery.toArray.find( + (p) => p.name === `Product B`, + ) + await db.execute(`UPDATE products SET price = 175 WHERE id = ?`, [ + productB!.id, + ]) + + await vi.waitFor( + () => { + const inElectronics = electronicsQuery.toArray.find( + (p) => p.name === `Product B`, + ) + const inExpensive = expensiveQuery.toArray.find( + (p) => p.name === `Product B`, + ) + expect(inElectronics?.price).toBe(175) + expect(inExpensive?.price).toBe(175) + }, + { timeout: 2000 }, + ) + }) + + it(`should handle changing a live query's predicate by replacing the collection`, async () => { + const db = await createDatabase() + await createTestProducts(db) + + const collection = createCollection( + powerSyncCollectionOptions({ + database: db, + table: APP_SCHEMA.props.products, + syncMode: `on-demand`, + }), + ) + onTestFinished(() => collection.cleanup()) + await collection.stateWhenReady() + + // Start with all products (no WHERE) + let liveQuery = createLiveQueryCollection({ + query: (q) => + q.from({ product: collection }).select(({ product }) => ({ + id: product.id, + name: product.name, + price: product.price, + category: product.category, + })), + }) + + await liveQuery.preload() + + await vi.waitFor( + () => { + expect(liveQuery.size).toBe(5) + }, + { timeout: 2000 }, + ) + + // Switch to only electronics + liveQuery.cleanup() + + liveQuery = createLiveQueryCollection({ + query: (q) => + q + .from({ product: collection }) + .where(({ product }) => eq(product.category, `electronics`)) + .select(({ product }) => ({ + id: product.id, + name: product.name, + price: product.price, + category: product.category, + })), + }) + onTestFinished(() => liveQuery.cleanup()) + + await liveQuery.preload() + + await vi.waitFor( + () => { + expect(liveQuery.size).toBe(3) + }, + { timeout: 2000 }, + ) + + const names = liveQuery.toArray.map((p) => p.name).sort() + expect(names).toEqual([`Product A`, `Product B`, `Product D`]) + + // Verify reactive updates work on the new query + await db.execute(` + INSERT INTO products (id, name, price, category) + VALUES (uuid(), 'Product F', 99, 'electronics') + `) + + await vi.waitFor( + () => { + expect(liveQuery.size).toBe(4) + }, + { timeout: 2000 }, + ) + }) + }) +}) diff --git a/packages/powersync-db-collection/tests/sqlite-compiler.test.ts b/packages/powersync-db-collection/tests/sqlite-compiler.test.ts new file mode 100644 index 000000000..59c7d5d81 --- /dev/null +++ b/packages/powersync-db-collection/tests/sqlite-compiler.test.ts @@ -0,0 +1,329 @@ +import { describe, expect, it } from 'vitest' +import { IR } from '@tanstack/db' +import { compileSQLite } from '../src/sqlite-compiler' + +const val = (value: T) => new IR.Value(value) +// Helper to create expression nodes +const ref = (path: Array) => new IR.PropRef(path) +const func = (name: string, args: Array) => + new IR.Func(name, args) + +describe(`SQLite Compiler`, () => { + describe(`where clause compilation`, () => { + it(`should compile eq operator`, () => { + const result = compileSQLite({ + where: func(`eq`, [ref([`name`]), val(`test`)]), + }) + + expect(result.where).toBe(`"name" = ?`) + expect(result.params).toEqual([`test`]) + }) + + it(`should compile gt operator`, () => { + const result = compileSQLite({ + where: func(`gt`, [ref([`price`]), val(100)]), + }) + + expect(result.where).toBe(`"price" > ?`) + expect(result.params).toEqual([100]) + }) + + it(`should compile gte operator`, () => { + const result = compileSQLite({ + where: func(`gte`, [ref([`price`]), val(100)]), + }) + + expect(result.where).toBe(`"price" >= ?`) + expect(result.params).toEqual([100]) + }) + + it(`should compile lt operator`, () => { + const result = compileSQLite({ + where: func(`lt`, [ref([`price`]), val(100)]), + }) + + expect(result.where).toBe(`"price" < ?`) + expect(result.params).toEqual([100]) + }) + + it(`should compile lte operator`, () => { + const result = compileSQLite({ + where: func(`lte`, [ref([`price`]), val(100)]), + }) + + expect(result.where).toBe(`"price" <= ?`) + expect(result.params).toEqual([100]) + }) + + it(`should compile and operator with two conditions`, () => { + const result = compileSQLite({ + where: func(`and`, [ + func(`gt`, [ref([`price`]), val(50)]), + func(`lt`, [ref([`price`]), val(100)]), + ]), + }) + + expect(result.where).toBe(`("price" > ?) AND ("price" < ?)`) + expect(result.params).toEqual([50, 100]) + }) + + it(`should compile and operator with multiple conditions`, () => { + const result = compileSQLite({ + where: func(`and`, [ + func(`eq`, [ref([`status`]), val(`active`)]), + func(`gt`, [ref([`price`]), val(50)]), + func(`lt`, [ref([`price`]), val(100)]), + ]), + }) + + expect(result.where).toBe( + `("status" = ?) AND ("price" > ?) AND ("price" < ?)`, + ) + expect(result.params).toEqual([`active`, 50, 100]) + }) + + it(`should compile or operator`, () => { + const result = compileSQLite({ + where: func(`or`, [ + func(`eq`, [ref([`status`]), val(`active`)]), + func(`eq`, [ref([`status`]), val(`pending`)]), + ]), + }) + + expect(result.where).toBe(`("status" = ?) OR ("status" = ?)`) + expect(result.params).toEqual([`active`, `pending`]) + }) + + it(`should compile isNull operator`, () => { + const result = compileSQLite({ + where: func(`isNull`, [ref([`deleted_at`])]), + }) + + expect(result.where).toBe(`"deleted_at" IS NULL`) + expect(result.params).toEqual([]) + }) + + it(`should compile not(isNull) as IS NOT NULL`, () => { + const result = compileSQLite({ + where: func(`not`, [func(`isNull`, [ref([`deleted_at`])])]), + }) + + expect(result.where).toBe(`"deleted_at" IS NOT NULL`) + expect(result.params).toEqual([]) + }) + + it(`should compile like operator`, () => { + const result = compileSQLite({ + where: func(`like`, [ref([`name`]), val(`%test%`)]), + }) + + expect(result.where).toBe(`"name" LIKE ?`) + expect(result.params).toEqual([`%test%`]) + }) + + it(`should escape quotes in column names`, () => { + const result = compileSQLite({ + where: func(`eq`, [ref([`col"name`]), val(`test`)]), + }) + + expect(result.where).toBe(`"col""name" = ?`) + }) + + it(`should throw error for null values in comparison operators`, () => { + expect(() => + compileSQLite({ + where: func(`eq`, [ref([`name`]), val(null)]), + }), + ).toThrow(`Cannot use null/undefined with 'eq' operator`) + }) + + it(`should compile ilike operator`, () => { + const result = compileSQLite({ + where: func(`ilike`, [ref([`name`]), val(`%test%`)]), + }) + + expect(result.where).toBe(`"name" LIKE ? COLLATE NOCASE`) + expect(result.params).toEqual([`%test%`]) + }) + + it(`should compile upper function`, () => { + const result = compileSQLite({ + where: func(`eq`, [func(`upper`, [ref([`name`])]), val(`TEST`)]), + }) + + expect(result.where).toBe(`UPPER("name") = ?`) + expect(result.params).toEqual([`TEST`]) + }) + + it(`should compile lower function`, () => { + const result = compileSQLite({ + where: func(`eq`, [func(`lower`, [ref([`name`])]), val(`test`)]), + }) + + expect(result.where).toBe(`LOWER("name") = ?`) + expect(result.params).toEqual([`test`]) + }) + + it(`should compile coalesce function`, () => { + const result = compileSQLite({ + where: func(`eq`, [ + func(`coalesce`, [ref([`name`]), val(`default`)]), + val(`test`), + ]), + }) + + expect(result.where).toBe(`COALESCE("name", ?) = ?`) + expect(result.params).toEqual([`default`, `test`]) + }) + + it(`should compile length function`, () => { + const result = compileSQLite({ + where: func(`gt`, [func(`length`, [ref([`name`])]), val(5)]), + }) + + expect(result.where).toBe(`LENGTH("name") > ?`) + expect(result.params).toEqual([5]) + }) + + it(`should compile concat function with multiple args`, () => { + const result = compileSQLite({ + where: func(`eq`, [ + func(`concat`, [ref([`first_name`]), val(` `), ref([`last_name`])]), + val(`John Doe`), + ]), + }) + + expect(result.where).toBe(`CONCAT("first_name", ?, "last_name") = ?`) + expect(result.params).toEqual([` `, `John Doe`]) + }) + + it(`should compile add operator`, () => { + const result = compileSQLite({ + where: func(`gt`, [func(`add`, [ref([`price`]), val(10)]), val(100)]), + }) + + expect(result.where).toBe(`"price" + ? > ?`) + expect(result.params).toEqual([10, 100]) + }) + + it(`should throw for length with wrong arg count`, () => { + expect(() => + compileSQLite({ where: func(`length`, [ref([`a`]), ref([`b`])]) }), + ).toThrow(`length expects 1 argument`) + }) + + it(`should throw for add with wrong arg count`, () => { + expect(() => + compileSQLite({ where: func(`add`, [ref([`price`])]) }), + ).toThrow(`add expects 2 arguments`) + }) + + it(`should throw error for unsupported operators`, () => { + expect(() => + compileSQLite({ + where: func(`unsupported_op`, [ref([`name`]), val(`%test%`)]), + }), + ).toThrow(`Operator 'unsupported_op' is not supported`) + }) + }) + + describe(`orderBy compilation`, () => { + it(`should compile simple orderBy`, () => { + const result = compileSQLite({ + orderBy: [ + { + expression: ref([`price`]), + compareOptions: { direction: `asc`, nulls: `last` }, + }, + ], + }) + + expect(result.orderBy).toBe(`"price" NULLS LAST`) + expect(result.params).toEqual([]) + }) + + it(`should compile orderBy with desc direction`, () => { + const result = compileSQLite({ + orderBy: [ + { + expression: ref([`price`]), + compareOptions: { direction: `desc`, nulls: `last` }, + }, + ], + }) + + expect(result.orderBy).toBe(`"price" DESC NULLS LAST`) + }) + + it(`should compile orderBy with nulls first`, () => { + const result = compileSQLite({ + orderBy: [ + { + expression: ref([`price`]), + compareOptions: { direction: `asc`, nulls: `first` }, + }, + ], + }) + + expect(result.orderBy).toBe(`"price" NULLS FIRST`) + }) + + it(`should compile multiple orderBy clauses`, () => { + const result = compileSQLite({ + orderBy: [ + { + expression: ref([`category`]), + compareOptions: { direction: `asc`, nulls: `last` }, + }, + { + expression: ref([`price`]), + compareOptions: { direction: `desc`, nulls: `last` }, + }, + ], + }) + + expect(result.orderBy).toBe( + `"category" NULLS LAST, "price" DESC NULLS LAST`, + ) + }) + }) + + describe(`limit`, () => { + it(`should pass through limit`, () => { + const result = compileSQLite({ + limit: 50, + }) + + expect(result.limit).toBe(50) + }) + }) + + describe(`combined options`, () => { + it(`should compile where, orderBy, and limit together`, () => { + const result = compileSQLite({ + where: func(`gt`, [ref([`price`]), val(100)]), + orderBy: [ + { + expression: ref([`price`]), + compareOptions: { direction: `desc`, nulls: `last` }, + }, + ], + limit: 10, + }) + + expect(result.where).toBe(`"price" > ?`) + expect(result.orderBy).toBe(`"price" DESC NULLS LAST`) + expect(result.limit).toBe(10) + expect(result.params).toEqual([100]) + }) + + it(`should handle empty options`, () => { + const result = compileSQLite({}) + + expect(result.where).toBeUndefined() + expect(result.orderBy).toBeUndefined() + expect(result.limit).toBeUndefined() + expect(result.params).toEqual([]) + }) + }) +})