diff --git a/PowerSync/PowerSync.Common/CHANGELOG.md b/PowerSync/PowerSync.Common/CHANGELOG.md index 0091701..05e329a 100644 --- a/PowerSync/PowerSync.Common/CHANGELOG.md +++ b/PowerSync/PowerSync.Common/CHANGELOG.md @@ -2,6 +2,8 @@ ## 0.0.10-alpha.1 +- Fixed watched queries sometimes resolving to the wrong underlying tables after a schema change. +- Fixed some properties in Table not being public when they are meant to be. - Fixed a bug where custom indexes were not being sent to the PowerSync SQLite extension. - Added a new model-based syntax for defining the PowerSync schema (the old syntax is still functional). This syntax uses classes marked with attributes to define the PowerSync schema. The classes can then also be used for queries later on. diff --git a/PowerSync/PowerSync.Common/Client/PowerSyncDatabase.cs b/PowerSync/PowerSync.Common/Client/PowerSyncDatabase.cs index ef78479..0eb3a43 100644 --- a/PowerSync/PowerSync.Common/Client/PowerSyncDatabase.cs +++ b/PowerSync/PowerSync.Common/Client/PowerSyncDatabase.cs @@ -768,41 +768,61 @@ private async Task WatchInternal( Func> getter ) { - try - { - var resolvedTables = await ResolveTables(query, parameters, options); - var result = await getter(query, parameters); - handler.OnResult(result); + var subscription = new WatchSubscription(); - var subscription = OnChange(new WatchOnChangeHandler + async Task ResetQuery() + { + try { - OnChange = async (change) => + var resolvedTables = await ResolveTables(query, parameters, options); + var result = await getter(query, parameters); + handler.OnResult(result); + + var onChangeListener = OnChange(new WatchOnChangeHandler { - try + OnChange = async (change) => { - var result = await getter(query, parameters); - handler.OnResult(result); - } - catch (Exception ex) - { - handler.OnError?.Invoke(ex); - } - }, - OnError = handler.OnError - }, new SQLWatchOptions - { - Tables = resolvedTables, - Signal = options?.Signal, - ThrottleMs = options?.ThrottleMs - }); + try + { + var result = await getter(query, parameters); + handler.OnResult(result); + } + catch (Exception ex) + { + handler.OnError?.Invoke(ex); + } + }, + OnError = handler.OnError + }, new SQLWatchOptions + { + Tables = resolvedTables, + Signal = options?.Signal, + ThrottleMs = options?.ThrottleMs + }); - return subscription; + subscription.SetOnChangeListener(onChangeListener); + } + catch (Exception ex) + { + handler.OnError?.Invoke(ex); + throw; + } } - catch (Exception ex) + + // Register initial subscription + await ResetQuery(); + + // Listen for schema changes and reset listener + var schemaListener = RunListener(async (e) => { - handler.OnError?.Invoke(ex); - throw; - } + if (e.SchemaChanged != null) + { + await ResetQuery(); + } + }); + subscription.SetSchemaListener(schemaListener); + + return subscription; } private class ExplainedResult @@ -869,7 +889,7 @@ void flushTableUpdates() }); } - var cts = Database.RunListener((update) => + var dbListenerCts = Database.RunListener((update) => { if (update.TablesUpdated != null) { @@ -885,27 +905,29 @@ void flushTableUpdates() } }); - CancellationTokenSource linkedCts; - if (options?.Signal.HasValue == true) + CancellationTokenSource stopRunningCts; + + if (options?.Signal != null) { - // Cancel on global CTS cancellation or user token cancellation - linkedCts = CancellationTokenSource.CreateLinkedTokenSource( + var linkedCts = CancellationTokenSource.CreateLinkedTokenSource( watchSubscriptionCts.Token, options.Signal.Value ); + stopRunningCts = linkedCts; } else { - // Cancel on global CTS cancellation - linkedCts = watchSubscriptionCts; + stopRunningCts = watchSubscriptionCts; } - var registration = linkedCts.Token.Register(() => + var stopRunningReg = stopRunningCts.Token.Register(dbListenerCts.Cancel); + + return new ActionDisposable(() => { - cts.Cancel(); + stopRunningReg.Dispose(); + dbListenerCts.Cancel(); + dbListenerCts.Dispose(); }); - - return new WatchSubscription(cts, registration); } private static void HandleTableChanges(HashSet changedTables, HashSet watchedTables, Action onDetectedChanges) @@ -968,21 +990,63 @@ public class WatchOnChangeHandler public Action? OnError { get; set; } } -public class WatchSubscription(CancellationTokenSource cts, CancellationTokenRegistration registration) : IDisposable +public class WatchSubscription : IDisposable { - private readonly CancellationTokenSource _cts = cts; - private readonly CancellationTokenRegistration _registration = registration; + private IDisposable? _onChangeListener; + private IDisposable? _schemaListener; + private readonly object _lock = new(); private bool _disposed; - public bool Disposed { get { return _disposed; } } + internal void SetSchemaListener(IDisposable listener) + { + lock (_lock) + { + if (_disposed) + { + listener.Dispose(); + return; + } + _schemaListener?.Dispose(); + _schemaListener = listener; + } + } + + internal void SetOnChangeListener(IDisposable listener) + { + lock (_lock) + { + if (_disposed) + { + listener.Dispose(); + return; + } + _onChangeListener?.Dispose(); + _onChangeListener = listener; + } + } + + public void Dispose() + { + lock (_lock) + { + if (_disposed) return; + _disposed = true; + + _onChangeListener?.Dispose(); + _schemaListener?.Dispose(); + } + } +} + +public class ActionDisposable(Action onDispose) : IDisposable +{ + private readonly Action _onDispose = onDispose; + private bool _disposed = false; public void Dispose() { if (_disposed) return; _disposed = true; - - _registration.Dispose(); - _cts.Cancel(); - _cts.Dispose(); + _onDispose(); } } diff --git a/PowerSync/PowerSync.Common/DB/Schema/Table.cs b/PowerSync/PowerSync.Common/DB/Schema/Table.cs index 095fb8c..eddabe9 100644 --- a/PowerSync/PowerSync.Common/DB/Schema/Table.cs +++ b/PowerSync/PowerSync.Common/DB/Schema/Table.cs @@ -84,22 +84,22 @@ public bool InsertOnly get { return Options.InsertOnly; } set { Options.InsertOnly = value; } } - string? ViewName + public string? ViewName { get { return Options.ViewName; } set { Options.ViewName = value; } } - bool TrackMetadata + public bool TrackMetadata { get { return Options.TrackMetadata; } set { Options.TrackMetadata = value; } } - TrackPreviousOptions? TrackPreviousValues + public TrackPreviousOptions? TrackPreviousValues { get { return Options.TrackPreviousValues; } set { Options.TrackPreviousValues = value; } } - bool IgnoreEmptyUpdates + public bool IgnoreEmptyUpdates { get { return Options.IgnoreEmptyUpdates; } set { Options.IgnoreEmptyUpdates = value; } diff --git a/Tests/PowerSync/PowerSync.Common.Tests/Client/PowerSyncDatabaseTests.cs b/Tests/PowerSync/PowerSync.Common.Tests/Client/PowerSyncDatabaseTests.cs index 87d3479..69b95c5 100644 --- a/Tests/PowerSync/PowerSync.Common.Tests/Client/PowerSyncDatabaseTests.cs +++ b/Tests/PowerSync/PowerSync.Common.Tests/Client/PowerSyncDatabaseTests.cs @@ -7,6 +7,7 @@ namespace PowerSync.Common.Tests.Client; using Newtonsoft.Json; using PowerSync.Common.Client; +using PowerSync.Common.DB.Schema; using PowerSync.Common.Tests.Models; /// @@ -573,7 +574,7 @@ await tx.Execute( } [Fact(Timeout = 2000)] - public async void WatchDisposableSubscriptionTest() + public async Task WatchDisposableSubscriptionTest() { int callCount = 0; var semaphore = new SemaphoreSlim(0); @@ -609,7 +610,7 @@ await db.Execute( } [Fact(Timeout = 2000)] - public async void WatchDisposableCustomTokenTest() + public async Task WatchDisposableCustomTokenTest() { var customTokenSource = new CancellationTokenSource(); int callCount = 0; @@ -649,8 +650,8 @@ await db.Execute( Assert.Equal(2, callCount); } - [Fact(Timeout = 2500)] - public async void WatchSingleCancelledTest() + [Fact(Timeout = 3000)] + public async Task WatchSingleCancelledTest() { int callCount = 0; @@ -688,13 +689,107 @@ await db.Execute( ); // Ensure nothing received from cancelled result - bool receivedResult = await semCancelled.WaitAsync(100); - Assert.False(receivedResult, "Received update after disposal"); + Assert.False(await semCancelled.WaitAsync(100)); await semAlwaysRunning.WaitAsync(); Assert.Equal(5, callCount); } + [Fact(Timeout = 3000)] + public async Task WatchSchemaResetTest() + { + var dbId = Guid.NewGuid().ToString(); + var db = new PowerSyncDatabase(new() + { + Database = new SQLOpenOptions + { + DbFilename = $"powerSyncWatch_{dbId}.db", + }, + Schema = TestSchema.MakeOptionalSyncSchema(false) + }); + + var sem = new SemaphoreSlim(0); + long lastCount = -1; + + string querySql = "SELECT COUNT(*) AS count FROM assets"; + var query = await db.Watch(querySql, [], new WatchHandler + { + OnResult = (result) => + { + lastCount = result[0].count; + sem.Release(); + }, + OnError = error => throw error + }); + Assert.True(await sem.WaitAsync(100)); + Assert.Equal(0, lastCount); + + var resolved = await GetSourceTables(db, querySql); + Assert.Single(resolved); + Assert.Contains("ps_data_local__local_assets", resolved); + + for (int i = 0; i < 3; i++) + { + await db.Execute( + "insert into assets(id, description, make) values (?, ?, ?)", + [Guid.NewGuid().ToString(), "some desc", "some make"] + ); + Assert.True(await sem.WaitAsync(100)); + Assert.Equal(i + 1, lastCount); + } + Assert.Equal(3, lastCount); + + await db.UpdateSchema(TestSchema.MakeOptionalSyncSchema(true)); + + resolved = await GetSourceTables(db, querySql); + Assert.Single(resolved); + Assert.Contains("ps_data__assets", resolved); + + Assert.True(await sem.WaitAsync(100)); + Assert.Equal(0, lastCount); + + await db.Execute("insert into assets select * from inactive_local_assets"); + Assert.True(await sem.WaitAsync(500)); + Assert.Equal(3, lastCount); + + // Sanity check + query.Dispose(); + + await db.Execute("delete from assets"); + Assert.False(await sem.WaitAsync(100)); + Assert.Equal(3, lastCount); + } + + private class ExplainedResult + { + public int addr = 0; + public string opcode = ""; + public int p1 = 0; + public int p2 = 0; + public int p3 = 0; + public string p4 = ""; + public int p5 = 0; + } + private record TableSelectResult(string tbl_name); + private async Task> GetSourceTables(PowerSyncDatabase db, string sql, object?[]? parameters = null) + { + var explained = await db.GetAll( + $"EXPLAIN {sql}", parameters + ); + + var rootPages = explained + .Where(row => row.opcode == "OpenRead" && row.p3 == 0) + .Select(row => row.p2) + .ToList(); + + var tables = await db.GetAll( + "SELECT DISTINCT tbl_name FROM sqlite_master WHERE rootpage IN (SELECT json_each.value FROM json_each(?))", + [JsonConvert.SerializeObject(rootPages)] + ); + + return tables.Select(row => row.tbl_name).ToList(); + } + [Fact] public async Task Attributes_ColumnAliasing() { diff --git a/Tests/PowerSync/PowerSync.Common.Tests/TestSchema.cs b/Tests/PowerSync/PowerSync.Common.Tests/TestSchema.cs index d404e1b..d64280b 100644 --- a/Tests/PowerSync/PowerSync.Common.Tests/TestSchema.cs +++ b/Tests/PowerSync/PowerSync.Common.Tests/TestSchema.cs @@ -89,4 +89,27 @@ public static Schema GetSchemaWithCustomAssetOptions(TableOptions? assetOptions return new Schema(customAssets, Customers); } + + public static Schema MakeOptionalSyncSchema(bool synced) + { + string SyncedName(string name) => synced ? name : $"inactice_synced_{name}"; + string LocalName(string name) => synced ? $"inactive_local_{name}" : name; + + return new Schema( + new Table + { + Name = "assets", + Columns = AssetsColumns, + ViewName = SyncedName("assets"), + }, + new Table + { + Name = "local_assets", + Columns = AssetsColumns, + ViewName = LocalName("assets"), + LocalOnly = true, + } + ); + } } +