From fe413f72302c804275066312e983f61c1b121d1c Mon Sep 17 00:00:00 2001 From: Amine Date: Thu, 12 Feb 2026 14:25:08 +0800 Subject: [PATCH 1/3] feat: Added Sync Streams support and improved overall feature parity with the React SDK --- packages/nuxt/src/module.ts | 7 +- .../nuxt/src/runtime/utils/addImportsFrom.ts | 14 +- packages/vue/src/composables/powerSync.ts | 8 +- .../useAllSyncStreamsHaveSynced.ts | 102 ++++++ packages/vue/src/composables/useQuery.ts | 72 ++++- .../vue/src/composables/useSingleQuery.ts | 38 ++- packages/vue/src/composables/useStatus.ts | 30 +- packages/vue/src/composables/useSyncStream.ts | 64 ++++ .../vue/src/composables/useWatchedQuery.ts | 14 +- packages/vue/src/index.ts | 2 + packages/vue/tests/powerSync.test.ts | 23 ++ packages/vue/tests/streams.test.ts | 133 ++++++++ packages/vue/tests/useQuery.test.ts | 294 +++++++++++++++++- packages/vue/tests/useStatus.test.ts | 19 +- 14 files changed, 797 insertions(+), 23 deletions(-) create mode 100644 packages/vue/src/composables/useAllSyncStreamsHaveSynced.ts create mode 100644 packages/vue/src/composables/useSyncStream.ts create mode 100644 packages/vue/tests/powerSync.test.ts create mode 100644 packages/vue/tests/streams.test.ts diff --git a/packages/nuxt/src/module.ts b/packages/nuxt/src/module.ts index aaa2da2d6..c5d4c181f 100644 --- a/packages/nuxt/src/module.ts +++ b/packages/nuxt/src/module.ts @@ -139,7 +139,12 @@ export default defineNuxtModule({ 'usePowerSyncWatchedQuery', 'useQuery', 'useStatus', - 'useWatchedQuerySubscription' + 'useWatchedQuerySubscription', + 'useSyncStream', + { + name: 'AdditionalOptions', + type: true + } ], '@powersync/vue' ); diff --git a/packages/nuxt/src/runtime/utils/addImportsFrom.ts b/packages/nuxt/src/runtime/utils/addImportsFrom.ts index 74cdf2a0a..ff9ee898b 100644 --- a/packages/nuxt/src/runtime/utils/addImportsFrom.ts +++ b/packages/nuxt/src/runtime/utils/addImportsFrom.ts @@ -1,5 +1,15 @@ import { addImports } from '@nuxt/kit'; -export const addImportsFrom = (names: string[], from: string) => { - addImports(names.map((name) => ({ name, from }))); +interface ImportDefinition { + name: string; + type: boolean; +} +export const addImportsFrom = (names: (string | ImportDefinition)[], from: string) => { + addImports( + names.map((name) => ({ + name: typeof name === 'string' ? name : name.name, + type: typeof name === 'string' ? false : name.type, + from + })) + ); }; diff --git a/packages/vue/src/composables/powerSync.ts b/packages/vue/src/composables/powerSync.ts index 21c84b850..01d1bb849 100644 --- a/packages/vue/src/composables/powerSync.ts +++ b/packages/vue/src/composables/powerSync.ts @@ -3,7 +3,7 @@ import { App, MaybeRef, Ref, hasInjectionContext, inject, provide, ref } from 'v import { setupTopLevelWarningMessage } from './messages.js'; // Create a unique symbol for the PowerSync context -const PowerSyncKey = Symbol(); +export const POWERSYNC_KEY = Symbol('POWERSYNC_KEY'); /** * Create a Vue plugin to define the PowerSync client. @@ -13,7 +13,7 @@ const PowerSyncKey = Symbol(); */ export function createPowerSyncPlugin(powerSyncPluginOptions: { database: MaybeRef }) { const install = (app: App) => { - app.provide(PowerSyncKey, ref(powerSyncPluginOptions.database)); + app.provide(POWERSYNC_KEY, ref(powerSyncPluginOptions.database)); }; return { install }; } @@ -26,7 +26,7 @@ export function createPowerSyncPlugin(powerSyncPluginOptions: { database: MaybeR * If the key parameter is provided, the client will be provided under that key instead of the default PowerSync key. */ export function providePowerSync(database: MaybeRef, key: string | undefined = undefined) { - provide(key || PowerSyncKey, ref(database)); + provide(key || POWERSYNC_KEY, ref(database)); } /** @@ -42,7 +42,7 @@ export const usePowerSync = (key: string | undefined = undefined) => { if (!hasInjectionContext()) { throw setupTopLevelWarningMessage; } - const powerSync = inject | undefined>(key || PowerSyncKey); + const powerSync = inject | undefined>(key || POWERSYNC_KEY); if (!powerSync) { console.warn('[PowerSync warn]: No PowerSync client found.'); diff --git a/packages/vue/src/composables/useAllSyncStreamsHaveSynced.ts b/packages/vue/src/composables/useAllSyncStreamsHaveSynced.ts new file mode 100644 index 000000000..95e4da0fc --- /dev/null +++ b/packages/vue/src/composables/useAllSyncStreamsHaveSynced.ts @@ -0,0 +1,102 @@ +import { AbstractPowerSyncDatabase, SyncStatus, SyncStreamSubscription } from '@powersync/common'; +import { UseSyncStreamOptions } from './useSyncStream.js'; +import { computed, MaybeRef, Ref, ref, toValue, watch } from 'vue'; + +/** + * Additional options to control how `useQuery` behaves when subscribing to a stream. + */ +export interface QuerySyncStreamOptions extends UseSyncStreamOptions { + /** + * The name of the stream to subscribe to. + */ + name: string; + /** + * When set to `true`, a `useQuery` hook will remain in a loading state as long as the stream is resolving or + * downloading for the first time (in other words, until {@link SyncSubscriptionDescription.hasSynced} is true). + */ + waitForStream?: boolean; +} + +/** + * @internal + */ +export const useAllSyncStreamsHaveSynced = ( + db: MaybeRef, + streams: MaybeRef +): { synced: Ref } => { + // Compute hash to detect content changes (not just reference changes) + const hash = computed(() => { + const streamsValue = toValue(streams); + return streamsValue && JSON.stringify(streamsValue); + }); + + // Initialize synced state + const streamsValue = toValue(streams); + const synced = ref(streamsValue == null || streamsValue.every((e) => e.waitForStream != true)); + + watch( + hash, + (_currentHash, _oldHash, onCleanup) => { + const streamsValue = toValue(streams); + const dbValue = toValue(db); + + if (!dbValue) { + return; + } + + if (streamsValue) { + synced.value = false; + + const promises: Promise[] = []; + const abort = new AbortController(); + + for (const stream of streamsValue) { + promises.push(dbValue.syncStream(stream.name, stream.parameters).subscribe(stream)); + } + + // First, wait for all subscribe() calls to make all subscriptions active. + Promise.all(promises).then(async (resolvedStreams) => { + function allHaveSynced(status: SyncStatus) { + return resolvedStreams.every((s, i) => { + const request = streamsValue[i]; + return !request.waitForStream || status.forStream(s)?.subscription?.hasSynced; + }); + } + + // Wait for the effect to be cancelled or all streams having synced. + await dbValue.waitForStatus(allHaveSynced, abort.signal); + + if (abort.signal.aborted) { + // Was cancelled + } else { + // Has synced, update public state. + synced.value = true; + + // Wait for cancellation before clearing subscriptions. + await new Promise((resolve) => { + abort.signal.addEventListener('abort', () => resolve()); + }); + } + + // Effect was definitely cancelled at this point, so drop the subscriptions. + for (const stream of resolvedStreams) { + stream.unsubscribe(); + } + }); + + // Cleanup: abort when dependencies change or component unmounts + onCleanup(() => { + abort.abort(); + }); + } else { + // There are no streams, so all of them have synced. + synced.value = true; + } + }, + { immediate: true } + ); + + return { + synced + }; +}; diff --git a/packages/vue/src/composables/useQuery.ts b/packages/vue/src/composables/useQuery.ts index f9bcd2c5e..0e174e6e4 100644 --- a/packages/vue/src/composables/useQuery.ts +++ b/packages/vue/src/composables/useQuery.ts @@ -1,7 +1,9 @@ import { type CompilableQuery } from '@powersync/common'; -import { type MaybeRef, type Ref } from 'vue'; +import { computed, ref, toValue, type MaybeRef, type Ref } from 'vue'; import { AdditionalOptions, useSingleQuery } from './useSingleQuery.js'; import { useWatchedQuery } from './useWatchedQuery.js'; +import { usePowerSync } from './powerSync.js'; +import { useAllSyncStreamsHaveSynced } from './useAllSyncStreamsHaveSynced.js'; export type WatchedQueryResult = { readonly data: Ref>>; @@ -20,6 +22,14 @@ export type WatchedQueryResult = { refresh?: () => Promise; }; +const createLoadingState = (): WatchedQueryResult => ({ + data: ref([]), + isLoading: ref(true), + isFetching: ref(true), + error: ref(undefined), + refresh: undefined +}); + /** * A composable to access the results of a watched query. * @@ -45,12 +55,60 @@ export type WatchedQueryResult = { export const useQuery = ( query: MaybeRef>, sqlParameters: MaybeRef = [], - options: AdditionalOptions = {} + options: MaybeRef> = {} ): WatchedQueryResult => { - switch (true) { - case options.runQueryOnce: - return useSingleQuery(query, sqlParameters, options); - default: - return useWatchedQuery(query, sqlParameters, options); + const ps = usePowerSync(); + if (!ps) { + const loadingState = createLoadingState(); + return { + ...loadingState, + isLoading: ref(false), + error: ref(new Error('PowerSync not configured.')) + }; } + + const { synced: streamsHaveSynced } = useAllSyncStreamsHaveSynced( + ps, + computed(() => toValue(options)?.streams) + ); + + const runOnce = computed(() => toValue(options)?.runQueryOnce === true); + + const single = useSingleQuery(query, sqlParameters, { + ...toValue(options), + active: computed(() => runOnce.value && streamsHaveSynced.value) + }); + + const watched = useWatchedQuery(query, sqlParameters, { + ...toValue(options), + active: computed(() => !runOnce.value && streamsHaveSynced.value) + }); + + const loadingState = createLoadingState(); + return { + data: computed(() => { + if (!streamsHaveSynced.value) return loadingState.data.value; + return runOnce.value ? single.data.value : watched.data.value; + }), + + isLoading: computed(() => { + if (!streamsHaveSynced.value) return true; + return runOnce.value ? single.isLoading.value : watched.isLoading.value; + }), + + isFetching: computed(() => { + if (!streamsHaveSynced.value) return false; + return runOnce.value ? single.isFetching.value : watched.isFetching.value; + }), + + error: computed(() => { + if (!streamsHaveSynced.value) return undefined; + return runOnce.value ? single.error.value : watched.error.value; + }), + + refresh: () => { + const currentMode = runOnce.value; + return currentMode ? single.refresh?.() : watched.refresh?.(); + } + }; }; diff --git a/packages/vue/src/composables/useSingleQuery.ts b/packages/vue/src/composables/useSingleQuery.ts index 3d970f7f8..cc6508ff1 100644 --- a/packages/vue/src/composables/useSingleQuery.ts +++ b/packages/vue/src/composables/useSingleQuery.ts @@ -7,6 +7,7 @@ import { } from '@powersync/common'; import { type MaybeRef, type Ref, ref, toValue, watchEffect } from 'vue'; import { usePowerSync } from './powerSync.js'; +import { QuerySyncStreamOptions } from './useAllSyncStreamsHaveSynced.js'; export interface AdditionalOptions extends Omit { runQueryOnce?: boolean; @@ -31,6 +32,28 @@ export interface AdditionalOptions extends Omit; + /** + * An optional array of sync streams (with names and parameters) backing the query. + * + * When set, `useQuery` will subscribe to those streams (and automatically handle unsubscribing from them, too). + * + * If {@link QuerySyncStreamOptions} is set on a stream, `useQuery` will remain in a loading state until that stream + * is marked as {@link SyncSubscriptionDescription.hasSynced}. This ensures the query is not missing rows that haven't + * been downloaded. + * Note however that after an initial sync, the query will not block itself while new rows are downloading. Instead, + * consistent sync snapshots will be made available as they've been processed by PowerSync. + * + * @experimental Sync streams are currently in alpha. + */ + streams?: QuerySyncStreamOptions[]; + + /** + * If true (default) the watched query will update its state to report + * on the fetching state of the query. + * Setting to false reduces the number of state changes if the fetch status + * is not relevant to the consumer. + */ + reportFetching?: boolean; } export type WatchedQueryResult = { @@ -53,7 +76,7 @@ export type WatchedQueryResult = { export const useSingleQuery = ( query: MaybeRef>, sqlParameters: MaybeRef = [], - options: AdditionalOptions = {} + options: AdditionalOptions & { active?: MaybeRef } = {} ): WatchedQueryResult => { const data = ref>>([]) as Ref>>; const error = ref(undefined); @@ -94,6 +117,12 @@ export const useSingleQuery = ( }; watchEffect(async (onCleanup) => { + const isActive = toValue(options.active ?? true); + if (!isActive) { + fetchData = undefined; + return; // Don't run if not active + } + const abortController = new AbortController(); // Abort any previous watches when the effect triggers again, or when the component is unmounted onCleanup(() => abortController.abort()); @@ -133,6 +162,11 @@ export const useSingleQuery = ( isLoading, isFetching, error, - refresh: () => fetchData?.() + refresh: () => { + if (!toValue(options.active ?? true)) { + return Promise.resolve(); + } + return fetchData?.(); + } }; }; diff --git a/packages/vue/src/composables/useStatus.ts b/packages/vue/src/composables/useStatus.ts index 3709bf40c..25118f23f 100644 --- a/packages/vue/src/composables/useStatus.ts +++ b/packages/vue/src/composables/useStatus.ts @@ -1,4 +1,6 @@ -import { usePowerSyncStatus } from './usePowerSyncStatus.js'; +import { SyncStatus } from '@powersync/common'; +import { ref, watchEffect } from 'vue'; +import { usePowerSync } from './powerSync.js'; /** * Retrieve the current synchronization status of PowerSync. @@ -12,4 +14,28 @@ import { usePowerSyncStatus } from './usePowerSyncStatus.js'; *