Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .github/workflows/ci-cd.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,11 @@ on:
branches: [main, develop]
workflow_dispatch:

permissions:
contents: write
packages: write
discussions: write

env:
DOTNET_VERSION: "9.0.x"
DOTNET_SKIP_FIRST_TIME_EXPERIENCE: true
Expand Down
111 changes: 72 additions & 39 deletions R3.DynamicData.Tests/List/TransformAsyncTests.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
using R3;
using R3.DynamicData.List;

#pragma warning disable SA1503, SA1513, SA1515, SA1107, SA1502, SA1508, SA1516

namespace R3.DynamicData.Tests.List;

public class TransformAsyncTests
Expand All @@ -10,11 +12,11 @@ public async Task TransformAsync_BasicTransformation()
{
var source = new SourceList<int>();
var results = new List<string>();
var tcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);

using var sub = source.Connect()
.TransformAsync(async x =>
{
await Task.Delay(10);
return x.ToString();
})
.Subscribe(changeSet =>
Expand All @@ -24,6 +26,7 @@ public async Task TransformAsync_BasicTransformation()
if (change.Reason == ListChangeReason.Add)
{
results.Add(change.Item);
if (results.Count == 3) tcs.TrySetResult(true);
}
}
});
Expand All @@ -32,8 +35,7 @@ public async Task TransformAsync_BasicTransformation()
source.Add(2);
source.Add(3);

// Wait for async transformations
await Task.Delay(100);
await tcs.Task.WaitAsync(TimeSpan.FromSeconds(5));

Assert.Equal(3, results.Count);
Assert.Contains("1", results);
Expand All @@ -45,16 +47,33 @@ public async Task TransformAsync_BasicTransformation()
public async Task TransformAsync_WithCancellation()
{
var source = new SourceList<int>();
var transformStarted = new List<int>();
var transformCompleted = new List<int>();
var started = new List<int>();
var completed = new List<int>();
var results = new List<string>();
var startTcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
var neverCompleteTcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);

using var sub = source.Connect()
.TransformAsync(async (x, ct) =>
{
transformStarted.Add(x);
await Task.Delay(50, ct);
transformCompleted.Add(x);
started.Add(x);
if (started.Count == 1)
{
startTcs.TrySetResult(true);
}

try
{
// Wait indefinitely until cancellation; we never SetResult on purpose.
await neverCompleteTcs.Task.WaitAsync(ct);
}
catch (OperationCanceledException)
{
// Expected; do not mark completion.
throw;
}

completed.Add(x); // Should never execute for cancelled item.
return x.ToString();
})
.Subscribe(changeSet =>
Expand All @@ -69,29 +88,27 @@ public async Task TransformAsync_WithCancellation()
});

source.Add(1);
await Task.Delay(10); // Let transformation start
await startTcs.Task.WaitAsync(TimeSpan.FromSeconds(5)); // Ensure transformation started

// Remove before transformation completes
source.Remove(1);
source.Remove(1); // Cancel before completion

await Task.Delay(100); // Wait to see if cancelled transformation completes
await Task.Delay(100); // Allow cancellation to propagate

Assert.Single(transformStarted);
Assert.Empty(transformCompleted); // Should be cancelled
Assert.Empty(results); // Nothing should be emitted
Assert.Single(started);
Assert.Empty(completed);
Assert.Empty(results);
}

[Fact]
public async Task TransformAsync_PreservesOrder()
{
var source = new SourceList<int>();
var results = new List<string>();
var tcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);

using var sub = source.Connect()
.TransformAsync(async x =>
{
// Items with higher values take longer
await Task.Delay(x * 10);
return x.ToString();
})
.Subscribe(changeSet =>
Expand All @@ -101,15 +118,15 @@ public async Task TransformAsync_PreservesOrder()
if (change.Reason == ListChangeReason.Add)
{
results.Add(change.Item);
if (results.Count == 3) tcs.TrySetResult(true);
}
}
});

source.AddRange(new[] { 3, 2, 1 });

await Task.Delay(100);
await tcs.Task.WaitAsync(TimeSpan.FromSeconds(5));

// Despite different delays, results should arrive as they complete
Assert.Equal(3, results.Count);
}

