From f4305e1ce71fb10a006a3a6cdcbb7483fe4961cd Mon Sep 17 00:00:00 2001 From: LucDeCaf Date: Wed, 18 Feb 2026 13:38:21 +0200 Subject: [PATCH 1/4] First stab --- .../Client/PowerSyncDatabase.cs | 401 ++++++++---------- .../Stream/StreamingSyncImplementation.cs | 19 +- .../MDSQLite/MDSQLiteConnection.cs | 1 + .../PowerSync.Common/PowerSync.Common.csproj | 1 + .../PowerSync.Common/Utils/EventStream.cs | 92 +--- .../SyncIntegrationTests.cs | 36 +- .../Client/PowerSyncDatabaseTests.cs | 254 +++++------ .../Utils/Sync/MockSyncService.cs | 23 +- demos/CommandLine/Demo.cs | 9 +- demos/MAUITodo/Views/ListsPage.xaml.cs | 14 +- demos/WPF/ViewModels/MainWindowViewModel.cs | 7 +- 11 files changed, 354 insertions(+), 503 deletions(-) diff --git a/PowerSync/PowerSync.Common/Client/PowerSyncDatabase.cs b/PowerSync/PowerSync.Common/Client/PowerSyncDatabase.cs index 0eb3a43..e57d199 100644 --- a/PowerSync/PowerSync.Common/Client/PowerSyncDatabase.cs +++ b/PowerSync/PowerSync.Common/Client/PowerSyncDatabase.cs @@ -1,5 +1,6 @@ namespace PowerSync.Common.Client; +using System.Runtime.CompilerServices; using System.Text.RegularExpressions; using System.Threading.Tasks; @@ -121,8 +122,9 @@ public class PowerSyncDatabase : EventStream, IPowerSyncDataba public SyncStatus CurrentStatus { get; protected set; } + protected CancellationTokenSource masterCts = new(); + protected CancellationTokenSource? syncStreamStatusCts; - protected CancellationTokenSource watchSubscriptionCts = new(); public ILogger Logger { get; protected set; } @@ -425,16 +427,6 @@ public async Task Connect(IPowerSyncBackendConnector connector, PowerSyncConnect await ConnectionManager.Connect(connector, resolvedOptions); } - /// - /// Unsubscribe from all currently watched queries. - /// - protected void UnsubscribeAllQueries() - { - watchSubscriptionCts.Cancel(); - watchSubscriptionCts.Dispose(); - watchSubscriptionCts = new(); - } - public async Task Disconnect() { await ConnectionManager.Disconnect(); @@ -490,10 +482,13 @@ public ISyncStream SyncStream(string name, Dictionary? parameter Emit(new PowerSyncDBEvent { Closing = true }); - UnsubscribeAllQueries(); await Disconnect(); base.Close(); + + masterCts.Cancel(); + masterCts.Dispose(); + ConnectionManager.Close(); BucketStorageAdapter?.Close(); @@ -742,87 +737,157 @@ public async Task WriteTransaction(Func> fn, DBLockO return await Database.WriteTransaction(fn, options); } - /// - /// Executes a read query every time the source tables are modified. - /// - /// Use to specify the minimum interval between queries. - /// Source tables are automatically detected using EXPLAIN QUERY PLAN. - /// - public Task Watch(string query, object?[]? parameters, WatchHandler handler, SQLWatchOptions? options = null) - => Task.Run(() => WatchInternal(query, parameters, handler, options, GetAll)); + public IAsyncEnumerable OnChange(SQLWatchOptions? options = null) + { + options ??= new SQLWatchOptions(); - /// - /// Executes a read query every time the source tables are modified. - /// - /// Use to specify the minimum interval between queries. - /// Source tables are automatically detected using EXPLAIN QUERY PLAN. - /// - public Task Watch(string query, object?[]? parameters, WatchHandler handler, SQLWatchOptions? options = null) - => Task.Run(() => WatchInternal(query, parameters, handler, options, GetAll)); - - private async Task WatchInternal( - string query, - object?[]? parameters, - WatchHandler handler, - SQLWatchOptions? options, - Func> getter + var tables = options?.Tables ?? []; + var powersyncTables = new HashSet( + tables.SelectMany(table => new[] { $"ps_data__{table}", $"ps_data_local__{table}" }) + ); + + var signal = options?.Signal != null + ? CancellationTokenSource.CreateLinkedTokenSource(masterCts.Token, options.Signal.Value) + : CancellationTokenSource.CreateLinkedTokenSource(masterCts.Token); + + var listener = Database.ListenAsync(signal.Token); + + // Return the actual IAsyncEnumerable here, using OnChange as a synchronous wrapper that blocks until the + // connection is established + return OnChangeCore(powersyncTables, listener, signal.Token, options?.TriggerImmediately == true); + } + + private async IAsyncEnumerable OnChangeCore( + HashSet watchedTables, + IAsyncEnumerable listener, + [EnumeratorCancellation] CancellationToken signal, + bool triggerImmediately ) { - var subscription = new WatchSubscription(); + if (triggerImmediately == true) + { + yield return new WatchOnChangeEvent { ChangedTables = [] }; + } - async Task ResetQuery() + HashSet changedTables = new(); + await foreach (var e in listener) { - try - { - var resolvedTables = await ResolveTables(query, parameters, options); - var result = await getter(query, parameters); - handler.OnResult(result); + if (signal.IsCancellationRequested) yield break; + if (e.TablesUpdated == null) continue; - var onChangeListener = OnChange(new WatchOnChangeHandler - { - OnChange = async (change) => - { - try - { - var result = await getter(query, parameters); - handler.OnResult(result); - } - catch (Exception ex) - { - handler.OnError?.Invoke(ex); - } - }, - OnError = handler.OnError - }, new SQLWatchOptions - { - Tables = resolvedTables, - Signal = options?.Signal, - ThrottleMs = options?.ThrottleMs - }); + changedTables.Clear(); + GetTablesFromNotification(e.TablesUpdated, changedTables); + changedTables.IntersectWith(watchedTables); - subscription.SetOnChangeListener(onChangeListener); - } - catch (Exception ex) + if (changedTables.Count == 0) continue; + + var update = new WatchOnChangeEvent { ChangedTables = [.. changedTables] }; + + // Convert from 'ps_data__' to '' + for (int i = 0; i < update.ChangedTables.Length; i++) { - handler.OnError?.Invoke(ex); - throw; + update.ChangedTables[i] = InternalToFriendlyTableName(update.ChangedTables[i]); } + yield return update; } + } + + private static string InternalToFriendlyTableName(string internalName) + { + const string PS_DATA_PREFIX = "ps_data__"; + const string PS_DATA_LOCAL_PREFIX = "ps_data_local__"; + + if (internalName.StartsWith(PS_DATA_PREFIX)) + return internalName.Substring(PS_DATA_PREFIX.Length); - // Register initial subscription - await ResetQuery(); + if (internalName.StartsWith(PS_DATA_LOCAL_PREFIX)) + return internalName.Substring(PS_DATA_LOCAL_PREFIX.Length); - // Listen for schema changes and reset listener - var schemaListener = RunListener(async (e) => + return internalName; + } + + public async IAsyncEnumerable Watch( + string sql, + object?[]? parameters = null, + SQLWatchOptions? options = null + ) + { + options ??= new SQLWatchOptions(); + + // Stop watching on master CTS cancellation, or on user CTS cancellation + var signal = options.Signal != null + ? CancellationTokenSource.CreateLinkedTokenSource(masterCts.Token, options.Signal.Value) + : CancellationTokenSource.CreateLinkedTokenSource(masterCts.Token); + + var schemaChanged = new TaskCompletionSource(); + + // Listen for schema changes in the background + _ = Task.Run(async () => { - if (e.SchemaChanged != null) + await foreach (var update in ListenAsync(signal.Token)) { - await ResetQuery(); + if (update.SchemaChanged != null) + { + // Swap schemaChanged with an unresolved TCS + var oldTcs = Interlocked.Exchange(ref schemaChanged, new()); + oldTcs.TrySetResult(true); + } } - }); - subscription.SetSchemaListener(schemaListener); + }, signal.Token); - return subscription; + // Re-register query on schema updates + bool isRestart = false; + while (!signal.Token.IsCancellationRequested) + { + // Resolve tables + HashSet powersyncTables = new(); + if (options?.Tables != null) + { + powersyncTables = [.. options + .Tables + .SelectMany(table => [$"ps_data__{table}", $"ps_data_local__{table}"] + )]; + } + else + { + powersyncTables = await GetSourceTables(sql, parameters); + } + + // Register new token for the current schema reset + using var restartCts = CancellationTokenSource.CreateLinkedTokenSource(signal.Token); + + var enumerator = OnRawTableChange( + powersyncTables, + restartCts.Token, + isRestart || options?.TriggerImmediately == true + ).GetAsyncEnumerator(restartCts.Token); + + // Continually wait for either OnChange or SchemaChanged to fire + while (true) + { + var currentSchemaTask = schemaChanged.Task; + var onChangeTask = enumerator.MoveNextAsync().AsTask(); + var completedTask = await Task.WhenAny(onChangeTask, currentSchemaTask); + + if (completedTask == currentSchemaTask) + { + restartCts.Cancel(); + isRestart = true; + // Let the current task complete/cancel gracefully + try { await onChangeTask; } + catch (OperationCanceledException) { } + + break; + } + + var update = enumerator.Current; + if (update.ChangedTables != null) + { + if (signal.IsCancellationRequested) yield break; + yield return await GetAll(sql, parameters); + } + } + } } private class ExplainedResult @@ -836,114 +901,54 @@ private class ExplainedResult public int p5 = 0; } private record TableSelectResult(string tbl_name); - public async Task ResolveTables(string sql, object?[]? parameters = null, SQLWatchOptions? options = null) + internal async Task> GetSourceTables(string sql, object?[]? parameters = null) { - List resolvedTables = options?.Tables != null ? [.. options.Tables] : []; + var explained = await GetAll( + $"EXPLAIN {sql}", parameters + ); - if (options?.Tables == null) - { - var explained = await GetAll( - $"EXPLAIN {sql}", parameters - ); + var rootPages = explained + .Where(row => row.opcode == "OpenRead" && row.p3 == 0) + .Select(row => row.p2) + .ToList(); - var rootPages = explained - .Where(row => row.opcode == "OpenRead" && row.p3 == 0) - .Select(row => row.p2) - .ToList(); - - var tables = await GetAll( - "SELECT DISTINCT tbl_name FROM sqlite_master WHERE rootpage IN (SELECT json_each.value FROM json_each(?))", - [JsonConvert.SerializeObject(rootPages)] - ); + var tables = await GetAll( + "SELECT DISTINCT tbl_name FROM sqlite_master WHERE rootpage IN (SELECT json_each.value FROM json_each(?))", + [JsonConvert.SerializeObject(rootPages)] + ); - foreach (var table in tables) - { - resolvedTables.Add(POWERSYNC_TABLE_MATCH.Replace(table.tbl_name, "")); - } - } - return [.. resolvedTables]; + return [.. tables.Select(row => row.tbl_name)]; } - /// - /// Invokes the provided callback whenever any of the specified tables are modified. - /// - /// This is preferred over when multiple queries need to be performed - /// together in response to data changes. - /// - public IDisposable OnChange(WatchOnChangeHandler handler, SQLWatchOptions? options = null) + private async IAsyncEnumerable OnRawTableChange( + HashSet watchedTables, + [EnumeratorCancellation] CancellationToken token, + bool triggerImmediately = false + ) { - var resolvedOptions = options ?? new SQLWatchOptions(); - - string[] tables = resolvedOptions.Tables ?? []; - HashSet watchedTables = [.. tables.SelectMany(table => new[] { table, $"ps_data__{table}", $"ps_data_local__{table}" })]; - - var changedTables = new HashSet(); - var resolvedThrottleMs = resolvedOptions.ThrottleMs ?? DEFAULT_WATCH_THROTTLE_MS; - - void flushTableUpdates() + if (triggerImmediately) { - HandleTableChanges(changedTables, watchedTables, (intersection) => - { - if (resolvedOptions?.Signal?.IsCancellationRequested == true) return; - handler.OnChange(new WatchOnChangeEvent { ChangedTables = intersection }); - }); + yield return new WatchOnChangeEvent { ChangedTables = [] }; } - var dbListenerCts = Database.RunListener((update) => + HashSet changedTables = new(); + await foreach (var e in Database.ListenAsync(token)) { - if (update.TablesUpdated != null) + if (e.TablesUpdated != null) { - try - { - ProcessTableUpdates(update.TablesUpdated, changedTables); - flushTableUpdates(); - } - catch (Exception ex) - { - handler?.OnError?.Invoke(ex); - } - } - }); + if (token.IsCancellationRequested) break; - CancellationTokenSource stopRunningCts; + // Extract the changed tables and intersect with the watched tables + changedTables.Clear(); + GetTablesFromNotification(e.TablesUpdated, changedTables); + changedTables.IntersectWith(watchedTables); - if (options?.Signal != null) - { - var linkedCts = CancellationTokenSource.CreateLinkedTokenSource( - watchSubscriptionCts.Token, - options.Signal.Value - ); - stopRunningCts = linkedCts; - } - else - { - stopRunningCts = watchSubscriptionCts; - } - - var stopRunningReg = stopRunningCts.Token.Register(dbListenerCts.Cancel); - - return new ActionDisposable(() => - { - stopRunningReg.Dispose(); - dbListenerCts.Cancel(); - dbListenerCts.Dispose(); - }); - } - - private static void HandleTableChanges(HashSet changedTables, HashSet watchedTables, Action onDetectedChanges) - { - if (changedTables.Count > 0) - { - var intersection = changedTables.Where(watchedTables.Contains).ToArray(); - if (intersection.Length > 0) - { - onDetectedChanges(intersection); + yield return new WatchOnChangeEvent { ChangedTables = [.. changedTables] }; } } - changedTables.Clear(); } - private static void ProcessTableUpdates(INotification updateNotification, HashSet changedTables) + private static void GetTablesFromNotification(INotification updateNotification, HashSet changedTables) { string[] tables = []; if (updateNotification is BatchedUpdateNotification batchedUpdate) @@ -971,6 +976,8 @@ public class SQLWatchOptions /// The minimum interval between queries in milliseconds. /// public int? ThrottleMs { get; set; } + + public bool TriggerImmediately { get; set; } = false; } public class WatchHandler @@ -990,63 +997,3 @@ public class WatchOnChangeHandler public Action? OnError { get; set; } } -public class WatchSubscription : IDisposable -{ - private IDisposable? _onChangeListener; - private IDisposable? _schemaListener; - private readonly object _lock = new(); - private bool _disposed; - - internal void SetSchemaListener(IDisposable listener) - { - lock (_lock) - { - if (_disposed) - { - listener.Dispose(); - return; - } - _schemaListener?.Dispose(); - _schemaListener = listener; - } - } - - internal void SetOnChangeListener(IDisposable listener) - { - lock (_lock) - { - if (_disposed) - { - listener.Dispose(); - return; - } - _onChangeListener?.Dispose(); - _onChangeListener = listener; - } - } - - public void Dispose() - { - lock (_lock) - { - if (_disposed) return; - _disposed = true; - - _onChangeListener?.Dispose(); - _schemaListener?.Dispose(); - } - } -} - -public class ActionDisposable(Action onDispose) : IDisposable -{ - private readonly Action _onDispose = onDispose; - private bool _disposed = false; - - public void Dispose() - { - if (_disposed) return; - _disposed = true; - _onDispose(); - } -} diff --git a/PowerSync/PowerSync.Common/Client/Sync/Stream/StreamingSyncImplementation.cs b/PowerSync/PowerSync.Common/Client/Sync/Stream/StreamingSyncImplementation.cs index 244272b..94659b1 100644 --- a/PowerSync/PowerSync.Common/Client/Sync/Stream/StreamingSyncImplementation.cs +++ b/PowerSync/PowerSync.Common/Client/Sync/Stream/StreamingSyncImplementation.cs @@ -457,16 +457,19 @@ async Task Connect(EstablishSyncStream instruction) controlInvocations = new EventStream(); try { - controlInvocations?.RunListenerAsync(async (line) => + _ = Task.Run(async () => { - await Control(line.Command, line.Payload); - - // Triggers a local CRUD upload when the first sync line has been received. - // This allows uploading local changes that have been made while offline or disconnected. - if (!hadSyncLine) + await foreach (var line in controlInvocations.ListenAsync(new CancellationToken())) { - TriggerCrudUpload(); - hadSyncLine = true; + await Control(line.Command, line.Payload); + + // Triggers a local CRUD upload when the first sync line has been received. + // This allows uploading local changes that have been made while offline or disconnected. + if (!hadSyncLine) + { + TriggerCrudUpload(); + hadSyncLine = true; + } } }); diff --git a/PowerSync/PowerSync.Common/MDSQLite/MDSQLiteConnection.cs b/PowerSync/PowerSync.Common/MDSQLite/MDSQLiteConnection.cs index 1c30cd1..8a1a84e 100644 --- a/PowerSync/PowerSync.Common/MDSQLite/MDSQLiteConnection.cs +++ b/PowerSync/PowerSync.Common/MDSQLite/MDSQLiteConnection.cs @@ -212,6 +212,7 @@ public async Task Execute(string query, object?[]? parameters = DynamicParameters? dynamicParams = PrepareQuery(ref query, parameters); int rowsAffected = await Db.ExecuteAsync(query, dynamicParams, commandType: CommandType.Text); + FlushUpdates(); return new NonQueryResult { InsertId = raw.sqlite3_last_insert_rowid(Db.Handle), diff --git a/PowerSync/PowerSync.Common/PowerSync.Common.csproj b/PowerSync/PowerSync.Common/PowerSync.Common.csproj index c6f891d..05be800 100644 --- a/PowerSync/PowerSync.Common/PowerSync.Common.csproj +++ b/PowerSync/PowerSync.Common/PowerSync.Common.csproj @@ -28,6 +28,7 @@ + diff --git a/PowerSync/PowerSync.Common/Utils/EventStream.cs b/PowerSync/PowerSync.Common/Utils/EventStream.cs index 3f3d21b..5dc209f 100644 --- a/PowerSync/PowerSync.Common/Utils/EventStream.cs +++ b/PowerSync/PowerSync.Common/Utils/EventStream.cs @@ -10,21 +10,15 @@ public interface IEventStream Task EmitAsync(T item); - CancellationTokenSource RunListenerAsync( - Func callback); + IEnumerable Listen(CancellationToken cancellationToken); IAsyncEnumerable ListenAsync(CancellationToken cancellationToken); - CancellationTokenSource RunListener(Action callback); - - IEnumerable Listen(CancellationToken cancellationToken); - void Close(); } public class EventStream : IEventStream { - public bool Closed = false; // Closest implementation to a ConcurrentSet in .Net @@ -51,25 +45,11 @@ public async Task EmitAsync(T item) } } - public CancellationTokenSource RunListenerAsync( - Func callback) + public IEnumerable Listen(CancellationToken cancellationToken) { - var cts = new CancellationTokenSource(); - var started = new TaskCompletionSource(); - - _ = Task.Run(async () => - { - started.SetResult(true); - await foreach (var value in ListenAsync(cts.Token)) - { - await callback(value); - } - - }, cts.Token); - - started.Task.GetAwaiter().GetResult(); - - return cts; + var channel = Channel.CreateUnbounded(); + subscribers.TryAdd(channel, 0); + return ReadFromChannel(channel, cancellationToken); } public IAsyncEnumerable ListenAsync(CancellationToken cancellationToken) @@ -79,30 +59,28 @@ public IAsyncEnumerable ListenAsync(CancellationToken cancellationToken) return ReadFromChannelAsync(channel, cancellationToken); } - public CancellationTokenSource RunListener(Action callback) + private IEnumerable ReadFromChannel(Channel channel, CancellationToken cancellationToken) { - var cts = new CancellationTokenSource(); - var started = new TaskCompletionSource(); - - _ = Task.Run(() => + try { - started.SetResult(true); - foreach (var value in Listen(cts.Token)) + while (!cancellationToken.IsCancellationRequested) { - callback(value); - } - }, cts.Token); - - started.Task.GetAwaiter().GetResult(); - - return cts; - } + if (!channel.Reader.WaitToReadAsync(cancellationToken).AsTask().Result) + { + // Channel was completed, exit the loop + break; + } - public IEnumerable Listen(CancellationToken cancellationToken) - { - var channel = Channel.CreateUnbounded(); - subscribers.TryAdd(channel, 0); - return ReadFromChannel(channel, cancellationToken); + while (channel.Reader.TryRead(out var item)) + { + yield return item; + } + } + } + finally + { + RemoveSubscriber(channel); + } } private async IAsyncEnumerable ReadFromChannelAsync( @@ -132,30 +110,6 @@ private async IAsyncEnumerable ReadFromChannelAsync( } } - private IEnumerable ReadFromChannel(Channel channel, CancellationToken cancellationToken) - { - try - { - while (!cancellationToken.IsCancellationRequested) - { - if (!channel.Reader.WaitToReadAsync(cancellationToken).AsTask().Result) - { - // Channel was completed, exit the loop - break; - } - - while (channel.Reader.TryRead(out var item)) - { - yield return item; - } - } - } - finally - { - RemoveSubscriber(channel); - } - } - public void Close() { foreach (var subscriber in subscribers.Keys) diff --git a/Tests/PowerSync/PowerSync.Common.IntegrationTests/SyncIntegrationTests.cs b/Tests/PowerSync/PowerSync.Common.IntegrationTests/SyncIntegrationTests.cs index c2dc95a..699d9e4 100644 --- a/Tests/PowerSync/PowerSync.Common.IntegrationTests/SyncIntegrationTests.cs +++ b/Tests/PowerSync/PowerSync.Common.IntegrationTests/SyncIntegrationTests.cs @@ -77,20 +77,16 @@ public async Task SyncDownCreateOperationTest() var cts = new CancellationTokenSource(); var id = Uuid(); - await db.Watch("select * from lists where id = ?", [id], new WatchHandler + _ = Task.Run(async () => { - OnResult = (x) => + await foreach (var x in db.Watch("select * from lists where id = ?", [id], new() { Signal = cts.Token })) { - // Verify that the item was added locally if (x.Length == 1) { watched.SetResult(true); cts.Cancel(); } } - }, new SQLWatchOptions - { - Signal = cts.Token }); await nodeClient.CreateList(id, name: "Test List magic"); @@ -106,9 +102,9 @@ public async Task SyncDownDeleteOperationTest() await nodeClient.CreateList(id, name: "Test List to delete"); - await db.Watch("select * from lists where id = ?", [id], new WatchHandler + _ = Task.Run(async () => { - OnResult = (x) => + await foreach (var x in db.Watch("select * from lists where id = ?", [id], new() { Signal = cts.Token })) { // Verify that the item was added locally if (x.Length == 1) @@ -117,9 +113,6 @@ public async Task SyncDownDeleteOperationTest() cts.Cancel(); } } - }, new SQLWatchOptions - { - Signal = cts.Token }); await watched.Task; @@ -128,9 +121,9 @@ public async Task SyncDownDeleteOperationTest() watched = new TaskCompletionSource(); cts = new CancellationTokenSource(); - await db.Watch("select * from lists where id = ?", [id], new WatchHandler + _ = Task.Run(async () => { - OnResult = (x) => + await foreach (var x in db.Watch("select * from lists where id = ?", [id], new() { Signal = cts.Token })) { // Verify that the item was deleted locally if (x.Length == 0) @@ -139,9 +132,6 @@ public async Task SyncDownDeleteOperationTest() cts.Cancel(); } } - }, new SQLWatchOptions - { - Signal = cts.Token }); await watched.Task; @@ -155,9 +145,9 @@ public async Task SyncDownLargeCreateOperationTest() var id = Uuid(); var listName = Uuid(); - await db.Watch("select * from lists where name = ?", [listName], new WatchHandler + _ = Task.Run(async () => { - OnResult = (x) => + await foreach (var x in db.Watch("select * from lists where id = ?", [id], new() { Signal = cts.Token })) { // Verify that the item was added locally if (x.Length == 100) @@ -166,9 +156,6 @@ public async Task SyncDownLargeCreateOperationTest() cts.Cancel(); } } - }, new SQLWatchOptions - { - Signal = cts.Token }); for (int i = 0; i < 100; i++) @@ -187,9 +174,9 @@ public async Task SyncDownCreateOperationAfterLargeUploadTest() var id = Uuid(); var listName = Uuid(); - await db.Watch("select * from lists where name = ?", [listName], new WatchHandler + _ = Task.Run(async () => { - OnResult = (x) => + await foreach (var x in db.Watch("select * from lists where id = ?", [id], new() { Signal = cts.Token })) { // Verify that the items were added locally if (x.Length == 100) @@ -203,9 +190,6 @@ public async Task SyncDownCreateOperationAfterLargeUploadTest() cts.Cancel(); } } - }, new SQLWatchOptions - { - Signal = cts.Token }); for (int i = 0; i < 100; i++) diff --git a/Tests/PowerSync/PowerSync.Common.Tests/Client/PowerSyncDatabaseTests.cs b/Tests/PowerSync/PowerSync.Common.Tests/Client/PowerSyncDatabaseTests.cs index 69b95c5..dde89e7 100644 --- a/Tests/PowerSync/PowerSync.Common.Tests/Client/PowerSyncDatabaseTests.cs +++ b/Tests/PowerSync/PowerSync.Common.Tests/Client/PowerSyncDatabaseTests.cs @@ -363,19 +363,22 @@ public async Task CallUpdateHookOnChangesTest() var cts = new CancellationTokenSource(); var result = new TaskCompletionSource(); - db.OnChange(new WatchOnChangeHandler + var onChange = db.OnChange(new SQLWatchOptions { - OnChange = (x) => + Tables = ["assets"], + Signal = cts.Token, + }); + + _ = Task.Run(async () => + { + await foreach (var update in onChange) { - result.SetResult(true); + result.TrySetResult(true); cts.Cancel(); - return Task.CompletedTask; } - }, new SQLWatchOptions - { - Tables = ["assets"], - Signal = cts.Token }); + // await Task.Delay(1000); + await db.Execute("INSERT INTO assets (id) VALUES(?)", ["099-onchange"]); await result.Task; @@ -387,9 +390,9 @@ public async Task ReflectWriteTransactionUpdatesOnReadConnectionsTest() var watched = new TaskCompletionSource(); var cts = new CancellationTokenSource(); - await db.Watch("SELECT COUNT(*) as count FROM assets", null, new WatchHandler + _ = Task.Run(async () => { - OnResult = (x) => + await foreach (var x in db.Watch("SELECT COUNT(*) as count FROM assets", null, new() { Signal = cts.Token })) { if (x.First().count == 1) { @@ -397,9 +400,6 @@ public async Task ReflectWriteTransactionUpdatesOnReadConnectionsTest() cts.Cancel(); } } - }, new SQLWatchOptions - { - Signal = cts.Token }); await db.WriteTransaction(async tx => @@ -418,9 +418,11 @@ public async Task ReflectWriteLockUpdatesOnReadConnectionsTest() var watched = new TaskCompletionSource(); var cts = new CancellationTokenSource(); - await db.Watch("SELECT COUNT(*) as count FROM assets", null, new WatchHandler + var listener = db.Watch("SELECT COUNT(*) as count FROM assets", null, new() { Signal = cts.Token }); + + _ = Task.Run(async () => { - OnResult = (x) => + await foreach (var x in listener) { if (x.First().count == numberOfAssets) { @@ -428,9 +430,6 @@ public async Task ReflectWriteLockUpdatesOnReadConnectionsTest() cts.Cancel(); } } - }, new SQLWatchOptions - { - Signal = cts.Token }); await db.WriteLock(async tx => @@ -529,159 +528,121 @@ await db.Execute( Assert.Equal(make, dynamicAsset.make); } - [Fact(Timeout = 2000)] + [Fact(Timeout = 2500)] public async Task WatchDynamicTest() { string id = Guid.NewGuid().ToString(); - string description = "new description"; - string make = "some make"; + string description = "dynamic description"; + string make = "dynamic make"; - var watched = new TaskCompletionSource(); - var cts = new CancellationTokenSource(); + using var sem = new SemaphoreSlim(0); + dynamic? dynamicAsset = null; - await db.Watch("select id, description, make from assets", null, new WatchHandler + _ = Task.Run(async () => { - OnResult = (assets) => + await foreach (var assets in db.Watch( + "select id, description, make from assets", + null, + new() { TriggerImmediately = true })) { - // Only care about results after Execute is called - if (assets.Length == 0) return; - - Assert.Single(assets); - dynamic dynamicAsset = assets[0]; - Assert.Equal(id, dynamicAsset.id); - Assert.Equal(description, dynamicAsset.description); - Assert.Equal(make, dynamicAsset.make); - - watched.SetResult(true); - cts.Cancel(); - }, - OnError = (ex) => throw ex, - }, - new SQLWatchOptions - { - Signal = cts.Token - }); - - await db.WriteTransaction(async tx => - { - await tx.Execute( - "insert into assets (id, description, make) values (?, ?, ?)", - [id, description, make] - ); - }); - - await watched.Task; - } + if (assets.Length == 0) + { + sem.Release(); + continue; + } - [Fact(Timeout = 2000)] - public async Task WatchDisposableSubscriptionTest() - { - int callCount = 0; - var semaphore = new SemaphoreSlim(0); + dynamicAsset = assets[0]; - var query = await db.Watch("select id from assets", null, new() - { - OnResult = (results) => - { - callCount++; - semaphore.Release(); - }, - OnError = (ex) => Assert.Fail(ex.ToString()) + sem.Release(); + } }); - await semaphore.WaitAsync(); - Assert.Equal(1, callCount); await db.Execute( - "insert into assets(id, description, make) values (?, ?, ?)", - [Guid.NewGuid().ToString(), "some desc", "some make"] + "insert into assets (id, description, make) values (?, ?, ?)", + [id, description, make] ); - await semaphore.WaitAsync(); - Assert.Equal(2, callCount); + Assert.True(await sem.WaitAsync(500)); - query.Dispose(); + Assert.NotNull(dynamicAsset); - await db.Execute( - "insert into assets(id, description, make) values (?, ?, ?)", - [Guid.NewGuid().ToString(), "some desc", "some make"] - ); - bool receivedResult = await semaphore.WaitAsync(100); - Assert.False(receivedResult, "Received update after disposal"); - Assert.Equal(2, callCount); + Assert.Equal(id, dynamicAsset?.id); + Assert.Equal(description, dynamicAsset?.description); + Assert.Equal(make, dynamicAsset?.make); } [Fact(Timeout = 2000)] - public async Task WatchDisposableCustomTokenTest() + public async Task WatchCancelledTest() { - var customTokenSource = new CancellationTokenSource(); int callCount = 0; - var sem = new SemaphoreSlim(0); + using var cts = new CancellationTokenSource(); + using var sem = new SemaphoreSlim(0); - using var query = await db.Watch("select id, description, make from assets", null, new() + _ = Task.Run(async () => { - OnResult = (results) => + await foreach (var result in db.Watch( + "select id from assets", + null, + new() { Signal = cts.Token, TriggerImmediately = true } + )) { - callCount++; + Console.WriteLine($"Result received: {result.Length}"); + Interlocked.Increment(ref callCount); sem.Release(); - }, - OnError = (ex) => Assert.Fail(ex.ToString()) - }, new() - { - Signal = customTokenSource.Token + } }); - await sem.WaitAsync(); + Assert.True(await sem.WaitAsync(100)); Assert.Equal(1, callCount); await db.Execute( "insert into assets(id, description, make) values (?, ?, ?)", [Guid.NewGuid().ToString(), "some desc", "some make"] ); - await sem.WaitAsync(); + + Assert.True(await sem.WaitAsync(100)); Assert.Equal(2, callCount); - customTokenSource.Cancel(); + cts.Cancel(); await db.Execute( "insert into assets(id, description, make) values (?, ?, ?)", [Guid.NewGuid().ToString(), "some desc", "some make"] ); - bool receivedResult = await sem.WaitAsync(100); - Assert.False(receivedResult, "Received update after disposal"); + // This is failing + Assert.False(await sem.WaitAsync(100)); Assert.Equal(2, callCount); } [Fact(Timeout = 3000)] - public async Task WatchSingleCancelledTest() + public async Task WatchMultipleCancelledTest() { int callCount = 0; - var watchHandlerFactory = (SemaphoreSlim sem) => new WatchHandler + var runQuery = (CancellationTokenSource cts, SemaphoreSlim sem) => Task.Run(async () => { - OnResult = (result) => + await foreach (var update in db.Watch("select id from assets", null, new() { Signal = cts.Token, TriggerImmediately = true })) { Interlocked.Increment(ref callCount); sem.Release(); - }, - OnError = (ex) => Assert.Fail(ex.ToString()), - }; + } + }); + using var semAlwaysRunning = new SemaphoreSlim(0); + using var semCancelled = new SemaphoreSlim(0); + using var ctsAlwaysRunning = new CancellationTokenSource(); + using var ctsCancelled = new CancellationTokenSource(); - var semAlwaysRunning = new SemaphoreSlim(0); - var semCancelled = new SemaphoreSlim(0); - using var queryAlwaysRunning = await db.Watch("select id from assets", null, watchHandlerFactory(semAlwaysRunning)); - using var queryCancelled = await db.Watch("select id from assets", null, watchHandlerFactory(semCancelled)); - - await Task.WhenAll(semAlwaysRunning.WaitAsync(), semCancelled.WaitAsync()); - Assert.Equal(2, callCount); + _ = runQuery(ctsAlwaysRunning, semAlwaysRunning); + _ = runQuery(ctsCancelled, semCancelled); await db.Execute( "insert into assets(id, description, make) values (?, ?, ?)", [Guid.NewGuid().ToString(), "some desc", "some make"] ); await Task.WhenAll(semAlwaysRunning.WaitAsync(), semCancelled.WaitAsync()); - Assert.Equal(4, callCount); + Assert.Equal(2, callCount); // Close one query - queryCancelled.Dispose(); + ctsCancelled.Cancel(); await db.Execute( "insert into assets(id, description, make) values (?, ?, ?)", @@ -692,7 +653,19 @@ await db.Execute( Assert.False(await semCancelled.WaitAsync(100)); await semAlwaysRunning.WaitAsync(); - Assert.Equal(5, callCount); + Assert.Equal(3, callCount); + + // Sanity check + ctsAlwaysRunning.Cancel(); + + await db.Execute( + "insert into assets(id, description, make) values (?, ?, ?)", + [Guid.NewGuid().ToString(), "some desc", "some make"] + ); + + Assert.False(await semAlwaysRunning.WaitAsync(100)); + Assert.False(await semCancelled.WaitAsync(100)); + Assert.Equal(3, callCount); } [Fact(Timeout = 3000)] @@ -708,23 +681,27 @@ public async Task WatchSchemaResetTest() Schema = TestSchema.MakeOptionalSyncSchema(false) }); - var sem = new SemaphoreSlim(0); + using var sem = new SemaphoreSlim(0); long lastCount = -1; - string querySql = "SELECT COUNT(*) AS count FROM assets"; - var query = await db.Watch(querySql, [], new WatchHandler + var cts = new CancellationTokenSource(); + + const string QUERY = "SELECT COUNT(*) AS count FROM assets"; + _ = Task.Run(async () => { - OnResult = (result) => + await foreach (var result in db.Watch(QUERY, null, new() { Signal = cts.Token, TriggerImmediately = true })) { - lastCount = result[0].count; + if (result.Length > 0) + { + lastCount = result[0].count; + } sem.Release(); - }, - OnError = error => throw error + } }); Assert.True(await sem.WaitAsync(100)); Assert.Equal(0, lastCount); - var resolved = await GetSourceTables(db, querySql); + var resolved = await db.GetSourceTables(QUERY, null); Assert.Single(resolved); Assert.Contains("ps_data_local__local_assets", resolved); @@ -741,7 +718,7 @@ await db.Execute( await db.UpdateSchema(TestSchema.MakeOptionalSyncSchema(true)); - resolved = await GetSourceTables(db, querySql); + resolved = await db.GetSourceTables(QUERY); Assert.Single(resolved); Assert.Contains("ps_data__assets", resolved); @@ -753,43 +730,14 @@ await db.Execute( Assert.Equal(3, lastCount); // Sanity check - query.Dispose(); + cts.Cancel(); + await Task.Delay(100); await db.Execute("delete from assets"); Assert.False(await sem.WaitAsync(100)); Assert.Equal(3, lastCount); } - private class ExplainedResult - { - public int addr = 0; - public string opcode = ""; - public int p1 = 0; - public int p2 = 0; - public int p3 = 0; - public string p4 = ""; - public int p5 = 0; - } - private record TableSelectResult(string tbl_name); - private async Task> GetSourceTables(PowerSyncDatabase db, string sql, object?[]? parameters = null) - { - var explained = await db.GetAll( - $"EXPLAIN {sql}", parameters - ); - - var rootPages = explained - .Where(row => row.opcode == "OpenRead" && row.p3 == 0) - .Select(row => row.p2) - .ToList(); - - var tables = await db.GetAll( - "SELECT DISTINCT tbl_name FROM sqlite_master WHERE rootpage IN (SELECT json_each.value FROM json_each(?))", - [JsonConvert.SerializeObject(rootPages)] - ); - - return tables.Select(row => row.tbl_name).ToList(); - } - [Fact] public async Task Attributes_ColumnAliasing() { diff --git a/Tests/PowerSync/PowerSync.Common.Tests/Utils/Sync/MockSyncService.cs b/Tests/PowerSync/PowerSync.Common.Tests/Utils/Sync/MockSyncService.cs index c21957c..61f8e4a 100644 --- a/Tests/PowerSync/PowerSync.Common.Tests/Utils/Sync/MockSyncService.cs +++ b/Tests/PowerSync/PowerSync.Common.Tests/Utils/Sync/MockSyncService.cs @@ -60,14 +60,17 @@ private ILogger createLogger() public static async Task NextStatus(PowerSyncDatabase db) { var tcs = new TaskCompletionSource(); - CancellationTokenSource? cts = null; + var cts = new CancellationTokenSource(); - cts = db.RunListenerAsync(async (update) => + _ = Task.Run(async () => { - if (update.StatusChanged != null) + await foreach (var update in db.ListenAsync(cts.Token)) { - tcs.TrySetResult(update.StatusChanged); - cts?.Cancel(); + if (update.StatusChanged != null) + { + tcs.TrySetResult(update.StatusChanged); + cts?.Cancel(); + } } }); @@ -160,10 +163,14 @@ public override async Task PostStreamRaw(SyncStreamOptions options) var pipe = new Pipe(); var writer = pipe.Writer; - var x = syncService.RunListenerAsync(async (line) => + var cts = new CancellationTokenSource(); + _ = Task.Run(async () => { - var bytes = Encoding.UTF8.GetBytes(line + "\n"); - await writer.WriteAsync(bytes); + await foreach (var line in syncService.ListenAsync(cts.Token)) + { + var bytes = Encoding.UTF8.GetBytes(line + "\n"); + await writer.WriteAsync(bytes); + } }); return pipe.Reader.AsStream(); diff --git a/demos/CommandLine/Demo.cs b/demos/CommandLine/Demo.cs index 62abe10..f8eaafb 100644 --- a/demos/CommandLine/Demo.cs +++ b/demos/CommandLine/Demo.cs @@ -122,11 +122,14 @@ static async Task Main() }; var connected = false; - db.RunListener((update) => + _ = Task.Run(async () => { - if (update.StatusChanged != null) + await foreach (var update in db.ListenAsync(new CancellationToken())) { - connected = update.StatusChanged.Connected; + if (update.StatusChanged != null) + { + connected = update.StatusChanged.Connected; + } } }); diff --git a/demos/MAUITodo/Views/ListsPage.xaml.cs b/demos/MAUITodo/Views/ListsPage.xaml.cs index 5103795..3b64d7c 100644 --- a/demos/MAUITodo/Views/ListsPage.xaml.cs +++ b/demos/MAUITodo/Views/ListsPage.xaml.cs @@ -20,15 +20,17 @@ protected override async void OnAppearing() { base.OnAppearing(); - database.Db.RunListener((update) => + _ = Task.Run(async () => { - if (update.StatusChanged != null) + await foreach (var update in database.Db.ListenAsync(new CancellationToken())) { - MainThread.BeginInvokeOnMainThread(() => + if (update.StatusChanged != null) { - WifiStatusItem.IconImageSource = update.StatusChanged.Connected ? "wifi.png" : "wifi_off.png"; - }); - + MainThread.BeginInvokeOnMainThread(() => + { + WifiStatusItem.IconImageSource = update.StatusChanged.Connected ? "wifi.png" : "wifi_off.png"; + }); + } } }); diff --git a/demos/WPF/ViewModels/MainWindowViewModel.cs b/demos/WPF/ViewModels/MainWindowViewModel.cs index 6297ab3..0312d2f 100644 --- a/demos/WPF/ViewModels/MainWindowViewModel.cs +++ b/demos/WPF/ViewModels/MainWindowViewModel.cs @@ -32,15 +32,16 @@ public MainWindowViewModel(PowerSyncDatabase db) { _db = db; // Set up the listener to track the status changes - _db.RunListener( - (update) => + _ = Task.Run(async () => + { + await foreach (var update in _db.ListenAsync(new CancellationToken())) { if (update.StatusChanged != null) { Connected = update.StatusChanged.Connected; } } - ); + }); } #endregion } From 3d562f3f08624c7a65921383417843e8bd9515ed Mon Sep 17 00:00:00 2001 From: LucDeCaf Date: Wed, 18 Feb 2026 14:46:53 +0200 Subject: [PATCH 2/4] Fix locks leaking between tests, small fixes --- .../Client/PowerSyncDatabase.cs | 43 +++++-- .../Client/PowerSyncDatabaseTests.cs | 115 +++++++++--------- 2 files changed, 93 insertions(+), 65 deletions(-) diff --git a/PowerSync/PowerSync.Common/Client/PowerSyncDatabase.cs b/PowerSync/PowerSync.Common/Client/PowerSyncDatabase.cs index e57d199..761935f 100644 --- a/PowerSync/PowerSync.Common/Client/PowerSyncDatabase.cs +++ b/PowerSync/PowerSync.Common/Client/PowerSyncDatabase.cs @@ -806,7 +806,7 @@ private static string InternalToFriendlyTableName(string internalName) return internalName; } - public async IAsyncEnumerable Watch( + public IAsyncEnumerable Watch( string sql, object?[]? parameters = null, SQLWatchOptions? options = null @@ -819,6 +819,24 @@ public async IAsyncEnumerable Watch( ? CancellationTokenSource.CreateLinkedTokenSource(masterCts.Token, options.Signal.Value) : CancellationTokenSource.CreateLinkedTokenSource(masterCts.Token); + // Establish the initial DB listener synchronously before returning the IAsyncEnumerable, + // so that table changes between Watch() being called and iteration starting are not missed. + // This mirrors the pattern used in OnChange(). + var initialRestartCts = CancellationTokenSource.CreateLinkedTokenSource(signal.Token); + var initialListener = Database.ListenAsync(initialRestartCts.Token); + + return WatchCore(sql, parameters, options, signal, initialRestartCts, initialListener); + } + + private async IAsyncEnumerable WatchCore( + string sql, + object?[]? parameters, + SQLWatchOptions options, + CancellationTokenSource signal, + CancellationTokenSource initialRestartCts, + IAsyncEnumerable initialListener + ) + { var schemaChanged = new TaskCompletionSource(); // Listen for schema changes in the background @@ -837,10 +855,13 @@ public async IAsyncEnumerable Watch( // Re-register query on schema updates bool isRestart = false; + var currentRestartCts = initialRestartCts; + var currentListener = initialListener; + while (!signal.Token.IsCancellationRequested) { // Resolve tables - HashSet powersyncTables = new(); + HashSet powersyncTables; if (options?.Tables != null) { powersyncTables = [.. options @@ -853,14 +874,12 @@ public async IAsyncEnumerable Watch( powersyncTables = await GetSourceTables(sql, parameters); } - // Register new token for the current schema reset - using var restartCts = CancellationTokenSource.CreateLinkedTokenSource(signal.Token); - var enumerator = OnRawTableChange( powersyncTables, - restartCts.Token, + currentListener, + currentRestartCts.Token, isRestart || options?.TriggerImmediately == true - ).GetAsyncEnumerator(restartCts.Token); + ).GetAsyncEnumerator(currentRestartCts.Token); // Continually wait for either OnChange or SchemaChanged to fire while (true) @@ -871,12 +890,17 @@ public async IAsyncEnumerable Watch( if (completedTask == currentSchemaTask) { - restartCts.Cancel(); + currentRestartCts.Cancel(); isRestart = true; // Let the current task complete/cancel gracefully try { await onChangeTask; } catch (OperationCanceledException) { } + // Establish a new listener BEFORE resolving source tables in the next iteration, + // so that changes during the async GetSourceTables call are not missed. + currentRestartCts = CancellationTokenSource.CreateLinkedTokenSource(signal.Token); + currentListener = Database.ListenAsync(currentRestartCts.Token); + break; } @@ -922,6 +946,7 @@ internal async Task> GetSourceTables(string sql, object?[]? para private async IAsyncEnumerable OnRawTableChange( HashSet watchedTables, + IAsyncEnumerable listener, [EnumeratorCancellation] CancellationToken token, bool triggerImmediately = false ) @@ -932,7 +957,7 @@ private async IAsyncEnumerable OnRawTableChange( } HashSet changedTables = new(); - await foreach (var e in Database.ListenAsync(token)) + await foreach (var e in listener) { if (e.TablesUpdated != null) { diff --git a/Tests/PowerSync/PowerSync.Common.Tests/Client/PowerSyncDatabaseTests.cs b/Tests/PowerSync/PowerSync.Common.Tests/Client/PowerSyncDatabaseTests.cs index dde89e7..a0f8d2a 100644 --- a/Tests/PowerSync/PowerSync.Common.Tests/Client/PowerSyncDatabaseTests.cs +++ b/Tests/PowerSync/PowerSync.Common.Tests/Client/PowerSyncDatabaseTests.cs @@ -4,24 +4,27 @@ namespace PowerSync.Common.Tests.Client; using Microsoft.Data.Sqlite; -using Newtonsoft.Json; - using PowerSync.Common.Client; -using PowerSync.Common.DB.Schema; using PowerSync.Common.Tests.Models; /// /// dotnet test -v n --framework net8.0 --filter "PowerSyncDatabaseTests" /// +[Collection("PowerSyncDatabaseTests")] public class PowerSyncDatabaseTests : IAsyncLifetime { private PowerSyncDatabase db = default!; + private CancellationTokenSource testCts = default!; + string dbName = default!; public async Task InitializeAsync() { + testCts = new(); + dbName = $"transactions-{Guid.NewGuid():N}.db"; + db = new PowerSyncDatabase(new PowerSyncDatabaseOptions { - Database = new SQLOpenOptions { DbFilename = "powersyncDataBaseTransactions.db" }, + Database = new SQLOpenOptions { DbFilename = dbName }, Schema = TestSchema.AppSchema, }); await db.Init(); @@ -29,8 +32,16 @@ public async Task InitializeAsync() public async Task DisposeAsync() { + testCts.Cancel(); + await db.DisconnectAndClear(); await db.Close(); + + try { File.Delete(dbName); } + catch { } + + testCts.Dispose(); + testCts = new(); } private record IdResult(string id); @@ -360,24 +371,22 @@ await db.WriteLock(async context => [Fact(Timeout = 2000)] public async Task CallUpdateHookOnChangesTest() { - var cts = new CancellationTokenSource(); var result = new TaskCompletionSource(); - var onChange = db.OnChange(new SQLWatchOptions + var listener = db.OnChange(new SQLWatchOptions { Tables = ["assets"], - Signal = cts.Token, + Signal = testCts.Token, }); _ = Task.Run(async () => { - await foreach (var update in onChange) + await foreach (var update in listener) { result.TrySetResult(true); - cts.Cancel(); + testCts.Cancel(); } - }); - // await Task.Delay(1000); + }, testCts.Token); await db.Execute("INSERT INTO assets (id) VALUES(?)", ["099-onchange"]); @@ -389,45 +398,47 @@ public async Task ReflectWriteTransactionUpdatesOnReadConnectionsTest() { var watched = new TaskCompletionSource(); - var cts = new CancellationTokenSource(); + var listener = db.Watch("SELECT COUNT(*) as count FROM assets", null, new() { Signal = testCts.Token }); _ = Task.Run(async () => { - await foreach (var x in db.Watch("SELECT COUNT(*) as count FROM assets", null, new() { Signal = cts.Token })) + await foreach (var x in listener) { if (x.First().count == 1) { watched.SetResult(true); - cts.Cancel(); + testCts.Cancel(); } } - }); + }, testCts.Token); await db.WriteTransaction(async tx => { await tx.Execute("INSERT INTO assets (id) VALUES(?)", ["099-watch"]); + await tx.Commit(); }); await watched.Task; } - [Fact(Timeout = 2000)] + [Fact(Timeout = 5000)] public async Task ReflectWriteLockUpdatesOnReadConnectionsTest() { - var numberOfAssets = 10_000; + var numberOfAssets = 10; var watched = new TaskCompletionSource(); - var cts = new CancellationTokenSource(); - var listener = db.Watch("SELECT COUNT(*) as count FROM assets", null, new() { Signal = cts.Token }); - + var listener = db.Watch( + "SELECT COUNT(*) as count FROM assets", + null, + new() { Signal = testCts.Token, TriggerImmediately = true }); _ = Task.Run(async () => { await foreach (var x in listener) { if (x.First().count == numberOfAssets) { - watched.SetResult(true); - cts.Cancel(); + watched.TrySetResult(true); + testCts.Cancel(); } } }); @@ -538,21 +549,16 @@ public async Task WatchDynamicTest() using var sem = new SemaphoreSlim(0); dynamic? dynamicAsset = null; + var listener = db.Watch("select id, description, make from assets", null, new() { TriggerImmediately = true }); _ = Task.Run(async () => { - await foreach (var assets in db.Watch( - "select id, description, make from assets", - null, - new() { TriggerImmediately = true })) + await foreach (var assets in listener) { - if (assets.Length == 0) + if (assets.Length > 0) { - sem.Release(); - continue; + dynamicAsset = assets[0]; } - dynamicAsset = assets[0]; - sem.Release(); } }); @@ -574,18 +580,13 @@ await db.Execute( public async Task WatchCancelledTest() { int callCount = 0; - using var cts = new CancellationTokenSource(); using var sem = new SemaphoreSlim(0); + var listener = db.Watch("select id from assets", null, new() { Signal = testCts.Token, TriggerImmediately = true }); _ = Task.Run(async () => { - await foreach (var result in db.Watch( - "select id from assets", - null, - new() { Signal = cts.Token, TriggerImmediately = true } - )) + await foreach (var result in listener) { - Console.WriteLine($"Result received: {result.Length}"); Interlocked.Increment(ref callCount); sem.Release(); } @@ -601,7 +602,7 @@ await db.Execute( Assert.True(await sem.WaitAsync(100)); Assert.Equal(2, callCount); - cts.Cancel(); + testCts.Cancel(); await db.Execute( "insert into assets(id, description, make) values (?, ?, ?)", @@ -618,21 +619,26 @@ public async Task WatchMultipleCancelledTest() { int callCount = 0; - var runQuery = (CancellationTokenSource cts, SemaphoreSlim sem) => Task.Run(async () => + void RunQuery(CancellationTokenSource cts, SemaphoreSlim sem) { - await foreach (var update in db.Watch("select id from assets", null, new() { Signal = cts.Token, TriggerImmediately = true })) + var listener = db.Watch("select id from assets", null, new() { Signal = cts.Token }); + _ = Task.Run(async () => { - Interlocked.Increment(ref callCount); - sem.Release(); - } - }); + await foreach (var update in listener) + { + Interlocked.Increment(ref callCount); + sem.Release(); + } + }); + } + using var semAlwaysRunning = new SemaphoreSlim(0); using var semCancelled = new SemaphoreSlim(0); - using var ctsAlwaysRunning = new CancellationTokenSource(); - using var ctsCancelled = new CancellationTokenSource(); + using var ctsAlwaysRunning = CancellationTokenSource.CreateLinkedTokenSource(testCts.Token); + using var ctsCancelled = CancellationTokenSource.CreateLinkedTokenSource(testCts.Token); - _ = runQuery(ctsAlwaysRunning, semAlwaysRunning); - _ = runQuery(ctsCancelled, semCancelled); + RunQuery(ctsAlwaysRunning, semAlwaysRunning); + RunQuery(ctsCancelled, semCancelled); await db.Execute( "insert into assets(id, description, make) values (?, ?, ?)", @@ -682,14 +688,13 @@ public async Task WatchSchemaResetTest() }); using var sem = new SemaphoreSlim(0); - long lastCount = -1; - - var cts = new CancellationTokenSource(); + long lastCount = 0; const string QUERY = "SELECT COUNT(*) AS count FROM assets"; + var listener = db.Watch(QUERY, null, new() { Signal = testCts.Token }); _ = Task.Run(async () => { - await foreach (var result in db.Watch(QUERY, null, new() { Signal = cts.Token, TriggerImmediately = true })) + await foreach (var result in listener) { if (result.Length > 0) { @@ -698,8 +703,6 @@ public async Task WatchSchemaResetTest() sem.Release(); } }); - Assert.True(await sem.WaitAsync(100)); - Assert.Equal(0, lastCount); var resolved = await db.GetSourceTables(QUERY, null); Assert.Single(resolved); @@ -730,7 +733,7 @@ await db.Execute( Assert.Equal(3, lastCount); // Sanity check - cts.Cancel(); + testCts.Cancel(); await Task.Delay(100); await db.Execute("delete from assets"); From 6b9e016cedcf6a6ed0cb1692250a8bae0e06b519 Mon Sep 17 00:00:00 2001 From: LucDeCaf Date: Wed, 18 Feb 2026 16:32:07 +0200 Subject: [PATCH 3/4] Better cleanup in tests, fix issues with orphaned tasks / disposal order --- .../Client/PowerSyncDatabase.cs | 16 +- .../MDSQLite/MDSQLiteAdapter.cs | 12 +- .../MDSQLite/MDSQLiteConnection.cs | 1 - .../Client/PowerSyncDatabaseTests.cs | 135 +++++++------ .../Client/Sync/CRUDTests.cs | 191 +++++++++++------- .../Client/Sync/SyncStreamsTests.cs | 4 +- .../Client/Sync/SyncTests.cs | 4 +- .../Utils/Sync/MockSyncService.cs | 22 +- 8 files changed, 236 insertions(+), 149 deletions(-) diff --git a/PowerSync/PowerSync.Common/Client/PowerSyncDatabase.cs b/PowerSync/PowerSync.Common/Client/PowerSyncDatabase.cs index 761935f..b18f21a 100644 --- a/PowerSync/PowerSync.Common/Client/PowerSyncDatabase.cs +++ b/PowerSync/PowerSync.Common/Client/PowerSyncDatabase.cs @@ -286,18 +286,22 @@ public async Task WaitForStatus(Func predicate, CancellationTo } var tcs = new TaskCompletionSource(); - var cts = new CancellationTokenSource(); + var cts = CancellationTokenSource.CreateLinkedTokenSource(masterCts.Token); - _ = Task.Run(() => + _ = Task.Run(async () => { - foreach (var update in Listen(cts.Token)) + try { - if (update.StatusChanged != null && predicate(update.StatusChanged)) + await foreach (var update in ListenAsync(cts.Token)) { - cts.Cancel(); - tcs.TrySetResult(true); + if (update.StatusChanged != null && predicate(update.StatusChanged)) + { + cts.Cancel(); + tcs.TrySetResult(true); + } } } + catch (OperationCanceledException) { } }); cancellationToken?.Register(() => diff --git a/PowerSync/PowerSync.Common/MDSQLite/MDSQLiteAdapter.cs b/PowerSync/PowerSync.Common/MDSQLite/MDSQLiteAdapter.cs index e331ebc..6400fe9 100644 --- a/PowerSync/PowerSync.Common/MDSQLite/MDSQLiteAdapter.cs +++ b/PowerSync/PowerSync.Common/MDSQLite/MDSQLiteAdapter.cs @@ -31,6 +31,7 @@ public class MDSQLiteAdapter : EventStream, IDBAdapter protected RequiredMDSQLiteOptions resolvedMDSQLiteOptions; private CancellationTokenSource? tablesUpdatedCts; + private Task? tablesUpdatedTask; private readonly AsyncLock writeMutex = new(); private readonly AsyncLock readMutex = new(); @@ -86,11 +87,7 @@ private async Task Init() foreach (var statement in writeConnectionStatements) { - for (int tries = 0; tries < 30; tries++) - { - await writeConnection!.Execute(statement); - tries = 30; - } + await writeConnection!.Execute(statement); } foreach (var statement in readConnectionStatements) @@ -99,9 +96,9 @@ private async Task Init() } tablesUpdatedCts = new CancellationTokenSource(); - var _ = Task.Run(() => + tablesUpdatedTask = Task.Run(async () => { - foreach (var notification in writeConnection!.Listen(tablesUpdatedCts.Token)) + await foreach (var notification in writeConnection!.ListenAsync(tablesUpdatedCts.Token)) { if (notification.TablesUpdated != null) { @@ -139,6 +136,7 @@ protected virtual void LoadExtension(SqliteConnection db) public new void Close() { tablesUpdatedCts?.Cancel(); + try { tablesUpdatedTask?.Wait(TimeSpan.FromSeconds(2)); } catch { } base.Close(); writeConnection?.Close(); readConnection?.Close(); diff --git a/PowerSync/PowerSync.Common/MDSQLite/MDSQLiteConnection.cs b/PowerSync/PowerSync.Common/MDSQLite/MDSQLiteConnection.cs index 8a1a84e..1c30cd1 100644 --- a/PowerSync/PowerSync.Common/MDSQLite/MDSQLiteConnection.cs +++ b/PowerSync/PowerSync.Common/MDSQLite/MDSQLiteConnection.cs @@ -212,7 +212,6 @@ public async Task Execute(string query, object?[]? parameters = DynamicParameters? dynamicParams = PrepareQuery(ref query, parameters); int rowsAffected = await Db.ExecuteAsync(query, dynamicParams, commandType: CommandType.Text); - FlushUpdates(); return new NonQueryResult { InsertId = raw.sqlite3_last_insert_rowid(Db.Handle), diff --git a/Tests/PowerSync/PowerSync.Common.Tests/Client/PowerSyncDatabaseTests.cs b/Tests/PowerSync/PowerSync.Common.Tests/Client/PowerSyncDatabaseTests.cs index a0f8d2a..dbcbbf8 100644 --- a/Tests/PowerSync/PowerSync.Common.Tests/Client/PowerSyncDatabaseTests.cs +++ b/Tests/PowerSync/PowerSync.Common.Tests/Client/PowerSyncDatabaseTests.cs @@ -6,6 +6,7 @@ namespace PowerSync.Common.Tests.Client; using PowerSync.Common.Client; using PowerSync.Common.Tests.Models; +using PowerSync.Common.Tests.Utils; /// /// dotnet test -v n --framework net8.0 --filter "PowerSyncDatabaseTests" @@ -678,96 +679,114 @@ await db.Execute( public async Task WatchSchemaResetTest() { var dbId = Guid.NewGuid().ToString(); + var localDbName = $"powerSyncWatch_{dbId}.db"; var db = new PowerSyncDatabase(new() { Database = new SQLOpenOptions { - DbFilename = $"powerSyncWatch_{dbId}.db", + DbFilename = localDbName, }, Schema = TestSchema.MakeOptionalSyncSchema(false) }); - using var sem = new SemaphoreSlim(0); - long lastCount = 0; - - const string QUERY = "SELECT COUNT(*) AS count FROM assets"; - var listener = db.Watch(QUERY, null, new() { Signal = testCts.Token }); - _ = Task.Run(async () => + try { - await foreach (var result in listener) + using var sem = new SemaphoreSlim(0); + long lastCount = 0; + + const string QUERY = "SELECT COUNT(*) AS count FROM assets"; + var listener = db.Watch(QUERY, null, new() { Signal = testCts.Token }); + _ = Task.Run(async () => { - if (result.Length > 0) + await foreach (var result in listener) { - lastCount = result[0].count; + if (result.Length > 0) + { + lastCount = result[0].count; + } + sem.Release(); } - sem.Release(); - } - }); + }); - var resolved = await db.GetSourceTables(QUERY, null); - Assert.Single(resolved); - Assert.Contains("ps_data_local__local_assets", resolved); + var resolved = await db.GetSourceTables(QUERY, null); + Assert.Single(resolved); + Assert.Contains("ps_data_local__local_assets", resolved); - for (int i = 0; i < 3; i++) - { - await db.Execute( - "insert into assets(id, description, make) values (?, ?, ?)", - [Guid.NewGuid().ToString(), "some desc", "some make"] - ); - Assert.True(await sem.WaitAsync(100)); - Assert.Equal(i + 1, lastCount); - } - Assert.Equal(3, lastCount); + for (int i = 0; i < 3; i++) + { + await db.Execute( + "insert into assets(id, description, make) values (?, ?, ?)", + [Guid.NewGuid().ToString(), "some desc", "some make"] + ); + Assert.True(await sem.WaitAsync(100)); + Assert.Equal(i + 1, lastCount); + } + Assert.Equal(3, lastCount); - await db.UpdateSchema(TestSchema.MakeOptionalSyncSchema(true)); + await db.UpdateSchema(TestSchema.MakeOptionalSyncSchema(true)); - resolved = await db.GetSourceTables(QUERY); - Assert.Single(resolved); - Assert.Contains("ps_data__assets", resolved); + resolved = await db.GetSourceTables(QUERY); + Assert.Single(resolved); + Assert.Contains("ps_data__assets", resolved); - Assert.True(await sem.WaitAsync(100)); - Assert.Equal(0, lastCount); + Assert.True(await sem.WaitAsync(100)); + Assert.Equal(0, lastCount); - await db.Execute("insert into assets select * from inactive_local_assets"); - Assert.True(await sem.WaitAsync(500)); - Assert.Equal(3, lastCount); + await db.Execute("insert into assets select * from inactive_local_assets"); + Assert.True(await sem.WaitAsync(500)); + Assert.Equal(3, lastCount); - // Sanity check - testCts.Cancel(); - await Task.Delay(100); + // Sanity check + testCts.Cancel(); + await Task.Delay(100); - await db.Execute("delete from assets"); - Assert.False(await sem.WaitAsync(100)); - Assert.Equal(3, lastCount); + await db.Execute("delete from assets"); + Assert.False(await sem.WaitAsync(100)); + Assert.Equal(3, lastCount); + } + finally + { + await db.Close(); + DatabaseUtils.CleanDb(localDbName); + } } [Fact] public async Task Attributes_ColumnAliasing() { + var localDbName = $"PowerSyncAttributesTest-{Guid.NewGuid():N}.db"; var db = new PowerSyncDatabase(new PowerSyncDatabaseOptions { - Database = new SQLOpenOptions { DbFilename = "PowerSyncAttributesTest.db" }, + Database = new SQLOpenOptions { DbFilename = localDbName }, Schema = TestSchemaAttributes.AppSchema, }); - await db.DisconnectAndClear(); + try + { + await db.DisconnectAndClear(); - var id = Guid.NewGuid().ToString(); - var description = "Test description"; - var completed = false; - var createdAt = DateTimeOffset.Now; + var id = Guid.NewGuid().ToString(); + var description = "Test description"; + var completed = false; + var createdAt = DateTimeOffset.Now; - await db.Execute( - "INSERT INTO todos(id, description, completed, created_at, list_id) VALUES(?, ?, ?, ?, uuid())", - [id, description, completed, createdAt] - ); + await db.Execute( + "INSERT INTO todos(id, description, completed, created_at, list_id) VALUES(?, ?, ?, ?, uuid())", + [id, description, completed, createdAt] + ); - var results = await db.GetAll("SELECT * FROM todos"); - Assert.Single(results); - var row = results.First(); - Assert.Equal(id, row.TodoId); - Assert.Equal(description, row.Description); - Assert.Equal(completed, row.Completed); - Assert.Equal(createdAt, row.CreatedAt); + var results = await db.GetAll("SELECT * FROM todos"); + Assert.Single(results); + var row = results.First(); + Assert.Equal(id, row.TodoId); + Assert.Equal(description, row.Description); + Assert.Equal(completed, row.Completed); + Assert.Equal(createdAt, row.CreatedAt); + } + finally + { + await db.Close(); + DatabaseUtils.CleanDb(localDbName); + } } [Fact] diff --git a/Tests/PowerSync/PowerSync.Common.Tests/Client/Sync/CRUDTests.cs b/Tests/PowerSync/PowerSync.Common.Tests/Client/Sync/CRUDTests.cs index 6f877e9..54054b3 100644 --- a/Tests/PowerSync/PowerSync.Common.Tests/Client/Sync/CRUDTests.cs +++ b/Tests/PowerSync/PowerSync.Common.Tests/Client/Sync/CRUDTests.cs @@ -17,7 +17,7 @@ public class CRUDTests : IAsyncLifetime { private PowerSyncDatabase db = default!; private readonly string testId = Guid.NewGuid().ToString(); - private readonly string dbName = "crud-test.db"; + private readonly string dbName = $"crud-{Guid.NewGuid():N}.db"; private record CrudEntryData(string data); @@ -47,50 +47,69 @@ private async Task ResetDB(PowerSyncDatabase db) [Fact] public async Task IncludeMetadataTest() { + var localDbName = $"IncludeMetadataTest-{Guid.NewGuid():N}.db"; var db = new PowerSyncDatabase(new PowerSyncDatabaseOptions { - Database = new SQLOpenOptions { DbFilename = "IncludeMetadataTest.db" }, + Database = new SQLOpenOptions { DbFilename = localDbName }, Schema = TestSchema.GetSchemaWithCustomAssetOptions(new TableOptions { TrackMetadata = true }), }); - await ResetDB(db); + try + { + await ResetDB(db); - await db.Execute("INSERT INTO assets (id, description, _metadata) VALUES(uuid(), 'xxxx', 'so meta');"); + await db.Execute("INSERT INTO assets (id, description, _metadata) VALUES(uuid(), 'xxxx', 'so meta');"); - var batch = await db.GetNextCrudTransaction(); - Assert.Equal("so meta", batch?.Crud[0].Metadata); + var batch = await db.GetNextCrudTransaction(); + Assert.Equal("so meta", batch?.Crud[0].Metadata); + } + finally + { + await db.Close(); + DatabaseUtils.CleanDb(localDbName); + } } [Fact] public async Task IncludeOldValuesTest() { + var localDbName = $"IncludeOldValuesTest-{Guid.NewGuid():N}.db"; var db = new PowerSyncDatabase(new PowerSyncDatabaseOptions { - Database = new SQLOpenOptions { DbFilename = "IncludeOldValuesTest.db" }, + Database = new SQLOpenOptions { DbFilename = localDbName }, Schema = TestSchema.GetSchemaWithCustomAssetOptions(new TableOptions { TrackPreviousValues = new TrackPreviousOptions() }), }); - await ResetDB(db); + try + { + await ResetDB(db); - await db.Execute("INSERT INTO assets (id, description) VALUES(?, ?);", ["a185b7e1-dffa-4a9a-888c-15c0f0cac4b3", "entry"]); - await db.Execute("DELETE FROM ps_crud;"); - await db.Execute("UPDATE assets SET description = ?", ["new name"]); + await db.Execute("INSERT INTO assets (id, description) VALUES(?, ?);", ["a185b7e1-dffa-4a9a-888c-15c0f0cac4b3", "entry"]); + await db.Execute("DELETE FROM ps_crud;"); + await db.Execute("UPDATE assets SET description = ?", ["new name"]); - var batch = await db.GetNextCrudTransaction(); - Assert.True(batch?.Crud[0].PreviousValues?.ContainsKey("description")); - Assert.Equal("entry", batch?.Crud[0].PreviousValues?["description"]); + var batch = await db.GetNextCrudTransaction(); + Assert.True(batch?.Crud[0].PreviousValues?.ContainsKey("description")); + Assert.Equal("entry", batch?.Crud[0].PreviousValues?["description"]); + } + finally + { + await db.Close(); + DatabaseUtils.CleanDb(localDbName); + } } [Fact] public async Task IncludeOldValuesWithColumnFilterTest() { + var localDbName = $"IncludeOldValuesWithColumnFilterTest-{Guid.NewGuid():N}.db"; var db = new PowerSyncDatabase(new PowerSyncDatabaseOptions { - Database = new SQLOpenOptions { DbFilename = "IncludeOldValuesWithColumnFilterTest.db" }, + Database = new SQLOpenOptions { DbFilename = localDbName }, Schema = TestSchema.GetSchemaWithCustomAssetOptions(new TableOptions { TrackPreviousValues = new TrackPreviousOptions @@ -99,25 +118,34 @@ public async Task IncludeOldValuesWithColumnFilterTest() } }), }); - await ResetDB(db); - - await db.Execute("INSERT INTO assets (id, description, make) VALUES(?, ?, ?);", ["a185b7e1-dffa-4a9a-888c-15c0f0cac4b3", "entry", "make1"]); - await db.Execute("DELETE FROM ps_crud;"); - await db.Execute("UPDATE assets SET description = ?, make = ?", ["new name", "make2"]); - - var batch = await db.GetNextCrudTransaction(); - Assert.NotNull(batch?.Crud[0].PreviousValues); - Assert.Equal("entry", batch?.Crud[0].PreviousValues?["description"]); - Assert.False(batch?.Crud[0].PreviousValues!.ContainsKey("make")); + try + { + await ResetDB(db); + + await db.Execute("INSERT INTO assets (id, description, make) VALUES(?, ?, ?);", ["a185b7e1-dffa-4a9a-888c-15c0f0cac4b3", "entry", "make1"]); + await db.Execute("DELETE FROM ps_crud;"); + await db.Execute("UPDATE assets SET description = ?, make = ?", ["new name", "make2"]); + + var batch = await db.GetNextCrudTransaction(); + Assert.NotNull(batch?.Crud[0].PreviousValues); + Assert.Equal("entry", batch?.Crud[0].PreviousValues?["description"]); + Assert.False(batch?.Crud[0].PreviousValues!.ContainsKey("make")); + } + finally + { + await db.Close(); + DatabaseUtils.CleanDb(localDbName); + } } [Fact] public async Task IncludeOldValuesWhenChangedTest() { + var localDbName = $"oldValuesDb-{Guid.NewGuid():N}.db"; var db = new PowerSyncDatabase(new PowerSyncDatabaseOptions { - Database = new SQLOpenOptions { DbFilename = "oldValuesDb" }, + Database = new SQLOpenOptions { DbFilename = localDbName }, Schema = TestSchema.GetSchemaWithCustomAssetOptions(new TableOptions { TrackPreviousValues = new TrackPreviousOptions @@ -126,37 +154,54 @@ public async Task IncludeOldValuesWhenChangedTest() } }), }); - await ResetDB(db); - - await db.Execute("INSERT INTO assets (id, description, make) VALUES(uuid(), ?, ?);", ["name", "make1"]); - await db.Execute("DELETE FROM ps_crud;"); - await db.Execute("UPDATE assets SET description = ?", ["new name"]); - - var batch = await db.GetNextCrudTransaction(); - Assert.Single(batch!.Crud); - Assert.NotNull(batch.Crud[0].PreviousValues); - Assert.Equal("name", batch.Crud[0].PreviousValues!["description"]); - Assert.False(batch.Crud[0].PreviousValues!.ContainsKey("make")); + try + { + await ResetDB(db); + + await db.Execute("INSERT INTO assets (id, description, make) VALUES(uuid(), ?, ?);", ["name", "make1"]); + await db.Execute("DELETE FROM ps_crud;"); + await db.Execute("UPDATE assets SET description = ?", ["new name"]); + + var batch = await db.GetNextCrudTransaction(); + Assert.Single(batch!.Crud); + Assert.NotNull(batch.Crud[0].PreviousValues); + Assert.Equal("name", batch.Crud[0].PreviousValues!["description"]); + Assert.False(batch.Crud[0].PreviousValues!.ContainsKey("make")); + } + finally + { + await db.Close(); + DatabaseUtils.CleanDb(localDbName); + } } [Fact] public async Task IgnoreEmptyUpdateTest() { + var localDbName = $"IgnoreEmptyUpdateTest-{Guid.NewGuid():N}.db"; var db = new PowerSyncDatabase(new PowerSyncDatabaseOptions { - Database = new SQLOpenOptions { DbFilename = "IgnoreEmptyUpdateTest.db" }, + Database = new SQLOpenOptions { DbFilename = localDbName }, Schema = TestSchema.GetSchemaWithCustomAssetOptions(new TableOptions { IgnoreEmptyUpdates = true }), }); - await ResetDB(db); - await db.Execute("INSERT INTO assets (id, description) VALUES(?, ?);", [testId, "name"]); - await db.Execute("DELETE FROM ps_crud;"); - await db.Execute("UPDATE assets SET description = ?", ["name"]); - - var batch = await db.GetNextCrudTransaction(); - Assert.Null(batch); + try + { + await ResetDB(db); + await db.Execute("INSERT INTO assets (id, description) VALUES(?, ?);", [testId, "name"]); + await db.Execute("DELETE FROM ps_crud;"); + await db.Execute("UPDATE assets SET description = ?", ["name"]); + + var batch = await db.GetNextCrudTransaction(); + Assert.Null(batch); + } + finally + { + await db.Close(); + DatabaseUtils.CleanDb(localDbName); + } } [Fact] @@ -309,37 +354,45 @@ public async Task InsertOnlyTablesTest() await insertOnlyDb.Init(); - var initialCrudRows = await insertOnlyDb.GetAll("SELECT * FROM ps_crud"); - Assert.Empty(initialCrudRows); + try + { + var initialCrudRows = await insertOnlyDb.GetAll("SELECT * FROM ps_crud"); + Assert.Empty(initialCrudRows); - await insertOnlyDb.Execute("INSERT INTO logs(id, level, content) VALUES(?, ?, ?)", [testId, "INFO", "test log"]); + await insertOnlyDb.Execute("INSERT INTO logs(id, level, content) VALUES(?, ?, ?)", [testId, "INFO", "test log"]); - var crudEntry = await insertOnlyDb.Get("SELECT data FROM ps_crud ORDER BY id"); + var crudEntry = await insertOnlyDb.Get("SELECT data FROM ps_crud ORDER BY id"); - Assert.Equal( - JsonConvert.SerializeObject(new - { - op = "PUT", - type = "logs", - id = testId, - data = new { content = "test log", level = "INFO" } - }), - crudEntry.data - ); + Assert.Equal( + JsonConvert.SerializeObject(new + { + op = "PUT", + type = "logs", + id = testId, + data = new { content = "test log", level = "INFO" } + }), + crudEntry.data + ); - var logRows = await insertOnlyDb.GetAll("SELECT * FROM logs"); - Assert.Empty(logRows); + var logRows = await insertOnlyDb.GetAll("SELECT * FROM logs"); + Assert.Empty(logRows); - var tx = await insertOnlyDb.GetNextCrudTransaction(); - Assert.Equal(1, tx!.TransactionId); + var tx = await insertOnlyDb.GetNextCrudTransaction(); + Assert.Equal(1, tx!.TransactionId); - var expectedCrudEntry = new CrudEntry(1, UpdateType.PUT, "logs", testId, 1, new Dictionary - { - { "content", "test log" }, - { "level", "INFO" } - }); + var expectedCrudEntry = new CrudEntry(1, UpdateType.PUT, "logs", testId, 1, new Dictionary + { + { "content", "test log" }, + { "level", "INFO" } + }); - Assert.True(tx.Crud.First().Equals(expectedCrudEntry)); + Assert.True(tx.Crud.First().Equals(expectedCrudEntry)); + } + finally + { + await insertOnlyDb.Close(); + DatabaseUtils.CleanDb(uniqueDbName); + } } private record QuantityResult(long quantity); diff --git a/Tests/PowerSync/PowerSync.Common.Tests/Client/Sync/SyncStreamsTests.cs b/Tests/PowerSync/PowerSync.Common.Tests/Client/Sync/SyncStreamsTests.cs index 9ab305a..13c4e59 100644 --- a/Tests/PowerSync/PowerSync.Common.Tests/Client/Sync/SyncStreamsTests.cs +++ b/Tests/PowerSync/PowerSync.Common.Tests/Client/Sync/SyncStreamsTests.cs @@ -29,10 +29,12 @@ public async Task InitializeAsync() public async Task DisposeAsync() { - syncService.Close(); + await db.Disconnect(); await db.Execute("DELETE FROM ps_stream_subscriptions"); await db.DisconnectAndClear(); + syncService.Close(); await db.Close(); + DatabaseUtils.CleanDb(db.Database.Name); } [Fact] diff --git a/Tests/PowerSync/PowerSync.Common.Tests/Client/Sync/SyncTests.cs b/Tests/PowerSync/PowerSync.Common.Tests/Client/Sync/SyncTests.cs index 3bb22b4..a2de5d7 100644 --- a/Tests/PowerSync/PowerSync.Common.Tests/Client/Sync/SyncTests.cs +++ b/Tests/PowerSync/PowerSync.Common.Tests/Client/Sync/SyncTests.cs @@ -1,6 +1,7 @@ namespace PowerSync.Common.Tests.Client.Sync; using PowerSync.Common.Client; +using PowerSync.Common.Tests.Utils; using PowerSync.Common.Tests.Utils.Sync; @@ -21,9 +22,10 @@ public async Task InitializeAsync() public async Task DisposeAsync() { - syncService.Close(); await db.DisconnectAndClear(); + syncService.Close(); await db.Close(); + DatabaseUtils.CleanDb(db.Database.Name); } private readonly string[] syncInitialWithListCreation = [ diff --git a/Tests/PowerSync/PowerSync.Common.Tests/Utils/Sync/MockSyncService.cs b/Tests/PowerSync/PowerSync.Common.Tests/Utils/Sync/MockSyncService.cs index 61f8e4a..3a93f36 100644 --- a/Tests/PowerSync/PowerSync.Common.Tests/Utils/Sync/MockSyncService.cs +++ b/Tests/PowerSync/PowerSync.Common.Tests/Utils/Sync/MockSyncService.cs @@ -33,14 +33,15 @@ public void PushLine(string line) Emit(line); } - public PowerSyncDatabase CreateDatabase() + public PowerSyncDatabase CreateDatabase(string? dbFilename = null) { + dbFilename ??= $"sync-stream-{Guid.NewGuid():N}.db"; var connector = new TestConnector(); var mockRemote = new MockRemote(connector, this, _requests); return new PowerSyncDatabase(new PowerSyncDatabaseOptions { - Database = new SQLOpenOptions { DbFilename = "sync-stream.db" }, + Database = new SQLOpenOptions { DbFilename = dbFilename }, Schema = TestSchemaTodoList.AppSchema, RemoteFactory = _ => mockRemote, Logger = createLogger() @@ -163,13 +164,22 @@ public override async Task PostStreamRaw(SyncStreamOptions options) var pipe = new Pipe(); var writer = pipe.Writer; - var cts = new CancellationTokenSource(); + var cts = CancellationTokenSource.CreateLinkedTokenSource(options.CancellationToken); _ = Task.Run(async () => { - await foreach (var line in syncService.ListenAsync(cts.Token)) + try + { + await foreach (var line in syncService.ListenAsync(cts.Token)) + { + var bytes = Encoding.UTF8.GetBytes(line + "\n"); + await writer.WriteAsync(bytes); + } + } + finally { - var bytes = Encoding.UTF8.GetBytes(line + "\n"); - await writer.WriteAsync(bytes); + await writer.CompleteAsync(); + cts.Cancel(); + cts.Dispose(); } }); From b7d87d29d5dd30771d6b01865f80fb06bd9fb460 Mon Sep 17 00:00:00 2001 From: LucDeCaf Date: Thu, 19 Feb 2026 12:18:44 +0200 Subject: [PATCH 4/4] Cleanup, update readme, update demos --- .../Client/PowerSyncDatabase.cs | 6 ---- PowerSync/PowerSync.Common/README.md | 23 ++++++------ demos/CommandLine/Demo.cs | 11 +++--- demos/MAUITodo/Views/ListsPage.xaml.cs | 9 ++--- demos/MAUITodo/Views/TodoListPage.xaml.cs | 9 ++--- demos/WPF/ViewModels/TodoListViewModel.cs | 35 ++++++++----------- demos/WPF/ViewModels/TodoViewModel.cs | 35 ++++++++----------- 7 files changed, 49 insertions(+), 79 deletions(-) diff --git a/PowerSync/PowerSync.Common/Client/PowerSyncDatabase.cs b/PowerSync/PowerSync.Common/Client/PowerSyncDatabase.cs index b18f21a..be9b43a 100644 --- a/PowerSync/PowerSync.Common/Client/PowerSyncDatabase.cs +++ b/PowerSync/PowerSync.Common/Client/PowerSyncDatabase.cs @@ -1009,12 +1009,6 @@ public class SQLWatchOptions public bool TriggerImmediately { get; set; } = false; } -public class WatchHandler -{ - public Action OnResult { get; set; } = null!; - public Action? OnError { get; set; } -} - public class WatchOnChangeEvent { public string[] ChangedTables { get; set; } = []; diff --git a/PowerSync/PowerSync.Common/README.md b/PowerSync/PowerSync.Common/README.md index cb8c852..89cd3d2 100644 --- a/PowerSync/PowerSync.Common/README.md +++ b/PowerSync/PowerSync.Common/README.md @@ -1,4 +1,4 @@ -# PowerSync SDK .NET Common +# PowerSync SDK .NET Common This package contains a .NET implementation of a PowerSync database connector and streaming sync bucket implementation. @@ -16,7 +16,7 @@ This package is published on [NuGet](https://www.nuget.org/packages/PowerSync.Co dotnet add package PowerSync.Common --prerelease ``` -## Usage +## Usage ### Simple Query @@ -39,24 +39,23 @@ static async Task Main() { ``` ### Watched queries -Watched queries will automatically update when a dependant table is updated. -Awaiting `Watch()` ensures the watcher is fully initialized and ready to monitor database changes. + +Watched queries will automatically update when a dependant table is updated. +Call `Watch()` synchronously to ensure the watcher is fully initialized before execution continues. ```csharp -await db.Watch("select * from lists", null, new WatchHandler +var cts = new CancellationTokenSource(); +var listener = db.Watch("select * from lists", null, new SQLWatchOptions { Signal = cts.Token }); +_ = Task.Run(async () => { - OnResult = (results) => + await foreach (var results in listener) { table.Rows.Clear(); foreach (var line in results) { table.AddRow(line.id, line.name, line.owner_id, line.created_at); } - }, - OnError = (error) => - { - Console.WriteLine("Error: " + error.Message); } -}); +}, cts.Token); +``` -``` \ No newline at end of file diff --git a/demos/CommandLine/Demo.cs b/demos/CommandLine/Demo.cs index f8eaafb..623581f 100644 --- a/demos/CommandLine/Demo.cs +++ b/demos/CommandLine/Demo.cs @@ -67,23 +67,20 @@ static async Task Main() bool running = true; - await db.Watch("select * from lists", null, new WatchHandler + var listener = db.Watch("select * from lists"); + _ = Task.Run(async () => { - OnResult = (results) => + await foreach (var results in listener) { table.Rows.Clear(); foreach (var line in results) { table.AddRow(line.id, line.name, line.owner_id, line.created_at); } - }, - OnError = (error) => - { - Console.WriteLine("Error: " + error.Message); } }); - var _ = Task.Run(async () => + _ = Task.Run(async () => { while (running) { diff --git a/demos/MAUITodo/Views/ListsPage.xaml.cs b/demos/MAUITodo/Views/ListsPage.xaml.cs index 3b64d7c..8f0be46 100644 --- a/demos/MAUITodo/Views/ListsPage.xaml.cs +++ b/demos/MAUITodo/Views/ListsPage.xaml.cs @@ -34,15 +34,12 @@ protected override async void OnAppearing() } }); - await database.Db.Watch("select * from lists", null, new WatchHandler + var listener = database.Db.Watch("select * from lists", null, new() { TriggerImmediately = true }); + _ = Task.Run(async () => { - OnResult = (results) => + await foreach (var results in listener) { MainThread.BeginInvokeOnMainThread(() => { ListsCollection.ItemsSource = results.ToList(); }); - }, - OnError = (error) => - { - Console.WriteLine("Error: " + error.Message); } }); } diff --git a/demos/MAUITodo/Views/TodoListPage.xaml.cs b/demos/MAUITodo/Views/TodoListPage.xaml.cs index e709593..72cff04 100644 --- a/demos/MAUITodo/Views/TodoListPage.xaml.cs +++ b/demos/MAUITodo/Views/TodoListPage.xaml.cs @@ -24,15 +24,12 @@ protected override async void OnAppearing() { base.OnAppearing(); - await database.Db.Watch("select * from todos where list_id = ?", [selectedList.ID], new WatchHandler + var listener = database.Db.Watch("select * from todos where list_id = ?", [selectedList.ID], new() { TriggerImmediately = true }); + _ = Task.Run(async () => { - OnResult = (results) => + await foreach (var results in listener) { MainThread.BeginInvokeOnMainThread(() => { TodoItemsCollection.ItemsSource = results.ToList(); }); - }, - OnError = (error) => - { - Console.WriteLine("Error: " + error.Message); } }); } diff --git a/demos/WPF/ViewModels/TodoListViewModel.cs b/demos/WPF/ViewModels/TodoListViewModel.cs index c27ff90..55ffa19 100644 --- a/demos/WPF/ViewModels/TodoListViewModel.cs +++ b/demos/WPF/ViewModels/TodoListViewModel.cs @@ -147,32 +147,25 @@ ORDER BY last_completed_at DESC NULLS LAST; "; - await _db.Watch( - query, - null, - new WatchHandler + var listener = _db.Watch(query); + _ = Task.Run(async () => + { + await foreach (var results in listener) { - OnResult = (results) => + var dispatcher = System.Windows.Application.Current?.Dispatcher; + if (dispatcher != null) { - var dispatcher = System.Windows.Application.Current?.Dispatcher; - if (dispatcher != null) + dispatcher.Invoke(() => { - dispatcher.Invoke(() => + TodoLists.Clear(); + foreach (var result in results) { - TodoLists.Clear(); - foreach (var result in results) - { - TodoLists.Add(result); - } - }); - } - }, - OnError = (error) => - { - Console.WriteLine("Error: " + error.Message); - }, + TodoLists.Add(result); + } + }); + } } - ); + }); } private async Task AddList(string newListName) diff --git a/demos/WPF/ViewModels/TodoViewModel.cs b/demos/WPF/ViewModels/TodoViewModel.cs index 85cd1e8..fe96afa 100644 --- a/demos/WPF/ViewModels/TodoViewModel.cs +++ b/demos/WPF/ViewModels/TodoViewModel.cs @@ -105,32 +105,25 @@ public void SetList(TodoList list) private async void WatchForChanges() { - await _db.Watch( - "SELECT * FROM todos where list_id = ? ORDER BY created_at;", - [_list!.Id], - new WatchHandler + var listener = _db.Watch("SELECT * FROM todos where list_id = ? ORDER BY created_at;", [_list!.Id]); + _ = Task.Run(async () => + { + await foreach (var results in listener) { - OnResult = (results) => + var dispatcher = System.Windows.Application.Current?.Dispatcher; + if (dispatcher != null) { - var dispatcher = System.Windows.Application.Current?.Dispatcher; - if (dispatcher != null) + dispatcher.Invoke(() => { - dispatcher.Invoke(() => + Todos.Clear(); + foreach (var result in results) { - Todos.Clear(); - foreach (var result in results) - { - Todos.Add(result); - } - }); - } - }, - OnError = (error) => - { - Console.WriteLine("Error: " + error.Message); - }, + Todos.Add(result); + } + }); + } } - ); + }); } private async void LoadTodos()