diff --git a/src/WebSocket.Rx.Benchmarks/ClientInstantiationBenchmarks.cs b/src/WebSocket.Rx.Benchmarks/ClientInstantiationBenchmarks.cs
deleted file mode 100644
index 14c83d6..0000000
--- a/src/WebSocket.Rx.Benchmarks/ClientInstantiationBenchmarks.cs
+++ /dev/null
@@ -1,64 +0,0 @@
-using System;
-using BenchmarkDotNet.Attributes;
-using Microsoft.IO;
-
-namespace WebSocket.Rx.Benchmarks;
-
-///
-/// Measures the overhead of creating ReactiveWebSocketClient instances.
-///
-/// The constructor optionally accepts a RecyclableMemoryStreamManager.
-/// We measure:
-/// - Allocation cost without vs. with a shared MemoryStreamManager
-/// - GC pressure from many short-lived instances
-/// No network connection required.
-///
-[ShortRunJob]
-[MemoryDiagnoser]
-[HideColumns("Job", "RatioSD", "Error")]
-public class ClientInstantiationBenchmarks
-{
- private static readonly Uri ServerUri = new("ws://localhost:9999");
-
- // Shared manager – as it should be used in production code
- private readonly RecyclableMemoryStreamManager _sharedManager = new();
-
- // -----------------------------------------------------------------------
- // 1) Baseline: without a custom MemoryStreamManager (new() is created internally)
- // -----------------------------------------------------------------------
- [Benchmark(Baseline = true, Description = "new ReactiveWebSocketClient(url)")]
- public ReactiveWebSocketClient Instantiate_NoManager()
- {
- return new ReactiveWebSocketClient(ServerUri);
- }
-
- // -----------------------------------------------------------------------
- // 2) With a shared RecyclableMemoryStreamManager
- // -----------------------------------------------------------------------
- [Benchmark(Description = "new ReactiveWebSocketClient(url, sharedManager)")]
- public ReactiveWebSocketClient Instantiate_SharedManager()
- {
- return new ReactiveWebSocketClient(ServerUri, _sharedManager);
- }
-
- // -----------------------------------------------------------------------
- // 3) With a dedicated RecyclableMemoryStreamManager per instance
- // -----------------------------------------------------------------------
- [Benchmark(Description = "new ReactiveWebSocketClient(url, new Manager())")]
- public ReactiveWebSocketClient Instantiate_OwnManager()
- {
- return new ReactiveWebSocketClient(ServerUri, new RecyclableMemoryStreamManager());
- }
-
- // -----------------------------------------------------------------------
- // 4) GC pressure: 100 short-lived instances (disposed immediately)
- // -----------------------------------------------------------------------
- [Benchmark(Description = "100x create + Dispose")]
- public void Allocate_AndDispose_100()
- {
- for (var i = 0; i < 100; i++)
- {
- using var client = new ReactiveWebSocketClient(ServerUri, _sharedManager);
- }
- }
-}
\ No newline at end of file
diff --git a/src/WebSocket.Rx.Benchmarks/EndToEndBenchmarks.cs b/src/WebSocket.Rx.Benchmarks/EndToEndBenchmarks.cs
index 68b417e..5d1c9eb 100644
--- a/src/WebSocket.Rx.Benchmarks/EndToEndBenchmarks.cs
+++ b/src/WebSocket.Rx.Benchmarks/EndToEndBenchmarks.cs
@@ -1,4 +1,5 @@
using System;
+using System.Buffers;
using System.Net;
using System.Net.WebSockets;
using System.Threading;
@@ -8,115 +9,210 @@
namespace WebSocket.Rx.Benchmarks;
+// ---------------------------------------------------------------------------
+// Shared infrastructure – server + client are started once per benchmark class
+// ---------------------------------------------------------------------------
+
///
-/// End-to-end benchmarks using ReactiveWebSocketServer and
-/// ReactiveWebSocketClient over loopback.
-///
-/// Measures:
-/// - Single round-trip latency (Send → Echo → MessageReceived)
-/// - Throughput for N sequential messages
-/// - Connection setup latency (ConnectAsync)
-///
-/// The server uses ReactiveWebSocketServer with a built-in echo
-/// mechanism via the Messages stream and SendAsTextAsync.
-///
-/// NOTE: These tests take longer to run. Execute individually with:
-/// dotnet run -c Release -- --filter *EndToEnd*
+/// Base class that spins up a loopback echo server and a connected client.
+/// Concrete benchmark classes inherit from this and add their own [Params].
///
-[ShortRunJob]
-[MemoryDiagnoser]
-[HideColumns("Job", "RatioSD", "Error")]
-public class EndToEndBenchmarks
+public abstract class EndToEndBase
{
- private ReactiveWebSocketServer _server = null!;
- private ReactiveWebSocketClient _client = null!;
- private string _prefix = null!;
- private Uri _serverUri = null!;
+ protected ReactiveWebSocketServer Server = null!;
+ protected ReactiveWebSocketClient Client = null!;
+ protected Uri ServerUri = null!;
[GlobalSetup]
- public async Task SetupAsync()
+ public virtual async Task SetupAsync()
{
var port = FreeTcpPort();
- _prefix = $"http://localhost:{port}/ws/";
- _serverUri = new Uri($"ws://localhost:{port}/ws/");
+ var prefix = $"http://localhost:{port}/ws/";
+ ServerUri = new Uri($"ws://localhost:{port}/ws/");
- _server = new ReactiveWebSocketServer(_prefix);
+ Server = new ReactiveWebSocketServer(prefix);
- _server.Messages.SubscribeAwait(async (msg, ct) =>
+ // Echo handler: text → text, binary → binary
+ Server.Messages.SubscribeAwait(async (msg, ct) =>
{
if (msg.Message.IsText)
{
- await _server.SendAsync(msg.Metadata.Id, msg.Message.Text, WebSocketMessageType.Text, ct);
+ await Server.SendAsync(msg.Metadata.Id, msg.Message.Text, WebSocketMessageType.Text, ct);
+ }
+ else if (msg.Message.IsBinary)
+ {
+ await Server.SendAsync(msg.Metadata.Id, msg.Message.Binary, WebSocketMessageType.Binary, ct);
}
});
- await _server.StartAsync();
+ await Server.StartAsync();
- _client = new ReactiveWebSocketClient(_serverUri)
- {
- IsReconnectionEnabled = false
- };
- var connectionHappenedTask = _client.ConnectionHappened.FirstAsync();
- await _client.StartAsync();
+ Client = new ReactiveWebSocketClient(ServerUri) { IsReconnectionEnabled = false };
- await connectionHappenedTask;
+ var connected = Client.ConnectionHappened.FirstAsync();
+ await Client.StartAsync();
+ await connected;
}
[GlobalCleanup]
- public async Task CleanupAsync()
+ public virtual async Task CleanupAsync()
+ {
+ await Client.StopAsync(WebSocketCloseStatus.NormalClosure, "Benchmark done");
+ await Server.StopAsync(WebSocketCloseStatus.NormalClosure, "Benchmark done");
+ await Client.DisposeAsync();
+ await Server.DisposeAsync();
+ }
+
+ [IterationCleanup]
+ public void IterationCleanup()
+ {
+ GC.Collect();
+ GC.WaitForPendingFinalizers();
+ GC.Collect();
+ }
+
+ private static int FreeTcpPort()
{
- await _client.StopAsync(WebSocketCloseStatus.NormalClosure, "Benchmark done");
- await _server.StopAsync(WebSocketCloseStatus.NormalClosure, "Benchmark done");
- await _client.DisposeAsync();
- await _server.DisposeAsync();
+ var listener = new System.Net.Sockets.TcpListener(IPAddress.Loopback, 0);
+ listener.Start();
+ var port = ((IPEndPoint)listener.LocalEndpoint).Port;
+ listener.Stop();
+ return port;
}
+}
+// ---------------------------------------------------------------------------
+// 1) Latency benchmarks – no [Params], pure connection / single-message cost
+// ---------------------------------------------------------------------------
+
+///
+/// Measures single round-trip latency and connection setup/teardown overhead.
+///
+/// Run with:
+/// dotnet run -c Release -- --filter *Latency*
+///
+[MemoryDiagnoser]
+[GcServer(true)]
+[HideColumns("Job", "RatioSD")]
+public class LatencyBenchmarks : EndToEndBase
+{
// -----------------------------------------------------------------------
- // 1) Single Round-Trip
+ // 1a) Single text round-trip
// -----------------------------------------------------------------------
[Benchmark(Baseline = true, Description = "Single text round-trip")]
public async Task SingleRoundTrip()
{
- var echoTask = _client.MessageReceived
+ var echoTask = Client.MessageReceived
.Where(m => m is { IsText: true })
- .Where(m => m.Text.ToString() is "ping")
+ .Where(m => m.Text.Span.SequenceEqual("ping".AsSpan()))
.FirstAsync();
- await _client.SendAsync("ping".AsMemory(), WebSocketMessageType.Text);
+ await Client.SendAsync("ping".AsMemory(), WebSocketMessageType.Text);
return (await echoTask).Text.ToString();
}
// -----------------------------------------------------------------------
- // 2) N sequentielle Round-Trips
+ // 1b) ConnectAsync + StopAsync overhead
// -----------------------------------------------------------------------
- [Params(10, 50)] public int RoundTrips { get; set; }
+ [Benchmark(Description = "ConnectAsync + StopAsync (latency)")]
+ public async Task ConnectAndStop()
+ {
+ var client = new ReactiveWebSocketClient(ServerUri) { IsReconnectionEnabled = false };
+ await client.StartAsync();
+ await client.StopAsync(WebSocketCloseStatus.NormalClosure, "done");
+ client.Dispose();
+ }
+}
+
+// ---------------------------------------------------------------------------
+// 2) Throughput benchmarks – small messages, varying message counts
+// ---------------------------------------------------------------------------
+
+///
+/// Measures sequential and concurrent throughput for small text messages.
+///
+/// Run with:
+/// dotnet run -c Release -- --filter *Throughput*
+///
+[MemoryDiagnoser]
+[GcServer(true)]
+[HideColumns("Job", "RatioSD")]
+public class ThroughputBenchmarks : EndToEndBase
+{
+ // RoundTrips is used by SequentialRoundTrips only.
+ // MessageCount is shared by EndToEndThroughput and TrySendThroughputWithConfirmation.
+ // Keeping them separate avoids a cartesian-product explosion.
+
+ [Params(10, 100)] public int RoundTrips { get; set; }
- [Benchmark(Description = "Sequential round-trips (N messages)")]
+ [Params(100, 500, 1000)] public int MessageCount { get; set; }
+
+ // -----------------------------------------------------------------------
+ // 2a) N sequential round-trips (send → wait for echo → repeat)
+ // -----------------------------------------------------------------------
+ [Benchmark(Baseline = true, Description = "Sequential round-trips (N messages)")]
public async Task SequentialRoundTrips()
{
var received = 0;
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30));
+ var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
- using var sub = _client.MessageReceived
+ using var sub = Client.MessageReceived
.Where(m => m.IsText)
- .Subscribe(_ => Interlocked.Increment(ref received));
+ .Subscribe(_ =>
+ {
+ if (Interlocked.Increment(ref received) >= RoundTrips)
+ {
+ tcs.TrySetResult();
+ }
+ });
for (var i = 0; i < RoundTrips; i++)
{
- await _client.SendAsync($"msg-{i}".AsMemory(), WebSocketMessageType.Text, cts.Token);
+ await Client.SendAsync($"msg-{i}".AsMemory(), WebSocketMessageType.Text, cts.Token);
}
- while (received < RoundTrips && !cts.IsCancellationRequested)
+ await tcs.Task.WaitAsync(cts.Token).ConfigureAwait(false);
+ return received;
+ }
+
+ // NOTE: Since SendAsync enqueues into a single Channel, concurrent sends
+ // do not parallelize actual I/O – they only pipeline the enqueue step.
+ // For large payloads the SendLoop becomes the bottleneck and concurrent
+ // offers no advantage over sequential.
+ // -----------------------------------------------------------------------
+ // 2b) End-to-end throughput (all sends fired concurrently via WhenAll)
+ // -----------------------------------------------------------------------
+ [Benchmark(Description = "End-to-end throughput (N concurrent sends)")]
+ public async Task EndToEndThroughput()
+ {
+ var received = 0;
+ using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30));
+ var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
+
+ using var sub = Client.MessageReceived
+ .Where(m => m.IsText)
+ .Subscribe(_ =>
+ {
+ if (Interlocked.Increment(ref received) >= MessageCount)
+ {
+ tcs.TrySetResult();
+ }
+ });
+
+ var sends = new Task[MessageCount];
+ for (var i = 0; i < MessageCount; i++)
{
- await Task.Delay(1, cts.Token).ConfigureAwait(false);
+ sends[i] = Client.SendInstantAsync($"msg-{i}".AsMemory(), WebSocketMessageType.Text, cts.Token);
}
+ await Task.WhenAll(sends).ConfigureAwait(false);
+ await tcs.Task.WaitAsync(cts.Token).ConfigureAwait(false);
return received;
}
// -----------------------------------------------------------------------
- // 3) TrySendAsText throughput (fire-and-forget, no echo wait)
- // Measures pure Channel.TryWrite speed including the SendLoop
+ // 2c) TrySend fire-and-forget (Channel.TryWrite, no echo wait)
// -----------------------------------------------------------------------
[Benchmark(Description = "TrySendAsText throughput (no echo wait)")]
public int TrySendAsText_Throughput()
@@ -124,7 +220,7 @@ public int TrySendAsText_Throughput()
var sent = 0;
for (var i = 0; i < 1_000; i++)
{
- if (_client.TrySend($"msg-{i}".AsMemory(), WebSocketMessageType.Text))
+ if (Client.TrySend($"msg-{i}".AsMemory(), WebSocketMessageType.Text))
{
sent++;
}
@@ -134,27 +230,135 @@ public int TrySendAsText_Throughput()
}
// -----------------------------------------------------------------------
- // 4) ConnectAsync + StopAsync overhead (connection setup and teardown)
- // Creates a fresh client each time without waiting for an echo
+ // 2d) TrySend with echo confirmation (Channel.TryWrite + backpressure)
// -----------------------------------------------------------------------
- [Benchmark(Description = "ConnectAsync + StopAsync (latency)")]
- public async Task ConnectAndStop()
+ [Benchmark(Description = "TrySend throughput (fire + confirm all echoed)")]
+ public async Task TrySendThroughputWithConfirmation()
{
- var client = new ReactiveWebSocketClient(_serverUri)
+ var received = 0;
+ using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30));
+ var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
+
+ using var sub = Client.MessageReceived
+ .Where(m => m.IsText)
+ .Subscribe(_ =>
+ {
+ if (Interlocked.Increment(ref received) >= MessageCount)
+ {
+ tcs.TrySetResult();
+ }
+ });
+
+ for (var i = 0; i < MessageCount; i++)
{
- IsReconnectionEnabled = false
- };
- await client.StartAsync();
- await client.StopAsync(WebSocketCloseStatus.NormalClosure, "done");
- client.Dispose();
+ while (!Client.TrySend($"msg-{i}".AsMemory(), WebSocketMessageType.Text))
+ {
+ await Task.Yield();
+ }
+ }
+
+ await tcs.Task.WaitAsync(cts.Token).ConfigureAwait(false);
+ return received;
}
+}
- private static int FreeTcpPort()
+// ---------------------------------------------------------------------------
+// 3) Large-message benchmarks – binary payloads, varying size + count
+// ---------------------------------------------------------------------------
+
+///
+/// Measures throughput and latency for large binary payloads.
+///
+/// Payload is allocated once per iteration in [IterationSetup] so allocation
+/// cost is not included in the benchmark measurement.
+///
+/// Run with:
+/// dotnet run -c Release -- --filter *LargeMessage*
+///
+[MemoryDiagnoser]
+[GcServer(true)]
+[HideColumns("Job", "RatioSD")]
+public class LargeMessageBenchmarks : EndToEndBase
+{
+ [Params(1024, 16 * 1024, 256 * 1024)] public int PayloadBytes { get; set; }
+
+ [Params(10, 100)] public int MessageCount { get; set; }
+
+ private byte[] _payload = null!;
+
+ // Allocate outside the timed region so payload size doesn't skew allocations
+ [IterationSetup]
+ public void IterationSetup()
{
- var l = new System.Net.Sockets.TcpListener(IPAddress.Loopback, 0);
- l.Start();
- var port = ((IPEndPoint)l.LocalEndpoint).Port;
- l.Stop();
- return port;
+ _payload = ArrayPool.Shared.Rent(PayloadBytes);
+ Random.Shared.NextBytes(_payload.AsSpan(0, PayloadBytes));
+ }
+
+ [IterationCleanup]
+ public new void IterationCleanup()
+ {
+ ArrayPool.Shared.Return(_payload);
+ _payload = null!;
+ base.IterationCleanup();
+ }
+
+ // -----------------------------------------------------------------------
+ // 3a) Sequential large-message round-trips
+ // -----------------------------------------------------------------------
+ [Benchmark(Baseline = true, Description = "Large message round-trip (sequential)")]
+ public async Task LargeMessageRoundTripSequential()
+ {
+ var received = 0;
+ using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(60));
+ var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
+
+ using var sub = Client.MessageReceived
+ .Where(m => m.IsBinary)
+ .Subscribe(_ =>
+ {
+ if (Interlocked.Increment(ref received) >= MessageCount)
+ {
+ tcs.TrySetResult();
+ }
+ });
+
+ for (var i = 0; i < MessageCount; i++)
+ {
+ await Client.SendAsync(_payload.AsMemory(0, PayloadBytes), WebSocketMessageType.Binary, cts.Token);
+ }
+
+ await tcs.Task.WaitAsync(cts.Token).ConfigureAwait(false);
+ return received;
+ }
+
+ // -----------------------------------------------------------------------
+ // 3b) Concurrent large-message sends (WhenAll)
+ // -----------------------------------------------------------------------
+ [Benchmark(Description = "Large message round-trip (concurrent)")]
+ public async Task LargeMessageRoundTripConcurrent()
+ {
+ var received = 0;
+ using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(60));
+ var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
+
+ using var sub = Client.MessageReceived
+ .Where(m => m.IsBinary)
+ .Subscribe(_ =>
+ {
+ if (Interlocked.Increment(ref received) >= MessageCount)
+ {
+ tcs.TrySetResult();
+ }
+ });
+
+ var sends = new Task[MessageCount];
+ for (var i = 0; i < MessageCount; i++)
+ {
+ sends[i] = Client.SendAsync(_payload.AsMemory(0, PayloadBytes), WebSocketMessageType.Binary, cts.Token);
+ }
+
+ await Task.WhenAll(sends).ConfigureAwait(false);
+ await tcs.Task.WaitAsync(cts.Token).ConfigureAwait(false);
+ return received;
}
}
\ No newline at end of file
diff --git a/src/WebSocket.Rx.Benchmarks/EventStreamBenchmarks.cs b/src/WebSocket.Rx.Benchmarks/EventStreamBenchmarks.cs
deleted file mode 100644
index e6ad8d0..0000000
--- a/src/WebSocket.Rx.Benchmarks/EventStreamBenchmarks.cs
+++ /dev/null
@@ -1,132 +0,0 @@
-using System;
-using BenchmarkDotNet.Attributes;
-using R3;
-
-namespace WebSocket.Rx.Benchmarks;
-
-///
-/// Measures the overhead of the ConnectionHappened,
-/// DisconnectionHappened and ErrorOccurred event streams.
-///
-/// These events are emitted via R3 Subject<T>.
-/// We use StreamFakeMessage as a reference and call the internal
-/// subjects directly via reflection – exercising exactly the same code
-/// paths as the production code does.
-///
-[ShortRunJob]
-[MemoryDiagnoser]
-[HideColumns("Job", "RatioSD", "Error")]
-public class EventStreamBenchmarks
-{
- [Params(10, 100)]
- public int EventCount { get; set; }
-
- private ReactiveWebSocketClient _client = null!;
-
- private Subject _connectionSource = null!;
- private Subject _disconnectionSource = null!;
- private Subject _errorSource = null!;
-
- [GlobalSetup]
- public void Setup()
- {
- _client = new ReactiveWebSocketClient(new Uri("ws://localhost:9999"));
-
- var t = typeof(ReactiveWebSocketClient);
- _connectionSource = (Subject)t.GetField("ConnectionHappenedSource",
- System.Reflection.BindingFlags.NonPublic | System.Reflection.BindingFlags.Instance)!.GetValue(_client)!;
- _disconnectionSource = (Subject)t.GetField("DisconnectionHappenedSource",
- System.Reflection.BindingFlags.NonPublic | System.Reflection.BindingFlags.Instance)!.GetValue(_client)!;
- _errorSource = (Subject)t.GetField("ErrorOccurredSource",
- System.Reflection.BindingFlags.NonPublic | System.Reflection.BindingFlags.Instance)!.GetValue(_client)!;
- }
-
- [GlobalCleanup]
- public void Cleanup() => _client.Dispose();
-
- // -----------------------------------------------------------------------
- // 1) Baseline: ConnectionHappened without subscriber
- // -----------------------------------------------------------------------
- [Benchmark(Baseline = true, Description = "ConnectionHappened (no subscriber)")]
- public void Emit_Connection_NoSubscriber()
- {
- for (var i = 0; i < EventCount; i++)
- {
- _connectionSource.OnNext(new Connected(ConnectReason.Initialized));
- }
- }
-
- // -----------------------------------------------------------------------
- // 2) ConnectionHappened with one subscriber
- // -----------------------------------------------------------------------
- [Benchmark(Description = "ConnectionHappened → 1 subscriber")]
- public int Emit_Connection_OneSubscriber()
- {
- var count = 0;
- using var sub = _client.ConnectionHappened.Subscribe(_ => count++);
- for (var i = 0; i < EventCount; i++)
- {
- _connectionSource.OnNext(new Connected(ConnectReason.Initialized));
- }
- return count;
- }
-
- // -----------------------------------------------------------------------
- // 3) DisconnectionHappened with one subscriber + Where filter
- // -----------------------------------------------------------------------
- [Benchmark(Description = "DisconnectionHappened → Where(ServerInitiated)")]
- public int Emit_Disconnection_Filtered()
- {
- var count = 0;
- using var sub = _client.DisconnectionHappened
- .Where(d => d.Reason == DisconnectReason.ServerInitiated)
- .Subscribe(_ => count++);
-
- for (var i = 0; i < EventCount; i++)
- {
- _disconnectionSource.OnNext(new Disconnected(
- i % 2 == 0 ? DisconnectReason.ServerInitiated : DisconnectReason.ClientInitiated));
- }
- return count;
- }
-
- // -----------------------------------------------------------------------
- // 4) ErrorOccurred with Select (extract error source)
- // -----------------------------------------------------------------------
- [Benchmark(Description = "ErrorOccurred → Select(Source)")]
- public int Emit_Error_Select()
- {
- var count = 0;
- using var sub = _client.ErrorOccurred
- .Select(e => e.Source)
- .Subscribe(_ => count++);
-
- for (var i = 0; i < EventCount; i++)
- {
- _errorSource.OnNext(new ErrorOccurred(ErrorSource.ReceiveLoop,
- new InvalidOperationException("test")));
- }
- return count;
- }
-
- // -----------------------------------------------------------------------
- // 5) All three streams simultaneously with one subscriber each
- // (mirrors typical production code)
- // -----------------------------------------------------------------------
- [Benchmark(Description = "All 3 streams → 1 subscriber each")]
- public int Emit_AllStreams()
- {
- var count = 0;
- using var s1 = _client.ConnectionHappened.Subscribe(_ => count++);
- using var s2 = _client.DisconnectionHappened.Subscribe(_ => count++);
- using var s3 = _client.ErrorOccurred.Subscribe(_ => count++);
-
- for (var i = 0; i < EventCount; i++)
- {
- _connectionSource.OnNext(new Connected(ConnectReason.Reconnected));
- _disconnectionSource.OnNext(new Disconnected(DisconnectReason.Dropped));
- }
-
- return count;
- }
-}
\ No newline at end of file
diff --git a/src/WebSocket.Rx.Benchmarks/MessagePipelineBenchmarks.cs b/src/WebSocket.Rx.Benchmarks/MessagePipelineBenchmarks.cs
index 5b1c9be..69c040e 100644
--- a/src/WebSocket.Rx.Benchmarks/MessagePipelineBenchmarks.cs
+++ b/src/WebSocket.Rx.Benchmarks/MessagePipelineBenchmarks.cs
@@ -15,7 +15,6 @@ namespace WebSocket.Rx.Benchmarks;
/// - ReceivedMessage allocation (Text vs. Binary)
/// - Rx operator overhead per subscriber (Where, Select, …)
///
-[ShortRunJob]
[MemoryDiagnoser]
[HideColumns("Job", "RatioSD", "Error")]
public class MessagePipelineBenchmarks
diff --git a/src/WebSocket.Rx.Benchmarks/Program.cs b/src/WebSocket.Rx.Benchmarks/Program.cs
index 4893ce3..6b7e70f 100644
--- a/src/WebSocket.Rx.Benchmarks/Program.cs
+++ b/src/WebSocket.Rx.Benchmarks/Program.cs
@@ -1,13 +1,25 @@
-using BenchmarkDotNet.Configs;
+using BenchmarkDotNet.Attributes;
+using BenchmarkDotNet.Configs;
+using BenchmarkDotNet.Environments;
using BenchmarkDotNet.Jobs;
+using BenchmarkDotNet.Reports;
using BenchmarkDotNet.Running;
+using BenchmarkDotNet.Toolchains.NativeAot;
var config = DefaultConfig.Instance
- .AddJob(Job.ShortRun
- .WithWarmupCount(1)
- .WithIterationCount(3)
- .WithInvocationCount(1)
- .WithUnrollFactor(1));
+ //.AddFilter(new SimpleFilter(bc => !bc.Descriptor.Categories.Contains("E2E")))
+ .AddJob(Job.Default
+ .WithToolchain(
+ NativeAotToolchain.CreateBuilder()
+ .UseNuGet()
+ .IlcOptimizationPreference()
+ .DisplayName("AOT-Speed")
+ .ToToolchain())
+ .WithRuntime(NativeAotRuntime.Net10_0)
+ .WithId("Speed"))
+ .AddJob(Job.Default
+ .WithRuntime(CoreRuntime.Core10_0)
+ .WithId("CoreCLR"));
BenchmarkSwitcher
.FromAssembly(typeof(Program).Assembly)
diff --git a/src/WebSocket.Rx.Benchmarks/SendChannelBenchmarks.cs b/src/WebSocket.Rx.Benchmarks/SendChannelBenchmarks.cs
index 6f5709a..23510af 100644
--- a/src/WebSocket.Rx.Benchmarks/SendChannelBenchmarks.cs
+++ b/src/WebSocket.Rx.Benchmarks/SendChannelBenchmarks.cs
@@ -1,5 +1,6 @@
using System;
using System.Net.WebSockets;
+using System.Text;
using System.Threading.Tasks;
using BenchmarkDotNet.Attributes;
@@ -20,7 +21,6 @@ namespace WebSocket.Rx.Benchmarks;
/// NOTE: The Channel is UnboundedChannel (SingleReader) –
/// without a consumer it fills up, but writes never block.
///
-[ShortRunJob]
[MemoryDiagnoser]
[HideColumns("Job", "RatioSD", "Error")]
public class SendChannelBenchmarks
@@ -61,24 +61,25 @@ public void Cleanup()
}
// -----------------------------------------------------------------------
- // 1) Baseline: TrySendAsBinary(string) – synchronous Channel.TryWrite
+ // 1) Baseline: TrySend(string) – synchronous Channel.TryWrite
// -----------------------------------------------------------------------
- [Benchmark(Baseline = true, Description = "TrySendAsBinary(string)")]
+ [Benchmark(Baseline = true, Description = "TrySend(string, WebSocketMessageType.Binary)")]
public bool TrySendAsBinary_String()
{
var result = false;
+ var payload = _textPayload.AsMemory();
for (var i = 0; i < MessageCount; i++)
{
- result = _client.TrySend(_textPayload.AsMemory(), WebSocketMessageType.Binary);
+ result = _client.TrySend(payload, WebSocketMessageType.Binary);
}
return result;
}
// -----------------------------------------------------------------------
- // 2) TrySendAsBinary(byte[])
+ // 2) TrySend(byte[])
// -----------------------------------------------------------------------
- [Benchmark(Description = "TrySendAsBinary(byte[])")]
+ [Benchmark(Description = "TrySend(byte[], WebSocketMessageType.Binary)")]
public bool TrySendAsBinary_Bytes()
{
var result = false;
@@ -91,24 +92,25 @@ public bool TrySendAsBinary_Bytes()
}
// -----------------------------------------------------------------------
- // 3) TrySendAsText(string)
+ // 3) TrySend(string)
// -----------------------------------------------------------------------
- [Benchmark(Description = "TrySendAsText(string)")]
+ [Benchmark(Description = "TrySend(string, WebSocketMessageType.Text)")]
public bool TrySendAsText_String()
{
var result = false;
+ var payload = _textPayload.AsMemory();
for (var i = 0; i < MessageCount; i++)
{
- result = _client.TrySend(_textPayload.AsMemory(), WebSocketMessageType.Text);
+ result = _client.TrySend(payload, WebSocketMessageType.Text);
}
return result;
}
// -----------------------------------------------------------------------
- // 4) TrySendAsText(byte[])
+ // 4) TrySend(byte[])
// -----------------------------------------------------------------------
- [Benchmark(Description = "TrySendAsText(byte[])")]
+ [Benchmark(Description = "TrySendAsText(byte[], WebSocketMessageType.Text)")]
public bool TrySendAsText_Bytes()
{
var result = false;
@@ -121,46 +123,49 @@ public bool TrySendAsText_Bytes()
}
// -----------------------------------------------------------------------
- // 5) SendAsBinaryAsync(string) – Channel.WriteAsync (ValueTask)
+ // 5) SendAsync(string) – Channel.WriteAsync (ValueTask)
// -----------------------------------------------------------------------
- [Benchmark(Description = "SendAsBinaryAsync(string)")]
+ [Benchmark(Description = "SendAsync(string, WebSocketMessageType.Binary)")]
public async Task SendAsBinaryAsync_String()
{
var result = false;
+ var payload = _textPayload.AsMemory();
for (var i = 0; i < MessageCount; i++)
{
- result = await _client.SendAsync(_textPayload.AsMemory(), WebSocketMessageType.Binary);
+ result = await _client.SendAsync(payload, WebSocketMessageType.Binary);
}
return result;
}
// -----------------------------------------------------------------------
- // 6) SendAsTextAsync(string) – Channel.WriteAsync (ValueTask)
+ // 6) SendAsync(string) – Channel.WriteAsync (ValueTask)
// -----------------------------------------------------------------------
- [Benchmark(Description = "SendAsTextAsync(string)")]
+ [Benchmark(Description = "SendAsync(string, WebSocketMessageType.Text)")]
public async Task SendAsTextAsync_String()
{
var result = false;
+ var payload = _textPayload.AsMemory();
for (var i = 0; i < MessageCount; i++)
{
- result = await _client.SendAsync(_textPayload.AsMemory(), WebSocketMessageType.Text);
+ result = await _client.SendAsync(payload, WebSocketMessageType.Text);
}
return result;
}
// -----------------------------------------------------------------------
- // 7) String → byte[] encoding overhead (Encoding.UTF8.GetBytes)
+ // 7) String → ReadOnlyMemory encoding overhead (Encoding.UTF8.GetBytes)
// – isolates the encoding cost hidden inside TrySend/SendAsync
// -----------------------------------------------------------------------
[Benchmark(Description = "Encoding.UTF8.GetBytes (overhead only)")]
- public byte[] EncodingOverhead()
+ public ReadOnlyMemory EncodingOverhead()
{
- byte[] result = [];
+ var result = new ReadOnlyMemory([]);
+ var payload = _textPayload.AsMemory();
for (var i = 0; i < MessageCount; i++)
{
- result = System.Text.Encoding.UTF8.GetBytes(_textPayload);
+ result = payload.ToPayload(Encoding.UTF8, WebSocketMessageType.Text).Data;
}
return result;
diff --git a/src/WebSocket.Rx.Benchmarks/ServerBroadcastBenchmarks.cs b/src/WebSocket.Rx.Benchmarks/ServerBroadcastBenchmarks.cs
deleted file mode 100644
index 0d110c9..0000000
--- a/src/WebSocket.Rx.Benchmarks/ServerBroadcastBenchmarks.cs
+++ /dev/null
@@ -1,104 +0,0 @@
-using System;
-using System.Net;
-using System.Net.WebSockets;
-using System.Threading.Tasks;
-using BenchmarkDotNet.Attributes;
-
-namespace WebSocket.Rx.Benchmarks;
-
-///
-/// Measures the overhead of the server broadcast methods on
-/// ReactiveWebSocketServer without real network connections.
-///
-/// Since the broadcast methods internally iterate the client list and
-/// use Task.WhenAll + LINQ, the overhead can already be meaningfully
-/// measured by calling them directly on a started server with no connected clients.
-///
-/// For a full test with N connected clients see EndToEndBenchmarks.
-///
-[ShortRunJob]
-[MemoryDiagnoser]
-[HideColumns("Job", "RatioSD", "Error")]
-public class ServerBroadcastBenchmarks
-{
- [Params(64, 1_024, 4_096)] public int PayloadSize { get; set; }
-
- private ReactiveWebSocketServer _server = null!;
- private string _textPayload = null!;
- private byte[] _binaryPayload = null!;
-
- [GlobalSetup]
- public async Task SetupAsync()
- {
- var port = FreeTcpPort();
- _server = new ReactiveWebSocketServer($"http://localhost:{port}/ws/");
- await _server.StartAsync();
-
- _textPayload = new string('x', PayloadSize);
- _binaryPayload = new byte[PayloadSize];
- }
-
- [GlobalCleanup]
- public async Task CleanupAsync()
- {
- await _server.StopAsync(WebSocketCloseStatus.NormalClosure, "done");
- await _server.DisposeAsync();
- }
-
- // -----------------------------------------------------------------------
- // 1) Baseline: BroadcastAsTextAsync – 0 clients (method overhead only)
- // -----------------------------------------------------------------------
- [Benchmark(Baseline = true, Description = "BroadcastAsTextAsync (0 clients)")]
- public async Task BroadcastAsTextAsync_Empty()
- {
- return await _server.BroadcastAsync(_textPayload.AsMemory(), WebSocketMessageType.Text);
- }
-
- // -----------------------------------------------------------------------
- // 2) BroadcastAsBinaryAsync with byte[]
- // -----------------------------------------------------------------------
- [Benchmark(Description = "BroadcastAsBinaryAsync(byte[]) (0 clients)")]
- public async Task BroadcastAsBinaryAsync_Bytes_Empty()
- {
- return await _server.BroadcastAsync(_binaryPayload, WebSocketMessageType.Binary);
- }
-
- // -----------------------------------------------------------------------
- // 3) TryBroadcastAsText – synchronous, no await
- // -----------------------------------------------------------------------
- [Benchmark(Description = "TryBroadcastAsText(string) (0 clients)")]
- public bool TryBroadcastAsText_Empty()
- {
- return _server.TryBroadcast(_textPayload.AsMemory(), WebSocketMessageType.Binary);
- }
-
- // -----------------------------------------------------------------------
- // 4) TryBroadcastAsBinary with byte[]
- // -----------------------------------------------------------------------
- [Benchmark(Description = "TryBroadcastAsBinary(byte[]) (0 clients)")]
- public bool TryBroadcastAsBinary_Empty()
- {
- return _server.TryBroadcast(_binaryPayload, WebSocketMessageType.Binary);
- }
-
- // -----------------------------------------------------------------------
- // 5) ClientCount query + ConnectedClients dictionary snapshot
- // (ConcurrentDictionary.ToDictionary() – called on every broadcast)
- // -----------------------------------------------------------------------
- [Benchmark(Description = "ClientCount + ConnectedClients snapshot")]
- public int ClientSnapshot()
- {
- var count = _server.ClientCount;
- var snapshot = _server.ConnectedClients;
- return count + snapshot.Count;
- }
-
- private static int FreeTcpPort()
- {
- var l = new System.Net.Sockets.TcpListener(IPAddress.Loopback, 0);
- l.Start();
- var port = ((IPEndPoint)l.LocalEndpoint).Port;
- l.Stop();
- return port;
- }
-}
\ No newline at end of file
diff --git a/src/WebSocket.Rx/Extensions.cs b/src/WebSocket.Rx/Extensions.cs
index c8e7ae6..cf97004 100644
--- a/src/WebSocket.Rx/Extensions.cs
+++ b/src/WebSocket.Rx/Extensions.cs
@@ -16,8 +16,8 @@ public Observable SendInstant(Observable messages)
{
return send.Type switch
{
- _ when send.IsText => await client.SendAsync(send.Text, send.Type, ct),
- _ when send.IsBinary => await client.SendAsync(send.Binary, send.Type, ct),
+ _ when send.IsText => await client.SendAsync(send.Text, send.Type, ct).ConfigureAwait(false),
+ _ when send.IsBinary => await client.SendAsync(send.Binary, send.Type, ct).ConfigureAwait(false),
_ => false
};
}, maxConcurrent: 1);
@@ -29,8 +29,8 @@ public Observable Send(Observable messages)
{
return send.Type switch
{
- _ when send.IsText => await client.SendAsync(send.Text, send.Type, ct),
- _ when send.IsBinary => await client.SendAsync(send.Binary, send.Type, ct),
+ _ when send.IsText => await client.SendAsync(send.Text, send.Type, ct).ConfigureAwait(false),
+ _ when send.IsBinary => await client.SendAsync(send.Binary, send.Type, ct).ConfigureAwait(false),
_ => false
};
}, maxConcurrent: 1);
@@ -59,8 +59,8 @@ public Observable SendInstant(Observable messages)
var msg = send.Message;
return msg.Type switch
{
- _ when msg.IsText => await server.SendAsync(send.Metadata.Id, msg.Text, msg.Type, ct),
- _ when msg.IsBinary => await server.SendAsync(send.Metadata.Id, msg.Binary, msg.Type, ct),
+ _ when msg.IsText => await server.SendAsync(send.Metadata.Id, msg.Text, msg.Type, ct).ConfigureAwait(false),
+ _ when msg.IsBinary => await server.SendAsync(send.Metadata.Id, msg.Binary, msg.Type, ct).ConfigureAwait(false),
_ => false
};
}, maxConcurrent: 1);
@@ -73,8 +73,8 @@ public Observable Send(Observable messages)
var msg = send.Message;
return msg.Type switch
{
- _ when msg.IsText => await server.SendAsync(send.Metadata.Id, msg.Text, msg.Type, ct),
- _ when msg.IsBinary => await server.SendAsync(send.Metadata.Id, msg.Binary, msg.Type, ct),
+ _ when msg.IsText => await server.SendAsync(send.Metadata.Id, msg.Text, msg.Type, ct).ConfigureAwait(false),
+ _ when msg.IsBinary => await server.SendAsync(send.Metadata.Id, msg.Binary, msg.Type, ct).ConfigureAwait(false),
_ => false
};
}, maxConcurrent: 1);
@@ -101,8 +101,8 @@ public Observable BroadcastInstant(Observable messages)
var msg = send.Message;
return msg.Type switch
{
- _ when msg.IsText => await server.BroadcastInstantAsync(msg.Text, msg.Type, ct),
- _ when msg.IsBinary => await server.BroadcastInstantAsync(msg.Binary, msg.Type, ct),
+ _ when msg.IsText => await server.BroadcastInstantAsync(msg.Text, msg.Type, ct).ConfigureAwait(false),
+ _ when msg.IsBinary => await server.BroadcastInstantAsync(msg.Binary, msg.Type, ct).ConfigureAwait(false),
_ => false
};
});
@@ -115,8 +115,8 @@ public Observable BroadcastAsync(Observable messages)
var msg = send.Message;
return msg.Type switch
{
- _ when msg.IsText => await server.BroadcastAsync(msg.Text, msg.Type, ct),
- _ when msg.IsBinary => await server.BroadcastAsync(msg.Binary, msg.Type, ct),
+ _ when msg.IsText => await server.BroadcastAsync(msg.Text, msg.Type, ct).ConfigureAwait(false),
+ _ when msg.IsBinary => await server.BroadcastAsync(msg.Binary, msg.Type, ct).ConfigureAwait(false),
_ => false
};
});
diff --git a/src/WebSocket.Rx/Internal/Extensions.cs b/src/WebSocket.Rx/Internal/Extensions.cs
index 0b4ef7b..323c4d2 100644
--- a/src/WebSocket.Rx/Internal/Extensions.cs
+++ b/src/WebSocket.Rx/Internal/Extensions.cs
@@ -10,7 +10,7 @@ public async Task Try(Func func)
{
try
{
- await func.Invoke(value);
+ await func.Invoke(value).ConfigureAwait(false);
}
catch (Exception)
{
diff --git a/src/WebSocket.Rx/ReactiveWebSocketClient.cs b/src/WebSocket.Rx/ReactiveWebSocketClient.cs
index 40339f0..1702617 100644
--- a/src/WebSocket.Rx/ReactiveWebSocketClient.cs
+++ b/src/WebSocket.Rx/ReactiveWebSocketClient.cs
@@ -10,8 +10,9 @@ namespace WebSocket.Rx;
public class ReactiveWebSocketClient : IReactiveWebSocketClient
{
- protected int DisposedValue;
+ private readonly SemaphoreSlim _sendLock = new(1, 1);
private readonly SemaphoreSlim _disposeLock = new(1, 1);
+ protected int DisposedValue;
protected readonly RecyclableMemoryStreamManager MemoryStreamManager;
protected CancellationTokenSource? MainCts;
@@ -64,7 +65,7 @@ public async Task StartAsync(CancellationToken cancellationToken = default)
{
try
{
- await StartOrFailAsync(cancellationToken);
+ await StartOrFailAsync(cancellationToken).ConfigureAwait(false);
}
catch (Exception ex)
{
@@ -76,14 +77,14 @@ public async Task StartOrFailAsync(CancellationToken cancellationToken = default
{
ThrowIfDisposed();
- using (await ConnectionLock.LockAsync(cancellationToken))
+ using (await ConnectionLock.LockAsync(cancellationToken).ConfigureAwait(false))
{
if (IsStarted)
{
return;
}
- await ConnectInternalAsync(ConnectReason.Initialized, true, cancellationToken);
+ await ConnectInternalAsync(ConnectReason.Initialized, true, cancellationToken).ConfigureAwait(false);
}
}
@@ -92,7 +93,7 @@ public async Task StopAsync(WebSocketCloseStatus status, string statusDesc
{
try
{
- return await StopOrFailAsync(status, statusDescription, cancellationToken);
+ return await StopOrFailAsync(status, statusDescription, cancellationToken).ConfigureAwait(false);
}
catch (Exception ex)
{
@@ -121,7 +122,7 @@ public async Task StopOrFailAsync(WebSocketCloseStatus status, string stat
{
try
{
- await NativeClient.CloseAsync(status, statusDescription, cancellationToken);
+ await NativeClient.CloseAsync(status, statusDescription, cancellationToken).ConfigureAwait(false);
}
catch (Exception ex)
{
@@ -132,7 +133,7 @@ public async Task StopOrFailAsync(WebSocketCloseStatus status, string stat
DisconnectionHappenedSource.OnNext(new Disconnected(DisconnectReason.ClientInitiated));
- await CleanupAsync();
+ await CleanupAsync().ConfigureAwait(false);
return true;
}
@@ -152,7 +153,7 @@ protected async Task CleanupAsync()
};
- await Task.WhenAll(tasks).Try(async x => await x.WaitAsync(TimeSpan.FromSeconds(10)));
+ await Task.WhenAll(tasks).Try(async x => await x.WaitAsync(TimeSpan.FromSeconds(10)).ConfigureAwait(false)).ConfigureAwait(false);
if (!IsDisposed)
{
@@ -173,9 +174,9 @@ public async Task ReconnectAsync(CancellationToken cancellationToken = default)
try
{
- using (await ConnectionLock.LockAsync(cancellationToken))
+ using (await ConnectionLock.LockAsync(cancellationToken).ConfigureAwait(false))
{
- await ReconnectInternalAsync(false, cancellationToken);
+ await ReconnectInternalAsync(false, cancellationToken).ConfigureAwait(false);
}
}
catch (Exception ex)
@@ -191,9 +192,9 @@ public async Task ReconnectOrFailAsync(CancellationToken cancellationToken = def
throw new InvalidOperationException("Client not started");
}
- using (await ConnectionLock.LockAsync())
+ using (await ConnectionLock.LockAsync().ConfigureAwait(false))
{
- await ReconnectInternalAsync(true, cancellationToken);
+ await ReconnectInternalAsync(true, cancellationToken).ConfigureAwait(false);
}
}
@@ -222,9 +223,9 @@ private async Task ReconnectInternalAsync(bool throwOnError, CancellationToken c
await Task.WhenAll(
SendLoopTask ?? Task.CompletedTask,
ReceiveLoopTask ?? Task.CompletedTask
- );
+ ).ConfigureAwait(false);
- await ConnectInternalAsync(ConnectReason.Reconnected, throwOnError, cancellationToken);
+ await ConnectInternalAsync(ConnectReason.Reconnected, throwOnError, cancellationToken).ConfigureAwait(false);
}
catch (Exception ex)
{
@@ -257,7 +258,7 @@ private async Task ConnectInternalAsync(ConnectReason reason, bool throwOnError,
using var linkedCts = CancellationTokenSource
.CreateLinkedTokenSource(MainCts.Token, timeoutCts.Token, cancellationToken);
- await NativeClient.ConnectAsync(Url, linkedCts.Token);
+ await NativeClient.ConnectAsync(Url, linkedCts.Token).ConfigureAwait(false);
IsStarted = true;
IsRunning = true;
@@ -297,9 +298,9 @@ private async Task ScheduleReconnectAsync()
try
{
- using (await ConnectionLock.LockAsync())
+ using (await ConnectionLock.LockAsync().ConfigureAwait(false))
{
- await ReconnectInternalAsync(throwOnError: false);
+ await ReconnectInternalAsync(throwOnError: false).ConfigureAwait(false);
}
}
catch (OperationCanceledException)
@@ -320,7 +321,7 @@ protected async Task SendLoopAsync(CancellationToken ct)
{
try
{
- await foreach (var payload in SendChannel.Reader.ReadAllAsync(ct))
+ await foreach (var payload in SendChannel.Reader.ReadAllAsync(ct).ConfigureAwait(false))
{
if (IsDisposed)
{
@@ -329,7 +330,7 @@ protected async Task SendLoopAsync(CancellationToken ct)
using (payload)
{
- await SendAsync(payload.Data, payload.Type, true, ct);
+ await SendAsync(payload.Data, payload.Type, true, ct).ConfigureAwait(false);
}
}
}
@@ -356,15 +357,23 @@ protected virtual async Task SendAsync(ReadOnlyMemory data, WebSocke
bool endOfMessage,
CancellationToken cancellationToken = default)
{
- if (NativeClient.State is not WebSocketState.Open) return false;
-
- await NativeClient.SendAsync(
- data,
- type,
- endOfMessage,
- cancellationToken
- ).ConfigureAwait(false);
- return true;
+ await _sendLock.WaitAsync(cancellationToken).ConfigureAwait(false);
+ try
+ {
+ if (NativeClient.State is not WebSocketState.Open)
+ {
+ return false;
+ }
+
+ await NativeClient
+ .SendAsync(data, type, endOfMessage, cancellationToken)
+ .ConfigureAwait(false);
+ return true;
+ }
+ finally
+ {
+ _sendLock.Release();
+ }
}
private async Task ReceiveLoopAsync(CancellationToken cancellationToken)
@@ -391,8 +400,10 @@ private async Task ReceiveLoopAsync(CancellationToken cancellationToken)
if (result.MessageType == WebSocketMessageType.Close)
{
await NativeClient
- .CloseAsync(result.CloseStatus ?? WebSocketCloseStatus.NormalClosure,
- result.CloseStatusDescription ?? "", CancellationToken.None);
+ .CloseAsync(
+ result.CloseStatus ?? WebSocketCloseStatus.NormalClosure,
+ result.CloseStatusDescription ?? "", CancellationToken.None)
+ .ConfigureAwait(false);
var @event = new Disconnected(DisconnectReason.ServerInitiated, NativeClient.CloseStatus,
NativeClient.CloseStatusDescription, NativeClient.SubProtocol);
@@ -514,11 +525,12 @@ public async Task SendInstantAsync(ReadOnlyMemory message, WebSocket
public async Task SendAsync(ReadOnlyMemory message, WebSocketMessageType type,
CancellationToken cancellationToken = default)
- => !message.IsEmpty && await WriteAsync(message.ToPayload(MessageEncoding, type), cancellationToken);
+ => !message.IsEmpty && await WriteAsync(message.ToPayload(MessageEncoding, type), cancellationToken)
+ .ConfigureAwait(false);
public async Task SendAsync(ReadOnlyMemory message, WebSocketMessageType type,
CancellationToken cancellationToken = default)
- => !message.IsEmpty && await WriteAsync(new Payload(message, type), cancellationToken);
+ => !message.IsEmpty && await WriteAsync(new Payload(message, type), cancellationToken).ConfigureAwait(false);
public bool TrySend(ReadOnlyMemory message, WebSocketMessageType type)
=> !message.IsEmpty && TryWrite(message.ToPayload(MessageEncoding, type));
@@ -529,7 +541,7 @@ public bool TrySend(ReadOnlyMemory message, WebSocketMessageType type)
private async Task WriteAsync(Payload payload, CancellationToken cancellationToken = default)
{
if (!IsRunning) return false;
- await SendWriter.WriteAsync(payload, cancellationToken);
+ await SendWriter.WriteAsync(payload, cancellationToken).ConfigureAwait(false);
return true;
}
diff --git a/src/WebSocket.Rx/ReactiveWebSocketServer.cs b/src/WebSocket.Rx/ReactiveWebSocketServer.cs
index 0d2cac9..b7cdc45 100644
--- a/src/WebSocket.Rx/ReactiveWebSocketServer.cs
+++ b/src/WebSocket.Rx/ReactiveWebSocketServer.cs
@@ -156,7 +156,7 @@ private async Task ServerLoopAsync(CancellationToken cancellationToken = default
{
try
{
- var ctx = await _listener.GetContextAsync();
+ var ctx = await _listener.GetContextAsync().ConfigureAwait(false);
if (ctx.Request.IsWebSocketRequest)
{
var metadata = ctx.GetMetadata();
@@ -258,7 +258,7 @@ public async Task SendInstantAsync(Guid clientId, ReadOnlyMemory mes
CancellationToken cancellationToken = default)
{
if (!_clients.TryGetValue(clientId, out var client)) return false;
- await client.Socket.SendInstantAsync(message, type, cancellationToken);
+ await client.Socket.SendInstantAsync(message, type, cancellationToken).ConfigureAwait(false);
return true;
}
@@ -266,7 +266,7 @@ public async Task SendInstantAsync(Guid clientId, ReadOnlyMemory mes
CancellationToken cancellationToken = default)
{
if (!_clients.TryGetValue(clientId, out var client)) return false;
- await client.Socket.SendInstantAsync(message, type, cancellationToken);
+ await client.Socket.SendInstantAsync(message, type, cancellationToken).ConfigureAwait(false);
return true;
}
@@ -274,7 +274,7 @@ public async Task SendAsync(Guid clientId, ReadOnlyMemory message, W
CancellationToken cancellationToken = default)
{
if (!_clients.TryGetValue(clientId, out var client)) return false;
- await client.Socket.SendAsync(message, type, cancellationToken);
+ await client.Socket.SendAsync(message, type, cancellationToken).ConfigureAwait(false);
return true;
}
@@ -282,7 +282,7 @@ public async Task SendAsync(Guid clientId, ReadOnlyMemory message, W
CancellationToken cancellationToken = default)
{
if (!_clients.TryGetValue(clientId, out var client)) return false;
- await client.Socket.SendAsync(message, type, cancellationToken);
+ await client.Socket.SendAsync(message, type, cancellationToken).ConfigureAwait(false);
return true;
}
@@ -309,7 +309,7 @@ public async Task BroadcastInstantAsync(ReadOnlyMemory message, WebS
{
var sockets = _clients.Values.Select(x => x.Socket).ToArray();
return await sockets.Async((client, ct) => client.SendInstantAsync(message, type, ct), x => x,
- cancellationToken);
+ cancellationToken).ConfigureAwait(false);
}
public async Task BroadcastInstantAsync(ReadOnlyMemory message, WebSocketMessageType type,
@@ -317,21 +317,21 @@ public async Task BroadcastInstantAsync(ReadOnlyMemory message, WebS
{
var sockets = _clients.Values.Select(x => x.Socket).ToArray();
return await sockets.Async((client, ct) => client.SendInstantAsync(message, type, ct), x => x,
- cancellationToken);
+ cancellationToken).ConfigureAwait(false);
}
public async Task BroadcastAsync(ReadOnlyMemory message, WebSocketMessageType type,
CancellationToken cancellationToken = default)
{
var sockets = _clients.Values.Select(x => x.Socket).ToArray();
- return await sockets.Async((client, ct) => client.SendAsync(message, type, ct), x => x, cancellationToken);
+ return await sockets.Async((client, ct) => client.SendAsync(message, type, ct), x => x, cancellationToken).ConfigureAwait(false);
}
public async Task BroadcastAsync(ReadOnlyMemory message, WebSocketMessageType type,
CancellationToken cancellationToken = default)
{
var sockets = _clients.Values.Select(x => x.Socket).ToArray();
- return await sockets.Async((client, ct) => client.SendAsync(message, type, ct), x => x, cancellationToken);
+ return await sockets.Async((client, ct) => client.SendAsync(message, type, ct), x => x, cancellationToken).ConfigureAwait(false);
}
public bool TryBroadcast(ReadOnlyMemory message, WebSocketMessageType type)
@@ -599,7 +599,7 @@ await NativeServerSocket
{
try
{
- await NativeServerSocket.CloseAsync(status, statusDescription, cancellationToken);
+ await NativeServerSocket.CloseAsync(status, statusDescription, cancellationToken).ConfigureAwait(false);
}
catch
{
diff --git a/src/WebSocket.Rx/WebSocket.Rx.csproj b/src/WebSocket.Rx/WebSocket.Rx.csproj
index 9352959..9979517 100644
--- a/src/WebSocket.Rx/WebSocket.Rx.csproj
+++ b/src/WebSocket.Rx/WebSocket.Rx.csproj
@@ -31,6 +31,7 @@
+