From 54b46e2c68dc231ac35442cd86324e574b7fb513 Mon Sep 17 00:00:00 2001 From: st0o0 <64534642+st0o0@users.noreply.github.com> Date: Fri, 20 Feb 2026 13:02:54 +0100 Subject: [PATCH] Add AOT Benchmark config --- .../ClientInstantiationBenchmarks.cs | 64 ---- .../EndToEndBenchmarks.cs | 346 ++++++++++++++---- .../EventStreamBenchmarks.cs | 132 ------- .../MessagePipelineBenchmarks.cs | 1 - src/WebSocket.Rx.Benchmarks/Program.cs | 24 +- .../SendChannelBenchmarks.cs | 47 +-- .../ServerBroadcastBenchmarks.cs | 104 ------ src/WebSocket.Rx/Extensions.cs | 24 +- src/WebSocket.Rx/Internal/Extensions.cs | 2 +- src/WebSocket.Rx/ReactiveWebSocketClient.cs | 78 ++-- src/WebSocket.Rx/ReactiveWebSocketServer.cs | 20 +- src/WebSocket.Rx/WebSocket.Rx.csproj | 1 + 12 files changed, 388 insertions(+), 455 deletions(-) delete mode 100644 src/WebSocket.Rx.Benchmarks/ClientInstantiationBenchmarks.cs delete mode 100644 src/WebSocket.Rx.Benchmarks/EventStreamBenchmarks.cs delete mode 100644 src/WebSocket.Rx.Benchmarks/ServerBroadcastBenchmarks.cs diff --git a/src/WebSocket.Rx.Benchmarks/ClientInstantiationBenchmarks.cs b/src/WebSocket.Rx.Benchmarks/ClientInstantiationBenchmarks.cs deleted file mode 100644 index 14c83d6..0000000 --- a/src/WebSocket.Rx.Benchmarks/ClientInstantiationBenchmarks.cs +++ /dev/null @@ -1,64 +0,0 @@ -using System; -using BenchmarkDotNet.Attributes; -using Microsoft.IO; - -namespace WebSocket.Rx.Benchmarks; - -/// -/// Measures the overhead of creating ReactiveWebSocketClient instances. -/// -/// The constructor optionally accepts a RecyclableMemoryStreamManager. -/// We measure: -/// - Allocation cost without vs. with a shared MemoryStreamManager -/// - GC pressure from many short-lived instances -/// No network connection required. -/// -[ShortRunJob] -[MemoryDiagnoser] -[HideColumns("Job", "RatioSD", "Error")] -public class ClientInstantiationBenchmarks -{ - private static readonly Uri ServerUri = new("ws://localhost:9999"); - - // Shared manager – as it should be used in production code - private readonly RecyclableMemoryStreamManager _sharedManager = new(); - - // ----------------------------------------------------------------------- - // 1) Baseline: without a custom MemoryStreamManager (new() is created internally) - // ----------------------------------------------------------------------- - [Benchmark(Baseline = true, Description = "new ReactiveWebSocketClient(url)")] - public ReactiveWebSocketClient Instantiate_NoManager() - { - return new ReactiveWebSocketClient(ServerUri); - } - - // ----------------------------------------------------------------------- - // 2) With a shared RecyclableMemoryStreamManager - // ----------------------------------------------------------------------- - [Benchmark(Description = "new ReactiveWebSocketClient(url, sharedManager)")] - public ReactiveWebSocketClient Instantiate_SharedManager() - { - return new ReactiveWebSocketClient(ServerUri, _sharedManager); - } - - // ----------------------------------------------------------------------- - // 3) With a dedicated RecyclableMemoryStreamManager per instance - // ----------------------------------------------------------------------- - [Benchmark(Description = "new ReactiveWebSocketClient(url, new Manager())")] - public ReactiveWebSocketClient Instantiate_OwnManager() - { - return new ReactiveWebSocketClient(ServerUri, new RecyclableMemoryStreamManager()); - } - - // ----------------------------------------------------------------------- - // 4) GC pressure: 100 short-lived instances (disposed immediately) - // ----------------------------------------------------------------------- - [Benchmark(Description = "100x create + Dispose")] - public void Allocate_AndDispose_100() - { - for (var i = 0; i < 100; i++) - { - using var client = new ReactiveWebSocketClient(ServerUri, _sharedManager); - } - } -} \ No newline at end of file diff --git a/src/WebSocket.Rx.Benchmarks/EndToEndBenchmarks.cs b/src/WebSocket.Rx.Benchmarks/EndToEndBenchmarks.cs index 68b417e..5d1c9eb 100644 --- a/src/WebSocket.Rx.Benchmarks/EndToEndBenchmarks.cs +++ b/src/WebSocket.Rx.Benchmarks/EndToEndBenchmarks.cs @@ -1,4 +1,5 @@ using System; +using System.Buffers; using System.Net; using System.Net.WebSockets; using System.Threading; @@ -8,115 +9,210 @@ namespace WebSocket.Rx.Benchmarks; +// --------------------------------------------------------------------------- +// Shared infrastructure – server + client are started once per benchmark class +// --------------------------------------------------------------------------- + /// -/// End-to-end benchmarks using ReactiveWebSocketServer and -/// ReactiveWebSocketClient over loopback. -/// -/// Measures: -/// - Single round-trip latency (Send → Echo → MessageReceived) -/// - Throughput for N sequential messages -/// - Connection setup latency (ConnectAsync) -/// -/// The server uses ReactiveWebSocketServer with a built-in echo -/// mechanism via the Messages stream and SendAsTextAsync. -/// -/// NOTE: These tests take longer to run. Execute individually with: -/// dotnet run -c Release -- --filter *EndToEnd* +/// Base class that spins up a loopback echo server and a connected client. +/// Concrete benchmark classes inherit from this and add their own [Params]. /// -[ShortRunJob] -[MemoryDiagnoser] -[HideColumns("Job", "RatioSD", "Error")] -public class EndToEndBenchmarks +public abstract class EndToEndBase { - private ReactiveWebSocketServer _server = null!; - private ReactiveWebSocketClient _client = null!; - private string _prefix = null!; - private Uri _serverUri = null!; + protected ReactiveWebSocketServer Server = null!; + protected ReactiveWebSocketClient Client = null!; + protected Uri ServerUri = null!; [GlobalSetup] - public async Task SetupAsync() + public virtual async Task SetupAsync() { var port = FreeTcpPort(); - _prefix = $"http://localhost:{port}/ws/"; - _serverUri = new Uri($"ws://localhost:{port}/ws/"); + var prefix = $"http://localhost:{port}/ws/"; + ServerUri = new Uri($"ws://localhost:{port}/ws/"); - _server = new ReactiveWebSocketServer(_prefix); + Server = new ReactiveWebSocketServer(prefix); - _server.Messages.SubscribeAwait(async (msg, ct) => + // Echo handler: text → text, binary → binary + Server.Messages.SubscribeAwait(async (msg, ct) => { if (msg.Message.IsText) { - await _server.SendAsync(msg.Metadata.Id, msg.Message.Text, WebSocketMessageType.Text, ct); + await Server.SendAsync(msg.Metadata.Id, msg.Message.Text, WebSocketMessageType.Text, ct); + } + else if (msg.Message.IsBinary) + { + await Server.SendAsync(msg.Metadata.Id, msg.Message.Binary, WebSocketMessageType.Binary, ct); } }); - await _server.StartAsync(); + await Server.StartAsync(); - _client = new ReactiveWebSocketClient(_serverUri) - { - IsReconnectionEnabled = false - }; - var connectionHappenedTask = _client.ConnectionHappened.FirstAsync(); - await _client.StartAsync(); + Client = new ReactiveWebSocketClient(ServerUri) { IsReconnectionEnabled = false }; - await connectionHappenedTask; + var connected = Client.ConnectionHappened.FirstAsync(); + await Client.StartAsync(); + await connected; } [GlobalCleanup] - public async Task CleanupAsync() + public virtual async Task CleanupAsync() + { + await Client.StopAsync(WebSocketCloseStatus.NormalClosure, "Benchmark done"); + await Server.StopAsync(WebSocketCloseStatus.NormalClosure, "Benchmark done"); + await Client.DisposeAsync(); + await Server.DisposeAsync(); + } + + [IterationCleanup] + public void IterationCleanup() + { + GC.Collect(); + GC.WaitForPendingFinalizers(); + GC.Collect(); + } + + private static int FreeTcpPort() { - await _client.StopAsync(WebSocketCloseStatus.NormalClosure, "Benchmark done"); - await _server.StopAsync(WebSocketCloseStatus.NormalClosure, "Benchmark done"); - await _client.DisposeAsync(); - await _server.DisposeAsync(); + var listener = new System.Net.Sockets.TcpListener(IPAddress.Loopback, 0); + listener.Start(); + var port = ((IPEndPoint)listener.LocalEndpoint).Port; + listener.Stop(); + return port; } +} +// --------------------------------------------------------------------------- +// 1) Latency benchmarks – no [Params], pure connection / single-message cost +// --------------------------------------------------------------------------- + +/// +/// Measures single round-trip latency and connection setup/teardown overhead. +/// +/// Run with: +/// dotnet run -c Release -- --filter *Latency* +/// +[MemoryDiagnoser] +[GcServer(true)] +[HideColumns("Job", "RatioSD")] +public class LatencyBenchmarks : EndToEndBase +{ // ----------------------------------------------------------------------- - // 1) Single Round-Trip + // 1a) Single text round-trip // ----------------------------------------------------------------------- [Benchmark(Baseline = true, Description = "Single text round-trip")] public async Task SingleRoundTrip() { - var echoTask = _client.MessageReceived + var echoTask = Client.MessageReceived .Where(m => m is { IsText: true }) - .Where(m => m.Text.ToString() is "ping") + .Where(m => m.Text.Span.SequenceEqual("ping".AsSpan())) .FirstAsync(); - await _client.SendAsync("ping".AsMemory(), WebSocketMessageType.Text); + await Client.SendAsync("ping".AsMemory(), WebSocketMessageType.Text); return (await echoTask).Text.ToString(); } // ----------------------------------------------------------------------- - // 2) N sequentielle Round-Trips + // 1b) ConnectAsync + StopAsync overhead // ----------------------------------------------------------------------- - [Params(10, 50)] public int RoundTrips { get; set; } + [Benchmark(Description = "ConnectAsync + StopAsync (latency)")] + public async Task ConnectAndStop() + { + var client = new ReactiveWebSocketClient(ServerUri) { IsReconnectionEnabled = false }; + await client.StartAsync(); + await client.StopAsync(WebSocketCloseStatus.NormalClosure, "done"); + client.Dispose(); + } +} + +// --------------------------------------------------------------------------- +// 2) Throughput benchmarks – small messages, varying message counts +// --------------------------------------------------------------------------- + +/// +/// Measures sequential and concurrent throughput for small text messages. +/// +/// Run with: +/// dotnet run -c Release -- --filter *Throughput* +/// +[MemoryDiagnoser] +[GcServer(true)] +[HideColumns("Job", "RatioSD")] +public class ThroughputBenchmarks : EndToEndBase +{ + // RoundTrips is used by SequentialRoundTrips only. + // MessageCount is shared by EndToEndThroughput and TrySendThroughputWithConfirmation. + // Keeping them separate avoids a cartesian-product explosion. + + [Params(10, 100)] public int RoundTrips { get; set; } - [Benchmark(Description = "Sequential round-trips (N messages)")] + [Params(100, 500, 1000)] public int MessageCount { get; set; } + + // ----------------------------------------------------------------------- + // 2a) N sequential round-trips (send → wait for echo → repeat) + // ----------------------------------------------------------------------- + [Benchmark(Baseline = true, Description = "Sequential round-trips (N messages)")] public async Task SequentialRoundTrips() { var received = 0; using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30)); + var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); - using var sub = _client.MessageReceived + using var sub = Client.MessageReceived .Where(m => m.IsText) - .Subscribe(_ => Interlocked.Increment(ref received)); + .Subscribe(_ => + { + if (Interlocked.Increment(ref received) >= RoundTrips) + { + tcs.TrySetResult(); + } + }); for (var i = 0; i < RoundTrips; i++) { - await _client.SendAsync($"msg-{i}".AsMemory(), WebSocketMessageType.Text, cts.Token); + await Client.SendAsync($"msg-{i}".AsMemory(), WebSocketMessageType.Text, cts.Token); } - while (received < RoundTrips && !cts.IsCancellationRequested) + await tcs.Task.WaitAsync(cts.Token).ConfigureAwait(false); + return received; + } + + // NOTE: Since SendAsync enqueues into a single Channel, concurrent sends + // do not parallelize actual I/O – they only pipeline the enqueue step. + // For large payloads the SendLoop becomes the bottleneck and concurrent + // offers no advantage over sequential. + // ----------------------------------------------------------------------- + // 2b) End-to-end throughput (all sends fired concurrently via WhenAll) + // ----------------------------------------------------------------------- + [Benchmark(Description = "End-to-end throughput (N concurrent sends)")] + public async Task EndToEndThroughput() + { + var received = 0; + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30)); + var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + + using var sub = Client.MessageReceived + .Where(m => m.IsText) + .Subscribe(_ => + { + if (Interlocked.Increment(ref received) >= MessageCount) + { + tcs.TrySetResult(); + } + }); + + var sends = new Task[MessageCount]; + for (var i = 0; i < MessageCount; i++) { - await Task.Delay(1, cts.Token).ConfigureAwait(false); + sends[i] = Client.SendInstantAsync($"msg-{i}".AsMemory(), WebSocketMessageType.Text, cts.Token); } + await Task.WhenAll(sends).ConfigureAwait(false); + await tcs.Task.WaitAsync(cts.Token).ConfigureAwait(false); return received; } // ----------------------------------------------------------------------- - // 3) TrySendAsText throughput (fire-and-forget, no echo wait) - // Measures pure Channel.TryWrite speed including the SendLoop + // 2c) TrySend fire-and-forget (Channel.TryWrite, no echo wait) // ----------------------------------------------------------------------- [Benchmark(Description = "TrySendAsText throughput (no echo wait)")] public int TrySendAsText_Throughput() @@ -124,7 +220,7 @@ public int TrySendAsText_Throughput() var sent = 0; for (var i = 0; i < 1_000; i++) { - if (_client.TrySend($"msg-{i}".AsMemory(), WebSocketMessageType.Text)) + if (Client.TrySend($"msg-{i}".AsMemory(), WebSocketMessageType.Text)) { sent++; } @@ -134,27 +230,135 @@ public int TrySendAsText_Throughput() } // ----------------------------------------------------------------------- - // 4) ConnectAsync + StopAsync overhead (connection setup and teardown) - // Creates a fresh client each time without waiting for an echo + // 2d) TrySend with echo confirmation (Channel.TryWrite + backpressure) // ----------------------------------------------------------------------- - [Benchmark(Description = "ConnectAsync + StopAsync (latency)")] - public async Task ConnectAndStop() + [Benchmark(Description = "TrySend throughput (fire + confirm all echoed)")] + public async Task TrySendThroughputWithConfirmation() { - var client = new ReactiveWebSocketClient(_serverUri) + var received = 0; + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30)); + var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + + using var sub = Client.MessageReceived + .Where(m => m.IsText) + .Subscribe(_ => + { + if (Interlocked.Increment(ref received) >= MessageCount) + { + tcs.TrySetResult(); + } + }); + + for (var i = 0; i < MessageCount; i++) { - IsReconnectionEnabled = false - }; - await client.StartAsync(); - await client.StopAsync(WebSocketCloseStatus.NormalClosure, "done"); - client.Dispose(); + while (!Client.TrySend($"msg-{i}".AsMemory(), WebSocketMessageType.Text)) + { + await Task.Yield(); + } + } + + await tcs.Task.WaitAsync(cts.Token).ConfigureAwait(false); + return received; } +} - private static int FreeTcpPort() +// --------------------------------------------------------------------------- +// 3) Large-message benchmarks – binary payloads, varying size + count +// --------------------------------------------------------------------------- + +/// +/// Measures throughput and latency for large binary payloads. +/// +/// Payload is allocated once per iteration in [IterationSetup] so allocation +/// cost is not included in the benchmark measurement. +/// +/// Run with: +/// dotnet run -c Release -- --filter *LargeMessage* +/// +[MemoryDiagnoser] +[GcServer(true)] +[HideColumns("Job", "RatioSD")] +public class LargeMessageBenchmarks : EndToEndBase +{ + [Params(1024, 16 * 1024, 256 * 1024)] public int PayloadBytes { get; set; } + + [Params(10, 100)] public int MessageCount { get; set; } + + private byte[] _payload = null!; + + // Allocate outside the timed region so payload size doesn't skew allocations + [IterationSetup] + public void IterationSetup() { - var l = new System.Net.Sockets.TcpListener(IPAddress.Loopback, 0); - l.Start(); - var port = ((IPEndPoint)l.LocalEndpoint).Port; - l.Stop(); - return port; + _payload = ArrayPool.Shared.Rent(PayloadBytes); + Random.Shared.NextBytes(_payload.AsSpan(0, PayloadBytes)); + } + + [IterationCleanup] + public new void IterationCleanup() + { + ArrayPool.Shared.Return(_payload); + _payload = null!; + base.IterationCleanup(); + } + + // ----------------------------------------------------------------------- + // 3a) Sequential large-message round-trips + // ----------------------------------------------------------------------- + [Benchmark(Baseline = true, Description = "Large message round-trip (sequential)")] + public async Task LargeMessageRoundTripSequential() + { + var received = 0; + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(60)); + var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + + using var sub = Client.MessageReceived + .Where(m => m.IsBinary) + .Subscribe(_ => + { + if (Interlocked.Increment(ref received) >= MessageCount) + { + tcs.TrySetResult(); + } + }); + + for (var i = 0; i < MessageCount; i++) + { + await Client.SendAsync(_payload.AsMemory(0, PayloadBytes), WebSocketMessageType.Binary, cts.Token); + } + + await tcs.Task.WaitAsync(cts.Token).ConfigureAwait(false); + return received; + } + + // ----------------------------------------------------------------------- + // 3b) Concurrent large-message sends (WhenAll) + // ----------------------------------------------------------------------- + [Benchmark(Description = "Large message round-trip (concurrent)")] + public async Task LargeMessageRoundTripConcurrent() + { + var received = 0; + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(60)); + var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + + using var sub = Client.MessageReceived + .Where(m => m.IsBinary) + .Subscribe(_ => + { + if (Interlocked.Increment(ref received) >= MessageCount) + { + tcs.TrySetResult(); + } + }); + + var sends = new Task[MessageCount]; + for (var i = 0; i < MessageCount; i++) + { + sends[i] = Client.SendAsync(_payload.AsMemory(0, PayloadBytes), WebSocketMessageType.Binary, cts.Token); + } + + await Task.WhenAll(sends).ConfigureAwait(false); + await tcs.Task.WaitAsync(cts.Token).ConfigureAwait(false); + return received; } } \ No newline at end of file diff --git a/src/WebSocket.Rx.Benchmarks/EventStreamBenchmarks.cs b/src/WebSocket.Rx.Benchmarks/EventStreamBenchmarks.cs deleted file mode 100644 index e6ad8d0..0000000 --- a/src/WebSocket.Rx.Benchmarks/EventStreamBenchmarks.cs +++ /dev/null @@ -1,132 +0,0 @@ -using System; -using BenchmarkDotNet.Attributes; -using R3; - -namespace WebSocket.Rx.Benchmarks; - -/// -/// Measures the overhead of the ConnectionHappened, -/// DisconnectionHappened and ErrorOccurred event streams. -/// -/// These events are emitted via R3 Subject<T>. -/// We use StreamFakeMessage as a reference and call the internal -/// subjects directly via reflection – exercising exactly the same code -/// paths as the production code does. -/// -[ShortRunJob] -[MemoryDiagnoser] -[HideColumns("Job", "RatioSD", "Error")] -public class EventStreamBenchmarks -{ - [Params(10, 100)] - public int EventCount { get; set; } - - private ReactiveWebSocketClient _client = null!; - - private Subject _connectionSource = null!; - private Subject _disconnectionSource = null!; - private Subject _errorSource = null!; - - [GlobalSetup] - public void Setup() - { - _client = new ReactiveWebSocketClient(new Uri("ws://localhost:9999")); - - var t = typeof(ReactiveWebSocketClient); - _connectionSource = (Subject)t.GetField("ConnectionHappenedSource", - System.Reflection.BindingFlags.NonPublic | System.Reflection.BindingFlags.Instance)!.GetValue(_client)!; - _disconnectionSource = (Subject)t.GetField("DisconnectionHappenedSource", - System.Reflection.BindingFlags.NonPublic | System.Reflection.BindingFlags.Instance)!.GetValue(_client)!; - _errorSource = (Subject)t.GetField("ErrorOccurredSource", - System.Reflection.BindingFlags.NonPublic | System.Reflection.BindingFlags.Instance)!.GetValue(_client)!; - } - - [GlobalCleanup] - public void Cleanup() => _client.Dispose(); - - // ----------------------------------------------------------------------- - // 1) Baseline: ConnectionHappened without subscriber - // ----------------------------------------------------------------------- - [Benchmark(Baseline = true, Description = "ConnectionHappened (no subscriber)")] - public void Emit_Connection_NoSubscriber() - { - for (var i = 0; i < EventCount; i++) - { - _connectionSource.OnNext(new Connected(ConnectReason.Initialized)); - } - } - - // ----------------------------------------------------------------------- - // 2) ConnectionHappened with one subscriber - // ----------------------------------------------------------------------- - [Benchmark(Description = "ConnectionHappened → 1 subscriber")] - public int Emit_Connection_OneSubscriber() - { - var count = 0; - using var sub = _client.ConnectionHappened.Subscribe(_ => count++); - for (var i = 0; i < EventCount; i++) - { - _connectionSource.OnNext(new Connected(ConnectReason.Initialized)); - } - return count; - } - - // ----------------------------------------------------------------------- - // 3) DisconnectionHappened with one subscriber + Where filter - // ----------------------------------------------------------------------- - [Benchmark(Description = "DisconnectionHappened → Where(ServerInitiated)")] - public int Emit_Disconnection_Filtered() - { - var count = 0; - using var sub = _client.DisconnectionHappened - .Where(d => d.Reason == DisconnectReason.ServerInitiated) - .Subscribe(_ => count++); - - for (var i = 0; i < EventCount; i++) - { - _disconnectionSource.OnNext(new Disconnected( - i % 2 == 0 ? DisconnectReason.ServerInitiated : DisconnectReason.ClientInitiated)); - } - return count; - } - - // ----------------------------------------------------------------------- - // 4) ErrorOccurred with Select (extract error source) - // ----------------------------------------------------------------------- - [Benchmark(Description = "ErrorOccurred → Select(Source)")] - public int Emit_Error_Select() - { - var count = 0; - using var sub = _client.ErrorOccurred - .Select(e => e.Source) - .Subscribe(_ => count++); - - for (var i = 0; i < EventCount; i++) - { - _errorSource.OnNext(new ErrorOccurred(ErrorSource.ReceiveLoop, - new InvalidOperationException("test"))); - } - return count; - } - - // ----------------------------------------------------------------------- - // 5) All three streams simultaneously with one subscriber each - // (mirrors typical production code) - // ----------------------------------------------------------------------- - [Benchmark(Description = "All 3 streams → 1 subscriber each")] - public int Emit_AllStreams() - { - var count = 0; - using var s1 = _client.ConnectionHappened.Subscribe(_ => count++); - using var s2 = _client.DisconnectionHappened.Subscribe(_ => count++); - using var s3 = _client.ErrorOccurred.Subscribe(_ => count++); - - for (var i = 0; i < EventCount; i++) - { - _connectionSource.OnNext(new Connected(ConnectReason.Reconnected)); - _disconnectionSource.OnNext(new Disconnected(DisconnectReason.Dropped)); - } - - return count; - } -} \ No newline at end of file diff --git a/src/WebSocket.Rx.Benchmarks/MessagePipelineBenchmarks.cs b/src/WebSocket.Rx.Benchmarks/MessagePipelineBenchmarks.cs index 5b1c9be..69c040e 100644 --- a/src/WebSocket.Rx.Benchmarks/MessagePipelineBenchmarks.cs +++ b/src/WebSocket.Rx.Benchmarks/MessagePipelineBenchmarks.cs @@ -15,7 +15,6 @@ namespace WebSocket.Rx.Benchmarks; /// - ReceivedMessage allocation (Text vs. Binary) /// - Rx operator overhead per subscriber (Where, Select, …) /// -[ShortRunJob] [MemoryDiagnoser] [HideColumns("Job", "RatioSD", "Error")] public class MessagePipelineBenchmarks diff --git a/src/WebSocket.Rx.Benchmarks/Program.cs b/src/WebSocket.Rx.Benchmarks/Program.cs index 4893ce3..6b7e70f 100644 --- a/src/WebSocket.Rx.Benchmarks/Program.cs +++ b/src/WebSocket.Rx.Benchmarks/Program.cs @@ -1,13 +1,25 @@ -using BenchmarkDotNet.Configs; +using BenchmarkDotNet.Attributes; +using BenchmarkDotNet.Configs; +using BenchmarkDotNet.Environments; using BenchmarkDotNet.Jobs; +using BenchmarkDotNet.Reports; using BenchmarkDotNet.Running; +using BenchmarkDotNet.Toolchains.NativeAot; var config = DefaultConfig.Instance - .AddJob(Job.ShortRun - .WithWarmupCount(1) - .WithIterationCount(3) - .WithInvocationCount(1) - .WithUnrollFactor(1)); + //.AddFilter(new SimpleFilter(bc => !bc.Descriptor.Categories.Contains("E2E"))) + .AddJob(Job.Default + .WithToolchain( + NativeAotToolchain.CreateBuilder() + .UseNuGet() + .IlcOptimizationPreference() + .DisplayName("AOT-Speed") + .ToToolchain()) + .WithRuntime(NativeAotRuntime.Net10_0) + .WithId("Speed")) + .AddJob(Job.Default + .WithRuntime(CoreRuntime.Core10_0) + .WithId("CoreCLR")); BenchmarkSwitcher .FromAssembly(typeof(Program).Assembly) diff --git a/src/WebSocket.Rx.Benchmarks/SendChannelBenchmarks.cs b/src/WebSocket.Rx.Benchmarks/SendChannelBenchmarks.cs index 6f5709a..23510af 100644 --- a/src/WebSocket.Rx.Benchmarks/SendChannelBenchmarks.cs +++ b/src/WebSocket.Rx.Benchmarks/SendChannelBenchmarks.cs @@ -1,5 +1,6 @@ using System; using System.Net.WebSockets; +using System.Text; using System.Threading.Tasks; using BenchmarkDotNet.Attributes; @@ -20,7 +21,6 @@ namespace WebSocket.Rx.Benchmarks; /// NOTE: The Channel is UnboundedChannel (SingleReader) – /// without a consumer it fills up, but writes never block. /// -[ShortRunJob] [MemoryDiagnoser] [HideColumns("Job", "RatioSD", "Error")] public class SendChannelBenchmarks @@ -61,24 +61,25 @@ public void Cleanup() } // ----------------------------------------------------------------------- - // 1) Baseline: TrySendAsBinary(string) – synchronous Channel.TryWrite + // 1) Baseline: TrySend(string) – synchronous Channel.TryWrite // ----------------------------------------------------------------------- - [Benchmark(Baseline = true, Description = "TrySendAsBinary(string)")] + [Benchmark(Baseline = true, Description = "TrySend(string, WebSocketMessageType.Binary)")] public bool TrySendAsBinary_String() { var result = false; + var payload = _textPayload.AsMemory(); for (var i = 0; i < MessageCount; i++) { - result = _client.TrySend(_textPayload.AsMemory(), WebSocketMessageType.Binary); + result = _client.TrySend(payload, WebSocketMessageType.Binary); } return result; } // ----------------------------------------------------------------------- - // 2) TrySendAsBinary(byte[]) + // 2) TrySend(byte[]) // ----------------------------------------------------------------------- - [Benchmark(Description = "TrySendAsBinary(byte[])")] + [Benchmark(Description = "TrySend(byte[], WebSocketMessageType.Binary)")] public bool TrySendAsBinary_Bytes() { var result = false; @@ -91,24 +92,25 @@ public bool TrySendAsBinary_Bytes() } // ----------------------------------------------------------------------- - // 3) TrySendAsText(string) + // 3) TrySend(string) // ----------------------------------------------------------------------- - [Benchmark(Description = "TrySendAsText(string)")] + [Benchmark(Description = "TrySend(string, WebSocketMessageType.Text)")] public bool TrySendAsText_String() { var result = false; + var payload = _textPayload.AsMemory(); for (var i = 0; i < MessageCount; i++) { - result = _client.TrySend(_textPayload.AsMemory(), WebSocketMessageType.Text); + result = _client.TrySend(payload, WebSocketMessageType.Text); } return result; } // ----------------------------------------------------------------------- - // 4) TrySendAsText(byte[]) + // 4) TrySend(byte[]) // ----------------------------------------------------------------------- - [Benchmark(Description = "TrySendAsText(byte[])")] + [Benchmark(Description = "TrySendAsText(byte[], WebSocketMessageType.Text)")] public bool TrySendAsText_Bytes() { var result = false; @@ -121,46 +123,49 @@ public bool TrySendAsText_Bytes() } // ----------------------------------------------------------------------- - // 5) SendAsBinaryAsync(string) – Channel.WriteAsync (ValueTask) + // 5) SendAsync(string) – Channel.WriteAsync (ValueTask) // ----------------------------------------------------------------------- - [Benchmark(Description = "SendAsBinaryAsync(string)")] + [Benchmark(Description = "SendAsync(string, WebSocketMessageType.Binary)")] public async Task SendAsBinaryAsync_String() { var result = false; + var payload = _textPayload.AsMemory(); for (var i = 0; i < MessageCount; i++) { - result = await _client.SendAsync(_textPayload.AsMemory(), WebSocketMessageType.Binary); + result = await _client.SendAsync(payload, WebSocketMessageType.Binary); } return result; } // ----------------------------------------------------------------------- - // 6) SendAsTextAsync(string) – Channel.WriteAsync (ValueTask) + // 6) SendAsync(string) – Channel.WriteAsync (ValueTask) // ----------------------------------------------------------------------- - [Benchmark(Description = "SendAsTextAsync(string)")] + [Benchmark(Description = "SendAsync(string, WebSocketMessageType.Text)")] public async Task SendAsTextAsync_String() { var result = false; + var payload = _textPayload.AsMemory(); for (var i = 0; i < MessageCount; i++) { - result = await _client.SendAsync(_textPayload.AsMemory(), WebSocketMessageType.Text); + result = await _client.SendAsync(payload, WebSocketMessageType.Text); } return result; } // ----------------------------------------------------------------------- - // 7) String → byte[] encoding overhead (Encoding.UTF8.GetBytes) + // 7) String → ReadOnlyMemory encoding overhead (Encoding.UTF8.GetBytes) // – isolates the encoding cost hidden inside TrySend/SendAsync // ----------------------------------------------------------------------- [Benchmark(Description = "Encoding.UTF8.GetBytes (overhead only)")] - public byte[] EncodingOverhead() + public ReadOnlyMemory EncodingOverhead() { - byte[] result = []; + var result = new ReadOnlyMemory([]); + var payload = _textPayload.AsMemory(); for (var i = 0; i < MessageCount; i++) { - result = System.Text.Encoding.UTF8.GetBytes(_textPayload); + result = payload.ToPayload(Encoding.UTF8, WebSocketMessageType.Text).Data; } return result; diff --git a/src/WebSocket.Rx.Benchmarks/ServerBroadcastBenchmarks.cs b/src/WebSocket.Rx.Benchmarks/ServerBroadcastBenchmarks.cs deleted file mode 100644 index 0d110c9..0000000 --- a/src/WebSocket.Rx.Benchmarks/ServerBroadcastBenchmarks.cs +++ /dev/null @@ -1,104 +0,0 @@ -using System; -using System.Net; -using System.Net.WebSockets; -using System.Threading.Tasks; -using BenchmarkDotNet.Attributes; - -namespace WebSocket.Rx.Benchmarks; - -/// -/// Measures the overhead of the server broadcast methods on -/// ReactiveWebSocketServer without real network connections. -/// -/// Since the broadcast methods internally iterate the client list and -/// use Task.WhenAll + LINQ, the overhead can already be meaningfully -/// measured by calling them directly on a started server with no connected clients. -/// -/// For a full test with N connected clients see EndToEndBenchmarks. -/// -[ShortRunJob] -[MemoryDiagnoser] -[HideColumns("Job", "RatioSD", "Error")] -public class ServerBroadcastBenchmarks -{ - [Params(64, 1_024, 4_096)] public int PayloadSize { get; set; } - - private ReactiveWebSocketServer _server = null!; - private string _textPayload = null!; - private byte[] _binaryPayload = null!; - - [GlobalSetup] - public async Task SetupAsync() - { - var port = FreeTcpPort(); - _server = new ReactiveWebSocketServer($"http://localhost:{port}/ws/"); - await _server.StartAsync(); - - _textPayload = new string('x', PayloadSize); - _binaryPayload = new byte[PayloadSize]; - } - - [GlobalCleanup] - public async Task CleanupAsync() - { - await _server.StopAsync(WebSocketCloseStatus.NormalClosure, "done"); - await _server.DisposeAsync(); - } - - // ----------------------------------------------------------------------- - // 1) Baseline: BroadcastAsTextAsync – 0 clients (method overhead only) - // ----------------------------------------------------------------------- - [Benchmark(Baseline = true, Description = "BroadcastAsTextAsync (0 clients)")] - public async Task BroadcastAsTextAsync_Empty() - { - return await _server.BroadcastAsync(_textPayload.AsMemory(), WebSocketMessageType.Text); - } - - // ----------------------------------------------------------------------- - // 2) BroadcastAsBinaryAsync with byte[] - // ----------------------------------------------------------------------- - [Benchmark(Description = "BroadcastAsBinaryAsync(byte[]) (0 clients)")] - public async Task BroadcastAsBinaryAsync_Bytes_Empty() - { - return await _server.BroadcastAsync(_binaryPayload, WebSocketMessageType.Binary); - } - - // ----------------------------------------------------------------------- - // 3) TryBroadcastAsText – synchronous, no await - // ----------------------------------------------------------------------- - [Benchmark(Description = "TryBroadcastAsText(string) (0 clients)")] - public bool TryBroadcastAsText_Empty() - { - return _server.TryBroadcast(_textPayload.AsMemory(), WebSocketMessageType.Binary); - } - - // ----------------------------------------------------------------------- - // 4) TryBroadcastAsBinary with byte[] - // ----------------------------------------------------------------------- - [Benchmark(Description = "TryBroadcastAsBinary(byte[]) (0 clients)")] - public bool TryBroadcastAsBinary_Empty() - { - return _server.TryBroadcast(_binaryPayload, WebSocketMessageType.Binary); - } - - // ----------------------------------------------------------------------- - // 5) ClientCount query + ConnectedClients dictionary snapshot - // (ConcurrentDictionary.ToDictionary() – called on every broadcast) - // ----------------------------------------------------------------------- - [Benchmark(Description = "ClientCount + ConnectedClients snapshot")] - public int ClientSnapshot() - { - var count = _server.ClientCount; - var snapshot = _server.ConnectedClients; - return count + snapshot.Count; - } - - private static int FreeTcpPort() - { - var l = new System.Net.Sockets.TcpListener(IPAddress.Loopback, 0); - l.Start(); - var port = ((IPEndPoint)l.LocalEndpoint).Port; - l.Stop(); - return port; - } -} \ No newline at end of file diff --git a/src/WebSocket.Rx/Extensions.cs b/src/WebSocket.Rx/Extensions.cs index c8e7ae6..cf97004 100644 --- a/src/WebSocket.Rx/Extensions.cs +++ b/src/WebSocket.Rx/Extensions.cs @@ -16,8 +16,8 @@ public Observable SendInstant(Observable messages) { return send.Type switch { - _ when send.IsText => await client.SendAsync(send.Text, send.Type, ct), - _ when send.IsBinary => await client.SendAsync(send.Binary, send.Type, ct), + _ when send.IsText => await client.SendAsync(send.Text, send.Type, ct).ConfigureAwait(false), + _ when send.IsBinary => await client.SendAsync(send.Binary, send.Type, ct).ConfigureAwait(false), _ => false }; }, maxConcurrent: 1); @@ -29,8 +29,8 @@ public Observable Send(Observable messages) { return send.Type switch { - _ when send.IsText => await client.SendAsync(send.Text, send.Type, ct), - _ when send.IsBinary => await client.SendAsync(send.Binary, send.Type, ct), + _ when send.IsText => await client.SendAsync(send.Text, send.Type, ct).ConfigureAwait(false), + _ when send.IsBinary => await client.SendAsync(send.Binary, send.Type, ct).ConfigureAwait(false), _ => false }; }, maxConcurrent: 1); @@ -59,8 +59,8 @@ public Observable SendInstant(Observable messages) var msg = send.Message; return msg.Type switch { - _ when msg.IsText => await server.SendAsync(send.Metadata.Id, msg.Text, msg.Type, ct), - _ when msg.IsBinary => await server.SendAsync(send.Metadata.Id, msg.Binary, msg.Type, ct), + _ when msg.IsText => await server.SendAsync(send.Metadata.Id, msg.Text, msg.Type, ct).ConfigureAwait(false), + _ when msg.IsBinary => await server.SendAsync(send.Metadata.Id, msg.Binary, msg.Type, ct).ConfigureAwait(false), _ => false }; }, maxConcurrent: 1); @@ -73,8 +73,8 @@ public Observable Send(Observable messages) var msg = send.Message; return msg.Type switch { - _ when msg.IsText => await server.SendAsync(send.Metadata.Id, msg.Text, msg.Type, ct), - _ when msg.IsBinary => await server.SendAsync(send.Metadata.Id, msg.Binary, msg.Type, ct), + _ when msg.IsText => await server.SendAsync(send.Metadata.Id, msg.Text, msg.Type, ct).ConfigureAwait(false), + _ when msg.IsBinary => await server.SendAsync(send.Metadata.Id, msg.Binary, msg.Type, ct).ConfigureAwait(false), _ => false }; }, maxConcurrent: 1); @@ -101,8 +101,8 @@ public Observable BroadcastInstant(Observable messages) var msg = send.Message; return msg.Type switch { - _ when msg.IsText => await server.BroadcastInstantAsync(msg.Text, msg.Type, ct), - _ when msg.IsBinary => await server.BroadcastInstantAsync(msg.Binary, msg.Type, ct), + _ when msg.IsText => await server.BroadcastInstantAsync(msg.Text, msg.Type, ct).ConfigureAwait(false), + _ when msg.IsBinary => await server.BroadcastInstantAsync(msg.Binary, msg.Type, ct).ConfigureAwait(false), _ => false }; }); @@ -115,8 +115,8 @@ public Observable BroadcastAsync(Observable messages) var msg = send.Message; return msg.Type switch { - _ when msg.IsText => await server.BroadcastAsync(msg.Text, msg.Type, ct), - _ when msg.IsBinary => await server.BroadcastAsync(msg.Binary, msg.Type, ct), + _ when msg.IsText => await server.BroadcastAsync(msg.Text, msg.Type, ct).ConfigureAwait(false), + _ when msg.IsBinary => await server.BroadcastAsync(msg.Binary, msg.Type, ct).ConfigureAwait(false), _ => false }; }); diff --git a/src/WebSocket.Rx/Internal/Extensions.cs b/src/WebSocket.Rx/Internal/Extensions.cs index 0b4ef7b..323c4d2 100644 --- a/src/WebSocket.Rx/Internal/Extensions.cs +++ b/src/WebSocket.Rx/Internal/Extensions.cs @@ -10,7 +10,7 @@ public async Task Try(Func func) { try { - await func.Invoke(value); + await func.Invoke(value).ConfigureAwait(false); } catch (Exception) { diff --git a/src/WebSocket.Rx/ReactiveWebSocketClient.cs b/src/WebSocket.Rx/ReactiveWebSocketClient.cs index 40339f0..1702617 100644 --- a/src/WebSocket.Rx/ReactiveWebSocketClient.cs +++ b/src/WebSocket.Rx/ReactiveWebSocketClient.cs @@ -10,8 +10,9 @@ namespace WebSocket.Rx; public class ReactiveWebSocketClient : IReactiveWebSocketClient { - protected int DisposedValue; + private readonly SemaphoreSlim _sendLock = new(1, 1); private readonly SemaphoreSlim _disposeLock = new(1, 1); + protected int DisposedValue; protected readonly RecyclableMemoryStreamManager MemoryStreamManager; protected CancellationTokenSource? MainCts; @@ -64,7 +65,7 @@ public async Task StartAsync(CancellationToken cancellationToken = default) { try { - await StartOrFailAsync(cancellationToken); + await StartOrFailAsync(cancellationToken).ConfigureAwait(false); } catch (Exception ex) { @@ -76,14 +77,14 @@ public async Task StartOrFailAsync(CancellationToken cancellationToken = default { ThrowIfDisposed(); - using (await ConnectionLock.LockAsync(cancellationToken)) + using (await ConnectionLock.LockAsync(cancellationToken).ConfigureAwait(false)) { if (IsStarted) { return; } - await ConnectInternalAsync(ConnectReason.Initialized, true, cancellationToken); + await ConnectInternalAsync(ConnectReason.Initialized, true, cancellationToken).ConfigureAwait(false); } } @@ -92,7 +93,7 @@ public async Task StopAsync(WebSocketCloseStatus status, string statusDesc { try { - return await StopOrFailAsync(status, statusDescription, cancellationToken); + return await StopOrFailAsync(status, statusDescription, cancellationToken).ConfigureAwait(false); } catch (Exception ex) { @@ -121,7 +122,7 @@ public async Task StopOrFailAsync(WebSocketCloseStatus status, string stat { try { - await NativeClient.CloseAsync(status, statusDescription, cancellationToken); + await NativeClient.CloseAsync(status, statusDescription, cancellationToken).ConfigureAwait(false); } catch (Exception ex) { @@ -132,7 +133,7 @@ public async Task StopOrFailAsync(WebSocketCloseStatus status, string stat DisconnectionHappenedSource.OnNext(new Disconnected(DisconnectReason.ClientInitiated)); - await CleanupAsync(); + await CleanupAsync().ConfigureAwait(false); return true; } @@ -152,7 +153,7 @@ protected async Task CleanupAsync() }; - await Task.WhenAll(tasks).Try(async x => await x.WaitAsync(TimeSpan.FromSeconds(10))); + await Task.WhenAll(tasks).Try(async x => await x.WaitAsync(TimeSpan.FromSeconds(10)).ConfigureAwait(false)).ConfigureAwait(false); if (!IsDisposed) { @@ -173,9 +174,9 @@ public async Task ReconnectAsync(CancellationToken cancellationToken = default) try { - using (await ConnectionLock.LockAsync(cancellationToken)) + using (await ConnectionLock.LockAsync(cancellationToken).ConfigureAwait(false)) { - await ReconnectInternalAsync(false, cancellationToken); + await ReconnectInternalAsync(false, cancellationToken).ConfigureAwait(false); } } catch (Exception ex) @@ -191,9 +192,9 @@ public async Task ReconnectOrFailAsync(CancellationToken cancellationToken = def throw new InvalidOperationException("Client not started"); } - using (await ConnectionLock.LockAsync()) + using (await ConnectionLock.LockAsync().ConfigureAwait(false)) { - await ReconnectInternalAsync(true, cancellationToken); + await ReconnectInternalAsync(true, cancellationToken).ConfigureAwait(false); } } @@ -222,9 +223,9 @@ private async Task ReconnectInternalAsync(bool throwOnError, CancellationToken c await Task.WhenAll( SendLoopTask ?? Task.CompletedTask, ReceiveLoopTask ?? Task.CompletedTask - ); + ).ConfigureAwait(false); - await ConnectInternalAsync(ConnectReason.Reconnected, throwOnError, cancellationToken); + await ConnectInternalAsync(ConnectReason.Reconnected, throwOnError, cancellationToken).ConfigureAwait(false); } catch (Exception ex) { @@ -257,7 +258,7 @@ private async Task ConnectInternalAsync(ConnectReason reason, bool throwOnError, using var linkedCts = CancellationTokenSource .CreateLinkedTokenSource(MainCts.Token, timeoutCts.Token, cancellationToken); - await NativeClient.ConnectAsync(Url, linkedCts.Token); + await NativeClient.ConnectAsync(Url, linkedCts.Token).ConfigureAwait(false); IsStarted = true; IsRunning = true; @@ -297,9 +298,9 @@ private async Task ScheduleReconnectAsync() try { - using (await ConnectionLock.LockAsync()) + using (await ConnectionLock.LockAsync().ConfigureAwait(false)) { - await ReconnectInternalAsync(throwOnError: false); + await ReconnectInternalAsync(throwOnError: false).ConfigureAwait(false); } } catch (OperationCanceledException) @@ -320,7 +321,7 @@ protected async Task SendLoopAsync(CancellationToken ct) { try { - await foreach (var payload in SendChannel.Reader.ReadAllAsync(ct)) + await foreach (var payload in SendChannel.Reader.ReadAllAsync(ct).ConfigureAwait(false)) { if (IsDisposed) { @@ -329,7 +330,7 @@ protected async Task SendLoopAsync(CancellationToken ct) using (payload) { - await SendAsync(payload.Data, payload.Type, true, ct); + await SendAsync(payload.Data, payload.Type, true, ct).ConfigureAwait(false); } } } @@ -356,15 +357,23 @@ protected virtual async Task SendAsync(ReadOnlyMemory data, WebSocke bool endOfMessage, CancellationToken cancellationToken = default) { - if (NativeClient.State is not WebSocketState.Open) return false; - - await NativeClient.SendAsync( - data, - type, - endOfMessage, - cancellationToken - ).ConfigureAwait(false); - return true; + await _sendLock.WaitAsync(cancellationToken).ConfigureAwait(false); + try + { + if (NativeClient.State is not WebSocketState.Open) + { + return false; + } + + await NativeClient + .SendAsync(data, type, endOfMessage, cancellationToken) + .ConfigureAwait(false); + return true; + } + finally + { + _sendLock.Release(); + } } private async Task ReceiveLoopAsync(CancellationToken cancellationToken) @@ -391,8 +400,10 @@ private async Task ReceiveLoopAsync(CancellationToken cancellationToken) if (result.MessageType == WebSocketMessageType.Close) { await NativeClient - .CloseAsync(result.CloseStatus ?? WebSocketCloseStatus.NormalClosure, - result.CloseStatusDescription ?? "", CancellationToken.None); + .CloseAsync( + result.CloseStatus ?? WebSocketCloseStatus.NormalClosure, + result.CloseStatusDescription ?? "", CancellationToken.None) + .ConfigureAwait(false); var @event = new Disconnected(DisconnectReason.ServerInitiated, NativeClient.CloseStatus, NativeClient.CloseStatusDescription, NativeClient.SubProtocol); @@ -514,11 +525,12 @@ public async Task SendInstantAsync(ReadOnlyMemory message, WebSocket public async Task SendAsync(ReadOnlyMemory message, WebSocketMessageType type, CancellationToken cancellationToken = default) - => !message.IsEmpty && await WriteAsync(message.ToPayload(MessageEncoding, type), cancellationToken); + => !message.IsEmpty && await WriteAsync(message.ToPayload(MessageEncoding, type), cancellationToken) + .ConfigureAwait(false); public async Task SendAsync(ReadOnlyMemory message, WebSocketMessageType type, CancellationToken cancellationToken = default) - => !message.IsEmpty && await WriteAsync(new Payload(message, type), cancellationToken); + => !message.IsEmpty && await WriteAsync(new Payload(message, type), cancellationToken).ConfigureAwait(false); public bool TrySend(ReadOnlyMemory message, WebSocketMessageType type) => !message.IsEmpty && TryWrite(message.ToPayload(MessageEncoding, type)); @@ -529,7 +541,7 @@ public bool TrySend(ReadOnlyMemory message, WebSocketMessageType type) private async Task WriteAsync(Payload payload, CancellationToken cancellationToken = default) { if (!IsRunning) return false; - await SendWriter.WriteAsync(payload, cancellationToken); + await SendWriter.WriteAsync(payload, cancellationToken).ConfigureAwait(false); return true; } diff --git a/src/WebSocket.Rx/ReactiveWebSocketServer.cs b/src/WebSocket.Rx/ReactiveWebSocketServer.cs index 0d2cac9..b7cdc45 100644 --- a/src/WebSocket.Rx/ReactiveWebSocketServer.cs +++ b/src/WebSocket.Rx/ReactiveWebSocketServer.cs @@ -156,7 +156,7 @@ private async Task ServerLoopAsync(CancellationToken cancellationToken = default { try { - var ctx = await _listener.GetContextAsync(); + var ctx = await _listener.GetContextAsync().ConfigureAwait(false); if (ctx.Request.IsWebSocketRequest) { var metadata = ctx.GetMetadata(); @@ -258,7 +258,7 @@ public async Task SendInstantAsync(Guid clientId, ReadOnlyMemory mes CancellationToken cancellationToken = default) { if (!_clients.TryGetValue(clientId, out var client)) return false; - await client.Socket.SendInstantAsync(message, type, cancellationToken); + await client.Socket.SendInstantAsync(message, type, cancellationToken).ConfigureAwait(false); return true; } @@ -266,7 +266,7 @@ public async Task SendInstantAsync(Guid clientId, ReadOnlyMemory mes CancellationToken cancellationToken = default) { if (!_clients.TryGetValue(clientId, out var client)) return false; - await client.Socket.SendInstantAsync(message, type, cancellationToken); + await client.Socket.SendInstantAsync(message, type, cancellationToken).ConfigureAwait(false); return true; } @@ -274,7 +274,7 @@ public async Task SendAsync(Guid clientId, ReadOnlyMemory message, W CancellationToken cancellationToken = default) { if (!_clients.TryGetValue(clientId, out var client)) return false; - await client.Socket.SendAsync(message, type, cancellationToken); + await client.Socket.SendAsync(message, type, cancellationToken).ConfigureAwait(false); return true; } @@ -282,7 +282,7 @@ public async Task SendAsync(Guid clientId, ReadOnlyMemory message, W CancellationToken cancellationToken = default) { if (!_clients.TryGetValue(clientId, out var client)) return false; - await client.Socket.SendAsync(message, type, cancellationToken); + await client.Socket.SendAsync(message, type, cancellationToken).ConfigureAwait(false); return true; } @@ -309,7 +309,7 @@ public async Task BroadcastInstantAsync(ReadOnlyMemory message, WebS { var sockets = _clients.Values.Select(x => x.Socket).ToArray(); return await sockets.Async((client, ct) => client.SendInstantAsync(message, type, ct), x => x, - cancellationToken); + cancellationToken).ConfigureAwait(false); } public async Task BroadcastInstantAsync(ReadOnlyMemory message, WebSocketMessageType type, @@ -317,21 +317,21 @@ public async Task BroadcastInstantAsync(ReadOnlyMemory message, WebS { var sockets = _clients.Values.Select(x => x.Socket).ToArray(); return await sockets.Async((client, ct) => client.SendInstantAsync(message, type, ct), x => x, - cancellationToken); + cancellationToken).ConfigureAwait(false); } public async Task BroadcastAsync(ReadOnlyMemory message, WebSocketMessageType type, CancellationToken cancellationToken = default) { var sockets = _clients.Values.Select(x => x.Socket).ToArray(); - return await sockets.Async((client, ct) => client.SendAsync(message, type, ct), x => x, cancellationToken); + return await sockets.Async((client, ct) => client.SendAsync(message, type, ct), x => x, cancellationToken).ConfigureAwait(false); } public async Task BroadcastAsync(ReadOnlyMemory message, WebSocketMessageType type, CancellationToken cancellationToken = default) { var sockets = _clients.Values.Select(x => x.Socket).ToArray(); - return await sockets.Async((client, ct) => client.SendAsync(message, type, ct), x => x, cancellationToken); + return await sockets.Async((client, ct) => client.SendAsync(message, type, ct), x => x, cancellationToken).ConfigureAwait(false); } public bool TryBroadcast(ReadOnlyMemory message, WebSocketMessageType type) @@ -599,7 +599,7 @@ await NativeServerSocket { try { - await NativeServerSocket.CloseAsync(status, statusDescription, cancellationToken); + await NativeServerSocket.CloseAsync(status, statusDescription, cancellationToken).ConfigureAwait(false); } catch { diff --git a/src/WebSocket.Rx/WebSocket.Rx.csproj b/src/WebSocket.Rx/WebSocket.Rx.csproj index 9352959..9979517 100644 --- a/src/WebSocket.Rx/WebSocket.Rx.csproj +++ b/src/WebSocket.Rx/WebSocket.Rx.csproj @@ -31,6 +31,7 @@ +