Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions PowerSync/PowerSync.Common/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

## 0.0.10-alpha.1

- Fixed watched queries sometimes resolving to the wrong underlying tables after a schema change.
- Fixed some properties in Table not being public when they are meant to be.
- Fixed a bug where custom indexes were not being sent to the PowerSync SQLite extension.
- Added a new model-based syntax for defining the PowerSync schema (the old syntax is still functional). This syntax uses classes marked with attributes to define the PowerSync schema. The classes can then also be used for queries later on.

Expand Down
158 changes: 111 additions & 47 deletions PowerSync/PowerSync.Common/Client/PowerSyncDatabase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -768,41 +768,61 @@ private async Task<IDisposable> WatchInternal<T>(
Func<string, object?[]?, Task<T[]>> getter
)
{
try
{
var resolvedTables = await ResolveTables(query, parameters, options);
var result = await getter(query, parameters);
handler.OnResult(result);
var subscription = new WatchSubscription();

var subscription = OnChange(new WatchOnChangeHandler
async Task ResetQuery()
{
try
{
OnChange = async (change) =>
var resolvedTables = await ResolveTables(query, parameters, options);
var result = await getter(query, parameters);
handler.OnResult(result);

var onChangeListener = OnChange(new WatchOnChangeHandler
{
try
OnChange = async (change) =>
{
var result = await getter(query, parameters);
handler.OnResult(result);
}
catch (Exception ex)
{
handler.OnError?.Invoke(ex);
}
},
OnError = handler.OnError
}, new SQLWatchOptions
{
Tables = resolvedTables,
Signal = options?.Signal,
ThrottleMs = options?.ThrottleMs
});
try
{
var result = await getter(query, parameters);
handler.OnResult(result);
}
catch (Exception ex)
{
handler.OnError?.Invoke(ex);
}
},
OnError = handler.OnError
}, new SQLWatchOptions
{
Tables = resolvedTables,
Signal = options?.Signal,
ThrottleMs = options?.ThrottleMs
});

return subscription;
subscription.SetOnChangeListener(onChangeListener);
}
catch (Exception ex)
{
handler.OnError?.Invoke(ex);
throw;
}
}
catch (Exception ex)

// Register initial subscription
await ResetQuery();

// Listen for schema changes and reset listener
var schemaListener = RunListener(async (e) =>
{
handler.OnError?.Invoke(ex);
throw;
}
if (e.SchemaChanged != null)
{
await ResetQuery();
}
});
subscription.SetSchemaListener(schemaListener);

return subscription;
}

private class ExplainedResult
Expand Down Expand Up @@ -869,7 +889,7 @@ void flushTableUpdates()
});
}

