Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion packages/nuxt/src/module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,12 @@ export default defineNuxtModule<PowerSyncNuxtModuleOptions>({
'usePowerSyncWatchedQuery',
'useQuery',
'useStatus',
'useWatchedQuerySubscription'
'useWatchedQuerySubscription',
'useSyncStream',
{
name: 'AdditionalOptions',
type: true
}
],
'@powersync/vue'
);
Expand Down
14 changes: 12 additions & 2 deletions packages/nuxt/src/runtime/utils/addImportsFrom.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,15 @@
import { addImports } from '@nuxt/kit';

export const addImportsFrom = (names: string[], from: string) => {
addImports(names.map((name) => ({ name, from })));
interface ImportDefinition {
name: string;
type: boolean;
}
export const addImportsFrom = (names: (string | ImportDefinition)[], from: string) => {
addImports(
names.map((name) => ({
name: typeof name === 'string' ? name : name.name,
type: typeof name === 'string' ? false : name.type,
from
}))
);
};
2 changes: 1 addition & 1 deletion packages/vue/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
"homepage": "https://docs.powersync.com",
"peerDependencies": {
"vue": "*",
"@powersync/common": "workspace:^1.46.0"
"@powersync/common": "workspace:*"
},
"devDependencies": {
"@powersync/common": "workspace:*",
Expand Down
8 changes: 4 additions & 4 deletions packages/vue/src/composables/powerSync.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { App, MaybeRef, Ref, hasInjectionContext, inject, provide, ref } from 'v
import { setupTopLevelWarningMessage } from './messages.js';

// Create a unique symbol for the PowerSync context
const PowerSyncKey = Symbol();
export const POWERSYNC_KEY = Symbol('POWERSYNC_KEY');

/**
* Create a Vue plugin to define the PowerSync client.
Expand All @@ -13,7 +13,7 @@ const PowerSyncKey = Symbol();
*/
export function createPowerSyncPlugin(powerSyncPluginOptions: { database: MaybeRef<AbstractPowerSyncDatabase> }) {
const install = (app: App) => {
app.provide(PowerSyncKey, ref(powerSyncPluginOptions.database));
app.provide(POWERSYNC_KEY, ref(powerSyncPluginOptions.database));
};
return { install };
}
Expand All @@ -26,7 +26,7 @@ export function createPowerSyncPlugin(powerSyncPluginOptions: { database: MaybeR
* If the key parameter is provided, the client will be provided under that key instead of the default PowerSync key.
*/
export function providePowerSync(database: MaybeRef<AbstractPowerSyncDatabase>, key: string | undefined = undefined) {
provide(key || PowerSyncKey, ref(database));
provide(key || POWERSYNC_KEY, ref(database));
}

/**
Expand All @@ -42,7 +42,7 @@ export const usePowerSync = (key: string | undefined = undefined) => {
if (!hasInjectionContext()) {
throw setupTopLevelWarningMessage;
}
const powerSync = inject<Ref<AbstractPowerSyncDatabase> | undefined>(key || PowerSyncKey);
const powerSync = inject<Ref<AbstractPowerSyncDatabase> | undefined>(key || POWERSYNC_KEY);

if (!powerSync) {
console.warn('[PowerSync warn]: No PowerSync client found.');
Expand Down
102 changes: 102 additions & 0 deletions packages/vue/src/composables/useAllSyncStreamsHaveSynced.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
import { AbstractPowerSyncDatabase, SyncStatus, SyncStreamSubscription } from '@powersync/common';
import { UseSyncStreamOptions } from './useSyncStream.js';
import { computed, MaybeRef, Ref, ref, toValue, watch } from 'vue';

/**
* Additional options to control how `useQuery` behaves when subscribing to a stream.
*/
export interface QuerySyncStreamOptions extends UseSyncStreamOptions {
/**
* The name of the stream to subscribe to.
*/
name: string;
/**
* When set to `true`, a `useQuery` hook will remain in a loading state as long as the stream is resolving or
* downloading for the first time (in other words, until {@link SyncSubscriptionDescription.hasSynced} is true).
*/
waitForStream?: boolean;
}

/**
* @internal
*/
export const useAllSyncStreamsHaveSynced = (
db: MaybeRef<AbstractPowerSyncDatabase>,
streams: MaybeRef<QuerySyncStreamOptions[] | undefined>
): { synced: Ref<boolean | undefined> } => {
// Compute hash to detect content changes (not just reference changes)
const hash = computed(() => {
const streamsValue = toValue(streams);
return streamsValue && JSON.stringify(streamsValue);
});

// Initialize synced state
const streamsValue = toValue(streams);
const synced = ref(streamsValue == null || streamsValue.every((e) => e.waitForStream != true));

watch(
hash,
(_currentHash, _oldHash, onCleanup) => {
const streamsValue = toValue(streams);
const dbValue = toValue(db);

if (!dbValue) {
return;
}

if (streamsValue) {
synced.value = false;

const promises: Promise<SyncStreamSubscription>[] = [];
const abort = new AbortController();

for (const stream of streamsValue) {
promises.push(dbValue.syncStream(stream.name, stream.parameters).subscribe(stream));
}

// First, wait for all subscribe() calls to make all subscriptions active.
Promise.all(promises).then(async (resolvedStreams) => {
function allHaveSynced(status: SyncStatus) {
return resolvedStreams.every((s, i) => {
const request = streamsValue[i];
return !request.waitForStream || status.forStream(s)?.subscription?.hasSynced;
});
}

// Wait for the effect to be cancelled or all streams having synced.
await dbValue.waitForStatus(allHaveSynced, abort.signal);

if (abort.signal.aborted) {
// Was cancelled
} else {
// Has synced, update public state.
synced.value = true;

// Wait for cancellation before clearing subscriptions.
await new Promise<void>((resolve) => {
abort.signal.addEventListener('abort', () => resolve());
});
}

// Effect was definitely cancelled at this point, so drop the subscriptions.
for (const stream of resolvedStreams) {
stream.unsubscribe();
}
});

// Cleanup: abort when dependencies change or component unmounts
onCleanup(() => {
abort.abort();
});
} else {
// There are no streams, so all of them have synced.
synced.value = true;
}
},
{ immediate: true }
);

return {
synced
};
};
72 changes: 65 additions & 7 deletions packages/vue/src/composables/useQuery.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import { type CompilableQuery } from '@powersync/common';
import { type MaybeRef, type Ref } from 'vue';
import { computed, ref, toValue, type MaybeRef, type Ref } from 'vue';
import { AdditionalOptions, useSingleQuery } from './useSingleQuery.js';
import { useWatchedQuery } from './useWatchedQuery.js';
import { usePowerSync } from './powerSync.js';
import { useAllSyncStreamsHaveSynced } from './useAllSyncStreamsHaveSynced.js';

export type WatchedQueryResult<T> = {
readonly data: Ref<ReadonlyArray<Readonly<T>>>;
Expand All @@ -20,6 +22,14 @@ export type WatchedQueryResult<T> = {
refresh?: () => Promise<void>;
};

const createLoadingState = <T>(): WatchedQueryResult<T> => ({
data: ref([]),
isLoading: ref(true),
isFetching: ref(true),
error: ref(undefined),
refresh: undefined
});

/**
* A composable to access the results of a watched query.
*
Expand All @@ -45,12 +55,60 @@ export type WatchedQueryResult<T> = {
export const useQuery = <T = any>(
query: MaybeRef<string | CompilableQuery<T>>,
sqlParameters: MaybeRef<any[]> = [],
options: AdditionalOptions<T> = {}
options: MaybeRef<AdditionalOptions<T>> = {}
): WatchedQueryResult<T> => {
switch (true) {
case options.runQueryOnce:
return useSingleQuery(query, sqlParameters, options);
default:
return useWatchedQuery(query, sqlParameters, options);
const ps = usePowerSync();
if (!ps) {
const loadingState = createLoadingState<T>();
return {
...loadingState,
isLoading: ref(false),
error: ref(new Error('PowerSync not configured.'))
};
}

const { synced: streamsHaveSynced } = useAllSyncStreamsHaveSynced(
ps,
computed(() => toValue(options)?.streams)
);

const runOnce = computed(() => toValue(options)?.runQueryOnce === true);

const single = useSingleQuery(query, sqlParameters, {
...toValue(options),
active: computed(() => runOnce.value && streamsHaveSynced.value)
});

const watched = useWatchedQuery<T>(query, sqlParameters, {
...toValue(options),
active: computed(() => !runOnce.value && streamsHaveSynced.value)
});

const loadingState = createLoadingState<T>();
return {
data: computed(() => {
if (!streamsHaveSynced.value) return loadingState.data.value;
return runOnce.value ? single.data.value : watched.data.value;
}),

isLoading: computed(() => {
if (!streamsHaveSynced.value) return true;
return runOnce.value ? single.isLoading.value : watched.isLoading.value;
}),

isFetching: computed(() => {
if (!streamsHaveSynced.value) return false;
return runOnce.value ? single.isFetching.value : watched.isFetching.value;
}),

error: computed(() => {
if (!streamsHaveSynced.value) return undefined;
return runOnce.value ? single.error.value : watched.error.value;
}),

refresh: () => {
const currentMode = runOnce.value;
return currentMode ? single.refresh?.() : watched.refresh?.();
}
};
};
38 changes: 36 additions & 2 deletions packages/vue/src/composables/useSingleQuery.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import {
} from '@powersync/common';
import { type MaybeRef, type Ref, ref, toValue, watchEffect } from 'vue';
import { usePowerSync } from './powerSync.js';
import { QuerySyncStreamOptions } from './useAllSyncStreamsHaveSynced.js';

export interface AdditionalOptions<RowType = unknown> extends Omit<SQLOnChangeOptions, 'signal'> {
runQueryOnce?: boolean;
Expand All @@ -31,6 +32,28 @@ export interface AdditionalOptions<RowType = unknown> extends Omit<SQLOnChangeOp
* ```
*/
rowComparator?: DifferentialWatchedQueryComparator<RowType>;
/**
* An optional array of sync streams (with names and parameters) backing the query.
*
* When set, `useQuery` will subscribe to those streams (and automatically handle unsubscribing from them, too).
*
* If {@link QuerySyncStreamOptions} is set on a stream, `useQuery` will remain in a loading state until that stream
* is marked as {@link SyncSubscriptionDescription.hasSynced}. This ensures the query is not missing rows that haven't
* been downloaded.
* Note however that after an initial sync, the query will not block itself while new rows are downloading. Instead,
* consistent sync snapshots will be made available as they've been processed by PowerSync.
*
* @experimental Sync streams are currently in alpha.
*/
streams?: QuerySyncStreamOptions[];

/**
* If true (default) the watched query will update its state to report
* on the fetching state of the query.
* Setting to false reduces the number of state changes if the fetch status
* is not relevant to the consumer.
*/
reportFetching?: boolean;
}

export type WatchedQueryResult<T> = {
Expand All @@ -53,7 +76,7 @@ export type WatchedQueryResult<T> = {
export const useSingleQuery = <T = any>(
query: MaybeRef<string | CompilableQuery<T>>,
sqlParameters: MaybeRef<any[]> = [],
options: AdditionalOptions<T> = {}
options: AdditionalOptions<T> & { active?: MaybeRef<boolean> } = {}
): WatchedQueryResult<T> => {
const data = ref<ReadonlyArray<Readonly<T>>>([]) as Ref<ReadonlyArray<Readonly<T>>>;
const error = ref<Error | undefined>(undefined);
Expand Down Expand Up @@ -94,6 +117,12 @@ export const useSingleQuery = <T = any>(
};

watchEffect(async (onCleanup) => {
const isActive = toValue(options.active ?? true);
if (!isActive) {
fetchData = undefined;
return; // Don't run if not active
}

const abortController = new AbortController();
// Abort any previous watches when the effect triggers again, or when the component is unmounted
onCleanup(() => abortController.abort());
Expand Down Expand Up @@ -133,6 +162,11 @@ export const useSingleQuery = <T = any>(
isLoading,
isFetching,
error,
refresh: () => fetchData?.()
refresh: () => {
if (!toValue(options.active ?? true)) {
return Promise.resolve();
}
return fetchData?.();
}
};
};
30 changes: 28 additions & 2 deletions packages/vue/src/composables/useStatus.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import { usePowerSyncStatus } from './usePowerSyncStatus.js';
import { SyncStatus } from '@powersync/common';
import { ref, watchEffect } from 'vue';
import { usePowerSync } from './powerSync.js';

/**
* Retrieve the current synchronization status of PowerSync.
Expand All @@ -12,4 +14,28 @@ import { usePowerSyncStatus } from './usePowerSyncStatus.js';
* <script>
* ```
*/
export const useStatus = usePowerSyncStatus;
export const useStatus = () => {
const powerSync = usePowerSync();
const status = ref(new SyncStatus({}));

if (!powerSync) {
return status;
}

status.value = powerSync.value.currentStatus || status.value;

watchEffect((onCleanup) => {
const listener = powerSync.value.registerListener({
statusChanged: (newStatus: SyncStatus) => {
status.value = newStatus;
}
});

// Cleanup previous listener when the effect triggers again, or when the component is unmounted
onCleanup(() => {
listener();
});
});

return status;
};
Loading
Loading