Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions packages/powersync-db-collection/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
"@standard-schema/spec": "^1.1.0",
"@tanstack/db": "workspace:*",
"@tanstack/store": "^0.8.0",
"async-mutex": "^0.5.0",
"debug": "^4.4.3",
"p-defer": "^4.0.1"
},
Expand Down
1 change: 1 addition & 0 deletions packages/powersync-db-collection/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
export * from './definitions'
export * from './powersync'
export * from './PowerSyncTransactor'
export * from './sqlite-compiler'
296 changes: 244 additions & 52 deletions packages/powersync-db-collection/src/powersync.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
import { DiffTriggerOperation, sanitizeSQL } from '@powersync/common'
import { Mutex } from 'async-mutex'
import { or } from '@tanstack/db'
import { compileSQLite } from './sqlite-compiler'
import { PendingOperationStore } from './PendingOperationStore'
import { PowerSyncTransactor } from './PowerSyncTransactor'
import { DEFAULT_BATCH_SIZE } from './definitions'
import { asPowerSyncRecord, mapOperation } from './helpers'
import { convertTableToSchema } from './schema'
import { serializeForSQLite } from './serialization'
import type { LoadSubsetOptions, OperationType, SyncConfig } from '@tanstack/db'
import type {
AnyTableColumnType,
ExtractedTable,
Expand All @@ -24,9 +28,8 @@ import type {
PowerSyncCollectionUtils,
} from './definitions'
import type { PendingOperation } from './PendingOperationStore'
import type { SyncConfig } from '@tanstack/db'
import type { StandardSchemaV1 } from '@standard-schema/spec'
import type { Table, TriggerDiffRecord } from '@powersync/common'
import type { LockContext, Table, TriggerDiffRecord } from '@powersync/common'

/**
* Creates PowerSync collection options for use with a standard Collection.
Expand Down Expand Up @@ -225,6 +228,7 @@ export function powerSyncCollectionOptions<
table,
schema: inputSchema,
syncBatchSize = DEFAULT_BATCH_SIZE,
syncMode = 'eager',
...restConfig
} = config

Expand Down Expand Up @@ -296,11 +300,66 @@ export function powerSyncCollectionOptions<
*/
const sync: SyncConfig<OutputType, string> = {
sync: (params) => {
const { begin, write, commit, markReady } = params
const { begin, write, collection, commit, markReady } = params
const abortController = new AbortController()

// The sync function needs to be synchronous
async function start() {
let disposeTracking: (() => Promise<void>) | null = null

if (syncMode === `eager`) {
return runEagerSync()
} else {
return runOnDemandSync()
}

async function createDiffTrigger(options: {
when: Record<DiffTriggerOperation, string>
writeType: (rowId: string) => OperationType
batchQuery: (
lockContext: LockContext,
batchSize: number,
cursor: number,
) => Promise<Array<TableType>>
onReady: () => void
}) {
const { when, writeType, batchQuery, onReady } = options

return await database.triggers.createDiffTrigger({
source: viewName,
destination: trackedTableName,
when,
hooks: {
beforeCreate: async (context) => {
let currentBatchCount = syncBatchSize
let cursor = 0
while (currentBatchCount == syncBatchSize) {
begin()

const batchItems = await batchQuery(
context,
syncBatchSize,
cursor,
)
currentBatchCount = batchItems.length
cursor += currentBatchCount
for (const row of batchItems) {
write({
type: writeType(row.id),
value: deserializeSyncRow(row),
})
}
commit()
}
onReady()
database.logger.info(
`Sync is ready for ${viewName} into ${trackedTableName}`,
)
},
},
})
}

// The sync function needs to be synchronous.
async function start(afterOnChangeRegistered?: () => Promise<void>) {
database.logger.info(
`Sync is starting for ${viewName} into ${trackedTableName}`,
)
Expand Down Expand Up @@ -362,68 +421,200 @@ export function powerSyncCollectionOptions<
},
)

const disposeTracking = await database.triggers.createDiffTrigger({
source: viewName,
destination: trackedTableName,
when: {
[DiffTriggerOperation.INSERT]: `TRUE`,
[DiffTriggerOperation.UPDATE]: `TRUE`,
[DiffTriggerOperation.DELETE]: `TRUE`,
},
hooks: {
beforeCreate: async (context) => {
let currentBatchCount = syncBatchSize
let cursor = 0
while (currentBatchCount == syncBatchSize) {
begin()
const batchItems = await context.getAll<TableType>(
sanitizeSQL`SELECT * FROM ${viewName} LIMIT ? OFFSET ?`,
[syncBatchSize, cursor],
)
currentBatchCount = batchItems.length
cursor += currentBatchCount
for (const row of batchItems) {
write({
type: `insert`,
value: deserializeSyncRow(row),
})
}
commit()
}
markReady()
database.logger.info(
`Sync is ready for ${viewName} into ${trackedTableName}`,
)
},
},
})
await afterOnChangeRegistered?.()

// If the abort controller was aborted while processing the request above
if (abortController.signal.aborted) {
await disposeTracking()
await disposeTracking?.()
} else {
abortController.signal.addEventListener(
`abort`,
() => {
disposeTracking()
disposeTracking?.()
},
{ once: true },
)
}
}

start().catch((error) =>
database.logger.error(
`Could not start syncing process for ${viewName} into ${trackedTableName}`,
error,
),
)
// Eager mode.
// Registers a diff trigger for the entire table.
function runEagerSync() {
start(async () => {
disposeTracking = await createDiffTrigger({
when: {
[DiffTriggerOperation.INSERT]: `TRUE`,
[DiffTriggerOperation.UPDATE]: `TRUE`,
[DiffTriggerOperation.DELETE]: `TRUE`,
},
writeType: (_rowId: string) => `insert`,
batchQuery: (
lockContext: LockContext,
batchSize: number,
cursor: number,
) =>
lockContext.getAll<TableType>(
sanitizeSQL`SELECT * FROM ${viewName} LIMIT ? OFFSET ?`,
[batchSize, cursor],
),
onReady: () => markReady(),
})
}).catch((error) =>
database.logger.error(
`Could not start syncing process for ${viewName} into ${trackedTableName}`,
error,
),
)

return () => {
database.logger.info(
`Sync has been stopped for ${viewName} into ${trackedTableName}`,
)
abortController.abort()
}
}

return () => {
database.logger.info(
`Sync has been stopped for ${viewName} into ${trackedTableName}`,
// On-demand mode.
// Registers a diff trigger for the active WHERE expressions.
function runOnDemandSync() {
start().catch((error) =>
database.logger.error(
`Could not start syncing process for ${viewName} into ${trackedTableName}`,
error,
),
)
abortController.abort()

// Tracks all active WHERE expressions for on-demand sync filtering.
// Each loadSubset call pushes its predicate; unloadSubset removes it.
const activeWhereExpressions: Array<LoadSubsetOptions['where']> = []
const mutex = new Mutex()

const loadSubset = async (options?: LoadSubsetOptions): Promise<void> => {
if (options) {
activeWhereExpressions.push(options.where)
}

if (activeWhereExpressions.length === 0) {
await disposeTracking?.()
return
}

const combinedWhere =
activeWhereExpressions.length === 1
? activeWhereExpressions[0]
: or(
activeWhereExpressions[0]!,
activeWhereExpressions[1]!,
...activeWhereExpressions.slice(2),
)

const compiledNewData = compileSQLite(
{ where: combinedWhere },
{ jsonColumn: 'NEW.data' },
)

const compiledOldData = compileSQLite(
{ where: combinedWhere },
{ jsonColumn: 'OLD.data' },
)

const compiledView = compileSQLite({ where: combinedWhere })

const newDataWhenClause = toInlinedWhereClause(compiledNewData)
const oldDataWhenClause = toInlinedWhereClause(compiledOldData)
const viewWhereClause = toInlinedWhereClause(compiledView)

await disposeTracking?.()

disposeTracking = await createDiffTrigger({
when: {
[DiffTriggerOperation.INSERT]: newDataWhenClause,
[DiffTriggerOperation.UPDATE]: `(${newDataWhenClause}) OR (${oldDataWhenClause})`,
[DiffTriggerOperation.DELETE]: oldDataWhenClause,
},
writeType: (rowId: string) =>
collection.has(rowId) ? `update` : `insert`,
batchQuery: (
lockContext: LockContext,
batchSize: number,
cursor: number,
) =>
lockContext.getAll<TableType>(
`SELECT * FROM ${viewName} WHERE ${viewWhereClause} LIMIT ? OFFSET ?`,
[batchSize, cursor],
),
onReady: () => {},
})
}

const toInlinedWhereClause = (compiled: {
where?: string
params: Array<unknown>
}): string => {
if (!compiled.where) return 'TRUE'
const sqlParts = compiled.where.split('?')
return sanitizeSQL(
sqlParts as unknown as TemplateStringsArray,
...compiled.params,
)
}

const unloadSubset = async (options: LoadSubsetOptions) => {
const idx = activeWhereExpressions.indexOf(options.where)
if (idx !== -1) {
activeWhereExpressions.splice(idx, 1)
}

// Evict rows that were exclusively loaded by the departing predicate.
// These are rows matching the departing WHERE that are no longer covered
// by any remaining active predicate.
const compiledDeparting = compileSQLite({ where: options.where })
const departingWhereSQL = toInlinedWhereClause(compiledDeparting)

let evictionSQL: string
if (activeWhereExpressions.length === 0) {
evictionSQL = `SELECT id FROM ${viewName} WHERE ${departingWhereSQL}`
} else {
const combinedRemaining =
activeWhereExpressions.length === 1
? activeWhereExpressions[0]!
: or(
activeWhereExpressions[0]!,
activeWhereExpressions[1]!,
...activeWhereExpressions.slice(2),
)
const compiledRemaining = compileSQLite({ where: combinedRemaining })
const remainingWhereSQL = toInlinedWhereClause(compiledRemaining)
evictionSQL = `SELECT id FROM ${viewName} WHERE (${departingWhereSQL}) AND NOT (${remainingWhereSQL})`
}

const rowsToEvict = await database.getAll<{ id: string }>(evictionSQL)
if (rowsToEvict.length > 0) {
begin()
for (const { id } of rowsToEvict) {
write({ type: `delete`, key: id })
}
commit()
}

// Recreate the diff trigger for the remaining active WHERE expressions.
await loadSubset()
}

markReady()

return {
cleanup: () => {
database.logger.info(
`Sync has been stopped for ${viewName} into ${trackedTableName}`,
)
abortController.abort()
},
loadSubset: (options: LoadSubsetOptions) =>
mutex.runExclusive(() => loadSubset(options)),
unloadSubset: (options: LoadSubsetOptions) =>
mutex.runExclusive(() => unloadSubset(options)),
}
}
},
// Expose the getSyncMetadata function
Expand All @@ -442,6 +633,7 @@ export function powerSyncCollectionOptions<
getKey,
// Syncing should start immediately since we need to monitor the changes for mutations
startSync: true,
syncMode,
sync,
onInsert: async (params) => {
// The transaction here should only ever contain a single insert mutation
Expand Down
Loading
Loading