var cts = Database.RunListener((update) =>
var dbListenerCts = Database.RunListener((update) =>
{
if (update.TablesUpdated != null)
{
Expand All @@ -885,27 +905,29 @@ void flushTableUpdates()
}
});

CancellationTokenSource linkedCts;
if (options?.Signal.HasValue == true)
CancellationTokenSource stopRunningCts;

if (options?.Signal != null)
{
// Cancel on global CTS cancellation or user token cancellation
linkedCts = CancellationTokenSource.CreateLinkedTokenSource(
var linkedCts = CancellationTokenSource.CreateLinkedTokenSource(
watchSubscriptionCts.Token,
options.Signal.Value
);
stopRunningCts = linkedCts;
}
else
{
// Cancel on global CTS cancellation
linkedCts = watchSubscriptionCts;
stopRunningCts = watchSubscriptionCts;
}

var registration = linkedCts.Token.Register(() =>
var stopRunningReg = stopRunningCts.Token.Register(dbListenerCts.Cancel);

return new ActionDisposable(() =>
{
cts.Cancel();
stopRunningReg.Dispose();
dbListenerCts.Cancel();
dbListenerCts.Dispose();
});

return new WatchSubscription(cts, registration);
}

private static void HandleTableChanges(HashSet<string> changedTables, HashSet<string> watchedTables, Action<string[]> onDetectedChanges)
Expand Down Expand Up @@ -968,21 +990,63 @@ public class WatchOnChangeHandler
public Action<Exception>? OnError { get; set; }
}

public class WatchSubscription(CancellationTokenSource cts, CancellationTokenRegistration registration) : IDisposable
public class WatchSubscription : IDisposable
{
private readonly CancellationTokenSource _cts = cts;
private readonly CancellationTokenRegistration _registration = registration;
private IDisposable? _onChangeListener;
private IDisposable? _schemaListener;
private readonly object _lock = new();
private bool _disposed;

public bool Disposed { get { return _disposed; } }
internal void SetSchemaListener(IDisposable listener)
{
lock (_lock)
{
if (_disposed)
{
listener.Dispose();
return;
}
_schemaListener?.Dispose();
_schemaListener = listener;
}
}

internal void SetOnChangeListener(IDisposable listener)
{
lock (_lock)
{
if (_disposed)
{
listener.Dispose();
return;
}
_onChangeListener?.Dispose();
_onChangeListener = listener;
}
}

public void Dispose()
{
lock (_lock)
{
if (_disposed) return;
_disposed = true;

_onChangeListener?.Dispose();
_schemaListener?.Dispose();
}
}
}

public class ActionDisposable(Action onDispose) : IDisposable
{
private readonly Action _onDispose = onDispose;
private bool _disposed = false;

public void Dispose()
{
if (_disposed) return;
_disposed = true;

_registration.Dispose();
_cts.Cancel();
_cts.Dispose();
_onDispose();
}
}
8 changes: 4 additions & 4 deletions PowerSync/PowerSync.Common/DB/Schema/Table.cs
Original file line number Diff line number Diff line change
Expand Up @@ -84,22 +84,22 @@ public bool InsertOnly
get { return Options.InsertOnly; }
set { Options.InsertOnly = value; }
}
string? ViewName
public string? ViewName
{
get { return Options.ViewName; }
set { Options.ViewName = value; }
}
bool TrackMetadata
public bool TrackMetadata
{
get { return Options.TrackMetadata; }
set { Options.TrackMetadata = value; }
}
TrackPreviousOptions? TrackPreviousValues
public TrackPreviousOptions? TrackPreviousValues
{
get { return Options.TrackPreviousValues; }
set { Options.TrackPreviousValues = value; }
}
bool IgnoreEmptyUpdates
public bool IgnoreEmptyUpdates
{
get { return Options.IgnoreEmptyUpdates; }
set { Options.IgnoreEmptyUpdates = value; }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ namespace PowerSync.Common.Tests.Client;
using Newtonsoft.Json;

using PowerSync.Common.Client;
using PowerSync.Common.DB.Schema;
using PowerSync.Common.Tests.Models;

/// <summary>
Expand Down Expand Up @@ -573,7 +574,7 @@ await tx.Execute(
}

[Fact(Timeout = 2000)]
public async void WatchDisposableSubscriptionTest()
public async Task WatchDisposableSubscriptionTest()
{
int callCount = 0;
var semaphore = new SemaphoreSlim(0);
Expand Down Expand Up @@ -609,7 +610,7 @@ await db.Execute(
}

[Fact(Timeout = 2000)]
public async void WatchDisposableCustomTokenTest()
public async Task WatchDisposableCustomTokenTest()
{
var customTokenSource = new CancellationTokenSource();
int callCount = 0;
Expand Down Expand Up @@ -649,8 +650,8 @@ await db.Execute(
Assert.Equal(2, callCount);
}

[Fact(Timeout = 2500)]
public async void WatchSingleCancelledTest()
[Fact(Timeout = 3000)]
public async Task WatchSingleCancelledTest()
{
int callCount = 0;

Expand Down Expand Up @@ -688,13 +689,107 @@ await db.Execute(
);

// Ensure nothing received from cancelled result
bool receivedResult = await semCancelled.WaitAsync(100);
Assert.False(receivedResult, "Received update after disposal");
Assert.False(await semCancelled.WaitAsync(100));

await semAlwaysRunning.WaitAsync();
Assert.Equal(5, callCount);
}

[Fact(Timeout = 3000)]
public async Task WatchSchemaResetTest()
{
var dbId = Guid.NewGuid().ToString();
var db = new PowerSyncDatabase(new()
{
Database = new SQLOpenOptions
{
DbFilename = $"powerSyncWatch_{dbId}.db",
},
Schema = TestSchema.MakeOptionalSyncSchema(false)
});

var sem = new SemaphoreSlim(0);
long lastCount = -1;

string querySql = "SELECT COUNT(*) AS count FROM assets";
var query = await db.Watch(querySql, [], new WatchHandler<CountResult>
{
OnResult = (result) =>
{
lastCount = result[0].count;
sem.Release();
},
OnError = error => throw error
});
Assert.True(await sem.WaitAsync(100));
Assert.Equal(0, lastCount);

var resolved = await GetSourceTables(db, querySql);
Assert.Single(resolved);
Assert.Contains("ps_data_local__local_assets", resolved);

for (int i = 0; i < 3; i++)
{
await db.Execute(
"insert into assets(id, description, make) values (?, ?, ?)",
[Guid.NewGuid().ToString(), "some desc", "some make"]
);
Assert.True(await sem.WaitAsync(100));
Assert.Equal(i + 1, lastCount);
}
Assert.Equal(3, lastCount);

await db.UpdateSchema(TestSchema.MakeOptionalSyncSchema(true));

resolved = await GetSourceTables(db, querySql);
Assert.Single(resolved);
Assert.Contains("ps_data__assets", resolved);

Assert.True(await sem.WaitAsync(100));
Assert.Equal(0, lastCount);

await db.Execute("insert into assets select * from inactive_local_assets");
Assert.True(await sem.WaitAsync(500));
Assert.Equal(3, lastCount);

// Sanity check
query.Dispose();

await db.Execute("delete from assets");
Assert.False(await sem.WaitAsync(100));
Assert.Equal(3, lastCount);
}

private class ExplainedResult
{
public int addr = 0;
public string opcode = "";
public int p1 = 0;
public int p2 = 0;
public int p3 = 0;
public string p4 = "";
public int p5 = 0;
}
private record TableSelectResult(string tbl_name);
private async Task<List<string>> GetSourceTables(PowerSyncDatabase db, string sql, object?[]? parameters = null)
{
var explained = await db.GetAll<ExplainedResult>(
$"EXPLAIN {sql}", parameters
);

var rootPages = explained
.Where(row => row.opcode == "OpenRead" && row.p3 == 0)
.Select(row => row.p2)
.ToList();

var tables = await db.GetAll<TableSelectResult>(
"SELECT DISTINCT tbl_name FROM sqlite_master WHERE rootpage IN (SELECT json_each.value FROM json_each(?))",
[JsonConvert.SerializeObject(rootPages)]
);

return tables.Select(row => row.tbl_name).ToList();
}

[Fact]
public async Task Attributes_ColumnAliasing()
{
Expand Down
Loading