Expand Down Expand Up @@ -162,17 +179,28 @@ public async Task TransformAsync_Clear()
{
var source = new SourceList<int>();
var results = new List<IChangeSet<string>>();
var addsDoneTcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
var addCount = 0;

using var sub = source.Connect()
.TransformAsync(async x =>
{
await Task.Delay(10);
return x.ToString();
})
.Subscribe(results.Add);
.Subscribe(cs =>
{
results.Add(cs);
foreach (var c in cs)
{
if (c.Reason == ListChangeReason.Add)
{
if (Interlocked.Increment(ref addCount) == 3) addsDoneTcs.TrySetResult(true);
Comment on lines +195 to +197
Copy link

Copilot AI Nov 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These 'if' statements can be combined.

Suggested change
if (c.Reason == ListChangeReason.Add)
{
if (Interlocked.Increment(ref addCount) == 3) addsDoneTcs.TrySetResult(true);
if (c.Reason == ListChangeReason.Add && Interlocked.Increment(ref addCount) == 3)
{
addsDoneTcs.TrySetResult(true);

Copilot uses AI. Check for mistakes.
}
}
Comment on lines +193 to +199
Copy link

Copilot AI Nov 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This foreach loop implicitly filters its target sequence - consider filtering the sequence explicitly using '.Where(...)'.

Copilot uses AI. Check for mistakes.
});

source.AddRange(new[] { 1, 2, 3 });
await Task.Delay(100);
await addsDoneTcs.Task.WaitAsync(TimeSpan.FromSeconds(5));

// Should have 3 Add operations
Assert.Equal(3, results.Count);
Expand All @@ -189,11 +217,12 @@ public async Task TransformAsync_RemoveRange()
{
var source = new SourceList<int>();
var currentState = new List<string>();
var addsDoneTcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
var addCount = 0;

using var sub = source.Connect()
.TransformAsync(async x =>
{
await Task.Delay(10);
return x.ToString();
})
.Subscribe(changes =>
Expand All @@ -203,6 +232,7 @@ public async Task TransformAsync_RemoveRange()
if (change.Reason == ListChangeReason.Add)
{
currentState.Add(change.Item);
if (Interlocked.Increment(ref addCount) == 5) addsDoneTcs.TrySetResult(true);
}
else if (change.Reason == ListChangeReason.Remove)
{
Expand All @@ -212,12 +242,11 @@ public async Task TransformAsync_RemoveRange()
});

source.AddRange(new[] { 1, 2, 3, 4, 5 });
await Task.Delay(150);
await addsDoneTcs.Task.WaitAsync(TimeSpan.FromSeconds(5));

Assert.Equal(5, currentState.Count);

source.RemoveRange(1, 3);
await Task.Delay(50);

Assert.Equal(2, currentState.Count);
}
Expand All @@ -227,11 +256,11 @@ public async Task TransformAsync_ConcurrentTransformations()
{
var source = new SourceList<int>();
var results = new List<string>();
var tcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);

using var sub = source.Connect()
.TransformAsync(async x =>
{
await Task.Delay(20);
return x.ToString();
})
.Subscribe(changeSet =>
Expand All @@ -241,6 +270,7 @@ public async Task TransformAsync_ConcurrentTransformations()
if (change.Reason == ListChangeReason.Add)
{
results.Add(change.Item);
if (results.Count == 10) tcs.TrySetResult(true);
}
}
});
Expand All @@ -251,7 +281,7 @@ public async Task TransformAsync_ConcurrentTransformations()
source.Add(i);
}

await Task.Delay(200);
await tcs.Task.WaitAsync(TimeSpan.FromSeconds(5));

Assert.Equal(10, results.Count);
}
Expand All @@ -263,6 +293,7 @@ public async Task TransformAsync_Remove_Before_Complete()
var results = new List<string>();
var tcs1 = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
var tcs2 = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
var resultsTcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
var calls = new Dictionary<int, TaskCompletionSource<bool>> { { 1, tcs1 }, { 2, tcs2 } };

using var sub = source.Connect()
Expand All @@ -282,20 +313,17 @@ public async Task TransformAsync_Remove_Before_Complete()
if (change.Reason == ListChangeReason.Add)
{
results.Add(change.Item);
resultsTcs.TrySetResult(true);
}
}
});

source.Add(1);
await Task.Delay(10);
source.Add(2);
await Task.Delay(10);

source.Remove(1);
await Task.Delay(10);
source.Remove(1); // Remove 1 before it completes

tcs2.SetResult(true);
await Task.Delay(50);
await resultsTcs.Task.WaitAsync(TimeSpan.FromSeconds(5));

Assert.Single(results);
Assert.Equal("2", results[0]);
Expand All @@ -306,6 +334,7 @@ public async Task TransformAsync_NoOverload_BasicTransformation()
{
var source = new SourceList<int>();
var results = new List<string>();
var tcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);

using var sub = source.Connect()
.TransformAsync(x => Task.FromResult(x.ToString()))
Expand All @@ -316,12 +345,13 @@ public async Task TransformAsync_NoOverload_BasicTransformation()
if (change.Reason == ListChangeReason.Add)
{
results.Add(change.Item);
tcs.TrySetResult(true);
}
}
});

source.Add(42);
await Task.Delay(50);
await tcs.Task.WaitAsync(TimeSpan.FromSeconds(5));

Assert.Single(results);
Assert.Equal("42", results[0]);
Expand All @@ -332,26 +362,29 @@ public async Task TransformAsync_DisposalCleansUp()
{
var source = new SourceList<int>();
var transformCount = 0;
var neverCompleteTcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
var startedTcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
var started = 0;

var sub = source.Connect()
.TransformAsync(async x =>
{
Interlocked.Increment(ref transformCount);
await Task.Delay(50);
var count = Interlocked.Increment(ref transformCount);
Copy link

Copilot AI Nov 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This assignment to count is useless, since its value is never read.

Suggested change
var count = Interlocked.Increment(ref transformCount);
Interlocked.Increment(ref transformCount);

Copilot uses AI. Check for mistakes.
if (Interlocked.Increment(ref started) == 2) startedTcs.TrySetResult(true);
await neverCompleteTcs.Task.WaitAsync(CancellationToken.None);
return x.ToString();
})
.Subscribe(_ => { });

source.Add(1);
source.Add(2);

await Task.Delay(20); // Let transformations start
await startedTcs.Task.WaitAsync(TimeSpan.FromSeconds(5));

// Dispose before transformations complete
sub.Dispose();

await Task.Delay(100);

// No need to wait - startedTcs guarantees both started, and disposal is synchronous
// Transformations should have been cancelled
Assert.Equal(2, transformCount); // Both started
}
Expand Down
Loading
Loading