Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
444 changes: 207 additions & 237 deletions PowerSync/PowerSync.Common/Client/PowerSyncDatabase.cs

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -457,16 +457,19 @@ async Task Connect(EstablishSyncStream instruction)
controlInvocations = new EventStream<EnqueuedCommand>();
try
{
controlInvocations?.RunListenerAsync(async (line) =>
_ = Task.Run(async () =>
{
await Control(line.Command, line.Payload);

// Triggers a local CRUD upload when the first sync line has been received.
// This allows uploading local changes that have been made while offline or disconnected.
if (!hadSyncLine)
await foreach (var line in controlInvocations.ListenAsync(new CancellationToken()))
{
TriggerCrudUpload();
hadSyncLine = true;
await Control(line.Command, line.Payload);

// Triggers a local CRUD upload when the first sync line has been received.
// This allows uploading local changes that have been made while offline or disconnected.
if (!hadSyncLine)
{
TriggerCrudUpload();
hadSyncLine = true;
}
}
});

Expand Down
12 changes: 5 additions & 7 deletions PowerSync/PowerSync.Common/MDSQLite/MDSQLiteAdapter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ public class MDSQLiteAdapter : EventStream<DBAdapterEvent>, IDBAdapter

protected RequiredMDSQLiteOptions resolvedMDSQLiteOptions;
private CancellationTokenSource? tablesUpdatedCts;
private Task? tablesUpdatedTask;

private readonly AsyncLock writeMutex = new();
private readonly AsyncLock readMutex = new();
Expand Down Expand Up @@ -86,11 +87,7 @@ private async Task Init()

foreach (var statement in writeConnectionStatements)
{
for (int tries = 0; tries < 30; tries++)
{
await writeConnection!.Execute(statement);
tries = 30;
}
await writeConnection!.Execute(statement);
}

foreach (var statement in readConnectionStatements)
Expand All @@ -99,9 +96,9 @@ private async Task Init()
}

tablesUpdatedCts = new CancellationTokenSource();
var _ = Task.Run(() =>
tablesUpdatedTask = Task.Run(async () =>
{
foreach (var notification in writeConnection!.Listen(tablesUpdatedCts.Token))
await foreach (var notification in writeConnection!.ListenAsync(tablesUpdatedCts.Token))
{
if (notification.TablesUpdated != null)
{
Expand Down Expand Up @@ -139,6 +136,7 @@ protected virtual void LoadExtension(SqliteConnection db)
public new void Close()
{
tablesUpdatedCts?.Cancel();
try { tablesUpdatedTask?.Wait(TimeSpan.FromSeconds(2)); } catch { }
base.Close();
writeConnection?.Close();
readConnection?.Close();
Expand Down
1 change: 1 addition & 0 deletions PowerSync/PowerSync.Common/PowerSync.Common.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

<ItemGroup>
<PackageReference Include="Dapper" Version="2.1.66" />
<PackageReference Include="Microsoft.Bcl.AsyncInterfaces" Version="9.0.1" />
<PackageReference Include="Microsoft.Data.Sqlite" Version="9.0.1" />
<PackageReference Include="Microsoft.Extensions.Logging" Version="6.0.0" />
<PackageReference Include="Microsoft.Extensions.Logging.Console" Version="6.0.0" />
Expand Down
23 changes: 11 additions & 12 deletions PowerSync/PowerSync.Common/README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# PowerSync SDK .NET Common
# PowerSync SDK .NET Common

This package contains a .NET implementation of a PowerSync database connector and streaming sync bucket implementation.

Expand All @@ -16,7 +16,7 @@ This package is published on [NuGet](https://www.nuget.org/packages/PowerSync.Co
dotnet add package PowerSync.Common --prerelease
```

## Usage
## Usage

### Simple Query

Expand All @@ -39,24 +39,23 @@ static async Task Main() {
```

### Watched queries
Watched queries will automatically update when a dependant table is updated.
Awaiting `Watch()` ensures the watcher is fully initialized and ready to monitor database changes.

Watched queries will automatically update when a dependant table is updated.
Call `Watch()` synchronously to ensure the watcher is fully initialized before execution continues.

```csharp
await db.Watch("select * from lists", null, new WatchHandler<ListResult>
var cts = new CancellationTokenSource();
var listener = db.Watch<ListResult>("select * from lists", null, new SQLWatchOptions { Signal = cts.Token });
_ = Task.Run(async () =>
{
OnResult = (results) =>
await foreach (var results in listener)
{
table.Rows.Clear();
foreach (var line in results)
{
table.AddRow(line.id, line.name, line.owner_id, line.created_at);
}
},
OnError = (error) =>
{
Console.WriteLine("Error: " + error.Message);
}
});
}, cts.Token);
```

```
92 changes: 23 additions & 69 deletions PowerSync/PowerSync.Common/Utils/EventStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,21 +10,15 @@ public interface IEventStream<T>

Task EmitAsync(T item);

CancellationTokenSource RunListenerAsync(
Func<T, Task> callback);
IEnumerable<T> Listen(CancellationToken cancellationToken);

IAsyncEnumerable<T> ListenAsync(CancellationToken cancellationToken);

CancellationTokenSource RunListener(Action<T> callback);

IEnumerable<T> Listen(CancellationToken cancellationToken);

void Close();
}

public class EventStream<T> : IEventStream<T>
{

public bool Closed = false;

// Closest implementation to a ConcurrentSet<T> in .Net
Expand All @@ -51,25 +45,11 @@ public async Task EmitAsync(T item)
}
}

public CancellationTokenSource RunListenerAsync(
Func<T, Task> callback)
public IEnumerable<T> Listen(CancellationToken cancellationToken)
{
var cts = new CancellationTokenSource();
var started = new TaskCompletionSource<bool>();

_ = Task.Run(async () =>
{
started.SetResult(true);
await foreach (var value in ListenAsync(cts.Token))
{
await callback(value);
}

}, cts.Token);

started.Task.GetAwaiter().GetResult();

return cts;
var channel = Channel.CreateUnbounded<T>();
subscribers.TryAdd(channel, 0);
return ReadFromChannel(channel, cancellationToken);
}

public IAsyncEnumerable<T> ListenAsync(CancellationToken cancellationToken)
Expand All @@ -79,30 +59,28 @@ public IAsyncEnumerable<T> ListenAsync(CancellationToken cancellationToken)
return ReadFromChannelAsync(channel, cancellationToken);
}

public CancellationTokenSource RunListener(Action<T> callback)
private IEnumerable<T> ReadFromChannel(Channel<T> channel, CancellationToken cancellationToken)
{
var cts = new CancellationTokenSource();
var started = new TaskCompletionSource<bool>();

_ = Task.Run(() =>
try
{
started.SetResult(true);
foreach (var value in Listen(cts.Token))
while (!cancellationToken.IsCancellationRequested)
{
callback(value);
}
}, cts.Token);

started.Task.GetAwaiter().GetResult();

return cts;
}
if (!channel.Reader.WaitToReadAsync(cancellationToken).AsTask().Result)
{
// Channel was completed, exit the loop
break;
}

public IEnumerable<T> Listen(CancellationToken cancellationToken)
{
var channel = Channel.CreateUnbounded<T>();
subscribers.TryAdd(channel, 0);
return ReadFromChannel(channel, cancellationToken);
while (channel.Reader.TryRead(out var item))
{
yield return item;
}
}
}
finally
{
RemoveSubscriber(channel);
}
}

private async IAsyncEnumerable<T> ReadFromChannelAsync(
Expand Down Expand Up @@ -132,30 +110,6 @@ private async IAsyncEnumerable<T> ReadFromChannelAsync(
}
}

private IEnumerable<T> ReadFromChannel(Channel<T> channel, CancellationToken cancellationToken)
{
try
{
while (!cancellationToken.IsCancellationRequested)
{
if (!channel.Reader.WaitToReadAsync(cancellationToken).AsTask().Result)
{
// Channel was completed, exit the loop
break;
}

while (channel.Reader.TryRead(out var item))
{
yield return item;
}
}
}
finally
{
RemoveSubscriber(channel);
}
}

public void Close()
{
foreach (var subscriber in subscribers.Keys)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,20 +77,16 @@ public async Task SyncDownCreateOperationTest()
var cts = new CancellationTokenSource();
var id = Uuid();

await db.Watch("select * from lists where id = ?", [id], new WatchHandler<ListResult>
_ = Task.Run(async () =>
{
OnResult = (x) =>
await foreach (var x in db.Watch<ListResult>("select * from lists where id = ?", [id], new() { Signal = cts.Token }))
{
// Verify that the item was added locally
if (x.Length == 1)
{
watched.SetResult(true);
cts.Cancel();
}
}
}, new SQLWatchOptions
{
Signal = cts.Token
});

await nodeClient.CreateList(id, name: "Test List magic");
Expand All @@ -106,9 +102,9 @@ public async Task SyncDownDeleteOperationTest()

await nodeClient.CreateList(id, name: "Test List to delete");

await db.Watch("select * from lists where id = ?", [id], new WatchHandler<ListResult>
_ = Task.Run(async () =>
{
OnResult = (x) =>
await foreach (var x in db.Watch<ListResult>("select * from lists where id = ?", [id], new() { Signal = cts.Token }))
{
// Verify that the item was added locally
if (x.Length == 1)
Expand All @@ -117,9 +113,6 @@ public async Task SyncDownDeleteOperationTest()
cts.Cancel();
}
}
}, new SQLWatchOptions
{
Signal = cts.Token
});

await watched.Task;
Expand All @@ -128,9 +121,9 @@ public async Task SyncDownDeleteOperationTest()
watched = new TaskCompletionSource<bool>();
cts = new CancellationTokenSource();

await db.Watch("select * from lists where id = ?", [id], new WatchHandler<ListResult>
_ = Task.Run(async () =>
{
OnResult = (x) =>
await foreach (var x in db.Watch<ListResult>("select * from lists where id = ?", [id], new() { Signal = cts.Token }))
{
// Verify that the item was deleted locally
if (x.Length == 0)
Expand All @@ -139,9 +132,6 @@ public async Task SyncDownDeleteOperationTest()
cts.Cancel();
}
}
}, new SQLWatchOptions
{
Signal = cts.Token
});

await watched.Task;
Expand All @@ -155,9 +145,9 @@ public async Task SyncDownLargeCreateOperationTest()
var id = Uuid();
var listName = Uuid();

await db.Watch("select * from lists where name = ?", [listName], new WatchHandler<ListResult>
_ = Task.Run(async () =>
{
OnResult = (x) =>
await foreach (var x in db.Watch<ListResult>("select * from lists where id = ?", [id], new() { Signal = cts.Token }))
{
// Verify that the item was added locally
if (x.Length == 100)
Expand All @@ -166,9 +156,6 @@ public async Task SyncDownLargeCreateOperationTest()
cts.Cancel();
}
}
}, new SQLWatchOptions
{
Signal = cts.Token
});

for (int i = 0; i < 100; i++)
Expand All @@ -187,9 +174,9 @@ public async Task SyncDownCreateOperationAfterLargeUploadTest()
var id = Uuid();
var listName = Uuid();

await db.Watch("select * from lists where name = ?", [listName], new WatchHandler<ListResult>
_ = Task.Run(async () =>
{
OnResult = (x) =>
await foreach (var x in db.Watch<ListResult>("select * from lists where id = ?", [id], new() { Signal = cts.Token }))
{
// Verify that the items were added locally
if (x.Length == 100)
Expand All @@ -203,9 +190,6 @@ public async Task SyncDownCreateOperationAfterLargeUploadTest()
cts.Cancel();
}
}
}, new SQLWatchOptions
{
Signal = cts.Token
});

for (int i = 0; i < 100; i++)
Expand Down
Loading